EngineCoreClient是与EngineCore进行交互的基类:

  • API定义了同步和异步两个版本。
class EngineCoreClient(ABC):@abstractmethoddef shutdown(self):...def get_output(self) -> EngineCoreOutputs:raise NotImplementedErrordef add_request(self, request: EngineCoreRequest) -> None:raise NotImplementedError...def collective_rpc(self,method: Union[str, Callable[..., _R]],timeout: Optional[float] = None,args: tuple = (),kwargs: Optional[dict[str, Any]] = None) -> list[_R]:raise NotImplementedErrordef dp_engines_running(self) -> bool:raise NotImplementedErrorasync def scale_elastic_ep(self, new_data_parallel_size: int) -> None:raise NotImplementedErrorasync def get_output_async(self) -> EngineCoreOutputs:raise NotImplementedErrorasync def add_request_async(self, request: EngineCoreRequest) -> None:raise NotImplementedError...async def collective_rpc_async(self,method: Union[str, Callable[..., _R]],timeout: Optional[float] = None,args: tuple = (),kwargs: Optional[dict[str, Any]] = None) -> list[_R]:raise NotImplementedError

InprocClient

InprocClient是EngineCoreClient的单进程子类,主要用于V0版本:

  • 在同一个进程中直接调用 EngineCore 的方法,而不需要通过IPC。
  • 避免了通信开销,但会阻塞主线程。
class InprocClient(EngineCoreClient):def __init__(self, *args, **kwargs):self.engine_core = EngineCore(*args, **kwargs)def get_output(self) -> EngineCoreOutputs:outputs, _ = self.engine_core.step()return outputs.get(0) or EngineCoreOutputs()def add_request(self, request: EngineCoreRequest) -> None:self.engine_core.add_request(request)...

MPClient

MPClient是EngineCoreClient的多进程子类:

  • 用一个后台进程(Background Process)来执行EngineCore
  • 使用input_socket来Push EngineCoreRequests
  • 使用output_socket来Pull EngineCoreOutputs

MPClient.__init__

MPClient.__init__完成:

  • 创建encoder和decoder,用于序列化EngineCoreRequests和反序列化EngineCoreOutputs
  • 创建zmq.Context/zmq.asyncio.Context:EngineCoreClient和EngineCore之间的交互Context。
  • 创建BackgroundResources用于释放资源(如zmp.Context等)
  • 调用weakref.finalize将BackgroundResources绑定到self,这样在self对象销毁时,会自动调用BackgroundResources.__call__。
  • 如果配置了client_addresses,则使用外部创建好的EngineCore,否则则调用launch_core_engines创建EngineCore:engine_manager和coordinator。
  • 调用make_zmq_socket创建EngineCoreClient Push/Pull EngineCore的socket。
  • 根据engine_ranks为每一个RANK分配一个ZMQ identity。
  • 用input_socket接收engine_ranks发送的初始化消息,全部接收完成后说明所有的EngineCore都被正确初始化。
  • 创建pending_messages用于跟踪socket的消息发送情况,以确保数据发送完成后,再释放对应的资源(如Pytorch的Tensor内存)
