Commit dc3b192e authored by zengtianlai3's avatar zengtianlai3

新增运维模块

parent 67038584
......@@ -5,6 +5,8 @@ public class Consts {
public static final int CMD_LICENSE = 1;//授权消息的cmd,十进制
public static final int EXECUTOR_THREAD_NUM = 30;
public static final int FORWARD_THREAD_NUM = 30;
public static final int OPERATE_THREAD_NUM = 5;
public static final Integer DEVICE_STATE_ONLINE = 1;// 设备在线
public static final Integer DEVICE_STATE_OFFLINE = 0;// 设备离线
public static final Integer DEVICE_STATE_FAILURE = -1;// 设备故障
......
package iot.sixiang.license.event;
import io.netty.channel.socket.SocketChannel;
import iot.sixiang.license.device.DeviceProtocol;
import lombok.Data;
@Data
public class OperateSAMStatusQueryEvent extends BaseEvent {
private SocketChannel channel;
private DeviceProtocol protocol;
}
package iot.sixiang.license.event;
import io.netty.channel.socket.SocketChannel;
import iot.sixiang.license.device.DeviceProtocol;
import iot.sixiang.license.util.Util;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class OperateSAMStatusQueryEventHandler {
public OperateSAMStatusQueryEventHandler() {
}
@EventListener
public void handlerEvent(OperateSAMStatusQueryEvent event) {
SocketChannel channel = event.getChannel();
DeviceProtocol protocol = event.getProtocol();
channel.writeAndFlush(protocol);
log.debug("OperateSAMStatusQueryEventHandler");
}
}
......@@ -10,7 +10,7 @@ public class ForwardChannelInitializer extends BaseChannelInitializer {
private ForwardClientHandler handler;
static final EventExecutorGroup workGroup = new DefaultEventExecutorGroup(Consts.EXECUTOR_THREAD_NUM);
static final EventExecutorGroup workGroup = new DefaultEventExecutorGroup(Consts.FORWARD_THREAD_NUM);
public ForwardChannelInitializer(ForwardClientHandler handler) {
this.handler = handler;
......
package iot.sixiang.license.operate;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import iot.sixiang.license.consts.Consts;
import iot.sixiang.license.forward.ForwardClientHandler;
import iot.sixiang.license.forward.ForwardDecoder;
import iot.sixiang.license.forward.ForwardEncoder;
import iot.sixiang.license.net.BaseChannelInitializer;
public class OperateChannelInitializer extends BaseChannelInitializer {
private OperateClientHandler handler;
static final EventExecutorGroup workGroup = new DefaultEventExecutorGroup(Consts.OPERATE_THREAD_NUM);
public OperateChannelInitializer(OperateClientHandler handler) {
this.handler = handler;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new OperateDecoder());
ch.pipeline().addLast("encoder", new OperateEncoder());
ch.pipeline().addLast(workGroup, "handler", handler);
}
}
package iot.sixiang.license.operate;
import iot.sixiang.license.forward.ForwardChannelInitializer;
import iot.sixiang.license.forward.ForwardConnectionListener;
import iot.sixiang.license.net.TcpClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class OperateClient {
private TcpClient client = null;
private OperateChannelInitializer channelInitializer;
@Autowired
OperateClientHandler handler;
public OperateClient() {
}
public void startTcp(String host, int port) {
OperateConnectionListener listener = new OperateConnectionListener();
channelInitializer = new OperateChannelInitializer(handler);
client = new TcpClient(host, port, channelInitializer, listener);
client.start();
}
}
package iot.sixiang.license.operate;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import iot.sixiang.license.device.DeviceProtocol;
import iot.sixiang.license.event.EventPublisher;
import iot.sixiang.license.event.ForwardClientInactiveEvent;
import iot.sixiang.license.event.ForwardMessageResponseEvent;
import iot.sixiang.license.util.HexUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
@Component
@ChannelHandler.Sharable
@Slf4j
public class OperateClientHandler extends SimpleChannelInboundHandler<Object> {
@Autowired
EventPublisher eventPublisher;
private byte[] obj = new byte[0]; // 特殊的instance变量
private String nodeType = "rsretail";
public OperateClientHandler() {
super();
log.debug("ProxyNodeClientHandler ****");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// TODO Auto-generated method stub
// TODO 中转客户端收到消息后,将消息原封不动的发送给设备客户端
SocketChannel channel = (SocketChannel) ctx.channel();
InetSocketAddress socketAddr = (InetSocketAddress) ctx.channel().remoteAddress();
String serverIp = socketAddr.getHostString();
int serverPort = socketAddr.getPort();
log.debug("channelRead0...");
DeviceProtocol protocol = (DeviceProtocol) msg;
log.info("桥接服务器响应1"+protocol.toString());
log.info("桥接服务器响应2:"+ HexUtil.bytes2hex(protocol.getContent()));
String channelId = channel.id().asLongText();
// ForwardMessageResponseEvent forwardMessageResponseEvent = new ForwardMessageResponseEvent();
// forwardMessageResponseEvent.setChannelId(channelId);
// forwardMessageResponseEvent.setChannel(channel);
// forwardMessageResponseEvent.setProtocol(protocol);
// eventPublisher.publishEvent(forwardMessageResponseEvent);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
// TODO Auto-generated method stub
try {
super.channelRegistered(ctx);
log.debug("channelRegistered----------->");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public synchronized void channelActive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelActive(ctx);
try {
SocketChannel channel = (SocketChannel) ctx.channel();
log.debug("channelActive********"+ctx.channel().id().asLongText());
// InetSocketAddress socketAddr = (InetSocketAddress) ctx.channel().remoteAddress();
// String clientIp = socketAddr.getHostString();
// int port = socketAddr.getPort();
//
// SessionContext session = new SessionContext();
// session.setRemoteIp(clientIp);
// session.setRemotePort(port);
// session.setClientChannel(channel);
//
// ForwardManager forwardManager = SpringUtil.getBean(ForwardManager.class);
// log.debug("channelActive.getClientChannel:" + channel);
// forwardManager.setSessionContexts(session);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelInactive(ctx);
SocketChannel channel = (SocketChannel) ctx.channel();
String channelId = channel.id().asLongText();
// ForwardClientInactiveEvent forwardClientInactiveEvent = new ForwardClientInactiveEvent();
// forwardClientInactiveEvent.setChannelId(channelId);
// eventPublisher.publishEvent(forwardClientInactiveEvent);
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelReadComplete(ctx);
log.debug("channelReadComplete");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// TODO Auto-generated method stub
super.exceptionCaught(ctx, cause);
InetSocketAddress socketAddr = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = socketAddr.getHostString();
int port = socketAddr.getPort();
cause.printStackTrace();
ctx.close();
log.error("client has error,ip:" + clientIp + ",port:" + port);
}
@Override
public synchronized void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
try {
SocketChannel channel = (SocketChannel) ctx.channel();
log.debug("userEventTriggered");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
package iot.sixiang.license.operate;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
import iot.sixiang.license.device.DeviceProtocol;
import iot.sixiang.license.event.*;
import iot.sixiang.license.net.BaseConnectionListener;
import iot.sixiang.license.util.SpringUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class OperateConnectionListener extends BaseConnectionListener {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (!channelFuture.isSuccess()) {
//TODO 失败进行告警
} else {
//TODO 查询SAM状态
OperateSAMStatusQueryEvent operateSAMStatusQueryEvent = new OperateSAMStatusQueryEvent();
short stx = 21930;
int len = 2;
// byte cmd = 0x5f;
byte cmd = 0x5b;
byte ack = 0x00;
// byte end = 0x5f;
byte end = 0x5b;
SocketChannel channel = (SocketChannel) channelFuture.channel();
DeviceProtocol protocol = new DeviceProtocol(stx, len, cmd, ack, null, end);
operateSAMStatusQueryEvent.setChannel(channel);
operateSAMStatusQueryEvent.setProtocol(protocol);
EventPublisher eventPublisher = SpringUtil.getBean(EventPublisher.class);
eventPublisher.publishEvent(operateSAMStatusQueryEvent);
}
}
}
package iot.sixiang.license.operate;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import iot.sixiang.license.device.DeviceProtocol;
import iot.sixiang.license.util.Util;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@Slf4j
public class OperateDecoder extends ByteToMessageDecoder {
public final int BASE_LENGTH = 2 + 2 + 1 + 1;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) {
// 可读长度必须大于等于基本长度
try {
byte[] packet=new byte[buffer.readableBytes()];
buffer.readBytes(packet);
buffer.resetReaderIndex();
Util.DEBUG_HEX("SERVER -> IN",packet,packet.length);
if (buffer.readableBytes() < BASE_LENGTH) {
return;
}
buffer.markReaderIndex();
short stx = buffer.readShort();//55AA->21930
short len = buffer.readShortLE();
byte cmd = buffer.readByte();
byte ack = buffer.readByte();////stx:21930,len:52,cmd:-112,ack:0
log.debug("stx:" + stx + ",len:" + len);
int real_len = len;//注意,透传前已经去掉了END一个字符
int cmd_ack_len = 2;
if (buffer.readableBytes() < real_len - cmd_ack_len+1) {
buffer.resetReaderIndex();
return;
}
// buffer.resetReaderIndex();//复位
// 读取data数据
byte[] content = new byte[real_len - cmd_ack_len];
buffer.readBytes(content);
byte end = buffer.readByte();
log.debug("OperateDecoder.msg.getLen()...." + real_len);
DeviceProtocol protocol = new DeviceProtocol(stx, real_len, cmd, ack, content,end);
out.add(protocol);
} catch (Exception e) {
// TODO Auto-generated catch block
log.error("DeviceDecoder error!");
}
}
}
\ No newline at end of file
package iot.sixiang.license.operate;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import iot.sixiang.license.device.DeviceProtocol;
import iot.sixiang.license.util.Util;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class OperateEncoder extends MessageToByteEncoder<DeviceProtocol> {
@Override
protected void encode(ChannelHandlerContext tcx, DeviceProtocol msg, ByteBuf out) throws Exception {
try {
// short stx = in.readShort();//55AA->21930
// short len = in.readShortLE();
out.writeShort(msg.getStx());
// out.writeShort(msg.getLen());
out.writeShortLE(msg.getLen());
out.writeByte(msg.getCmd());
out.writeByte(msg.getAck());
log.debug("OperateEncoder.msg.getLen()...." + msg.getLen());
if (msg.getContent() == null) {
// logger.debug("body数据为空");
} else {
out.writeBytes(msg.getContent());
}
out.writeByte(msg.getEnd());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.error("DeviceEncoder error---->");
}
}
}
package iot.sixiang.license.operate;
import iot.sixiang.license.entity.Server;
import iot.sixiang.license.service.ServerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@Component
@Slf4j
public class OperateManager {
@Autowired
private ServerService serverService;
@Autowired
public OperateClient client;
private Map<String, Server> allServers = null;
public OperateManager() {
allServers = new HashMap<String, Server>();
}
@PostConstruct
public void init() {
List<Server> servers = serverService.getServerList(0, 20);
for (Server server : servers) {
String serverIp = server.getServerIp();
allServers.put(serverIp, server);
}
createProxyClient();
}
private void createProxyClient() {
Iterator<Map.Entry<String, Server>> iterator = allServers.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Server> next = iterator.next();
Server server = next.getValue();
//TODO 创建运维客户端去查询相关的运维消息
// for(int i =0;i<5;i++){
String serverIp = server.getServerIp();
Integer port = server.getPort();
// serverIp = "172.17.115.81";
// port = 777;
this.startTcpClient(serverIp, port);
// }
}
}
public void startTcpClient(String serviceIP, int port) {
client.startTcp(serviceIP, port);
}
}
......@@ -2,7 +2,7 @@ server:
port: 8868
logging:
level:
root: info
root: debug
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
......
spring:
profiles:
active: prod
active: dev
application:
name: iot_license #当前服务的名称
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment