若该文为原创文章,转载请注明原文出处。

目的是实现视频的传输,只是个demo.

程序分为两部分,视频接收端和视频发送端。

一、视频接收端流程分析

主要流程:

  1. 初始化配置

    • 设置UDP端口(5001)和缓冲区大小
    • 初始化帧缓冲队列(最大15帧)
    • 设置目标帧率(25fps)和显示窗口大小(640x480)
  2. 创建UDP套接字

    • 绑定到本地端口,设置8MB接收缓冲区
    • 设置10ms超时以避免阻塞
  3. 帧数据管理

    • 使用defaultdict(dict)存储帧分片数据
    • 使用frame_segment_counts跟踪每个帧的分片接收情况
  4. 数据包处理(process_packet函数):

    • 解析包头信息(帧ID、分片索引、总分片数)
    • 验证CRC32校验和
    • 重组完整帧数据
    • 清理过期帧数据
  5. 视频显示循环

    • 每次尝试接收多个数据包填充缓冲区
    • 按目标帧率从缓冲区取出帧显示
    • 缓冲区不足时重复显示上一帧
    • 定期输出统计信息(接收/显示帧率、缓冲区状态等)

二、视频发送端流程分析

主要流程:

  1. 初始化配置

    • 设置目标IP地址和端口(5001)
    • 设置发送帧率(20fps)和JPEG压缩质量(70)
    • 创建UDP套接字并设置4MB发送缓冲区
  2. 视频源设置

    • 从test.mp4文件读取视频(而非实际摄像头)
    • 设置帧率和分辨率(640x480)
  3. 帧发送函数(send_frame):

    • 将帧数据分片(每片8192字节)
    • 为每个分片添加包头信息(帧ID、分片索引、总分片数)
    • 添加CRC32校验和
    • 最后一个分片添加结束标记
  4. 主发送循环

    • 控制发送帧率
    • 读取视频帧并调整尺寸
    • JPEG编码并控制压缩质量
    • 调用send_frame发送帧数据
    • 定期输出统计信息

三、关键思路

数据传输协议:

  • 分片传输:大帧被分割成多个UDP数据包发送
  • 包结构:帧ID(4字节) + 分片索引(2字节) + 总分片数(2字节) + 数据 + CRC32校验(4字节) + 结束标记(最后分片)
  • 帧重组:接收端根据帧ID和分片索引重新组装完整帧

优化策略:

  1. 缓冲机制:发送端降低帧率和压缩质量,接收端使用帧缓冲减少卡顿
  2. 错误处理:CRC校验保证数据完整性,缓冲区管理避免内存泄漏
  3. 性能平衡:通过调整分片大小、发送速率和压缩质量平衡传输效率和网络压力

四、源码

1、rk3568_video.py

