asyncio 的异步编程框架中,如果说 asyncio.StreamReader 是你异步应用的数据输入管道,那么 asyncio.StreamWriter 就是你异步应用的数据输出管道。它是一个至关重要的组件,让你能够方便、高效且非阻塞地向连接的另一端(如 TCP 套接字)发送字节数据

你可以把 StreamWriter 想象成一个异步的、非阻塞的“数据发送器”。当你调用它的方法来发送数据时,它不会立即把所有数据都推送到网络或底层I/O。相反,它会将数据放入自己的内部缓冲区,并通知事件循环有数据待发送。如果网络繁忙,数据暂时无法发送,StreamWriter 会让出控制权,让事件循环去执行其他任务,直到数据可以被真正写入后再继续。


StreamWriter 的核心职能与价值

asyncio.StreamWriter 主要用于向异步 I/O 源(通常是网络套接字或进程间的管道)高效地写入字节数据。它的核心价值体现在以下几个方面:

  1. 异步非阻塞写入:这是其最核心的特性。StreamWriter 提供的所有写入方法都是非阻塞的。当底层I/O(如网络)因缓冲区满而暂时无法接收更多数据时,写入操作不会阻塞你的整个程序。StreamWriter 会暂停当前的写入任务,允许事件循环处理其他协程,直到数据能够被写入。
  2. 智能内部数据缓冲StreamWriter 内部维护一个输出缓冲区。你写入的数据会先进入这个缓冲区。它会尝试批量发送数据到操作系统底层,减少系统调用次数,提高发送效率。
  3. 流量控制 (Flow Control):这是 StreamWriter 的一个高级特性。当底层传输层(如 TCP 缓冲区)变得拥塞,无法立即发送更多数据时,StreamWriter 会自动暂停数据的写入,直到拥塞缓解。这通过其 drain() 方法得到体现,它能确保数据被“排空”到网络中,而不是无限堆积在应用程序层缓冲区。
  4. 优雅处理连接关闭StreamWriter 提供了明确的方法来关闭连接(如 close()),并且能够让你等待所有待发送数据发送完毕,确保数据的完整传输。

如何获取 StreamWriter 实例?

