Commit 66742c60 authored by zengtianlai3's avatar zengtianlai3

完善透传功能

parent 200b89e1
......@@ -49,40 +49,40 @@ public class DeviceManager {
server.start();
}
public SessionContext getSessionByRemoteIp(String remoteIp, int remotePort) {
public synchronized void putSession(String appId, SessionContext session) {
sessionContexts.put(appId, session);
}
public SessionContext getSessionContextByAppId(String appId) {
return sessionContexts.get(appId);
}
public SessionContext getSessionByChannelId(String channelId) {
SessionContext session = null;
Iterator<Map.Entry<String, SessionContext>> it = sessionContexts.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, SessionContext> entry = it.next();
SessionContext targetSession = entry.getValue();
if (targetSession.getRemoteIp().equals(remoteIp) && targetSession.getRemotePort() == remotePort) {
if (targetSession.getChannelId().equals(channelId)) {
return targetSession;
}
}
return session;
}
public SessionContext getSessionByClientIp(String remoteIp, int remotePort) {
SessionContext session = null;
public synchronized boolean removeSessionByChannelId(String channelId) {
Iterator<Map.Entry<String, SessionContext>> it = sessionContexts.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, SessionContext> entry = it.next();
SessionContext targetSession = entry.getValue();
if (targetSession.getRemoteIp().equals(remoteIp) && targetSession.getRemotePort() == remotePort) {
return targetSession;
if (targetSession.getChannelId().equals(channelId)) {
it.remove();
return true;
}
}
return session;
}
public synchronized void putSession(String appId, SessionContext session) {
sessionContexts.put(appId, session);
}
public SessionContext getSessionContextByAppId(String appId) {
return sessionContexts.get(appId);
return false;
}
public synchronized boolean removeSession(String appId, int offPort) {
......
......@@ -4,12 +4,11 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import iot.sixiang.license.consts.Consts;
import iot.sixiang.license.event.CreateForwarClientEvent;
import iot.sixiang.license.event.DeviceClientInactiveEvent;
import iot.sixiang.license.event.EventPublisher;
import iot.sixiang.license.forward.ForwardManager;
import iot.sixiang.license.event.ForwardClientRequestEvent;
import iot.sixiang.license.model.SessionContext;
import iot.sixiang.license.util.SpringUtil;
import lombok.extern.slf4j.Slf4j;
......@@ -43,30 +42,6 @@ public class DeviceServerHandler extends SimpleChannelInboundHandler<Object> {
int remotePort = socketAddr.getPort();
DeviceProtocol protocol = (DeviceProtocol) msg;
log.debug("channel.id().asLongText():" + channel.id().asLongText());
// SessionContext session = new SessionContext();
// session.setClientIp(clientIp);
// session.setPort(clientPort);
// session.setClientChannel(channel);
// DeviceManager deviceManager= SpringUtil.getBean(DeviceManager.class);
// deviceManager.setSessionContext(session);
//
// //TODO:透传
// ForwardManager forwardManager= SpringUtil.getBean(ForwardManager.class);
// log.debug("channelRead0.forwardManager:"+forwardManager);
// log.debug("channelRead0.forwardManager.getClientChannel:"+forwardManager.getSessionContexts().getClientChannel());
// forwardManager.getSessionContexts().getClientChannel().writeAndFlush(protocol);
// short stx = buf.readShort();//55AA->21930
// short len = buf.readShortLE();
// log.debug("stx:"+stx+",len:"+len);
// log.debug("stx:"+stx+",len:"+len);
// channel.writeAndFlush(protocol);
/*
TODO:
1.透传前先进行鉴权
......@@ -75,9 +50,10 @@ public class DeviceServerHandler extends SimpleChannelInboundHandler<Object> {
*/
byte cmd = protocol.getCmd();
int cmdInt = cmd & 0xFF;
log.info("real cmd:"+cmdInt);
boolean license = false;
cmdInt = Consts.CMD_LICENSE;
// cmdInt = Consts.CMD_LICENSE;
if (cmdInt == Consts.CMD_LICENSE) {
license = handlerLicense(channel, remoteIp, remotePort, protocol);
......@@ -88,6 +64,7 @@ public class DeviceServerHandler extends SimpleChannelInboundHandler<Object> {
}
}
//TODO 以下为正式代码
// if (license == false) {
// channel.close();
// }
......@@ -110,28 +87,45 @@ public class DeviceServerHandler extends SimpleChannelInboundHandler<Object> {
int port = socketAddr.getPort();
log.debug("client connected,ip:" + clientIp + ",port:" + port);
log.debug("channel.id().asLongText():" + ctx.channel().id().asLongText());
// TODO 以下为模拟的测试代码
String appId = "123456";
String appKey = "123456";
String token = "123456";
SocketChannel channel = (SocketChannel) ctx.channel();
String channelId = channel.id().asLongText();
boolean license = true;
if (license) {
SessionContext session = new SessionContext();
session.setAppId(appId);
session.setAppKey(appKey);
session.setToken(token);
session.setChannelId(channelId);
session.setClientChannel(channel);
DeviceManager deviceManager = SpringUtil.getBean(DeviceManager.class);
deviceManager.putSession(appId, session);
//TODO 创建透传的客户端
log.info("forward client begin start ..." + appId);
CreateForwarClientEvent event = new CreateForwarClientEvent();
event.setAppId(appId);
eventPublisher.publishEvent(event);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelInactive(ctx);
InetSocketAddress socketAddr = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = socketAddr.getHostString();
int port = socketAddr.getPort();
DeviceManager deviceManager = SpringUtil.getBean(DeviceManager.class);
SessionContext session = deviceManager.getSessionByClientIp(clientIp, port);
if (session == null) {
return;
}
boolean result = deviceManager.removeSession(session.getAppId(), port);
if (result) {
// TODO 设备离线需要中断该设备对应的中转客户端
}
log.debug("channel.id().asLongText():" + ctx.channel().id().asLongText());
SocketChannel channel = (SocketChannel) ctx.channel();
String channelId = channel.id().asLongText();
DeviceClientInactiveEvent deviceClientInactiveEvent = new DeviceClientInactiveEvent();
deviceClientInactiveEvent.setChannelId(channelId);
eventPublisher.publishEvent(deviceClientInactiveEvent);
ctx.close();
}
......@@ -155,51 +149,6 @@ public class DeviceServerHandler extends SimpleChannelInboundHandler<Object> {
log.error("client has error,ip:" + clientIp + ",port:" + port);
}
@Override
public synchronized void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj;
if (IdleState.READER_IDLE.equals(event.state())) { // 如果读通道处于空闲状态,说明没有接收到心跳命令
InetSocketAddress socketAddr = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = socketAddr.getHostString();
int port = socketAddr.getPort();
// synchronized (obj) {
// SessionContext session = deviceManager.getSessionByClientIp(clientIp, port);
// if (session == null) {
// log.warn("这是非法的连接");
// ctx.channel().close();
// } else {
//
//
// String deviceCode = session.getDeviceCode();
//
// int idleCount = session.getIdleCount();
// idleCount += 1;
// if (idleCount >= 1) {
// ctx.channel().close();
// session.setIdleCount(0);
// boolean result = deviceManager.removeSession(deviceCode, port, false);
// if (result) {
//// // TODO 设备离线需要上报设备状态给业务平台
//// // TODO 使用心跳检测判断掉线,可能存在多个线程监控一台设备是否重连上来,导致重复发送多次告警
////
//
//
// log.info(deviceCode + "," + "," + session.getClientIp() + ","
// + session.getPort() + ",心跳检测失败,异常下线");
// }
// }
// }
// }
}
} else {
super.userEventTriggered(ctx, obj);
}
}
private boolean handlerLicense(SocketChannel channel, String remoteIp, int remotePort, DeviceProtocol protocol) {
String jsonLicense = new String(protocol.getContent(), 0, protocol.getContent().length);
......@@ -208,9 +157,6 @@ public class DeviceServerHandler extends SimpleChannelInboundHandler<Object> {
String token = "123456";
String channelId = channel.id().asLongText();
// LicenseManager licenseManager = SpringUtil.getBean(LicenseManager.class);
//
// boolean license = licenseManager.checkLicense(deviceCode,deviceKey);
boolean license = true;
if (license) {
SessionContext session = new SessionContext();
......@@ -226,10 +172,10 @@ public class DeviceServerHandler extends SimpleChannelInboundHandler<Object> {
deviceManager.putSession(appId, session);
//TODO 创建透传的客户端
CreateForwarClientEvent event = new CreateForwarClientEvent();
log.info("forward client begin start ..." + appId);
CreateForwarClientEvent event = new CreateForwarClientEvent();
event.setAppId(appId);
eventPublisher.publishEvent(event);
}
return license;
......@@ -249,8 +195,13 @@ public class DeviceServerHandler extends SimpleChannelInboundHandler<Object> {
//
// forwardManager.getSessionContexts().getClientChannel().writeAndFlush(protocol);
CreateForwarClientEvent event = new CreateForwarClientEvent();
eventPublisher.publishEvent(event);
// Thread.sleep(1000);
String channelId = channel.id().asLongText();
ForwardClientRequestEvent forwardClientRequestEvent = new ForwardClientRequestEvent();
forwardClientRequestEvent.setDeviceChannelId(channelId);
forwardClientRequestEvent.setProtocol(protocol);
eventPublisher.publishEvent(forwardClientRequestEvent);
}
......
......@@ -13,6 +13,8 @@ public class CreateForwarClientEventHandler {
@Autowired
ForwardManager forwardManager;
@Autowired
EventPublisher eventPublisher;
public CreateForwarClientEventHandler() {
......@@ -24,11 +26,7 @@ public class CreateForwarClientEventHandler {
public void handlerEvent(CreateForwarClientEvent event) {
String appId = event.getAppId();
forwardManager.startTcpClient(appId);
log.debug("创建客户端、、、");
}
......
package iot.sixiang.license.event;
import lombok.Data;
@Data
public class DeviceClientBeForcedOfflineEvent extends BaseEvent {
private String appId;
}
package iot.sixiang.license.event;
import io.netty.channel.socket.SocketChannel;
import iot.sixiang.license.device.DeviceManager;
import iot.sixiang.license.device.DeviceProtocol;
import iot.sixiang.license.forward.ForwardManager;
import iot.sixiang.license.model.SessionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class DeviceClientBeForcedOfflineEventHandler {
@Autowired
DeviceManager deviceManager;
public DeviceClientBeForcedOfflineEventHandler() {
}
// @Async("asyncExecutor")
@EventListener
public void handlerEvent(DeviceClientBeForcedOfflineEvent event) {
String appId = event.getAppId();
SessionContext deviceSessionContext = deviceManager.getSessionContextByAppId(appId);
if (deviceSessionContext != null) {
SocketChannel deviceClientChannel = deviceSessionContext.getClientChannel();
if (deviceClientChannel != null) {
deviceClientChannel.close();
log.info("device client be forced offline success ..." + appId);
}
} else {
log.info("device client be forced offline undo ..." + appId);
}
}
}
package iot.sixiang.license.event;
import lombok.Data;
@Data
public class DeviceClientInactiveEvent extends BaseEvent {
String channelId;
}
package iot.sixiang.license.event;
import io.netty.channel.socket.SocketChannel;
import iot.sixiang.license.device.DeviceManager;
import iot.sixiang.license.model.SessionContext;
import iot.sixiang.license.util.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class DeviceClientInactiveEventHandler {
@Autowired
DeviceManager deviceManager;
@Autowired
EventPublisher eventPublisher;
public DeviceClientInactiveEventHandler() {
}
// @Async("asyncExecutor")
@EventListener
public void handlerEvent(DeviceClientInactiveEvent event) {
String channelId = event.getChannelId();
SessionContext session = deviceManager.getSessionByChannelId(channelId);
if (session == null) {
log.debug("device client inactive undo ..." );
return;
}else{
String appId = session.getAppId();
boolean result = deviceManager.removeSessionByChannelId(channelId);
if (result) {
// TODO device client 离线需要强制中断该设备对应的forward client
ForwardClientBeForcedOfflineEvent forwardClientBeForcedOfflineEvent = new ForwardClientBeForcedOfflineEvent();
forwardClientBeForcedOfflineEvent.setAppId(appId);
eventPublisher.publishEvent(forwardClientBeForcedOfflineEvent);
log.debug("device client inactive success ..." );
}
}
}
}
package iot.sixiang.license.event;
import lombok.Data;
@Data
public class ForwardClientBeForcedOfflineEvent extends BaseEvent {
private String appId;
}
package iot.sixiang.license.event;
import io.netty.channel.socket.SocketChannel;
import iot.sixiang.license.forward.ForwardManager;
import iot.sixiang.license.model.SessionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ForwardClientBeForcedOfflineEventHandler {
@Autowired
ForwardManager forwardManager;
public ForwardClientBeForcedOfflineEventHandler() {
}
// @Async("asyncExecutor")
@EventListener
public void handlerEvent(ForwardClientBeForcedOfflineEvent event) {
String appId = event.getAppId();
SessionContext forwardSessionContext = forwardManager.getSessionContextByAppId(appId);
if (forwardSessionContext != null) {
SocketChannel forwardClientChannel = forwardSessionContext.getClientChannel();
if (forwardClientChannel != null) {
forwardClientChannel.close();
log.info("forward client be forced offline success ..." + appId);
}
}else {
log.info("forward client be forced offline undo ..." + appId);
}
}
}
package iot.sixiang.license.event;
import io.netty.channel.socket.SocketChannel;
import lombok.Data;
@Data
public class ForwardClientConnectEvent extends BaseEvent {
private String appId;
private String channelId;
private SocketChannel channel;
}
package iot.sixiang.license.event;
import io.netty.channel.socket.SocketChannel;
import iot.sixiang.license.forward.ForwardManager;
import iot.sixiang.license.model.SessionContext;
import iot.sixiang.license.util.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ForwardClientConnectEventHandler {
@Autowired
ForwardManager forwardManager;
@Autowired
EventPublisher eventPublisher;
public ForwardClientConnectEventHandler() {
}
// @Async("asyncExecutor")
@EventListener
public void handlerEvent(ForwardClientConnectEvent event) {
String appId = event.getAppId();
String channelId = event.getChannelId();
SocketChannel channel = event.getChannel();
SessionContext session = new SessionContext();
// session.setRemoteIp(remoteIp);
// session.setRemotePort(remotePort);
session.setAppId(appId);
// session.setAppKey(appKey);
// session.setToken(token);
session.setChannelId(channelId);
session.setClientChannel(channel);
forwardManager.putSession(appId,session);
log.debug("forward client connect:"+event);
}
}
package iot.sixiang.license.event;
import lombok.Data;
@Data
public class ForwardClientInactiveEvent extends BaseEvent {
String channelId;
}
package iot.sixiang.license.event;
import iot.sixiang.license.device.DeviceManager;
import iot.sixiang.license.forward.ForwardManager;
import iot.sixiang.license.model.SessionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ForwardClientInactiveEventHandler {
@Autowired
DeviceManager deviceManager;
@Autowired
ForwardManager forwardManager;
@Autowired
EventPublisher eventPublisher;
public ForwardClientInactiveEventHandler() {
}
// @Async("asyncExecutor")
@EventListener
public void handlerEvent(ForwardClientInactiveEvent event) {
String channelId = event.getChannelId();
SessionContext session = forwardManager.getSessionByChannelId(channelId);
if (session == null) {
log.debug("forward client inactive undo ..." );
return;
}else{
String appId = session.getAppId();
boolean result = forwardManager.removeSessionByChannelId(channelId);
if (result) {
// TODO forward client 离线需要强制中断该设备对应的 device client
DeviceClientBeForcedOfflineEvent deviceClientBeForcedOfflineEvent = new DeviceClientBeForcedOfflineEvent();
deviceClientBeForcedOfflineEvent.setAppId(appId);
eventPublisher.publishEvent(deviceClientBeForcedOfflineEvent);
log.debug("forward client inactive success ..." );
}
}
}
}
package iot.sixiang.license.event;
import io.netty.channel.socket.SocketChannel;
import iot.sixiang.license.device.DeviceProtocol;
import lombok.Data;
@Data
public class ForwardClientRequestEvent extends BaseEvent {
private String appId;
private String deviceChannelId;
private DeviceProtocol protocol;
}
\ No newline at end of file
package iot.sixiang.license.event;
import io.netty.channel.socket.SocketChannel;
import iot.sixiang.license.device.DeviceManager;
import iot.sixiang.license.device.DeviceProtocol;
import iot.sixiang.license.forward.ForwardManager;
import iot.sixiang.license.model.SessionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ForwardClientRequestEventHandler {
@Autowired
DeviceManager deviceManager;
@Autowired
ForwardManager forwardManager;
@Autowired
EventPublisher eventPublisher;
public ForwardClientRequestEventHandler() {
}
// @Async("asyncExecutor")
@EventListener
public void handlerEvent(ForwardClientRequestEvent event) {
String deviceChannelId = event.getDeviceChannelId();
DeviceProtocol protocol = event.getProtocol();
SessionContext deviceSessionContext = deviceManager.getSessionByChannelId(deviceChannelId);
String appId = deviceSessionContext.getAppId();
SessionContext forwardSessionContext = forwardManager.getSessionContextByAppId(appId);
log.info("forward client request:" + appId+","+forwardSessionContext);
SocketChannel clientChannel = forwardSessionContext.getClientChannel();
clientChannel.writeAndFlush(protocol);
}
}
package iot.sixiang.license.event;
import io.netty.channel.socket.SocketChannel;
import iot.sixiang.license.device.DeviceProtocol;
import lombok.Data;
@Data
public class ForwardMessageResponseEvent extends BaseEvent {
// private String appId;
private String channelId;
private SocketChannel channel;
private DeviceProtocol protocol;
}
package iot.sixiang.license.event;
import io.netty.channel.socket.SocketChannel;
import iot.sixiang.license.device.DeviceManager;
import iot.sixiang.license.device.DeviceProtocol;
import iot.sixiang.license.forward.ForwardManager;
import iot.sixiang.license.model.SessionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ForwardMessageResponseEventHandler {
@Autowired
DeviceManager deviceManager;
@Autowired
ForwardManager forwardManager;
@Autowired
EventPublisher eventPublisher;
public ForwardMessageResponseEventHandler() {
}
// @Async("asyncExecutor")
@EventListener
public void handlerEvent(ForwardMessageResponseEvent event) {
String channelId = event.getChannelId();
SocketChannel channel = event.getChannel();
DeviceProtocol protocol = event.getProtocol();
SessionContext forwardSessionContext = forwardManager.getSessionByChannelId(channelId);
String appId = forwardSessionContext.getAppId();
log.info("forward client response..." + appId+",forward session:"+forwardSessionContext);
SessionContext deviceSessionContext = deviceManager.getSessionContextByAppId(appId);
SocketChannel deviceClientChannel = deviceSessionContext.getClientChannel();
log.info("forward client response..." + appId+",forward session:"+deviceSessionContext);
deviceClientChannel.writeAndFlush(protocol);
}
}
......@@ -5,12 +5,9 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import iot.sixiang.license.consts.Consts;
import iot.sixiang.license.net.BaseChannelInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ForwardChannelInitializer extends BaseChannelInitializer {
static Logger logger = LoggerFactory.getLogger(ForwardChannelInitializer.class.getName());
private ForwardClientHandler handler;
static final EventExecutorGroup workGroup = new DefaultEventExecutorGroup(Consts.EXECUTOR_THREAD_NUM);
......@@ -22,9 +19,6 @@ public class ForwardChannelInitializer extends BaseChannelInitializer {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 服务端心跳检测
// ch.pipeline().addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
ch.pipeline().addLast("decoder", new ForwardDecoder());
ch.pipeline().addLast("encoder", new ForwardEncoder());
ch.pipeline().addLast(workGroup, "handler", handler);
......
......@@ -6,9 +6,14 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import iot.sixiang.license.device.DeviceManager;
import iot.sixiang.license.device.DeviceProtocol;
import iot.sixiang.license.event.DeviceClientInactiveEvent;
import iot.sixiang.license.event.EventPublisher;
import iot.sixiang.license.event.ForwardClientInactiveEvent;
import iot.sixiang.license.event.ForwardMessageResponseEvent;
import iot.sixiang.license.model.SessionContext;
import iot.sixiang.license.util.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
......@@ -18,8 +23,8 @@ import java.net.InetSocketAddress;
@Slf4j
public class ForwardClientHandler extends SimpleChannelInboundHandler<Object> {
// @Autowired
// ForwardManager forwardManager;
@Autowired
EventPublisher eventPublisher;
private byte[] obj = new byte[0]; // 特殊的instance变量
private String nodeType = "rsretail";
......@@ -33,18 +38,22 @@ public class ForwardClientHandler extends SimpleChannelInboundHandler<Object> {
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// TODO Auto-generated method stub
// 用于获取客户端发来的数据信息
// TODO 中转客户端收到消息后,将消息原封不动的发送给设备客户端
SocketChannel channel = (SocketChannel) ctx.channel();
InetSocketAddress socketAddr = (InetSocketAddress) ctx.channel().remoteAddress();
String serverIp = socketAddr.getHostString();
int serverPort = socketAddr.getPort();
log.debug("channelRead0...");
DeviceProtocol protocol = (DeviceProtocol) msg;
log.debug("准备发回给设备:" + protocol.toString());
log.info("准备发回给设备:" + protocol.toString());
DeviceManager deviceManager = SpringUtil.getBean(DeviceManager.class);
deviceManager.getSessionContext().getClientChannel().writeAndFlush(protocol);
String channelId = channel.id().asLongText();
ForwardMessageResponseEvent forwardMessageResponseEvent = new ForwardMessageResponseEvent();
forwardMessageResponseEvent.setChannelId(channelId);
forwardMessageResponseEvent.setChannel(channel);
forwardMessageResponseEvent.setProtocol(protocol);
eventPublisher.publishEvent(forwardMessageResponseEvent);
}
......@@ -68,19 +77,19 @@ public class ForwardClientHandler extends SimpleChannelInboundHandler<Object> {
super.channelActive(ctx);
try {
SocketChannel channel = (SocketChannel) ctx.channel();
log.debug("channelActive********");
InetSocketAddress socketAddr = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = socketAddr.getHostString();
int port = socketAddr.getPort();
SessionContext session = new SessionContext();
session.setRemoteIp(clientIp);
session.setRemotePort(port);
session.setClientChannel(channel);
ForwardManager forwardManager = SpringUtil.getBean(ForwardManager.class);
log.debug("channelActive.getClientChannel:" + channel);
forwardManager.setSessionContexts(session);
log.debug("channelActive********"+ctx.channel().id().asLongText());
// InetSocketAddress socketAddr = (InetSocketAddress) ctx.channel().remoteAddress();
// String clientIp = socketAddr.getHostString();
// int port = socketAddr.getPort();
//
// SessionContext session = new SessionContext();
// session.setRemoteIp(clientIp);
// session.setRemotePort(port);
// session.setClientChannel(channel);
//
// ForwardManager forwardManager = SpringUtil.getBean(ForwardManager.class);
// log.debug("channelActive.getClientChannel:" + channel);
// forwardManager.setSessionContexts(session);
} catch (Exception e) {
......@@ -93,13 +102,12 @@ public class ForwardClientHandler extends SimpleChannelInboundHandler<Object> {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelInactive(ctx);
InetSocketAddress socketAddr = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = socketAddr.getHostString();
int port = socketAddr.getPort();
log.debug("client disconnected--->,ip:" + clientIp + ",port:" + port);
SocketChannel channel = (SocketChannel) ctx.channel();
String channelId = channel.id().asLongText();
ForwardClientInactiveEvent forwardClientInactiveEvent = new ForwardClientInactiveEvent();
forwardClientInactiveEvent.setChannelId(channelId);
eventPublisher.publishEvent(forwardClientInactiveEvent);
ctx.close();
// ForwardManager forwardManager = SpringUtil.getBean(ForwardManager.class);
// forwardManager.startTcpClient();
}
@Override
......
......@@ -2,6 +2,10 @@ package iot.sixiang.license.forward;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.SocketChannel;
import iot.sixiang.license.event.DeviceClientBeForcedOfflineEvent;
import iot.sixiang.license.event.EventPublisher;
import iot.sixiang.license.event.ForwardClientConnectEvent;
import iot.sixiang.license.net.BaseConnectionListener;
import iot.sixiang.license.util.SpringUtil;
import lombok.extern.slf4j.Slf4j;
......@@ -9,28 +13,47 @@ import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
//@Component
@Slf4j
public class ForwardConnectionListener extends BaseConnectionListener {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
log.debug("listen ..."+appId);
if (!channelFuture.isSuccess()) {
final EventLoop loop = channelFuture.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {
log.debug("ForwardConnectionListener ...operationComplete");
ForwardManager forwardManager = SpringUtil.getBean(ForwardManager.class);
log.debug("channelActive.forwardManager:" + forwardManager);
log.warn("The client is not connected and starts to reconnect..." + forwardManager);
forwardManager.startTcpClient(appId);
}
}, 10L, TimeUnit.SECONDS);
// final EventLoop loop = channelFuture.channel().eventLoop();
// loop.schedule(new Runnable() {
// @Override
// public void run() {
// log.debug("ForwardConnectionListener ...operationComplete");
// log.info("forward client connect fail ..." + appId);
//// ForwardManager forwardManager = SpringUtil.getBean(ForwardManager.class);
//// log.debug("channelActive.forwardManager:" + forwardManager);
//
//// log.warn("The client is not connected and starts to reconnect..." + forwardManager);
//// forwardManager.startTcpClient(appId);
// }
// }, 10L, TimeUnit.SECONDS);
//TODO forward client连接失败,则强制踢掉设备客户端
DeviceClientBeForcedOfflineEvent deviceClientBeForcedOfflineEvent = new DeviceClientBeForcedOfflineEvent();
deviceClientBeForcedOfflineEvent.setAppId(appId);
EventPublisher eventPublisher = SpringUtil.getBean(EventPublisher.class);
eventPublisher.publishEvent(deviceClientBeForcedOfflineEvent);
} else {
SocketChannel channel = (SocketChannel) channelFuture.channel();
String channelId = channel.id().asLongText();
ForwardClientConnectEvent forwardClientConnectEvent = new ForwardClientConnectEvent();
forwardClientConnectEvent.setAppId(appId);
forwardClientConnectEvent.setChannelId(channelId);
forwardClientConnectEvent.setChannel(channel);
EventPublisher eventPublisher = SpringUtil.getBean(EventPublisher.class);
eventPublisher.publishEvent(forwardClientConnectEvent);
log.debug("Successful connection to the server");
log.info("forward client connect success ..." + appId);
}
}
......
......@@ -33,7 +33,7 @@ public class ForwardDecoder extends ByteToMessageDecoder {
log.debug("stx:" + stx + ",len:" + len);
int real_len = len;//注意,透传前已经去掉了END一个字符
int real_len = len+1;//注意,透传前已经去掉了END一个字符
int cmd_ack_len = 2;
if (buffer.readableBytes() < real_len - cmd_ack_len) {
......
......@@ -9,35 +9,65 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@Component(value = "forwardManager")
@Slf4j
public class ForwardManager {
static Logger logger = LoggerFactory.getLogger(ForwardManager.class.getName());
String serviceIP = "121.46.25.14";
int port = 18889;
@Autowired
public ForwardClient client;
@Getter
@Setter
SessionContext sessionContexts = null;
String serviceIP = "121.46.25.14";
int port = 18889;
private Map<String, SessionContext> sessionContexts = null;
public ForwardManager() {
sessionContexts = new HashMap<String, SessionContext>();
}
// @PostConstruct
// public void init() {
// startTcpClient();
// }
public void startTcpClient(String appId) {
client.startTcp(serviceIP, port,appId);
}
public synchronized void putSession(String appId, SessionContext session) {
sessionContexts.put(appId, session);
}
public SessionContext getSessionContextByAppId(String appId) {
return sessionContexts.get(appId);
}
public SessionContext getSessionByChannelId(String channelId) {
SessionContext session = null;
Iterator<Map.Entry<String, SessionContext>> it = sessionContexts.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, SessionContext> entry = it.next();
SessionContext targetSession = entry.getValue();
if (targetSession.getChannelId().equals(channelId)) {
return targetSession;
}
}
return session;
}
public synchronized boolean removeSessionByChannelId(String channelId) {
Iterator<Map.Entry<String, SessionContext>> it = sessionContexts.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, SessionContext> entry = it.next();
SessionContext targetSession = entry.getValue();
if (targetSession.getChannelId().equals(channelId)) {
it.remove();
return true;
}
}
return false;
}
}
\ No newline at end of file
package iot.sixiang.license.util;
public class HexUtil {
public static String bytes2hex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
String tmp;
sb.append("[");
for (byte b : bytes) {
// 将每个字节与0xFF进行与运算,然后转化为10进制,然后借助于Integer再转化为16进制
tmp = Integer.toHexString(0xFF & b);
if (tmp.length() == 1) {
tmp = "0" + tmp;//只有一位的前面补个0
}
sb.append(tmp).append(" ");//每个字节用空格断开
}
sb.delete(sb.length() - 1, sb.length());//删除最后一个字节后面对于的空格
sb.append("]");
return sb.toString();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment