若该文为原创文章,转载请注明原文出处。
目的是实现视频的传输,只是个demo.
程序分为两部分,视频接收端和视频发送端。
一、视频接收端流程分析
主要流程:
-
初始化配置:
- 设置UDP端口(5001)和缓冲区大小
- 初始化帧缓冲队列(最大15帧)
- 设置目标帧率(25fps)和显示窗口大小(640x480)
-
创建UDP套接字:
- 绑定到本地端口,设置8MB接收缓冲区
- 设置10ms超时以避免阻塞
-
帧数据管理:
- 使用defaultdict(dict)存储帧分片数据
- 使用frame_segment_counts跟踪每个帧的分片接收情况
-
数据包处理(process_packet函数):
- 解析包头信息(帧ID、分片索引、总分片数)
- 验证CRC32校验和
- 重组完整帧数据
- 清理过期帧数据
-
视频显示循环:
- 每次尝试接收多个数据包填充缓冲区
- 按目标帧率从缓冲区取出帧显示
- 缓冲区不足时重复显示上一帧
- 定期输出统计信息(接收/显示帧率、缓冲区状态等)
二、视频发送端流程分析
主要流程:
-
初始化配置:
- 设置目标IP地址和端口(5001)
- 设置发送帧率(20fps)和JPEG压缩质量(70)
- 创建UDP套接字并设置4MB发送缓冲区
-
视频源设置:
- 从test.mp4文件读取视频(而非实际摄像头)
- 设置帧率和分辨率(640x480)
-
帧发送函数(send_frame):
- 将帧数据分片(每片8192字节)
- 为每个分片添加包头信息(帧ID、分片索引、总分片数)
- 添加CRC32校验和
- 最后一个分片添加结束标记
-
主发送循环:
- 控制发送帧率
- 读取视频帧并调整尺寸
- JPEG编码并控制压缩质量
- 调用send_frame发送帧数据
- 定期输出统计信息
三、关键思路
数据传输协议:
- 分片传输:大帧被分割成多个UDP数据包发送
- 包结构:帧ID(4字节) + 分片索引(2字节) + 总分片数(2字节) + 数据 + CRC32校验(4字节) + 结束标记(最后分片)
- 帧重组:接收端根据帧ID和分片索引重新组装完整帧
优化策略:
- 缓冲机制:发送端降低帧率和压缩质量,接收端使用帧缓冲减少卡顿
- 错误处理:CRC校验保证数据完整性,缓冲区管理避免内存泄漏
- 性能平衡:通过调整分片大小、发送速率和压缩质量平衡传输效率和网络压力
四、源码
1、rk3568_video.py
import cv2
import socket
import time
import struct
import zlib# 配置
SERVER_IP = '192.168.50.84' # 上位机的IP地址
PORT = 5001 # 端口号
FPS = 20 # 降低帧率以减轻网络压力
COMPRESSION_QUALITY = 70 # 降低JPEG压缩质量,减小数据量# 创建一个socket对象
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 4194304) # 4MB发送缓冲区# 相机连接参数
CAMERA_CONFIG = {'ip': '192.168.50.84','port': 6501,'username': 'admin','password': 'a1234567'
}# RTSP流地址print("正在连接摄像头...")
# 打开摄像头
cap = cv2.VideoCapture("test.mp4")if not cap.isOpened():print("无法打开摄像头")exit()print("摄像头连接成功")# 尝试设置摄像头参数
cap.set(cv2.CAP_PROP_FPS, FPS) # 设置帧率
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) # 设置宽度
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) # 设置高度frame_count = 0
segment_size = 8192 # 减小分片大小,降低单包丢失影响
last_frame_time = time.time()
start_time = time.time()def send_frame(data, frame_id):"""发送一帧数据,使用可靠的分片方法"""# 计算分片数量total_segments = (len(data) + segment_size - 1) // segment_size# 如果分片过多,跳过此帧if total_segments > 100:print(f"帧太大,分片数量: {total_segments},跳过")return False# 为每个分片添加头信息:帧ID(4字节) + 分片索引(2字节) + 总分片数(2字节) + CRC32(4字节)for i in range(total_segments):# 计算当前分片数据start_pos = i * segment_sizeend_pos = min(start_pos + segment_size, len(data))segment_data = data[start_pos:end_pos]# 计算数据校验和checksum = zlib.crc32(segment_data)# 创建头信息header = struct.pack(">IHH", frame_id, i, total_segments)# 如果是最后一个分片,添加结束标记if i == total_segments - 1:# 发送分片 (头信息 + 数据 + 校验和 + 结束标记)packet = header + segment_data + struct.pack(">I", checksum) + b'\xff\xff'else:# 发送分片 (头信息 + 数据 + 校验和)packet = header + segment_data + struct.pack(">I", checksum)# 发送数据client_socket.sendto(packet, (SERVER_IP, PORT))# 控制发送速率,避免网络拥塞if total_segments > 10:time.sleep(0.001)return Truetry:print("开始发送视频流...")while True:# 控制帧率current_time = time.time()if current_time - last_frame_time < 1.0/FPS:time.sleep(0.001) # 短暂休眠避免CPU占用过高continue# 读取摄像头帧ret, frame = cap.read()if not ret:print("无法读取帧,尝试重新连接...")# 尝试重新连接摄像头cap.release()time.sleep(1)cap = cv2.VideoCapture("test.mp4")print("重新打开视频...")if not cap.isOpened():print("重新连接失败,退出程序")breakcontinuelast_frame_time = current_time# 调整图像尺寸,减小传输数据量frame = cv2.resize(frame, (640, 480))# 对帧进行编码(控制压缩质量)encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), COMPRESSION_QUALITY]encoded, buffer = cv2.imencode('.jpg', frame, encode_param)if not encoded:print("编码帧失败")continue# 获取编码后的数据data = buffer.tobytes()data_len = len(data)# 发送当前帧frame_id = frame_count % 10000 # 循环使用的帧IDsuccess = send_frame(data, frame_id)if success:frame_count += 1# 每5秒显示一次统计信息if frame_count % 100 == 0:elapsed = time.time() - start_timefps = frame_count / elapsedprint(f"已发送 {frame_count} 帧,平均发送速率: {fps:.1f} fps,平均帧大小: {data_len/1024:.1f} KB")except KeyboardInterrupt:print("程序被用户中断")
except Exception as e:print(f"发生错误: {e}")
finally:# 释放资源print(f"程序结束,共发送 {frame_count} 帧")cap.release()client_socket.close()
2、pc_display_video.py
import cv2
import socket
import numpy as np
import time
import struct
import zlib
from collections import deque, defaultdict# 配置
PORT = 5001 # 端口号
BUFFER_SIZE = 15 # 帧缓冲区大小
TARGET_FPS = 25 # 目标帧率
DISPLAY_WIDTH = 640 # 显示窗口宽度
DISPLAY_HEIGHT = 480 # 显示窗口高度# 创建一个socket对象
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(('0.0.0.0', PORT))
# 设置socket更大的接收缓冲区
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 8388608) # 8MB缓冲区
# 设置socket超时,避免一直阻塞
server_socket.settimeout(0.01)# 创建帧缓冲队列
frame_buffer = deque(maxlen=BUFFER_SIZE)
last_display_time = time.time()
last_frame = None# 数据接收缓冲区,格式: {frame_id: {segment_idx: segment_data, ...}}
frame_segments = defaultdict(dict)
# 帧片段计数,格式: {frame_id: (received_segments, total_segments)}
frame_segment_counts = {}# 创建固定大小的显示窗口
cv2.namedWindow('Video Stream', cv2.WINDOW_NORMAL)
cv2.resizeWindow('Video Stream', DISPLAY_WIDTH, DISPLAY_HEIGHT)# 统计信息变量
frame_received = 0
frame_displayed = 0
corrupted_frames = 0
start_time = time.time()print("开始接收视频流...")def process_packet(packet):"""处理接收到的数据包,返回完整的帧或None"""# 数据包太小,无法包含头信息if len(packet) < 12: # 4(frame_id) + 2(segment_idx) + 2(total_segments) + 4(crc32)return Nonetry:# 提取头信息:帧ID + 分片索引 + 总分片数header = packet[:8]frame_id, segment_idx, total_segments = struct.unpack(">IHH", header)# 检查是否是结束标记has_end_marker = Falseif packet[-2:] == b'\xff\xff':# 去掉结束标记data = packet[8:-6] # 去掉头部8字节和尾部6字节(4字节CRC + 2字节结束标记)checksum_bytes = packet[-6:-2] # 取出校验和has_end_marker = Trueelse:# 没有结束标记data = packet[8:-4] # 去掉头部8字节和尾部4字节校验和checksum_bytes = packet[-4:] # 取出校验和# 验证校验和received_checksum = struct.unpack(">I", checksum_bytes)[0]calculated_checksum = zlib.crc32(data)if received_checksum != calculated_checksum:print(f"校验和错误: 帧ID={frame_id}, 分片={segment_idx}, 收到={received_checksum}, 计算={calculated_checksum}")return None# 更新帧分片信息frame_segments[frame_id][segment_idx] = dataif frame_id not in frame_segment_counts:frame_segment_counts[frame_id] = [1, total_segments]else:frame_segment_counts[frame_id][0] += 1# 检查帧是否完整if frame_segment_counts[frame_id][0] == total_segments:# 按顺序拼接所有分片full_data = b''for i in range(total_segments):if i not in frame_segments[frame_id]:print(f"错误: 帧ID={frame_id}缺少分片{i}")return Nonefull_data += frame_segments[frame_id][i]# 清理缓存del frame_segments[frame_id]del frame_segment_counts[frame_id]# 清理过期的帧数据(超过100个不同的帧ID时)if len(frame_segments) > 100:oldest_frame_id = min(frame_segments.keys())del frame_segments[oldest_frame_id]if oldest_frame_id in frame_segment_counts:del frame_segment_counts[oldest_frame_id]return full_datareturn Noneexcept Exception as e:print(f"处理数据包错误: {e}")return Nonewhile True:try:# 接收和处理帧的循环for _ in range(30): # 每次显示前尝试接收更多包以填充缓冲区try:# 接收数据包packet, _ = server_socket.recvfrom(65535)full_data = process_packet(packet)if full_data is not None:# 解码np_data = np.frombuffer(full_data, dtype=np.uint8)frame = cv2.imdecode(np_data, cv2.IMREAD_COLOR)if frame is not None:# 调整帧大小以确保一致性frame = cv2.resize(frame, (DISPLAY_WIDTH, DISPLAY_HEIGHT))# 将解码成功的帧添加到缓冲区frame_buffer.append(frame)frame_received += 1else:corrupted_frames += 1print("解码帧失败")except socket.timeout:# 超时继续循环breakexcept Exception as e:print(f"接收/解码错误: {e}")break# 控制显示帧率并从缓冲区取帧显示current_time = time.time()time_elapsed = current_time - last_display_time# 按照目标帧率显示if time_elapsed >= 1.0/TARGET_FPS:# 缓冲区有足够帧时才消耗if len(frame_buffer) > BUFFER_SIZE // 3:display_frame = frame_buffer.popleft()last_frame = display_frame.copy() # 保存该帧用于后续可能的填充cv2.imshow('Video Stream', display_frame)frame_displayed += 1# 缓冲区不足但有上一帧时显示上一帧elif last_frame is not None:cv2.imshow('Video Stream', last_frame)last_display_time = current_time# 每5秒显示一次统计信息if current_time - start_time > 5:fps_received = frame_received / (current_time - start_time)fps_displayed = frame_displayed / (current_time - start_time)buffer_status = len(frame_buffer)print(f"接收帧率: {fps_received:.1f} fps, 显示帧率: {fps_displayed:.1f} fps, 缓冲区: {buffer_status}/{BUFFER_SIZE}, 损坏帧: {corrupted_frames}")# 重置统计frame_received = 0frame_displayed = 0corrupted_frames = 0start_time = current_timeif cv2.waitKey(1) & 0xFF == ord('q'):breakexcept Exception as e:print(f"主循环错误: {e}")continue# 释放资源
server_socket.close()
cv2.destroyAllWindows()
测试是正常的。后续想在rk3568上实现可视对讲功能。
如有侵权,或需要完整代码,请及时联系博主。