class MPClient(EngineCoreClient):def __init__(self,asyncio_mode: bool,vllm_config: VllmConfig,executor_class: type[Executor],log_stats: bool,client_addresses: Optional[dict[str, str]] = None,):self.vllm_config = vllm_configself.encoder = MsgpackEncoder()self.decoder = MsgpackDecoder(EngineCoreOutputs)sync_ctx = zmq.Context(io_threads=2)self.ctx = zmq.asyncio.Context(sync_ctx) if asyncio_mode else sync_ctxself.resources = BackgroundResources(ctx=sync_ctx)self._finalizer = weakref.finalize(self, self.resources)success = Falsetry:self.engines_running = Falseself.stats_update_address: Optional[str] = Noneif client_addresses is not None:# Engines are managed externally to this client.input_address = client_addresses["input_address"]output_address = client_addresses["output_address"]self.stats_update_address = client_addresses.get("stats_update_address")else:with launch_core_engines(vllm_config, executor_class,log_stats) as (engine_manager,coordinator,addresses):self.resources.coordinator = coordinatorself.resources.engine_manager = engine_manager(input_address, ) = addresses.inputs(output_address, ) = addresses.outputsself.stats_update_address = (addresses.frontend_stats_publish_address)if coordinator is not None:assert self.stats_update_address == (coordinator.get_stats_publish_address())self.input_socket = self.resources.input_socket = make_zmq_socket(self.ctx, input_address, zmq.ROUTER, bind=True)self.resources.output_socket = make_zmq_socket(self.ctx, output_address, zmq.PULL)...engine_ranks = [dp_rank] if (offline_modeor external_dp_lb) else range(dp_size)self.core_engines: list[EngineIdentity] = [index.to_bytes(2, "little") for index in engine_ranks]identities = set(self.core_engines)sync_input_socket = zmq.Socket.shadow(self.input_socket)while identities:if not sync_input_socket.poll(timeout=600_000):raise TimeoutError("Timed out waiting for engines to send""initial message on input socket.")identity, _ = sync_input_socket.recv_multipart()identities.remove(identity)self.core_engine: EngineIdentity = self.core_engines[0]self.utility_results: dict[int, AnyFuture] = {}self.pending_messages = deque[tuple[zmq.MessageTracker, Any]]()success = Truefinally:if not success:self._finalizer()

launch_core_engines

launch_core_engines完成EnginCore和DPCoordinator进程的创建:

  • 创建EngineZmqAddresses:
@dataclass
class EngineZmqAddresses:inputs: list[str] # LLMEngine Push Request给EngineCoreClient的ZMQ Socket地址outputs: list[str] # LLMEngine Pull EngineCoreClient的Response的ZMQ Socket地址coordinator_input: Optional[str] = None # DPCoordinator发送EngineCoreRequest(START_DP_WAVE)给EngineCore的ZMQ Socket地址coordinator_output: Optional[str] = None # EngineCore发送EngineCoreOutputs给DPCoordinator的ZMQ Socket地址frontend_stats_publish_address: Optional[str] = None # DPCoordinator接收EngineCoreClient发布消息(如SCALE_ELASTIC_EP)的ZMQ Socket地址
  • 创建DPCoordinator:在DP(dp_size > 1)和Online模式下,在DP_RANK=0的机器上,会启动一个DPCoordinator进程,用于DP域多机资源管理与调度。
  • 根据data_parallel_backend配置来创建对应的EngineCoreManager:如果backend是ray,则创建CoreEngineActorManager,否则创建CoreEngineProcManager。
  • 调用wait_for_engine_startup等待EnginCoreClient和EnginCore完成握手,表明EnginCoreClient代理的所有EngineCore都启动完成。
@contextlib.contextmanager
def launch_core_engines(vllm_config: VllmConfig,executor_class: type[Executor],log_stats: bool,num_api_servers: int = 1,
) -> Iterator[tuple[Optional[Union[CoreEngineProcManager, CoreEngineActorManager]],Optional[DPCoordinator],EngineZmqAddresses,
]]:...# Set up input and output addresses.addresses = EngineZmqAddresses(inputs=[get_engine_client_zmq_addr(client_local_only, host)for _ in range(num_api_servers)],outputs=[get_engine_client_zmq_addr(client_local_only, host)for _ in range(num_api_servers)],)run_coordinator = dp_size > 1 and not offline_mode and dp_rank == 0if run_coordinator:coordinator = DPCoordinator(parallel_config)addresses.coordinator_input, addresses.coordinator_output = (coordinator.get_engine_socket_addresses())addresses.frontend_stats_publish_address = (coordinator.get_stats_publish_address())else:coordinator = Noneif parallel_config.data_parallel_backend == "ray":engine_actor_manager = CoreEngineActorManager(vllm_config=vllm_config,addresses=addresses,executor_class=executor_class,log_stats=log_stats,)yield engine_actor_manager, coordinator, addressesreturn...with zmq_socket_ctx(local_handshake_address, zmq.ROUTER,bind=True) as handshake_socket:from vllm.v1.engine.core import EngineCoreProcif local_engine_count:local_engine_manager = CoreEngineProcManager(EngineCoreProc.run_engine_core,vllm_config=vllm_config,executor_class=executor_class,log_stats=log_stats,handshake_address=handshake_address,client_handshake_address=client_handshake_address,local_client=True,local_engine_count=local_engine_count,start_index=dp_rank,local_start_index=local_start_index or 0)else:local_engine_manager = Noneyield local_engine_manager, coordinator, addresses# Now wait for engines to start.wait_for_engine_startup(handshake_socket,addresses,engines_to_handshake,parallel_config,vllm_config.cache_config,local_engine_manager,coordinator.proc if coordinator else None,)

