文章目录
- 🌴通讯支持
- 🌴 功能完成情况
- 服务端架构设计
- 一、核心模块划分
- 二、数据层定义
- 三、协议解析层
- 四、通信业务层(以DLT645服务端为例)
- 五、通信层(以TCP为例)
- 使用例子
🌴通讯支持
功能 | 状态 |
---|---|
TCP客户端(方便通讯测试) 🐾 | ✅ |
TCP服务端(方便通讯测试) 🐾 | ✅ |
RTU主站 🐾 | ✅ |
RTU从站 🐾 | ✅ |
🌴 功能完成情况
功能 | 状态 |
---|---|
读、写通讯地址 🐾 | ✅ |
广播校时 🐾 | ✅ |
电能量 🐾 | ✅ |
最大需量及发生时间 🐾 | ✅ |
变量 🐾 | ✅ |
事件记录 🐾 | ❌ |
参变量 🐾 | ❌ |
冻结量 🐾 | ❌ |
负荷纪录 🐾 | ❌ |
项目地址:https://gitee.com/chen-dongyu123/dlt645
服务端架构设计
一、核心模块划分
- 数据层:模拟电表数据,存储数据标识与值的映射关系。
- 协议解析层:负责帧的组装、拆卸、校验和转义。
- 业务逻辑层:根据解析出的控制码和DI,执行相应操作并组织回复数据。
- 通信层:负责底层的字节流收发(串口/TCP)。
代码结构
├── config # 测点json,用于初始化导入
├── src # 源文件夹
│ ├── common # 存放通用函数
│ ├── model # 数据模型
│ │ ├── data
│ │ │ └── define
│ │ └── types # dlt645数据类型
│ ├── protocol # 协议解析层
│ ├── service
│ │ ├── clientsvc # dlt645客户端api及实现
│ │ └── serversvc # dlt645服务端api及实现
│ └── transport
│ ├── client # 客户端通讯接口,支持TCP客户端、RTU主站
│ └── server # 服务端通讯接口,支持TCP服务端、RTU从站
└── test # 测试文件
流程图:启动 -> 监听连接 -> 接收数据 -> 解析帧 -> 处理请求 -> 组织回复帧 -> 发送数据
二、数据层定义
-
通过读取config文件夹里的json导入测点,json示例如下
[{"Di": "02010100","Name": "A相电压","Unit": "V","DataFormat": "XXX.X"},{"Di": "02010200","Name": "B相电压","Unit": "V","DataFormat": "XXX.X"},... ]
-
定义数据类型和映射map
class DataItem:def __init__(self, di: int, name: str, data_format: str, value: float = 0, unit: str = '', timestamp: int = 0):self.di = diself.name = nameself.data_format = data_formatself.value = valueself.unit = unitself.timestamp = timestampdef __repr__(self):return (f"DataItem(name={self.name}, di={format(self.di, '#x')}, value={self.value}, "f"unit={self.unit},data_format={self.data_format}, timestamp={self.timestamp})")DIMap: Dict[int, DataItem] = {}
-
定义设置DataItem值接口和获取DataItem接口
def get_data_item(di: int) -> Optional[DataItem]:"""根据 di 获取数据项"""item = DIMap.get(di)if item is None:log.info(f"未通过di {hex(di)} 找到映射")return Nonereturn itemdef set_data_item(di: int, data: Any) -> bool:"""设置指定 di 的数据项"""if data is None:log.info("data is nil")return Falseif di in DIMap:DIMap[di].value = datalog.info(f"设置数据项 {hex(di)} 成功, 值 {DIMap[di]}")return Truereturn False
-
初始化数据标识map
def init_variable_def(VariableTypes: List[DataItem]):for date_type in VariableTypes:DIMap[date_type.di] = DataItem(di=date_type.di,name=date_type.name,data_format=date_type.data_format,unit=date_type.unit)
-
初始化DLT645相关测点数据
EnergyTypes = [] DemandTypes = [] VariableTypes = []def init():global EnergyTypesEnergyTypes = initDataTypeFromJson(os.path.join(conf_path, 'energy_types.json'))DemandTypes = initDataTypeFromJson(os.path.join(conf_path, 'demand_types.json'))VariableTypes = initDataTypeFromJson(os.path.join(conf_path, 'variable_types.json'))init_energy_def(EnergyTypes)init_demand_def(DemandTypes)init_variable_def(VariableTypes)# 执行初始化 init()
三、协议解析层
-
帧类型
# 常量定义 FRAME_START_BYTE = 0x68 FRAME_END_BYTE = 0x16 BROADCAST_ADDR = 0xAAclass Frame:def __init__(self, preamble: List[int] = None, start_flag: int = 0, addr: List[int] = None,ctrl_code: int = 0, data_len: int = 0, data: List[int] = None,check_sum: int = 0, end_flag: int = 0):self.preamble = preamble if preamble is not None else [] # 前导字节self.start_flag = start_flag # 起始字节self.addr = addr if addr is not None else [0] * 6 # 地址字节self.ctrl_code = ctrl_code # 控制字节self.data_len = data_len # 数据长度字节self.data = data if data is not None else [] # 数据字节self.check_sum = check_sum # 校验字节self.end_flag = end_flag # 结束字节
-
协议解析
-
数据域解码
@classmethoddef decode_data(cls, data: bytes) -> bytes:"""数据域解码(±33H转换)"""return bytes([b - 0x33 for b in data])
-
数据域编码
@classmethoddef encode_data(cls, data: bytes) -> bytes:"""数据域编码"""return bytes([b + 0x33 for b in data])
-
计算校验和
@classmethoddef calculate_checksum(cls, data: bytes) -> int:"""校验和计算(模256求和)"""return sum(data) % 256
-
构建帧
@classmethoddef build_frame(cls, addr: bytes, ctrl_code: int, data: bytes) -> bytearray:"""帧构建(支持广播和单播)"""if len(addr) != 6:raise ValueError("地址长度必须为6字节")buf = []buf.append(FRAME_START_BYTE)buf.extend(addr)buf.append(FRAME_START_BYTE)buf.append(ctrl_code)# 数据域编码encoded_data = DLT645Protocol.encode_data(data)buf.append(len(encoded_data))buf.extend(encoded_data)# 计算校验和check_sum = DLT645Protocol.calculate_checksum(bytes(buf))buf.append(check_sum)buf.append(FRAME_END_BYTE)return bytearray(buf)
-
字节数组反序列化Frame结构体
@classmethoddef deserialize(cls, raw: bytes) -> Optional[Frame]:"""将字节切片反序列化为 Frame 结构体"""# 基础校验if len(raw) < 12:raise Exception(f"frame too short: {raw}")# 帧边界检查(需考虑前导FE)try:start_idx = raw.index(FRAME_START_BYTE)except ValueError:log.error(f"invalid start flag: {raw}")raise Exception("invalid start flag")if start_idx == -1 or start_idx + 10 >= len(raw):log.error(f"invalid start flag: {raw}")raise Exception("invalid start flag")if start_idx + 7 >= len(raw) or raw[start_idx + 7] != FRAME_START_BYTE:log.error(f"missing second start flag: {raw}")raise Exception("missing second start flag")# 构建帧结构frame = Frame()frame.start_flag = raw[start_idx]frame.addr = raw[start_idx + 1:start_idx + 7]frame.ctrl_code = raw[start_idx + 8]frame.data_len = raw[start_idx + 9]# 数据域提取(严格按协议1.2.5节处理)data_start = start_idx + 10data_end = data_start + frame.data_lenif data_end > len(raw) - 2:log.error(f"invalid data length {frame.data_len}")raise Exception(f"invalid data length {frame.data_len}")# 数据域解码(需处理加33H/减33H)frame.data = DLT645Protocol.decode_data(raw[data_start:data_end])# 校验和验证(从第一个68H到校验码前)checksum_start = start_idxchecksum_end = data_endif checksum_end >= len(raw):log.error(f"frame truncated: {raw}")raise Exception(f"frame truncated: {raw}")calculated_sum = DLT645Protocol.calculate_checksum(raw[checksum_start:checksum_end])if calculated_sum != raw[checksum_end]:log.error(f"checksum error: calc=0x{calculated_sum:02X}, actual=0x{raw[checksum_end]:02X}")raise Exception(f"checksum error: calc=0x{calculated_sum:02X}, actual=0x{raw[checksum_end]:02X}")# 结束符验证if checksum_end + 1 >= len(raw) or raw[checksum_end + 1] != FRAME_END_BYTE:log.error(f"invalid end flag: {raw[checksum_end + 1]}")raise Exception(f"invalid end flag: {raw[checksum_end + 1]}")# 转换为带缩进的JSONlog.info(f"frame: {frame}")return frame
-
Frame 结构体序列化为字节数组
@classmethoddef serialize(cls, frame: Frame) -> Optional[bytes]:"""将 Frame 结构体序列化为字节切片"""if frame.start_flag != FRAME_START_BYTE or frame.end_flag != FRAME_END_BYTE:log.error(f"invalid start or end flag: {frame.start_flag} {frame.end_flag}")raise Exception(f"invalid start or end flag: {frame.start_flag} {frame.end_flag}")buf = []# 写入前导字节buf.extend(frame.preamble)# 写入起始符buf.append(frame.start_flag)# 写入地址buf.extend(frame.addr)# 写入第二个起始符buf.append(frame.start_flag)# 写入控制码buf.append(frame.ctrl_code)# 数据域编码encoded_data = DLT645Protocol.encode_data(frame.data)# 写入数据长度buf.append(len(encoded_data))# 写入编码后的数据buf.extend(encoded_data)# 计算并写入校验和check_sum = DLT645Protocol.calculate_checksum(bytearray(buf))buf.append(check_sum)# 写入结束符buf.append(frame.end_flag)return bytearray(buf)
四、通信业务层(以DLT645服务端为例)
-
定义电表Service
class MeterServerService:def __init__(self,server: Union[TcpServer, RtuServer],address: bytearray = bytearray([0x00] * 6),password: bytearray = bytearray([0x00] * 4)):self.server = server self.address = addressself.password = password
-
设置值接口,以电能为例
# 写电能量def set_00(self, di: int, value: float) -> bool:ok = set_data_item(di, value)if not ok:log.error(f"写电能量失败")return ok
-
处理数据函数
# 处理读数据请求(协议与业务分离)def handle_request(self, frame):# 1. 验证设备if not self.validate_device(frame.addr):log.info(f"验证设备地址: {bytes_to_spaced_hex(frame.addr)} 失败")raise Exception("unauthorized device")# 2. 根据控制码判断请求类型if frame.ctrl_code == CtrlCode.BroadcastTimeSync: # 广播校时log.info(f"广播校时: {frame.Data.hex(' ')}")self.set_time(frame.Data)return DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, frame.data)elif frame.ctrl_code == CtrlCode.CtrlReadData:# 解析数据标识di = frame.datadi3 = di[3]if di3 == 0x00: # 读取电能# 构建响应帧res_data = bytearray(8)# 解析数据标识为 32 位无符号整数data_id = struct.unpack("<I", frame.data[:4])[0]data_item = data.get_data_item(data_id)if data_item is None:raise Exception("data item not found")res_data[:4] = frame.data[:4] # 仅复制前 4 字节数据标识value = data_item.value# 转换为 BCD 码bcd_value = float_to_bcd(value, data_item.data_format, 'little')res_data[4:] = bcd_valuereturn DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, bytes(res_data))elif di3 == 0x01: # 读取最大需量及发生时间res_data = bytearray(12)data_id = struct.unpack("<I", frame.data[:4])[0]data_item = data.get_data_item(data_id)if data_item is None:raise Exception("data item not found")res_data[:4] = frame.data[:4] # 返回数据标识value = data_item.value# 转换为 BCD 码bcd_value = float_to_bcd(value, data_item.data_format, 'little')res_data[4:7] = bcd_value[:3]# 需量发生时间res_data[7:12] = time_to_bcd(time.time())log.info(f"读取最大需量及发生时间: {res_data}")return DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, bytes(res_data))elif di3 == 0x02: # 读变量data_id = struct.unpack("<I", frame.data[:4])[0]data_item = data.get_data_item(data_id)if data_item is None:raise Exception("data item not found")# 变量数据长度data_len = 4data_len += (len(data_item.data_format) - 1) // 2 # (数据格式长度 - 1 位小数点)/2# 构建响应帧res_data = bytearray(data_len)res_data[:4] = frame.data[:4] # 仅复制前 4 字节value = data_item.value# 转换为 BCD 码(小端序)bcd_value = float_to_bcd(value, data_item.data_format, 'little')res_data[4:data_len] = bcd_valuereturn DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, bytes(res_data))else:log.info(f"unknown: {hex(di3)}")return Exception("unknown di3")elif frame.ctrl_code == CtrlCode.ReadAddress:# 构建响应帧res_data = self.address[:6]return DLT645Protocol.build_frame(self.address, frame.ctrl_code | 0x80, res_data)elif frame.ctrl_code == CtrlCode.WriteAddress:res_data = b'' # 写通讯地址不需要返回数据# 解析数据addr = frame.data[:6]self.set_address(addr) # 设置通讯地址return DLT645Protocol.build_frame(self.address, frame.ctrl_code | 0x80, res_data)else:log.info(f"unknown control code: {hex(frame.ctrl_code)}")raise Exception("unknown control code")
-
注入通讯协议到Service
def new_tcp_server(ip: str, port: int, timeout: int) -> MeterServerService:# 1. 先创建 TcpServer(不依赖 Service)tcp_server = TcpServer(ip, port, timeout, None)# 2. 创建 MeterServerService,注入 TcpServer(作为 Server 接口)meter_service = MeterServerService(tcp_server)# 3. 将 MeterServerService 注入回 TcpServertcp_server.service = meter_servicereturn meter_servicedef new_rtu_server(port: str, dataBits: int, stopBits: int, baudRate: int, parity: str,timeout: float) -> MeterServerService:rtu_server = RtuServer(port, dataBits, stopBits, baudRate, parity, timeout)# 2. 创建 MeterServerService,注入 RtuServer(作为 Server 接口)meter_service = MeterServerService(rtu_server)# 3. 将 MeterServerService 注入回 RtuServerrtu_server.service = meter_servicereturn meter_service
五、通信层(以TCP为例)
-
定义TCP服务端
class TcpServer:def __init__(self, ip: str, port: int, timeout: float, service):self.ip = ipself.port = portself.timeout = timeoutself.ln = Noneself.service = service # Dlt645业务
-
处理数据
def handle_connection(self, conn):try:while True:try:# 接收数据buf = conn.recv(256)if not buf:breaklog.info(f"Received data: {bytes_to_spaced_hex(buf)}")# 协议解析try:frame = DLT645Protocol.deserialize(buf)except Exception as e:log.error(f"Error parsing frame: {e}")continue# 业务处理try:resp = self.service.handle_request(frame)except Exception as e:log.error(f"Error handling request: {e}")continue# 响应if resp:try:conn.sendall(resp)log.info(f"Sent response: {bytes_to_spaced_hex(resp)}")except Exception as e:log.error(f"Error writing response: {e}")except socket.timeout:breakexcept Exception as e:log.error(f"Error handling connection: {e}")finally:try:conn.close()except Exception as e:log.error(f"Error closing connection: {e}")
-
启动服务端
def start(self):try:# 创建 TCP 套接字self.ln = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 设置地址可重用self.ln.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)# 绑定地址和端口self.ln.bind((self.ip, self.port))# 开始监听self.ln.listen(5)log.info(f"TCP server started on port {self.port}")while True:try:# 接受连接conn, addr = self.ln.accept()log.info(f"Accepted connection from {addr}")# 设置超时时间conn.settimeout(self.timeout)# 启动新线程处理连接import threadingthreading.Thread(target=self.handle_connection, args=(conn,)).start()except socket.error as e:if isinstance(e, socket.timeout):continuelog.error(f"Failed to accept connection: {e}")if not e.errno == 10038: # 非套接字关闭错误return eexcept Exception as e:log.error(f"Failed to start TCP server: {e}")return e
-
关闭服务端
def stop(self):if self.ln:log.info("Shutting down TCP server...")try:self.ln.close()except Exception as e:log.error(f"Error closing server: {e}")return ereturn None
使用例子
-
启动服务端
if __name__ == '__main__':server_svc = new_tcp_server("127.0.0.1", 8021, 3000)# server_svc = new_rtu_server("COM4", 8, 1, 9600, "N", 1000)server_svc.set_00(0x00000000, 100.0)server_svc.set_02(0x02010100, 86.0)server_svc.server.start()
-
使用模拟DLT645客户端软件测试
-