Commit 3afb5e5e authored by zengtianlai3's avatar zengtianlai3

运维模块修改以及代码整理

parent 9bbdf799
......@@ -5,7 +5,7 @@ import iot.sixiang.license.device.DeviceProtocol;
import lombok.Data;
@Data
public class OperateSAMStatusQueryEvent extends BaseEvent {
public class OperateSAMStatusRequestEvent extends BaseEvent {
private SocketChannel channel;
private DeviceProtocol protocol;
}
......@@ -9,18 +9,16 @@ import org.springframework.stereotype.Component;
@Component
@Slf4j
public class OperateSAMStatusQueryEventHandler {
public OperateSAMStatusQueryEventHandler() {
public class OperateSAMStatusRequestEventHandler {
public OperateSAMStatusRequestEventHandler() {
}
@EventListener
public void handlerEvent(OperateSAMStatusQueryEvent event) {
public void handlerEvent(OperateSAMStatusRequestEvent event) {
SocketChannel channel = event.getChannel();
DeviceProtocol protocol = event.getProtocol();
channel.writeAndFlush(protocol);
log.debug("OperateSAMStatusQueryEventHandler");
}
......
......@@ -7,7 +7,7 @@ import lombok.Data;
* Created by m33 on 2022/6/9 21:38
*/
@Data
public class OperateSAMStatusEvent extends BaseEvent {
public class OperateSAMStatusResponseEvent extends BaseEvent {
private String ip;
private DeviceProtocol protocol;
}
package iot.sixiang.license.event;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import iot.sixiang.license.device.DeviceProtocol;
import iot.sixiang.license.model.SamInfo;
import iot.sixiang.license.model.SamMonitor;
import iot.sixiang.license.model.msg.SamInfoMsg;
import iot.sixiang.license.operate.OperateManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -19,20 +18,19 @@ import java.util.List;
@Component
@Slf4j
public class OperateSAMStatusEventHandler {
public class OperateSAMStatusResponseEventHandler {
@Autowired
public OperateManager operateManager;
public OperateSAMStatusEventHandler() {
public OperateSAMStatusResponseEventHandler() {
}
@EventListener
public void handlerEvent(OperateSAMStatusEvent event) {
public void handlerEvent(OperateSAMStatusResponseEvent event) {
DeviceProtocol protocol = event.getProtocol();
String serverIp = event.getIp();
String jsonOperateStatus = new String(protocol.getContent(), 0, protocol.getContent().length);
SamInfoMsg samInfoMsg = JSON.parseObject(jsonOperateStatus, SamInfoMsg.class);
List<SamInfo> samInfoList = samInfoMsg.getList();
List<SamInfo> samInfoList = JSONObject.parseArray(jsonOperateStatus, SamInfo.class);
int samCount = samInfoList.size();
int onlineCount = 0;
for (SamInfo samInfo : samInfoList) {
......@@ -40,11 +38,11 @@ public class OperateSAMStatusEventHandler {
onlineCount++;
}
}
SamMonitor samMonitor = new SamMonitor();
samMonitor.setServerIp(serverIp);
samMonitor.setOnlineCount(onlineCount);
samMonitor.setSamCount(samCount);
operateManager.putSamMonitorMap(serverIp, samMonitor);
log.debug("OperateSAMStatusEventHandler");
}
}
......@@ -8,6 +8,6 @@ import lombok.Data;
@Data
public class SamInfo {
private int index;
private int samid;
private String samid;
private int status;
}
......@@ -6,7 +6,7 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import iot.sixiang.license.device.DeviceProtocol;
import iot.sixiang.license.event.EventPublisher;
import iot.sixiang.license.event.OperateSAMStatusEvent;
import iot.sixiang.license.event.OperateSAMStatusResponseEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -21,12 +21,8 @@ public class OperateClientHandler extends SimpleChannelInboundHandler<Object> {
@Autowired
EventPublisher eventPublisher;
private byte[] obj = new byte[0]; // 特殊的instance变量
private String nodeType = "rsretail";
public OperateClientHandler() {
super();
log.debug("ProxyNodeClientHandler ****");
}
@Override
......@@ -35,8 +31,7 @@ public class OperateClientHandler extends SimpleChannelInboundHandler<Object> {
String remoteIp = socketAddr.getHostString();
DeviceProtocol protocol = (DeviceProtocol) msg;
OperateSAMStatusEvent event = new OperateSAMStatusEvent();
OperateSAMStatusResponseEvent event = new OperateSAMStatusResponseEvent();
event.setProtocol(protocol);
event.setIp(remoteIp);
eventPublisher.publishEvent(event);
......@@ -46,10 +41,10 @@ public class OperateClientHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
// TODO Auto-generated method stub
try {
super.channelRegistered(ctx);
log.debug("channelRegistered----------->");
SocketChannel channel = (SocketChannel) ctx.channel();
log.debug("channelRegistered "+ctx.channel().id().asLongText());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
......@@ -62,20 +57,7 @@ public class OperateClientHandler extends SimpleChannelInboundHandler<Object> {
super.channelActive(ctx);
try {
SocketChannel channel = (SocketChannel) ctx.channel();
log.debug("channelActive********"+ctx.channel().id().asLongText());
// InetSocketAddress socketAddr = (InetSocketAddress) ctx.channel().remoteAddress();
// String clientIp = socketAddr.getHostString();
// int port = socketAddr.getPort();
//
// SessionContext session = new SessionContext();
// session.setRemoteIp(clientIp);
// session.setRemotePort(port);
// session.setClientChannel(channel);
//
// ForwardManager forwardManager = SpringUtil.getBean(ForwardManager.class);
// log.debug("channelActive.getClientChannel:" + channel);
// forwardManager.setSessionContexts(session);
log.debug("channelActive "+ctx.channel().id().asLongText());
} catch (Exception e) {
// TODO Auto-generated catch block
......@@ -88,10 +70,7 @@ public class OperateClientHandler extends SimpleChannelInboundHandler<Object> {
// TODO Auto-generated method stub
super.channelInactive(ctx);
SocketChannel channel = (SocketChannel) ctx.channel();
String channelId = channel.id().asLongText();
// ForwardClientInactiveEvent forwardClientInactiveEvent = new ForwardClientInactiveEvent();
// forwardClientInactiveEvent.setChannelId(channelId);
// eventPublisher.publishEvent(forwardClientInactiveEvent);
log.debug("channelInactive "+ctx.channel().id().asLongText());
ctx.close();
}
......@@ -99,19 +78,18 @@ public class OperateClientHandler extends SimpleChannelInboundHandler<Object> {
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelReadComplete(ctx);
log.debug("channelReadComplete");
SocketChannel channel = (SocketChannel) ctx.channel();
log.debug("channelReadComplete "+ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// TODO Auto-generated method stub
super.exceptionCaught(ctx, cause);
InetSocketAddress socketAddr = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = socketAddr.getHostString();
int port = socketAddr.getPort();
SocketChannel channel = (SocketChannel) ctx.channel();
log.debug("exceptionCaught "+ctx.channel().id().asLongText());
cause.printStackTrace();
ctx.close();
log.error("client has error,ip:" + clientIp + ",port:" + port);
}
@Override
......
......@@ -22,15 +22,13 @@ public class OperateConnectionListener extends BaseConnectionListener {
//TODO 查询SAM状态
OperateSAMStatusQueryEvent operateSAMStatusQueryEvent = new OperateSAMStatusQueryEvent();
OperateSAMStatusRequestEvent operateSAMStatusQueryEvent = new OperateSAMStatusRequestEvent();
short stx = 21930;
int len = 2;
byte cmd = 0x5f;
// byte cmd = 0x5b;
byte ack = 0x00;
byte end = 0x5f;
// byte end = 0x5b;
SocketChannel channel = (SocketChannel) channelFuture.channel();
DeviceProtocol protocol = new DeviceProtocol(stx, len, cmd, ack, null, end);
......
......@@ -18,10 +18,6 @@ public class OperateDecoder extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) {
// 可读长度必须大于等于基本长度
try {
byte[] packet=new byte[buffer.readableBytes()];
buffer.readBytes(packet);
buffer.resetReaderIndex();
Util.DEBUG_HEX("SERVER -> IN",packet,packet.length);
if (buffer.readableBytes() < BASE_LENGTH) {
return;
......@@ -29,25 +25,21 @@ public class OperateDecoder extends ByteToMessageDecoder {
buffer.markReaderIndex();
short stx = buffer.readShort();//55AA->21930
short stx = buffer.readShort();
short len = buffer.readShortLE();
log.debug("msg rece len:"+packet.length+",decode len:"+len);
len = 2677;
byte cmd = buffer.readByte();
byte ack = buffer.readByte();////stx:21930,len:52,cmd:-112,ack:0
byte cmd = buffer.readByte();
byte ack = buffer.readByte();
log.debug("stx:" + stx + ",len:" + len);
int real_len = len;
int cmd_ack_len = 2;
if (buffer.readableBytes() < real_len - cmd_ack_len+1) {
if (buffer.readableBytes() < real_len - cmd_ack_len + 1) {
buffer.resetReaderIndex();
return;
}
// buffer.resetReaderIndex();//复位
// 读取data数据
byte[] content = new byte[real_len - cmd_ack_len];
......@@ -55,14 +47,13 @@ public class OperateDecoder extends ByteToMessageDecoder {
byte end = buffer.readByte();
log.debug("OperateDecoder.msg.getLen()...." + real_len);
DeviceProtocol protocol = new DeviceProtocol(stx, real_len, cmd, ack, content,end);
DeviceProtocol protocol = new DeviceProtocol(stx, real_len, cmd, ack, content, end);
out.add(protocol);
} catch (Exception e) {
// TODO Auto-generated catch block
log.error("DeviceDecoder error!");
log.error(e.getMessage());
}
}
......
......@@ -11,23 +11,16 @@ import lombok.extern.slf4j.Slf4j;
public class OperateEncoder extends MessageToByteEncoder<DeviceProtocol> {
@Override
protected void encode(ChannelHandlerContext tcx, DeviceProtocol msg, ByteBuf out) throws Exception {
protected void encode(ChannelHandlerContext tcx, DeviceProtocol msg, ByteBuf out) {
try {
// short stx = in.readShort();//55AA->21930
// short len = in.readShortLE();
out.writeShort(msg.getStx());
// out.writeShort(msg.getLen());
out.writeShortLE(msg.getLen());
out.writeByte(msg.getCmd());
out.writeByte(msg.getAck());
log.debug("OperateEncoder.msg.getLen()...." + msg.getLen());
if (msg.getContent() == null) {
// logger.debug("body数据为空");
// log.debug("body数据为空");
} else {
out.writeBytes(msg.getContent());
}
......@@ -35,7 +28,7 @@ public class OperateEncoder extends MessageToByteEncoder<DeviceProtocol> {
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.error("DeviceEncoder error---->");
log.error(e.getMessage());
}
}
}
......@@ -46,8 +46,8 @@ public class OperateManager {
allServers.put(serverIp, server);
}
createProxyClient();
// createProxyClient();
// test();
}
public void createProxyClient() {
......@@ -57,19 +57,20 @@ public class OperateManager {
Server server = next.getValue();
//TODO 创建运维客户端去查询相关的运维消息
// for(int i =0;i<5;i++){
String serverIp = server.getServerIp();
Integer port = server.getPort();
serverIp = "192.168.1.56";
// serverIp = "172.17.115.81";
// port = 777;
this.startTcpClient(serverIp, port);
// }
String serverIp = server.getServerIp();
Integer port = server.getPort();
this.startTcpClient(serverIp, port);
}
}
private void test() {
String serverIp = "192.168.1.56";
int port = 18889;
// serverIp = "172.17.115.81";
// port = 777;
this.startTcpClient(serverIp, port);
}
public void startTcpClient(String serviceIP, int port) {
client.startTcp(serviceIP, port);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment