通信库:DotNetty
封装实现:TcpServer、TcpClient、Udp
TCP协议特性:面向连接协议;每个新连接都会创建独立的ChannelHandler实例;TcpHandler构造函数在每次客户端连接时触发
UDP协议特性:无连接协议;整个服务端只创建单例UdpHandler;所有UDP数据包共享同一个处理器实例;UdpHandler构造函数仅在服务启动时触发一次
Server
1. 端口复用
2. 定时清理链路
TcpClient
1. 指定本地ip及端口
2. 自动重连
其他
1. 上线,离线,数据接收消息通知
2. 接收缓冲数据存储
using DotNetty.Buffers; using FusionDrive.DotNetty.Util; using FusionDrive.DotNetty.Util.decode; using FusionDrive.DotNetty.Util.socket; using System.Net; using System.Text;namespace FusionDrive.DotNetty {internal class Program{static void Main(string[] args){//Test_Server();Test_Client();Console.ReadLine();}static void Test_Server(){OHMDecode ohm = new OHMDecode();PipeManager.CheckLink();var server = new CommunicationServer();//server.Listen(8801);server.Listen(8801, isTcp: true, isUdp: true, udpTargetEndPoint: new IPEndPoint(IPAddress.Parse("127.0.0.1"), 8802));server.OnPipeOnline += async (session) =>{Logger.Info($"-> {session.RemoteAddress?.ToString()} 上线"); };server.OnPipeOffline += async (session) =>{Logger.Info($"<- {session.RemoteAddress?.ToString()} 离线");};server.OnPipeReceived += async (session, buffer) =>{// dtu模式if (session.IsFirstConnect){// 心跳包session.IsFirstConnect = false;var sessionId = Encoding.UTF8.GetString(buffer);session.UpdateSessionId(sessionId);// 下发TCP指令SendData(session.SessionId);// 清理PipeManager.ClearByteBuffer(sessionId);}else{ohm.ProtocolFormat(session.SessionId, session.ByteBuffer);}// 下发UDP指令SendData(session.SessionId, session.TargetEndPoint);SendData(session.SessionId, session.RemoteAddress);};}static void Test_Client(){AirDecode air = new AirDecode();var tcpClient = new TcpClient();tcpClient.Connect("127.0.0.1", 8801, autoConect:true);tcpClient.OnPipeOnline += async (session) =>{Logger.Info($"-> {session.RemoteAddress?.ToString()} 上线");};tcpClient.OnPipeOffline += async (session) =>{Logger.Info($"<- {session.RemoteAddress?.ToString()} 离线");};tcpClient.OnPipeReceived += async (session, buffer) =>{air.ProtocolFormat(session.SessionId, session.ByteBuffer);////tcpClient.DisConnect();};}static void SendData(string sessionId, EndPoint endPoint = null){IByteBuffer message = Unpooled.Buffer();message.WriteString("*TRG\r\n", Encoding.UTF8);var p = PipeManager.CheckSession(sessionId, out string errmsg);p.Send(message, endPoint);} } }
using DotNetty.Buffers; using System.Text;namespace FusionDrive.DotNetty.Util.decode {public class AirDecode : NettyDecode{const byte HEAD = 02;public override void ProtocolFormat(string sessionId, IByteBuffer oldBuffer){ try{if (IsHeadLost(oldBuffer, 1)) return ;// 掉包bool headok = false;// 垃圾包处理oldBuffer.MarkReaderIndex();while (oldBuffer.IsReadable()){byte head1 = oldBuffer.ReadByte();// 1字节:标识头if (HEAD == head1){headok = true;break;}else{oldBuffer.MarkReaderIndex();}}if (!oldBuffer.IsReadable()){if (!headok) return ; // 垃圾包}// 3字节:ascii长度if (IsBagLost(oldBuffer, 3)) return ;// 掉包byte[] arrlen = new byte[3]; oldBuffer.ReadBytes(arrlen);int len = Convert.ToInt32(Encoding.ASCII.GetString(arrlen));// n字节:数据包len = len + 2;// 包含结束帧及校验位if (IsBagLost(oldBuffer, len)) return ;// 掉包byte[] data = new byte[len]; oldBuffer.ReadBytes(data);Logger.Info($"AirDecode sessionId:{sessionId},data:{Common.ToHexString(data, data.Length, true)}");// 压力值string pressureValue = "999";var pressureFlg = Encoding.ASCII.GetString(new byte[] { data[55] });var pressureNumber = Encoding.ASCII.GetString(new byte[] {data[56], data[57], data[58], data[59], data[60],data[61], data[62], data[63], data[64], data[65] });// 可能为----------- if (double.TryParse(pressureNumber, out double result1)){pressureNumber = result1.ToString();pressureValue = pressureFlg + pressureNumber;}Logger.Info($"AirDecode 压力值:{pressureValue}");if (oldBuffer.IsReadable()){ProtocolFormat(sessionId, oldBuffer);// 处理粘包}else{oldBuffer.Clear();// 包完整,清理} }catch (Exception ex){Logger.Error($"AirDecode 协议解析错误,sessionId:{sessionId}:{ex.Message}");} }} }