import cv2
import socket
import time
import struct
import zlib# 配置
SERVER_IP = '192.168.50.84'  # 上位机的IP地址
PORT = 5001  # 端口号
FPS = 20  # 降低帧率以减轻网络压力
COMPRESSION_QUALITY = 70  # 降低JPEG压缩质量,减小数据量# 创建一个socket对象
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 4194304)  # 4MB发送缓冲区# 相机连接参数
CAMERA_CONFIG = {'ip': '192.168.50.84','port': 6501,'username': 'admin','password': 'a1234567'
}# RTSP流地址print("正在连接摄像头...")
# 打开摄像头
cap = cv2.VideoCapture("test.mp4")if not cap.isOpened():print("无法打开摄像头")exit()print("摄像头连接成功")# 尝试设置摄像头参数
cap.set(cv2.CAP_PROP_FPS, FPS)  # 设置帧率
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)  # 设置宽度
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)  # 设置高度frame_count = 0
segment_size = 8192  # 减小分片大小,降低单包丢失影响
last_frame_time = time.time()
start_time = time.time()def send_frame(data, frame_id):"""发送一帧数据,使用可靠的分片方法"""# 计算分片数量total_segments = (len(data) + segment_size - 1) // segment_size# 如果分片过多,跳过此帧if total_segments > 100:print(f"帧太大,分片数量: {total_segments},跳过")return False# 为每个分片添加头信息:帧ID(4字节) + 分片索引(2字节) + 总分片数(2字节) + CRC32(4字节)for i in range(total_segments):# 计算当前分片数据start_pos = i * segment_sizeend_pos = min(start_pos + segment_size, len(data))segment_data = data[start_pos:end_pos]# 计算数据校验和checksum = zlib.crc32(segment_data)# 创建头信息header = struct.pack(">IHH", frame_id, i, total_segments)# 如果是最后一个分片,添加结束标记if i == total_segments - 1:# 发送分片 (头信息 + 数据 + 校验和 + 结束标记)packet = header + segment_data + struct.pack(">I", checksum) + b'\xff\xff'else:# 发送分片 (头信息 + 数据 + 校验和)packet = header + segment_data + struct.pack(">I", checksum)# 发送数据client_socket.sendto(packet, (SERVER_IP, PORT))# 控制发送速率,避免网络拥塞if total_segments > 10:time.sleep(0.001)return Truetry:print("开始发送视频流...")while True:# 控制帧率current_time = time.time()if current_time - last_frame_time < 1.0/FPS:time.sleep(0.001)  # 短暂休眠避免CPU占用过高continue# 读取摄像头帧ret, frame = cap.read()if not ret:print("无法读取帧,尝试重新连接...")# 尝试重新连接摄像头cap.release()time.sleep(1)cap = cv2.VideoCapture("test.mp4")print("重新打开视频...")if not cap.isOpened():print("重新连接失败,退出程序")breakcontinuelast_frame_time = current_time# 调整图像尺寸,减小传输数据量frame = cv2.resize(frame, (640, 480))# 对帧进行编码(控制压缩质量)encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), COMPRESSION_QUALITY]encoded, buffer = cv2.imencode('.jpg', frame, encode_param)if not encoded:print("编码帧失败")continue# 获取编码后的数据data = buffer.tobytes()data_len = len(data)# 发送当前帧frame_id = frame_count % 10000  # 循环使用的帧IDsuccess = send_frame(data, frame_id)if success:frame_count += 1# 每5秒显示一次统计信息if frame_count % 100 == 0:elapsed = time.time() - start_timefps = frame_count / elapsedprint(f"已发送 {frame_count} 帧,平均发送速率: {fps:.1f} fps,平均帧大小: {data_len/1024:.1f} KB")except KeyboardInterrupt:print("程序被用户中断")
except Exception as e:print(f"发生错误: {e}")
finally:# 释放资源print(f"程序结束,共发送 {frame_count} 帧")cap.release()client_socket.close()

2、pc_display_video.py