StreamReader 一样,你通常不会直接实例化 asyncio.StreamWriter。它总是与 StreamReader 成对出现,并通过 asyncio 库的以下两个核心函数获得:

  1. 作为客户端连接 (asyncio.open_connection())
    当你需要作为客户端连接到远程服务器时,asyncio.open_connection() 会建立 TCP 连接,并返回一个包含 (StreamReader, StreamWriter) 的元组。StreamWriter 用于向服务器发送数据。

    import asyncioasync def connect_and_write_client():host, port = 'localhost', 8888print(f"Trying to connect to {host}:{port}...")writer = None # Pre-define writer for finally blocktry:# open_connection returns (StreamReader, StreamWriter) tuplereader, writer = await asyncio.open_connection(host, port)print(f"Successfully connected to {host}:{port}")# --- Using StreamWriter to send data ---message_to_send = "Hello from async client!"print(f"Sending message: '{message_to_send}'")writer.write(message_to_send.encode()) # Write bytes to the bufferawait writer.drain() # Crucial: ensures data is pushed to the network# Optionally, read server's response using StreamReaderprint("Waiting for server response...")data = await reader.read(100)if data:response = data.decode()print(f"Received server response: '{response}'")else:print("Server did not send data or closed connection.")except ConnectionRefusedError:print(f"Connection refused. Make sure the server is running on {host}:{port}.")except Exception as e:print(f"Client encountered an unexpected error: {e}")finally:if writer and not writer.is_closing(): # Check if writer exists and is not already closingprint("Closing client connection...")writer.close() # Close the writer (and underlying socket)await writer.wait_closed() # Wait for the writer to fully closeprint("Connection fully closed.")if __name__ == "__main__":# To run this client example, first start a compatible server (e.g., the echo_server below).# asyncio.run(connect_and_write_client())pass # Not running directly to allow server to be started first
    
  2. 作为服务器处理连接 (asyncio.start_server())
    当你使用 asyncio.start_server() 启动 TCP 服务器时,每当有新的客户端连接到来,你提供的连接处理回调函数就会被调用。这个回调函数会接收到 (StreamReader, StreamWriter) 元组,其中 StreamWriter 代表了向该客户端发送数据的通道。

    import asyncioasync def echo_server_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):"""Coroutine to handle each new client connection."""addr = writer.get_extra_info('peername') # Get client address infoprint(f"Accepted new connection from {addr!r}.")try:while True:# Read data from the client using StreamReaderdata = await reader.read(1024) if not data: # If data is empty, client has disconnectedprint(f"Client {addr!r} disconnected.")break # Exit loop to close connectionmessage = data.decode().strip()print(f"Received from {addr!r}: '{message}'")# --- Using StreamWriter to send a response ---response_message = f"Server received your message: {message}"print(f"Sending response to {addr!r}: '{response_message}'")writer.write(response_message.encode()) # Write bytes to the bufferawait writer.drain() # Crucial: ensures data is sent over the networkexcept asyncio.IncompleteReadError:print(f"Client {addr!r} disconnected before full read.")except ConnectionResetError:print(f"Client {addr!r} forcibly closed the connection.")except Exception as e:print(f"Error handling {addr!r}: {e}")finally:print(f"Closing connection with {addr!r}.")writer.close() # Close the writer (and underlying socket)await writer.wait_closed() # Wait for the writer to fully closeprint(f"Connection with {addr!r} fully closed.")async def run_echo_server():host, port = 'localhost', 8888server = await asyncio.start_server(echo_server_handler, host, port)addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)print(f"Server listening on {addrs}...")# server.serve_forever() keeps the server running until the event loop stopsasync with server:await server.serve_forever()if __name__ == "__main__":# Run this script to start the server.# After starting, you can connect with the client example above or# using 'telnet localhost 8888' in another terminal.asyncio.run(run_echo_server())
    

StreamWriter 的常用方法详解

StreamWriter 提供了一系列用于发送数据和管理连接的异步方法。请注意,它主要操作字节数据

  1. writer.write(data)

    • 作用:将 data(必须是 bytesbytearray 对象)写入 StreamWriter内部缓冲区
    • 行为:这是一个非协程方法,它会立即返回。数据并不会立即发送到网络,而是被放置到写入器的内部缓冲区中等待发送。
    • 重要性:这个方法仅仅是“排队”数据,它不保证数据已经发送出去了。要确保数据被发送到网络,你通常需要紧接着调用 await writer.drain()
    • 示例writer.write(b"Hello, Server!")
  2. await writer.drain()

    • 作用关键的异步方法,用于实现流量控制。它等待内部写入缓冲区中的数据被完全“排空”(drain),即所有数据都已发送到操作系统底层的网络缓冲区。
    • 行为:如果内部缓冲区中有数据待发送,且底层网络缓冲区已满或数据发送速度跟不上生产速度,drain()挂起当前协程,直到所有数据发送完毕,或底层网络缓冲区有足够的空间接收更多数据。
    • 重要性:在写入大量数据或确保某个消息块被完整发送后才能继续执行后续逻辑时,drain() 至关重要。任何重要的写入操作后,都应考虑调用 await writer.drain()。否则,数据可能仍在 Python 应用程序的缓冲区中,而未真正发出去。
    • 示例writer.write(b"Large data chunk"); await writer.drain()
  3. writer.writelines(list_of_data)

    • 作用:将一个包含 bytesbytearray 对象的列表写入内部缓冲区。
    • 行为:与 write() 类似,这是一个非协程方法,仅将列表中的所有数据依次放入缓冲区。
    • 重要性:同样需要结合 await writer.drain() 来确保数据发送。
    • 示例writer.writelines([b"Line 1\n", b"Line 2\n"])
  4. writer.can_write_eof()

    • 作用:同步方法,检查传输层是否支持发送 EOF (End-Of-File) 标志。
    • 行为:返回 TrueFalse。对于 TCP 套接字,通常返回 True
    • 应用场景:在需要明确通知对端不再有数据发送,但又不立即关闭连接时(例如半关闭连接)。
  5. writer.write_eof()

    • 作用:发送一个 EOF 标志到对端。这表明此端将不再发送数据,但仍可以接收数据。
    • 行为:这是一个非协程方法,仅排队发送 EOF 标志。
    • 重要性:发送 EOF 后,通常应调用 await writer.drain() 来确保 EOF 标志被送出。
  6. writer.close()

    • 作用:请求关闭写入器及其关联的底层传输层(如套接字)。
    • 行为:这是一个非协程方法,它会立即返回。它将关闭操作放入队列。在调用 close() 之后,不应再进行写入操作。
    • 重要性:通常在使用完 StreamWriter 后调用,以释放资源。
  7. await writer.wait_closed()

    • 作用:等待写入器完全关闭。
    • 行为:这是一个协程方法。它会挂起当前协程,直到 writer.close() 操作完成,并且所有底层资源都被释放。
    • 重要性:在使用 writer.close() 后,强烈建议调用 await writer.wait_closed()。这能确保在程序继续执行之前,所有待发送的数据都已尝试发送,并且底层连接已正确关闭,避免资源泄露或数据丢失。这在 finally 块中尤其有用。
    • 示例writer.close(); await writer.wait_closed()
  8. writer.is_closing()

    • 作用:同步方法,检查写入器是否正在关闭中。
    • 行为:返回 True 如果 close() 已经被调用,但 wait_closed() 还没有完成。
    • 应用场景:在编写复杂的连接管理逻辑时,可以用于判断当前写入器的状态。
  9. writer.get_extra_info(name, default=None)

    • 作用:获取关于底层传输层的额外信息,例如远程地址、本地地址等。
    • 行为:返回指定名称的信息。
    • 常见参数'peername' (远程地址), 'sockname' (本地地址), 'compression', 'cipher' (SSL/TLS 信息)。
    • 示例peername = writer.get_extra_info('peername')

