EngineCoreClient是与EngineCore进行交互的基类:
- API定义了同步和异步两个版本。
class EngineCoreClient(ABC):@abstractmethoddef shutdown(self):...def get_output(self) -> EngineCoreOutputs:raise NotImplementedErrordef add_request(self, request: EngineCoreRequest) -> None:raise NotImplementedError...def collective_rpc(self,method: Union[str, Callable[..., _R]],timeout: Optional[float] = None,args: tuple = (),kwargs: Optional[dict[str, Any]] = None) -> list[_R]:raise NotImplementedErrordef dp_engines_running(self) -> bool:raise NotImplementedErrorasync def scale_elastic_ep(self, new_data_parallel_size: int) -> None:raise NotImplementedErrorasync def get_output_async(self) -> EngineCoreOutputs:raise NotImplementedErrorasync def add_request_async(self, request: EngineCoreRequest) -> None:raise NotImplementedError...async def collective_rpc_async(self,method: Union[str, Callable[..., _R]],timeout: Optional[float] = None,args: tuple = (),kwargs: Optional[dict[str, Any]] = None) -> list[_R]:raise NotImplementedError
InprocClient
InprocClient是EngineCoreClient的单进程子类,主要用于V0版本:
- 在同一个进程中直接调用 EngineCore 的方法,而不需要通过IPC。
- 避免了通信开销,但会阻塞主线程。
class InprocClient(EngineCoreClient):def __init__(self, *args, **kwargs):self.engine_core = EngineCore(*args, **kwargs)def get_output(self) -> EngineCoreOutputs:outputs, _ = self.engine_core.step()return outputs.get(0) or EngineCoreOutputs()def add_request(self, request: EngineCoreRequest) -> None:self.engine_core.add_request(request)...
MPClient
MPClient是EngineCoreClient的多进程子类:
- 用一个后台进程(Background Process)来执行EngineCore
- 使用input_socket来Push EngineCoreRequests
- 使用output_socket来Pull EngineCoreOutputs
MPClient.__init__
MPClient.__init__完成:
- 创建encoder和decoder,用于序列化EngineCoreRequests和反序列化EngineCoreOutputs
- 创建zmq.Context/zmq.asyncio.Context:EngineCoreClient和EngineCore之间的交互Context。
- 创建BackgroundResources用于释放资源(如zmp.Context等)
- 调用weakref.finalize将BackgroundResources绑定到self,这样在self对象销毁时,会自动调用BackgroundResources.__call__。
- 如果配置了client_addresses,则使用外部创建好的EngineCore,否则则调用launch_core_engines创建EngineCore:engine_manager和coordinator。
- 调用make_zmq_socket创建EngineCoreClient Push/Pull EngineCore的socket。
- 根据engine_ranks为每一个RANK分配一个ZMQ identity。
- 用input_socket接收engine_ranks发送的初始化消息,全部接收完成后说明所有的EngineCore都被正确初始化。
- 创建pending_messages用于跟踪socket的消息发送情况,以确保数据发送完成后,再释放对应的资源(如Pytorch的Tensor内存)
class MPClient(EngineCoreClient):def __init__(self,asyncio_mode: bool,vllm_config: VllmConfig,executor_class: type[Executor],log_stats: bool,client_addresses: Optional[dict[str, str]] = None,):self.vllm_config = vllm_configself.encoder = MsgpackEncoder()self.decoder = MsgpackDecoder(EngineCoreOutputs)sync_ctx = zmq.Context(io_threads=2)self.ctx = zmq.asyncio.Context(sync_ctx) if asyncio_mode else sync_ctxself.resources = BackgroundResources(ctx=sync_ctx)self._finalizer = weakref.finalize(self, self.resources)success = Falsetry:self.engines_running = Falseself.stats_update_address: Optional[str] = Noneif client_addresses is not None:# Engines are managed externally to this client.input_address = client_addresses["input_address"]output_address = client_addresses["output_address"]self.stats_update_address = client_addresses.get("stats_update_address")else:with launch_core_engines(vllm_config, executor_class,log_stats) as (engine_manager,coordinator,addresses):self.resources.coordinator = coordinatorself.resources.engine_manager = engine_manager(input_address, ) = addresses.inputs(output_address, ) = addresses.outputsself.stats_update_address = (addresses.frontend_stats_publish_address)if coordinator is not None:assert self.stats_update_address == (coordinator.get_stats_publish_address())self.input_socket = self.resources.input_socket = make_zmq_socket(self.ctx, input_address, zmq.ROUTER, bind=True)self.resources.output_socket = make_zmq_socket(self.ctx, output_address, zmq.PULL)...engine_ranks = [dp_rank] if (offline_modeor external_dp_lb) else range(dp_size)self.core_engines: list[EngineIdentity] = [index.to_bytes(2, "little") for index in engine_ranks]identities = set(self.core_engines)sync_input_socket = zmq.Socket.shadow(self.input_socket)while identities:if not sync_input_socket.poll(timeout=600_000):raise TimeoutError("Timed out waiting for engines to send""initial message on input socket.")identity, _ = sync_input_socket.recv_multipart()identities.remove(identity)self.core_engine: EngineIdentity = self.core_engines[0]self.utility_results: dict[int, AnyFuture] = {}self.pending_messages = deque[tuple[zmq.MessageTracker, Any]]()success = Truefinally:if not success:self._finalizer()
launch_core_engines
launch_core_engines完成EnginCore和DPCoordinator进程的创建:
- 创建EngineZmqAddresses:
@dataclass
class EngineZmqAddresses:inputs: list[str] # LLMEngine Push Request给EngineCoreClient的ZMQ Socket地址outputs: list[str] # LLMEngine Pull EngineCoreClient的Response的ZMQ Socket地址coordinator_input: Optional[str] = None # DPCoordinator发送EngineCoreRequest(START_DP_WAVE)给EngineCore的ZMQ Socket地址coordinator_output: Optional[str] = None # EngineCore发送EngineCoreOutputs给DPCoordinator的ZMQ Socket地址frontend_stats_publish_address: Optional[str] = None # DPCoordinator接收EngineCoreClient发布消息(如SCALE_ELASTIC_EP)的ZMQ Socket地址
- 创建DPCoordinator:在DP(dp_size > 1)和Online模式下,在DP_RANK=0的机器上,会启动一个DPCoordinator进程,用于DP域多机资源管理与调度。
- 根据data_parallel_backend配置来创建对应的EngineCoreManager:如果backend是ray,则创建CoreEngineActorManager,否则创建CoreEngineProcManager。
- 调用wait_for_engine_startup等待EnginCoreClient和EnginCore完成握手,表明EnginCoreClient代理的所有EngineCore都启动完成。
@contextlib.contextmanager
def launch_core_engines(vllm_config: VllmConfig,executor_class: type[Executor],log_stats: bool,num_api_servers: int = 1,
) -> Iterator[tuple[Optional[Union[CoreEngineProcManager, CoreEngineActorManager]],Optional[DPCoordinator],EngineZmqAddresses,
]]:...# Set up input and output addresses.addresses = EngineZmqAddresses(inputs=[get_engine_client_zmq_addr(client_local_only, host)for _ in range(num_api_servers)],outputs=[get_engine_client_zmq_addr(client_local_only, host)for _ in range(num_api_servers)],)run_coordinator = dp_size > 1 and not offline_mode and dp_rank == 0if run_coordinator:coordinator = DPCoordinator(parallel_config)addresses.coordinator_input, addresses.coordinator_output = (coordinator.get_engine_socket_addresses())addresses.frontend_stats_publish_address = (coordinator.get_stats_publish_address())else:coordinator = Noneif parallel_config.data_parallel_backend == "ray":engine_actor_manager = CoreEngineActorManager(vllm_config=vllm_config,addresses=addresses,executor_class=executor_class,log_stats=log_stats,)yield engine_actor_manager, coordinator, addressesreturn...with zmq_socket_ctx(local_handshake_address, zmq.ROUTER,bind=True) as handshake_socket:from vllm.v1.engine.core import EngineCoreProcif local_engine_count:local_engine_manager = CoreEngineProcManager(EngineCoreProc.run_engine_core,vllm_config=vllm_config,executor_class=executor_class,log_stats=log_stats,handshake_address=handshake_address,client_handshake_address=client_handshake_address,local_client=True,local_engine_count=local_engine_count,start_index=dp_rank,local_start_index=local_start_index or 0)else:local_engine_manager = Noneyield local_engine_manager, coordinator, addresses# Now wait for engines to start.wait_for_engine_startup(handshake_socket,addresses,engines_to_handshake,parallel_config,vllm_config.cache_config,local_engine_manager,coordinator.proc if coordinator else None,)
SyncMPClient
SyncMPClient是MPClient的同步IO子类:
- 实现了EngineCoreClient中定义的所有同步IO的API。
- 使用queue.Queue队列,实现同步IO。
SyncMPClient.__init__:
- 创建一个线程,接收EngineCore发送到output_socket的消息,并加入到self.outputs_queue。
class SyncMPClient(MPClient):def __init__(self, vllm_config: VllmConfig, executor_class: type[Executor],log_stats: bool):super().__init__(asyncio_mode=False,vllm_config=vllm_config,executor_class=executor_class,log_stats=log_stats,)...self.outputs_queue = queue.Queue[Union[EngineCoreOutputs, Exception]]()ctx = self.ctxout_socket = self.resources.output_socketdecoder = self.decoderutility_results = self.utility_resultsoutputs_queue = self.outputs_queueshutdown_path = get_open_zmq_inproc_path()resources = self.resourcesresources.shutdown_path = shutdown_pathdef process_outputs_socket():shutdown_socket = ctx.socket(zmq.PAIR)try:shutdown_socket.bind(shutdown_path)poller = zmq.Poller()poller.register(shutdown_socket, zmq.POLLIN)poller.register(out_socket, zmq.POLLIN)while True:socks = poller.poll()...frames = out_socket.recv_multipart(copy=False)resources.validate_alive(frames)outputs: EngineCoreOutputs = decoder.decode(frames)if outputs.utility_output:_process_utility_output(outputs.utility_output,utility_results)else:outputs_queue.put_nowait(outputs)except Exception as e:...self.output_queue_thread = Thread(target=process_outputs_socket,name="EngineCoreOutputQueueThread",daemon=True)self.output_queue_thread.start()
SyncMPClient和EngineCore的交互主要有3类:
- add_request:添加EngineCoreRequest
- call_utility:远程调用EngineCore方法
- get_output:获取EngineCoreOutput
SyncMPClient.add_request
- 序列化EngineCoreRequest,会同时序列化里面的Auxiliary Buffers(比如Pytorch Tensor等)。
- 使用input_socke发送消息给EngineCore:(Identity, RequestType, EngineCoreRequest(Serialized), Auxiliary Buffers)
- 如果存在Auxiliary Buffers,会将消息添加到self.pending_messages,用于确保消息发送完成后,再释放对应的内存(如Pytorch Tensor等)
class SyncMPClient(MPClient):def add_request(self, request: EngineCoreRequest) -> None:if self.is_dp:self.engines_running = Trueself._send_input(EngineCoreRequestType.ADD, request) def _send_input(self, request_type: EngineCoreRequestType, request: Any):self.ensure_alive()self.free_pending_messages()# (Identity, RequestType, SerializedRequest)msg = (self.core_engine, request_type.value,*self.encoder.encode(request))if len(msg) <= 3:# No auxiliary buffers => no tensor backing buffers in request.self.input_socket.send_multipart(msg, copy=False)returntracker = self.input_socket.send_multipart(msg, copy=False, track=True)self.add_pending_message(tracker, request)
SyncMPClient.call_utility
以profile为例:
- 生成一个call_id
- 使用input_socke发送消息给EngineCore:(Identity, RequestType, (0, call_id, method, args)(Serialized))
class SyncMPClient(MPClient):def profile(self, is_start: bool = True) -> None:self.call_utility("profile", is_start)def call_utility(self, method: str, *args) -> Any:call_id = uuid.uuid1().int >> 64future: Future[Any] = Future()self.utility_results[call_id] = futureself._send_input(EngineCoreRequestType.UTILITY,(0, call_id, method, args))return future.result()
SyncMPClient.get_output
在SyncMPClient.__init__中,已经独立创建了一个线程,用于接收并解码output_socket的消息,并放入self.outputs_queue(queue.Queue类型)。
所以SyncMPClient.get_output只需要调用self.outputs_queue.get()即可以实现同步IO:在未收到数据时,阻塞主线程。
AsyncMPClient
TODO:待补充