import cv2
import socket
import numpy as np
import time
import struct
import zlib
from collections import deque, defaultdict# 配置
PORT = 5001  # 端口号
BUFFER_SIZE = 15  # 帧缓冲区大小
TARGET_FPS = 25  # 目标帧率
DISPLAY_WIDTH = 640  # 显示窗口宽度
DISPLAY_HEIGHT = 480  # 显示窗口高度# 创建一个socket对象
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(('0.0.0.0', PORT))
# 设置socket更大的接收缓冲区
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 8388608)  # 8MB缓冲区
# 设置socket超时,避免一直阻塞
server_socket.settimeout(0.01)# 创建帧缓冲队列
frame_buffer = deque(maxlen=BUFFER_SIZE)
last_display_time = time.time()
last_frame = None# 数据接收缓冲区,格式: {frame_id: {segment_idx: segment_data, ...}}
frame_segments = defaultdict(dict)
# 帧片段计数,格式: {frame_id: (received_segments, total_segments)}
frame_segment_counts = {}# 创建固定大小的显示窗口
cv2.namedWindow('Video Stream', cv2.WINDOW_NORMAL)
cv2.resizeWindow('Video Stream', DISPLAY_WIDTH, DISPLAY_HEIGHT)# 统计信息变量
frame_received = 0
frame_displayed = 0
corrupted_frames = 0
start_time = time.time()print("开始接收视频流...")def process_packet(packet):"""处理接收到的数据包,返回完整的帧或None"""# 数据包太小,无法包含头信息if len(packet) < 12:  # 4(frame_id) + 2(segment_idx) + 2(total_segments) + 4(crc32)return Nonetry:# 提取头信息:帧ID + 分片索引 + 总分片数header = packet[:8]frame_id, segment_idx, total_segments = struct.unpack(">IHH", header)# 检查是否是结束标记has_end_marker = Falseif packet[-2:] == b'\xff\xff':# 去掉结束标记data = packet[8:-6]  # 去掉头部8字节和尾部6字节(4字节CRC + 2字节结束标记)checksum_bytes = packet[-6:-2]  # 取出校验和has_end_marker = Trueelse:# 没有结束标记data = packet[8:-4]  # 去掉头部8字节和尾部4字节校验和checksum_bytes = packet[-4:]  # 取出校验和# 验证校验和received_checksum = struct.unpack(">I", checksum_bytes)[0]calculated_checksum = zlib.crc32(data)if received_checksum != calculated_checksum:print(f"校验和错误: 帧ID={frame_id}, 分片={segment_idx}, 收到={received_checksum}, 计算={calculated_checksum}")return None# 更新帧分片信息frame_segments[frame_id][segment_idx] = dataif frame_id not in frame_segment_counts:frame_segment_counts[frame_id] = [1, total_segments]else:frame_segment_counts[frame_id][0] += 1# 检查帧是否完整if frame_segment_counts[frame_id][0] == total_segments:# 按顺序拼接所有分片full_data = b''for i in range(total_segments):if i not in frame_segments[frame_id]:print(f"错误: 帧ID={frame_id}缺少分片{i}")return Nonefull_data += frame_segments[frame_id][i]# 清理缓存del frame_segments[frame_id]del frame_segment_counts[frame_id]# 清理过期的帧数据(超过100个不同的帧ID时)if len(frame_segments) > 100:oldest_frame_id = min(frame_segments.keys())del frame_segments[oldest_frame_id]if oldest_frame_id in frame_segment_counts:del frame_segment_counts[oldest_frame_id]return full_datareturn Noneexcept Exception as e:print(f"处理数据包错误: {e}")return Nonewhile True:try:# 接收和处理帧的循环for _ in range(30):  # 每次显示前尝试接收更多包以填充缓冲区try:# 接收数据包packet, _ = server_socket.recvfrom(65535)full_data = process_packet(packet)if full_data is not None:# 解码np_data = np.frombuffer(full_data, dtype=np.uint8)frame = cv2.imdecode(np_data, cv2.IMREAD_COLOR)if frame is not None:# 调整帧大小以确保一致性frame = cv2.resize(frame, (DISPLAY_WIDTH, DISPLAY_HEIGHT))# 将解码成功的帧添加到缓冲区frame_buffer.append(frame)frame_received += 1else:corrupted_frames += 1print("解码帧失败")except socket.timeout:# 超时继续循环breakexcept Exception as e:print(f"接收/解码错误: {e}")break# 控制显示帧率并从缓冲区取帧显示current_time = time.time()time_elapsed = current_time - last_display_time# 按照目标帧率显示if time_elapsed >= 1.0/TARGET_FPS:# 缓冲区有足够帧时才消耗if len(frame_buffer) > BUFFER_SIZE // 3:display_frame = frame_buffer.popleft()last_frame = display_frame.copy()  # 保存该帧用于后续可能的填充cv2.imshow('Video Stream', display_frame)frame_displayed += 1# 缓冲区不足但有上一帧时显示上一帧elif last_frame is not None:cv2.imshow('Video Stream', last_frame)last_display_time = current_time# 每5秒显示一次统计信息if current_time - start_time > 5:fps_received = frame_received / (current_time - start_time)fps_displayed = frame_displayed / (current_time - start_time)buffer_status = len(frame_buffer)print(f"接收帧率: {fps_received:.1f} fps, 显示帧率: {fps_displayed:.1f} fps, 缓冲区: {buffer_status}/{BUFFER_SIZE}, 损坏帧: {corrupted_frames}")# 重置统计frame_received = 0frame_displayed = 0corrupted_frames = 0start_time = current_timeif cv2.waitKey(1) & 0xFF == ord('q'):breakexcept Exception as e:print(f"主循环错误: {e}")continue# 释放资源
server_socket.close()
cv2.destroyAllWindows()