StreamWriter 的内部工作原理(简述)

当你调用 writer.write() 方法时:

  1. 数据首先被添加到 StreamWriter内部输出缓冲区
  2. StreamWriter 会通知事件循环,表示其内部缓冲区中有数据待发送。
  3. 事件循环会在合适的时机(通常是当底层网络套接字准备好接收数据时),将缓冲区中的数据异步地写入到底层套接字。
  4. 如果你调用 await writer.drain(),并且此时底层网络缓冲区已满或数据发送有延迟,drain()挂起当前协程。事件循环会继续处理其他任务,直到底层缓冲区有空间或数据发送完毕后,再恢复被挂起的协程。
  5. 当你调用 writer.close() 时,StreamWriter 会尝试发送所有剩余的缓冲数据,然后关闭底层套接字。await writer.wait_closed() 则会等待这一系列关闭操作完成。

这种设计确保了数据发送的高效性和非阻塞性,即使网络拥塞,你的应用程序也能保持响应。


StreamWriterStreamReader 的协同

StreamReaderStreamWriter 几乎总是成对出现,共同代表了一个完整的、双向的异步通信通道。它们通过底层的 asyncio 传输层(Transport)进行协作,传输层负责具体的网络 I/O。

理解如何同时使用 StreamReader 进行数据接收和 StreamWriter 进行数据发送,是构建任何基于 asyncio 的网络应用(无论是客户端还是服务器)的核心。它们提供了一个高层次的抽象,让你可以专注于应用逻辑,而不必陷入底层的套接字细节和非阻塞 I/O 的复杂性中。


总结

asyncio.StreamWriterasyncio 框架中用于非阻塞数据发送的核心组件。它通过智能的内部缓冲和流量控制机制,确保数据能够高效且可靠地从你的异步应用程序发送到网络或其他 I/O 目标。

掌握 StreamWriterwrite()drain()(尤其重要!)、close()wait_closed() 方法,是编写健壮、高性能、响应迅速的异步网络服务和客户端的关键。它与 StreamReader 共同构筑了 asyncio 异步 I/O 的强大基石。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/914454.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/914454.shtml
英文地址,请注明出处:http://en.pswp.cn/news/914454.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

