1. 概念解释
ZeroMQ Sockets提供了一种类标准套接字(socket-like)的 API,是消息导向的通信机制,基于 TCP/UDP 等传输层协议,但封装了底层细节(如连接管理、消息路由、缓冲区等),提供高级抽象。其核心是 消息队列 模型,支持多种通信模式(如请求-响应、发布-订阅等)。
1.1 不同模式:
- 消息导向 :与 TCP 的字节流不同,ZeroMQ sockets 传输的是离散的消息(message),类似 UDP,但可靠性更高。每条消息包含长度和二进制数据 。
- 异步抽象 :底层实现为异步消息队列,自动处理连接、重试、缓冲等复杂逻辑,用户无需手动管理 。
- 模式支持 :通过不同的 socket 类型定义通信模式(如请求-响应、发布-订阅等)和消息路由规则 。
提供了多种 socket 类型以支持不同通信模式,
1.2 不同 Socket 类型:
- REQ/REP :请求-响应模式,客户端(REQ)发送请求,服务端(REP)响应,严格同步,用于高可靠性通信 。
- PUB/SUB :发布-订阅模式,PUB 发送消息,SUB 过滤并接收特定主题的消息,用于实时数据流 。
- PUSH/PULL :用于任务分发(PUSH)和结果收集(PULL),常用于流水线架构 ,分布式系统及微服务间通信。
- ROUTER/DEALER :更灵活的异步模式,ROUTER 根据消息地址路由,DEALER 无状态转发,适合自定义协议 。
- 其他类型还包括 PAIR(点对点)、XPUB/XSUB(扩展发布/订阅)等,总计 16 种以上 。
1.3 与传统 Socket 对比
- 简化网络细节 :ZeroMQ 抽象了底层连接管理(如自动重连、多播)、序列化、线程安全等,用户仅需关注消息内容 。
- 内置模式 :传统 socket 需手动实现通信模式(如 RPC),而 ZeroMQ 直接通过 socket 类型支持常见模式 。
- 跨语言兼容 :ZeroMQ 支持多种语言绑定(C++, Python, Java 等),便于构建异构系统 。
1.4 与web Socket对比
- ZeroMQ :适用于 快速构建分布式系统,适合需要高性能、复杂通信模式(如发布-订阅)的分布式系统,隐藏底层细节,提升开发效率 。
- WebSocket :适用于面向 Web 实时应用,专为 Web 实时通信设计,基于 HTTP 握手,适合浏览器与服务器的双向交互,仅提供全双工通信通道,需上层协议(如自定义消息格式)定义交互规则。
2. 请求-响应模式【REQ/REP】示例
严格同步,一问一答,用于客户端发送请求,服务端同步响应,适用于远程过程调用(RPC)
2.1 服务端
import zmqcontext = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")print("服务器已启动,等待客户端请求...")while True:message = socket.recv_string()print(f"收到客户端请求: {message}")response = f"服务器已收到请求: {message}"socket.send_string(response)print(f"发送响应: {response}")
2.2 客户端
import zmqcontext = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")msg = "你好";
socket.send_string(msg)
print(f"发送消息: {msg}")response = socket.recv_string()
print(f"收到服务器消息: {response}")
2.3 执行效果
3. 发布-订阅模式(PUB/SUB)示例
一对多广播,客户端可过滤主题,用于服务端广播消息,客户端订阅特定主题,适用于实时数据推送(如股票行情)
3.1 服务端
import zmq
import timecontext = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")print("发布者已启动,开始广播...")while True:topic = "测试主题"data = f"你好,欢迎收听!" socket.send_string(f"{topic} {data}")time.sleep(1)
3.2 客户端
import zmqcontext = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt_string(zmq.SUBSCRIBE, "测试主题")print("订阅者已启动,等待消息...")
while True:msg = socket.recv_string()print(f"收到消息: {msg}")
3.3 执行效果
4. 管道模式(PUSH/PULL)示例
单向任务分发,支持负载均衡,用于服务端分发任务,客户端并行处理,适用于分布式任务队列
4.1 服务端
import zmqcontext = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5555")print("任务分发器已启动,开始推送任务...")for i in range(10):task = f"任务{i}命令编码" socket.send_string(task)
4.2 客户端
import zmqcontext = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://localhost:5555")print("客户端已启动,等待指令...")
while True:msg = socket.recv_string()print(f"收到任务: {msg}")
4.3 执行效果
4.4 服务器(客户端绑定端口)
客户端和服务器对端口的使用可以呼唤,不过,防止服务器未连接到就结束程序,需要增加个延时等待连接成功。
import zmq
import timecontext = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://localhost:5555")time.sleep(1) # 等待连接建立
print("任务分发器已启动,开始推送任务...")for i in range(10):task = f"任务{i}命令编码" socket.send_string(task)
4.5 客户端(客户端绑定端口)
import zmqcontext = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:5555")print("客户端已启动,等待指令...")
while True:msg = socket.recv_string()print(f"收到任务: {msg}")
4.6 执行效果(客户端绑定端口)
4.7 服务器(非阻塞模式监听)
使用 flags=zmq.NOBLOCK 时,若当前无消息可接收,recv_string() 会立即抛出 zmq.Again 异常,而非阻塞等待。
启用 zmq.CONFLATE 后,队列仅保留最新消息。若服务端发送频率过高或客户端处理延迟,可能导致消息被覆盖。
import zmq
import timecontext = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://localhost:5555")time.sleep(1) # 等待连接建立
print("任务分发器已启动,开始推送任务...")for i in range(10):task = f"任务{i}命令编码" socket.send_string(task)
4.8 客户端(非阻塞模式监听)
import zmqcontext = zmq.Context()
socket = context.socket(zmq.PULL)
socket.setsockopt(zmq.CONFLATE, 1)
socket.bind("tcp://*:5555")print("客户端已启动,等待指令...")
while True:try:msg = socket.recv_string(flags=zmq.NOBLOCK)print(f"收到任务: {msg}")except zmq.Again:pass
4.9 执行效果(非阻塞模式监听)
5. ZeroMQ 常见 setsockopt 选项
选项名称 | 作用 | 层级 |
---|---|---|
ZMQ_CONFLATE | 当队列满时,仅保留最新消息(覆盖旧消息)。适用于实时数据更新场景(如传感器数据、股票行情) | ZeroMQ 自定义层选项 |
ZMQ_SNDHWM | 设置发送队列的高水位线(High Water Mark),控制最大未发送消息数。超出后丢弃或阻塞 | 同上 |
ZMQ_RCVHWM | 设置接收队列的高水位线,控制最大未处理消息数 | 同上 |
ZMQ_LINGER | 控制关闭连接时是否等待未发送/接收的消息。值为 0 表示立即关闭,忽略未处理数据 | 同上 |
ZMQ_MAXMSGSIZE | 设置消息的最大大小(字节)。超出此限制的消息会被丢弃 | 同上 |
ZMQ_IMMEDIATE | 仅在绑定了地址时才允许连接(REQ/REP 等模式)。避免连接未就绪的服务端 | 同上 |
ZMQ_ROUTER_RAW | 启用原始模式,直接读写裸数据(如 HTTP 协议),不使用 ZeroMQ 的帧格式 | 同上 |
ZMQ_TCP_KEEPALIVE | 启用 TCP 保活机制,检测连接是否存活 | 同上 |
ZMQ_IDENTITY | 设置套接字的唯一标识符(字符串),用于 ROUTER/DEALER 模式下的路由 | 同上 |
SO_REUSEADDR | 允许绑定到同一地址和端口,即使该地址/端口处于 TIME_WAIT 状态或被其他套接字占用,避免服务器重启时因端口占用而失败 | 套接字层选项 |
SO_BROADCAST | 启用广播功能,允许套接字发送广播消息(UDP 场景),局域网内向所有设备发送数据 | 同上 |
SO_KEEPALIVE | 启用 TCP 保活机制,定期检测连接是否存活,自动断开无响应的空闲连接,防止资源泄漏。 | 同上 |
SO_LINGER | 控制调用 close() 时的行为,决定是否等待未发送的数据。 | 同上 |
SO_RCVBUF / SO_SNDBUF | 设置接收(SO_RCVBUF)和发送(SO_SNDBUF)缓冲区的大小,优化吞吐量或延迟,大缓冲区适合高带宽场景,小缓冲区适合低延迟场景。 | 同上 |
SO_TIMEOUT | 设置接收(SO_RCVTIMEO)或发送(SO_SNDTIMEO)操作的超时时间。防止阻塞操作无限期等待,适用于非阻塞模式。 | 同上 |
TCP_NODELAY | 禁用 Nagle 算法,强制立即发送小数据包。减少延迟,适用于实时通信(如游戏、聊天)。 | 传输层选项 |
IP_MULTICAST_TTL | 设置多播数据包的生存时间(TTL),控制多播范围(如局域网或跨网络)。 | IP 层选项 |
6. 环境安装
pip install zmq