SyncMPClient

SyncMPClient是MPClient的同步IO子类:

  • 实现了EngineCoreClient中定义的所有同步IO的API。
  • 使用queue.Queue队列,实现同步IO。

SyncMPClient.__init__:

  • 创建一个线程,接收EngineCore发送到output_socket的消息,并加入到self.outputs_queue。
class SyncMPClient(MPClient):def __init__(self, vllm_config: VllmConfig, executor_class: type[Executor],log_stats: bool):super().__init__(asyncio_mode=False,vllm_config=vllm_config,executor_class=executor_class,log_stats=log_stats,)...self.outputs_queue = queue.Queue[Union[EngineCoreOutputs, Exception]]()ctx = self.ctxout_socket = self.resources.output_socketdecoder = self.decoderutility_results = self.utility_resultsoutputs_queue = self.outputs_queueshutdown_path = get_open_zmq_inproc_path()resources = self.resourcesresources.shutdown_path = shutdown_pathdef process_outputs_socket():shutdown_socket = ctx.socket(zmq.PAIR)try:shutdown_socket.bind(shutdown_path)poller = zmq.Poller()poller.register(shutdown_socket, zmq.POLLIN)poller.register(out_socket, zmq.POLLIN)while True:socks = poller.poll()...frames = out_socket.recv_multipart(copy=False)resources.validate_alive(frames)outputs: EngineCoreOutputs = decoder.decode(frames)if outputs.utility_output:_process_utility_output(outputs.utility_output,utility_results)else:outputs_queue.put_nowait(outputs)except Exception as e:...self.output_queue_thread = Thread(target=process_outputs_socket,name="EngineCoreOutputQueueThread",daemon=True)self.output_queue_thread.start()

SyncMPClient和EngineCore的交互主要有3类:

  • add_request:添加EngineCoreRequest
  • call_utility:远程调用EngineCore方法
  • get_output:获取EngineCoreOutput

SyncMPClient.add_request

  • 序列化EngineCoreRequest,会同时序列化里面的Auxiliary Buffers(比如Pytorch Tensor等)。
  • 使用input_socke发送消息给EngineCore:(Identity, RequestType, EngineCoreRequest(Serialized), Auxiliary Buffers)
  • 如果存在Auxiliary Buffers,会将消息添加到self.pending_messages,用于确保消息发送完成后,再释放对应的内存(如Pytorch Tensor等)
class SyncMPClient(MPClient):def add_request(self, request: EngineCoreRequest) -> None:if self.is_dp:self.engines_running = Trueself._send_input(EngineCoreRequestType.ADD, request) def _send_input(self, request_type: EngineCoreRequestType, request: Any):self.ensure_alive()self.free_pending_messages()# (Identity, RequestType, SerializedRequest)msg = (self.core_engine, request_type.value,*self.encoder.encode(request))if len(msg) <= 3:# No auxiliary buffers => no tensor backing buffers in request.self.input_socket.send_multipart(msg, copy=False)returntracker = self.input_socket.send_multipart(msg, copy=False, track=True)self.add_pending_message(tracker, request)

SyncMPClient.call_utility

以profile为例:

  • 生成一个call_id
  • 使用input_socke发送消息给EngineCore:(Identity, RequestType, (0, call_id, method, args)(Serialized))
class SyncMPClient(MPClient):def profile(self, is_start: bool = True) -> None:self.call_utility("profile", is_start)def call_utility(self, method: str, *args) -> Any:call_id = uuid.uuid1().int >> 64future: Future[Any] = Future()self.utility_results[call_id] = futureself._send_input(EngineCoreRequestType.UTILITY,(0, call_id, method, args))return future.result()

SyncMPClient.get_output

在SyncMPClient.__init__中,已经独立创建了一个线程,用于接收并解码output_socket的消息,并放入self.outputs_queue(queue.Queue类型)。
所以SyncMPClient.get_output只需要调用self.outputs_queue.get()即可以实现同步IO:在未收到数据时,阻塞主线程。