控制台打开mysql服务报错解决办法

控制台打开mysql服务报错解决办法这个MySQL错误表示访问被拒绝,通常是因为没有提供正确的用户名和密码。以下是几种解决方法: 方法1:指定用户名和密码连接 mysql -u root -p然后输入root用户的密码。 方法2:如果忘记了root密码&am…

Unsloth 实战:DeepSeek-R1 模型高效微调指南(下篇)

食用指南 本系列因篇幅原因拆分为上下两篇: 上篇以基础环境搭建为主,介绍了 Unsloth 框架、基座模型下载、导入基座模型、数据集下载/加载/清洗、SwanLab 平台账号注册。 下篇(本文)以实战微调为主,介绍预训练、全量…

Ubuntu安装Jenkins

Ubuntu安装Jenkins方法1:使用官方的Jenkins仓库1. 添加Jenkins仓库2. 更新软件包列表3. 安装Jenkins4. 启动Jenkins服务5. 设置Jenkins开机启动6. 查找初始管理员密码7. 访问Jenkins方法2:使用Snap包(适用于较新的Ubuntu版本)1. 安…

ubuntu22.04下配置qt5.15.17开发环境

自从qt5.15版本开始,不再提供免费的离线安装包,只能通过源码自行编译。刚好最近需要在ubuntu22.04下配置qt开发环境,于是写篇文章记录配置的过程。 其实一开始是想配置qt5.15.2的,但是在编译配置参数这一步骤中出现如下报错 em…

S7-1200 与 S7-300 CPS7-400 CP UDP 通信 Step7 项目编程

S7-1200 CPU 与S7-300 CP STEP7 UDP通信S7-1200 与 S7-300 CP 之间的以太网通信可以通过 UDP 协议来实现,使用的通信指令是在S7-1200 CPU 侧调用通信-开放式用户通信TSEND_C,TRCV_C指令或TCON,TDISCON,TUSEND,TURCV 指…

基于YOLOv11的无人机目标检测实战(Windows环境)

1. 环境搭建 1.1 硬件与操作系统 操作系统:Windows 11 CPU:Intel i7-9700 GPU:NVIDIA RTX 2080(8GB显存) 1.2 安装CUDA和cuDNN 由于YOLOv11依赖PyTorch的GPU加速,需要安装CUDA和cuDNN: 安…

Spring Cloud分布式配置中心:架构设计与技术实践

从单体到微服务:Spring Cloud 开篇与微服务设计 Spring Cloud服务注册与发现:架构设计与技术实践深度分析 在以往分享中,码友们已经掌握了微服务的设计和注册中心的设计,部分聪明的码友已经察觉了,已经到了需要设计一个…

15.2 Common Criteria合规

目录1. Common Criteria简介1.1 CC评估要素1.2 CC与TF-A的关系2. TF-A的CC合规要求2.1 安全功能需求2.2 开发过程要求3. TF-A的CC合规实现3.1 关键安全机制3.2 开发流程控制4. CC认证实践指南4.1 认证准备步骤4.2 典型挑战与解决方案4.3 已认证案例参考5. 持续合规建议1. Commo…

【前端:Typst】--let关键字的用法

在 Typst 中,#let 命令是用于定义变量和函数的核心指令,其用法非常灵活。以下是详细的用法说明和示例。 目录 1.基础变量定义 2.函数定义 3.默认参数 4.内容块参数(Content Blocks) 5.递归函数 1.基础变量定义 // 定义简单…

Qt轮廓分析设计+算法+避坑

轮廓分析拟合方面我现在只考虑矩形拟合和圆形拟合细分的话,椭圆拟合,矩形拟合,最小外接矩形,最小外接圆。对于一张图像可能有不同的图形,不同的圆,不同的矩形,我需要对其进行筛选,也…

C++中STL六大组件List的简单介绍