测试是正常的。后续想在rk3568上实现可视对讲功能。

如有侵权,或需要完整代码,请及时联系博主。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/92524.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/92524.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/92524.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【ArcGIS】分区统计中出现Null值且Nodata无法忽略的问题以及shp擦除(erase)的使用——以NDVI去水体为例

需求 已有某地NDVI栅格、行政区shp以及水体shp&#xff0c;计算每个行政区的平均NDVI 问题 1.如果不剔除水体 负值NDVI会把平均值拉低 且水体NDVI并不全为负 需要通过shp剔除&#xff0c;Mask掩膜是提取水体本身而不是剩余部分 2.使用分区统计工具&#xff08;Zonal statis…

Linux中的内核同步源码相关总结

什么是内核同步Linux 内核同步是指内核中用于解决并发执行单元&#xff08;如进程、中断、内核线程等&#xff09;对共享资源&#xff08;如全局数据结构、硬件寄存器、链表等&#xff09;的竞争访问的一系列机制和技术。其核心目标是保证多个并发单元在操作共享资源时的数据一…

WORD接受修订,并修改修订后文字的颜色

在 Word 中&#xff0c;接受修订之后默认会采用正文的默认字体格式&#xff0c;不会保留修订时设置的颜色&#xff0c;比如“插入内容是蓝色字体”的设置会被清除。 如果你想要做到&#xff1a;✅ 接受所有修订后仍然让“原插入的文字”变为蓝色字体保留下来你只能通过一些手动…

行业速览:中国新能源汽车市场格局与关键趋势

在全球汽车产业迈向绿色、低碳、智能化的变革浪潮中&#xff0c;新能源汽车已成为各国争夺的战略高地。中国&#xff0c;作为全球最大的汽车市场和新能源汽车制造国&#xff0c;正以强大的市场规模、完整的产业链体系以及快速提升的技术创新能力&#xff0c;在这场变革中不断加…

【51单片机2个按键控制流水灯转向】2022-10-25

缘由51单片机按键流水灯-嵌入式-CSDN问答 #include "REG52.h" sbit k1P3^0; sbit k2P3^1; void main() {unsigned char l0,xd0,ys10,ys20,z0;P1l;while(1){if(k10&&xd0){z0;while(k10);}if(k20&&xd0){z1;while(k20);}if(ys10)if(ys20){if(z0)if(l0)…

flutter开发(一)flutter命令行工具

安装 Linux下面的flutter安装比较简单&#xff0c;在flutter 中文战 上下载一个最新稳定的版本&#xff0c;解压到系统上就行了。 我下载的是Linux下的3.32.7版。 解压之后&#xff0c;flutter目录里会有bin、dev等目录&#xff0c;把bin目录加到系统的PATH环境变量里&#…

OpenCV 入门实战:从环境配置到图像 / 视频处理

OpenCV 是计算机视觉领域最常用的开源库之一&#xff0c;它提供了丰富的图像和视频处理功能。本文将从环境配置开始&#xff0c;带大家一步步解析基础操作代码&#xff0c;快速入门 OpenCV 的使用。 一、环境配置 在开始之前&#xff0c;我们需要先搭建好 OpenCV 的运行环境。…

2.2.1 饰面板材和陶瓷的特性和应用

1、饰面石材1&#xff09;天然花岗岩2&#xff09;天然大理石3&#xff09;人造石&#xff08;1&#xff09;人造石按主要原材料分包括人造石实体面材、人造石英石和人造石岗石等产品。2、建筑卫生陶瓷建筑卫生陶瓷包括建筑陶瓷和卫生陶瓷两大类。建筑陶瓷包括陶瓷砖、建筑琉璃…

C++的结构体数组

结构体数组的基础知识 结构体数组通过​​组合数据批量管理​​的特性&#xff0c;广泛应用于学生管理、游戏角色属性存储等场景。常见问题 ​​数组越界​​&#xff1a;静态数组长度固定&#xff0c;超过数组长度的访问&#xff0c;会导致未定义行为。​​未初始化成员​​&a…

小程序中使用echarts(2025/8/8)