AsyncMPClient

TODO:待补充

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/99146.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/99146.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/99146.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

几种排序算法(2)

几种排序算法&#xff08;2&#xff09;1冒泡排序2.快速排序2.1hoare版本找基准值2.2lomuto前后指针3.非递归版本快速排序4.递归排序5.排序算法复杂度及稳定性分析我们已经详解了插入排序和选择排序&#xff0c;不了解的可以翻看我上一篇博客。1冒泡排序 void BubbleSort(int*…

Excel甘特图

1. 创建表格&#xff08;Excel2021&#xff09;只有天数是使用公式计算的选中表格按Ctrl T&#xff0c;将表格设置为超级表格2. 创建堆积条形图3. 添加设置图例项3.1 添加开始时间3.2 修改图例项顺序 3.3 编辑轴标签3.4 Y轴逆序类别 3.5 添加开始时间数据标签选择 所用橘色图&…

基于OpenCV的答题卡自动识别与评分系统

引言 在教育考试场景中&#xff0c;手动批改答题卡效率低下且容易出错。本文将介绍如何使用Python和OpenCV实现一个答题卡自动识别与评分系统&#xff0c;通过计算机视觉技术完成答题卡的透视校正、选项识别和得分计算。该系统可广泛应用于学校考试、培训测评等场景&#xff0c…

LLaMA-MoE v2:基于后训练混合专家模型的稀疏性探索与技术突破

重新定义大型语言模型的效率边界在人工智能飞速发展的今天&#xff0c;大型语言模型&#xff08;LLMs&#xff09;已成为推动技术进步的核心力量。然而&#xff0c;模型规模的不断扩大带来了惊人的计算成本和高昂的部署门槛&#xff0c;使得众多研究机构和中小型企业难以承担。…

R geo 然后读取数据的时候 make.names(vnames, unique = TRUE): invalid multibyte string 9

setwd("K:/download/geo") # 替换为实际工作目录 # 修改get_geo_data_local函数中的读取部分 #file_path <- "K:/download/geo/raw_data/GEO/GSE32967_series_matrix_fixed.txt" file_path <- "K:/download/geo/data/GSE32967_series_matrix.t…

深入理解 Spring @Async 注解:原理、实现与实践

在现代 Java 应用开发中&#xff0c;异步编程是提升系统吞吐量和响应速度的关键技术之一。Spring 框架提供的Async注解极大简化了异步方法的实现&#xff0c;让开发者无需手动管理线程即可轻松实现异步操作。本文将从底层原理到实际应用&#xff0c;全面解析Async注解的工作机制…

linux C 语言开发 (七) 文件 IO 和标准 IO

文章的目的为了记录使用C语言进行linux 开发学习的经历。开发流程和要点有些记忆模糊&#xff0c;赶紧记录&#xff0c;防止忘记。 相关链接&#xff1a; linux C 语言开发 (一) Window下用gcc编译和gdb调试 linux C 语言开发 (二) VsCode远程开发 linux linux C 语言开发 (…

maven , mvn 运行 项目

提示&#xff1a;环境搭建 文章目录前言一、使用步骤1. 以构建含有 pom.xml 的项目2.mvn 运行具体项目3.mvn 指定模块>运行具体项目前言 提示&#xff1a;版本 spirngboot 3.2 jdk 21 mvn 3.9 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、使…

JVM垃圾回收的时机是什么时候(深入理解 JVM 垃圾回收时机:什么时候会触发 GC?)

深入理解 JVM 垃圾回收时机&#xff1a;什么时候会触发 GC&#xff1f;在 Java 开发中&#xff0c;我们常听说 “JVM 会自动进行垃圾回收”&#xff0c;但很少有人能说清&#xff1a;GC 究竟在什么情况下会被触发&#xff1f;是到固定时间就执行&#xff1f;还是内存满了才会启…

在Vue项目中Axios发起请求时的小知识

在Vue项目中Axios发起请求时的小知识 在Vue项目开发中&#xff0c;Axios作为基于Promise的HTTP客户端&#xff0c;凭借其简洁的API设计和强大的功能&#xff08;如请求/响应拦截、自动JSON转换、取消请求等&#xff09;&#xff0c;已成为前端与后端通信的主流选择。本文将深入…