一、前言C非常重视效率&#xff0c;对效率有损失的代码常常是能省则省。使用list要包含的头文件是<list>&#xff0c;要包含头文件就是#iinclude <list>&#xff0c;List肯定是一种链表&#xff0c;我们不妨回忆一下那种链表插入删除效率最快也就是最简单&#xff…

第十五节:Vben Admin 最新 v5.0 (vben5) + Python Flask 快速入门 - vue前端 生产部署

Vben Admin vben5 系列文章目录 💻 基础篇 ✅ 第一节:Vben Admin 最新 v5.0 (vben5) + Python Flask 快速入门 ✅ 第二节:Vben Admin 最新 v5.0 (vben5) + Python Flask 快速入门 - Python Flask 后端开发详解(附源码) ✅ 第三节:Vben Admin 最新 v5.0 (vben5) + Python …

背包初步(0-1背包、完全背包)

当月光洒在我的脸上 我想我就快变了模样 有一种叫做撕心裂肺的汤 喝了它有神奇的力量 动态规划初步&#xff08;完全背包&#xff09; 目录动态规划初步&#xff08;完全背包&#xff09;0-1背包简介完全背包检查数组是否存在有效划分&#xff08;前缀划分DP&#xff09;单词拆…

Linux驱动06 --- UDP

目录 一、UDP 1.1 介绍 1.2 UDP 的通信方式 1.3 单播 发送函数 接收函数 1.4 广播 1.5 组播/多播 一、UDP 1.1 介绍 传输层的另外一个协议 面向无连接&#xff0c;不稳定&#xff0c;速度快&#xff0c;可以一对多 UDP&#xff08;User Datagram Protocol&…

AJAX 投票:技术解析与应用场景

AJAX 投票:技术解析与应用场景 引言 随着互联网技术的不断发展,Web应用的用户体验越来越受到重视。AJAX(Asynchronous JavaScript and XML)作为一种重要的技术,在实现异步数据交互方面发挥着关键作用。本文将深入探讨AJAX投票系统的技术原理、应用场景以及优化策略。 A…

【字节跳动】数据挖掘面试题0017:推荐算法:双塔模型,怎么把内容精准地推送给用户

文章大纲 双塔模型:推荐算法中的“高效匹配引擎一、双塔模型的核心思想:“分而治之” 的匹配逻辑二、双塔模型的结构:从特征输入到相似度输出1. 输入层:特征的 “原材料处理”2. 塔网络层:用户与物品的“个性化编码”3. 交互层:向量相似度的“偏好打分”三、双塔模型的优…

7月14日日记

数学类今天考完最后一科英语放假回家了。有点羡慕他们。今天英语成绩出来了&#xff0c;我是89分&#xff0c;一开始有点失望&#xff0c;感觉没有上90&#xff0c;这是一个很好的冲击4.0 的机会。但是后来一想好像也没什么可惜的&#xff0c;这个分数还是很高的。舍友小林是90…

js的局部变量和全局变量

全局变量常常定义在函数外&#xff0c;具有全局定义域&#xff0c;在整个js代码的任何地方都可以使用&#xff0c;这个就叫全局变量局部变量定义在函数内部&#xff0c;只在当前函数的定义域可以被使用&#xff0c;而且不同的函数可以定义相同的局部变量&#xff0c;他们之间相…

C++ 多态详解:从概念到实现原理----《Hello C++ Wrold!》(14)--(C/C++)

文章目录前言多态的概念多态的定义和实现虚函数虚函数的重写(覆盖)多态的构成条件override 和 final&#xff08;C11提出&#xff09;finaloverride重载、覆盖(重写)、隐藏(重定义)的对比抽象类接口继承和实现继承多态的原理虚函数表(也叫做虚表)引申:虚表的打印多态的原理静态…

Node.js + Express的数据库AB View切换方案设计

方案总览数据导入过程&#xff1a; - 根据控制表判断当前活跃组&#xff08;假设当前活跃的是a&#xff0c;那么接下来要导入到b&#xff09;。 - 清空非活跃表&#xff08;即b表&#xff09;的数据&#xff0c;然后将新数据导入到b表。 - 切换控制表&#xff0c;将活…