这篇博文讲的很详细&#xff0c;也很简洁&#xff0c;这里补充一点东西 小程序中使用echarts(硬货&#xff0c;全网最详细教程&#xff01;)_小程序使用echarts-CSDN博客 简单来说就是去官网下载ec-canvas组件&#xff0c;将其中的echarts.js换成echarts.min.js&#xff08;原…

【SpringBoot】SpringBoot配置

根据自动配置原理 学习后&#xff0c;整理学习笔记 一定要耐心去看&#xff0c;耐着性子去学习&#xff0c;慢慢慢慢就明白了 配置深化学习 前提 通过 SpringBootApplication 找到 EnableAutoConfiguration&#xff1b;发现 Import({AutoConfigurationImportSelector.class})…

网络安全与软件定义汽车的发展

在许多汽车公司&#xff0c;同一个系统工程团队同时负责安全&#xff08;safety&#xff09;和安防&#xff08;security&#xff09;。因此&#xff0c;网络安全被视为安全&#xff08;safety&#xff09;的一个子集&#xff0c;其根源在于一个隐含的假设&#xff1a;“如果安…

字典列表依据数值键排序

要根据字典列表中的特定数值键进行排序&#xff0c;我们可以使用 Python 的 sorted() 函数配合自定义排序键。以下是操作方法&#xff1a; 1. 按升序排序&#xff08;从小到大&#xff09; sorted_list sorted(original_list, keylambda x: x[数值键名])2. 按降序排序&#xf…

五、SpringBoot工程打包与运行

SpringBoot工程打包与运行 1、SpringBoot项目快速启动&#xff08;Windows版&#xff09; (1)对SpringBoot项目打包&#xff08;执行Maven构建指令package&#xff09;&#xff1a; mvn package (2)运行项目&#xff08;执行启动指令&#xff09; java -jar springboot.jar 2、…

构建高可用架构:ZDNS GSLB 在多数据中心场景下的应用与 F5 替换实践

随着互联网的快速发展&#xff0c;金融机构、大型企业等组织单位&#xff0c;出于自身业务发展的需要和国家监管的要求&#xff0c;纷纷通过建设多数据中心来提升不同地区的用户体验&#xff0c;同时避免不可抗力因素带来的巨大损失。ZDNS GSLB 全局负载均衡技术&#xff0c;能…

【JMeter】压测脚本生成完善增强

JMeter 压测脚本生成完善增强0. 通过JMeter代理服务器录制脚本1. 设置客户端的代理2. JMeter GUI配置 以及录制脚本3. 调试脚本附录0. 通过JMeter代理服务器录制脚本 1. 设置客户端的代理 JMeter代理服务器默认端口号就是8888 2. JMeter GUI配置 以及录制脚本 新建线程组 …

Agent 开发进阶路线:从基础功能到自主决策

Agent 开发进阶路线&#xff1a;从基础功能到自主决策基础功能构建定义 Agent 的核心功能&#xff0c;如信息收集、简单任务执行和环境交互。 实现基本的感知-决策-执行循环&#xff0c;确保 Agent 能响应外部输入并完成预设任务。 集成 API 调用或传感器交互&#xff0c;扩展 …

使用 ECharts GL 实现 3D 中国地图点位飞线效果

前言在现代数据可视化领域&#xff0c;3D 地图飞线效果是一种非常吸引人的展示方式&#xff0c;特别适合展示地理空间关系和数据流动。本文将详细解析如何使用 ECharts GL 在 Vue 项目中实现一个 3D 中国地图飞线效果。技术栈Vue.js 2.x/3.xECharts 5.xECharts GL 2.x核心实现步…

Redis对象编码

前言 Redis中提供多种数据结构&#xff1a;string、list、map、set、zset等&#xff0c;关于上述多种数据类型的底层实现原理&#xff0c;Redis针对不同的数据类型对应的不同使用场景从时间和空间上进行平衡选择不同的对象编码方式。本文大致介绍一些Redis对象编码方式以及在上…

12-Django项目实战-登录短信验证

1.路由配置 2.对接第三方短信接口 详细内容请点击 3.视图函数 def sms_view(request):"""短信验证视图逻辑1.获取请求体的数据[phone]2.调用封装的短信发送接口&#xff0c;实现发送短信"""data json.loads(request.body)phone data.get(&q…