GeoHash分级索引技术

GeoHash分级索引技术是一种将二维地理坐标转换为一维字符串的空间索引方法,其核心是通过分级网格划分和前缀编码实现高效的空间数据检索。以下从技术原理、实现细节到工程优化展开详细解析: 一、编码原理与分级结构 1. 经纬度二进制化 GeoHash通过递归二分地球表面生成网格…

HTML HTML基础(4)

1.列表 (1).有序列表 概念&#xff1a;有顺序或侧重顺序的列表。 <h2>要把大象放冰箱总共分几步</h2> <ol> <li>把冰箱门打开</li> <li>把大象放进去</li> <li>把冰箱门关上</li> </ol> (2).无序列表 概念&a…

MySQL中的回表操作

在数据库查询&#xff08;尤其是基于 B树索引 的关系型数据库&#xff0c;如MySQL、PostgreSQL&#xff09;中&#xff0c;“回表”是一个核心且高频出现的概念&#xff0c;直接影响查询性能。要理解回表&#xff0c;需先理清索引结构与数据存储的关联&#xff0c;再拆解其发生…

QT子线程与GUI线程安全交互

在Qt应用程序开发中&#xff0c;涉及到多线程处理时&#xff0c;如何安全地从子线程更新UI界面是一个常见的问题。Qt的UI界面并不是线程安全的&#xff0c;意味着你不能直接在子线程中操作UI组件&#xff08;比如按钮、标签等&#xff09;。如果不遵循线程安全的规则&#xff0…

RL【10-2】:Actor - Critic

系列文章目录 Fundamental Tools RL【1】&#xff1a;Basic Concepts RL【2】&#xff1a;Bellman Equation RL【3】&#xff1a;Bellman Optimality Equation Algorithm RL【4】&#xff1a;Value Iteration and Policy Iteration RL【5】&#xff1a;Monte Carlo Learnin…

开源大模型天花板?DeepSeek-V3 6710亿参数MoE架构深度拆解

文章目录认知解构&#xff1a;DeepSeek的定位与核心价值模型概述与发展历程创立初期与技术奠基&#xff08;2023年7月-2024年11月&#xff09;里程碑一&#xff1a;MoE架构规模化突破&#xff08;2024年12月&#xff09;里程碑二&#xff1a;推理成本革命性优化&#xff08;202…

10 训练中的一些问题

&#x1f31f; 大背景&#xff1a;训练神经网络 下山寻宝 训练神经网络就像你蒙着眼在一座大山里&#xff0c;想找最低点&#xff08;最小损失&#xff09;。你只能靠脚下的坡度&#xff08;梯度&#xff09;来决定往哪儿走。 你的位置 模型参数&#xff08;权重 www&#xf…

synchronized锁升级的过程(从无锁到偏向锁,再到轻量级锁,最后到重量级锁的一个过程)

锁升级是 Java 中 synchronized 锁 的核心优化机制&#xff08;基于 JVM 的 对象头 Mark Word 实现&#xff09;&#xff0c;指锁的状态从 无锁 → 偏向锁 → 轻量级锁 → 重量级锁 逐步升级的过程。其目的是通过 “按需升级”&#xff0c;在不同并发场景下选择最优的锁实现&am…

HOT100--Day25--84. 柱状图中最大的矩形,215. 数组中的第K个最大元素,347. 前 K 个高频元素

HOT100–Day25–84. 柱状图中最大的矩形&#xff0c;215. 数组中的第K个最大元素&#xff0c;347. 前 K 个高频元素 每日刷题系列。今天的题目是《力扣HOT100》题单。 题目类型&#xff1a;栈&#xff0c;堆。 84. 柱状图中最大的矩形 思路&#xff1a; class Solution {publ…

基于 Apache Doris 的用户画像数据模型设计方案

一、 需求分析与设计目标数据源&#xff1a;用户基本信息&#xff1a;用户ID、性别、出生日期、注册时间、常驻地域&#xff08;省、市、区&#xff09;、职业等。用户体检报告&#xff1a;每次体检的报告ID、体检时间、各项指标&#xff08;如血压、血糖、血脂、BMI等&#xf…