from sglang.srt.entrypoints.http_server import launch_server from sglang.srt.server_args import prepare_server_args from sglang.srt.utils import kill_process_tree
if __name__ == "__main__": server_args = prepare_server_args(sys.argv[1:])
deflaunch_server( server_args: ServerArgs, pipe_finish_writer: Optional[multiprocessing.connection.Connection] = None, launch_callback: Optional[Callable[[], None]] = None, ): """ Launch SRT (SGLang Runtime) Server. The SRT server consists of an HTTP server and an SRT engine. - HTTP server: A FastAPI server that routes requests to the engine. - The engine consists of three components: 1. TokenizerManager: Tokenizes the requests and sends them to the scheduler. 2. Scheduler (subprocess): Receives requests from the Tokenizer Manager, schedules batches, forwards them, and sends the output tokens to the Detokenizer Manager. 3. DetokenizerManager (subprocess): Detokenizes the output tokens and sends the result back to the Tokenizer Manager. Note: 1. The HTTP server, Engine, and TokenizerManager both run in the main process. 2. Inter-process communication is done through IPC (each process uses a different port) via the ZMQ library. """ tokenizer_manager, scheduler_info = _launch_subprocesses(server_args=server_args) set_global_state( _GlobalState( tokenizer_manager=tokenizer_manager, scheduler_info=scheduler_info, ) )
# Add api key authorization if server_args.api_key: add_api_key_middleware(app, server_args.api_key)
# Add prometheus middleware if server_args.enable_metrics: add_prometheus_middleware(app) enable_func_timer()
# Send a warmup request - we will create the thread launch it # in the lifespan after all other warmups have fired. warmup_thread = threading.Thread( target=_wait_and_warmup, args=( server_args, pipe_finish_writer, _global_state.tokenizer_manager.image_token_id, launch_callback, ), ) app.warmup_thread = warmup_thread
def_launch_subprocesses( server_args: ServerArgs, port_args: Optional[PortArgs] = None ) -> Tuple[TokenizerManager, Dict]: """ Launch the TokenizerManager in the main process, the Scheduler in a subprocess, and the DetokenizerManager in another subprocess. """ # Configure global environment configure_logger(server_args) server_args.check_server_args() _set_envs_and_config(server_args)
# Allocate ports for inter-process communications if port_args isNone: port_args = PortArgs.init_new(server_args) logger.info(f"{server_args=}")
# If using model from www.modelscope.cn, first download the model. server_args.model_path, server_args.tokenizer_path = prepare_model_and_tokenizer( server_args.model_path, server_args.tokenizer_path )
for proc in scheduler_procs: proc.join() logger.error( f"Scheduler or DataParallelController {proc.pid} terminated with {proc.exitcode}" ) returnNone, None
# Launch tokenizer process tokenizer_manager = TokenizerManager(server_args, port_args) if server_args.chat_template: load_chat_template_for_openai_api( tokenizer_manager, server_args.chat_template, server_args.model_path ) else: guess_chat_template_name_from_model_path(server_args.model_path)
if server_args.completion_template: load_completion_template_for_openai_api(server_args.completion_template)
# Wait for the model to finish loading scheduler_infos = [] for i inrange(len(scheduler_pipe_readers)): try: data = scheduler_pipe_readers[i].recv() except EOFError: logger.error( f"Rank {i} scheduler is dead. Please check if there are relevant logs." ) scheduler_procs[i].join() logger.error(f"Exit code: {scheduler_procs[i].exitcode}") raise
if data["status"] != "ready": raise RuntimeError( "Initialization failed. Please see the error messages above." ) scheduler_infos.append(data)
# Assume all schedulers have the same scheduler_info scheduler_info = scheduler_infos[0] tokenizer_manager.max_req_input_len = scheduler_info["max_req_input_len"] return tokenizer_manager, scheduler_info
# Config the process kill_itself_when_parent_died() setproctitle.setproctitle(f"sglang::scheduler{prefix.replace(' ', '_')}") faulthandler.enable() parent_process = psutil.Process().parent()
# [For Router] if env var "SGLANG_DP_RANK" exist, set dp_rank to the value of the env var if dp_rank isNoneand"SGLANG_DP_RANK"in os.environ: dp_rank = int(os.environ["SGLANG_DP_RANK"])
# Configure the logger configure_logger(server_args, prefix=prefix) suppress_other_loggers()
# Set cpu affinity to this gpu process if get_bool_env_var("SGLANG_SET_CPU_AFFINITY"): set_gpu_proc_affinity(server_args.tp_size, server_args.nnodes, gpu_id)
if batch: result = self.run_batch(batch) self.process_batch_result(batch, result) else: # When the server is idle, do self-check and re-init some states self.check_memory() self.new_token_ratio = self.init_new_token_ratio
defget_next_batch_to_run(self) -> Optional[ScheduleBatch]: # Merge the prefill batch into the running batch chunked_req_to_exclude = set() ifself.chunked_req: # Move the chunked request out of the batch so that we can merge # only finished requests to running_batch. chunked_req_to_exclude.add(self.chunked_req) self.tree_cache.cache_unfinished_req(self.chunked_req) # chunked request keeps its rid but will get a new req_pool_idx self.req_to_token_pool.free(self.chunked_req.req_pool_idx) ifself.last_batch andself.last_batch.forward_mode.is_extend(): ifself.last_batch.chunked_req isnotNone: # In the context pipeline parallelism, after the last chunk, the current microbatch still track outdated chunked_req. # We need to discard it. chunked_req_to_exclude.add(self.last_batch.chunked_req)
# Merge the new batch into the running batch ifnotself.last_batch.is_empty(): ifself.running_batch.is_empty(): self.running_batch = self.last_batch else: # Merge running_batch with prefill batch self.running_batch.merge_batch(self.last_batch)
new_batch = self.get_new_batch_prefill() if new_batch isnotNone: # Run prefill first if possible ret = new_batch else: # Run decode ifnotself.running_batch.is_empty(): self.running_batch = self.update_running_batch(self.running_batch) ret = self.running_batch ifnotself.running_batch.is_empty() elseNone else: ret = None
# These 2 values are needed for processing the output, but the values can be # modified by overlap schedule. So we have to copy them here so that # we can use the correct values in output processing. if batch.return_logprob: extend_input_len_per_req = [req.extend_input_len for req in batch.reqs] extend_logprob_start_len_per_req = [ req.extend_logprob_start_len for req in batch.reqs ] else: extend_input_len_per_req = None extend_logprob_start_len_per_req = None
ret = GenerationBatchResult( logits_output=logits_output ifself.pp_group.is_last_rank elseNone, pp_hidden_states_proxy_tensors=( pp_hidden_states_proxy_tensors ifnotself.pp_group.is_last_rank elseNone ), next_token_ids=next_token_ids ifself.pp_group.is_last_rank elseNone, extend_input_len_per_req=extend_input_len_per_req, extend_logprob_start_len_per_req=extend_logprob_start_len_per_req, bid=bid, can_run_cuda_graph=can_run_cuda_graph, ) else: # embedding or reward model model_worker_batch = batch.get_model_worker_batch() embeddings = self.tp_worker.forward_batch_embedding(model_worker_batch) ret = EmbeddingBatchResult( embeddings=embeddings, bid=model_worker_batch.bid ) return ret
if server_args.enable_expert_distribution_metrics: logger.info( "ExpertDistributionRecorder auto start record since enable_expert_distribution_metrics" ) self.start_record()
def_reset(self): """Reset the expert distribution recorder.""" logger.info("Resetting ExpertDistributionRecorder...") assert ( self._current_layer_idx.value isNone ), f"{self._current_layer_idx.value=}" for gatherer inself._single_pass_gatherers.values(): gatherer.reset() self._accumulator.reset()
defstart_record(self): """Start recording the expert distribution.""" ifself._recording: logger.warning( "SGLang server is already recording expert ids. Did you forget to dump the expert ids recorded so far by sending requests to the `/stop_expert_distribution_record` and `/dump_expert_distribution_record` endpoints?" ) self._reset() self._recording = True
defstop_record(self): """Stop recording the expert distribution.""" ifnotself._recording: logger.warning( "SGLang server has not been recording expert ids. Did you forget to start recording by sending request to the `/start_expert_distribution_record` endpoint?" ) self._recording = False
defdump_record(self, output_mode: _OutputMode = "file"): """Dump the expert distribution record and reset the recorder after dumping.""" output = self._accumulator.dump(output_mode=output_mode) self._reset() return output
class_StatAccumulator(_UtilizationRateAccumulatorMixin): def__init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._global_physical_count_of_buffered_step = _Buffer.init_new( item_shape=( self._expert_location_metadata.num_layers, # Cannot use local_physical_count to support select_experts self._expert_location_metadata.num_physical_experts, ), buffer_size=self._server_args.expert_distribution_recorder_buffer_size, dtype=torch.int32, device=self._server_args.device, )
defappend( self, forward_pass_id: int, gatherer_key: str, single_pass_data: Dict, ): super().append(forward_pass_id, gatherer_key, single_pass_data) # Can optimize if overhead here is large self._global_physical_count_of_buffered_step.append( single_pass_data["global_physical_count"] )
logger.info( f"[Expert Balancedness] " f"forward_pass_id={forward_pass_id} " f"current_pass_balancedness={utilization_rate:.03f} " f"{''.join(f'last_{size}_average_balancedness={value:.03f} 'for size, value in self._history.mean().items())} " f"gpu_physical_count_sum={gpu_physical_count_sum}" # f"current_pass_per_layer={[round(x, 2) for x in utilization_rate_tensor.cpu().tolist()]}" )
# Otherwise, the circular buffer will contain stale data. If the case is needed, it can be implemented. assert ( self._server_args.eplb_rebalance_num_iterations >= self._server_args.expert_distribution_recorder_buffer_size ), "eplb_rebalance_num_iterations must be less than expert_distribution_recorder_buffer_size"
# case 1: unchanged if old_physical_to_logical_map[dst_expert_location] == logical_expert_id: if debug: output_logs.append( f"handle_recv_of_dst_expert_location {dst_expert_location=} case=unchanged" ) return
# case 2: same-gpu for src_expert_location inrange(*local_expert_location_range): if old_physical_to_logical_map[src_expert_location] == logical_expert_id: for i inrange(num_tensors): _get_tensor(temp_buffers, i, dst_expert_location).copy_( _get_tensor(routed_experts_weights, i, src_expert_location) ) buffer2weight_copy_infos.append( (dst_expert_location, dst_expert_location) ) if debug: output_logs.append( f"handle_recv_of_dst_expert_location {dst_expert_location=} case=same-gpu {src_expert_location=}" ) return
# case 3: free-rider for src_expert_location inrange( rank * num_local_physical_experts, dst_expert_location ): if new_physical_to_logical_map[src_expert_location] == logical_expert_id: buffer2weight_copy_infos.append( (src_expert_location, dst_expert_location) ) if debug: output_logs.append( f"handle_recv_of_dst_expert_location {dst_expert_location=} case=free-rider {src_expert_location=}" ) return
# case 4: same-node if rank in need_comm_self_node_dst_ranks: chosen_src_rank = same_node_mapping.chunk_value_from_element_value( element_value=rank ) _create_p2p_recv_and_buffer2weight_copy( buffer2weight_copy_infos, p2p_op_infos, src_rank=chosen_src_rank, logical_expert_id=logical_expert_id, dst_expert_location=dst_expert_location, ) if debug: output_logs.append( f"handle_recv_of_dst_expert_location {dst_expert_location=} case=same-node {chosen_src_rank=}" ) return
# case 5: cross-node # Future work: can optimize when there are multiple ranks in the same dst node that uses the same logical expert chosen_src_rank = cross_node_mapping.chunk_value_from_element_value( element_value=rank ) _create_p2p_recv_and_buffer2weight_copy( buffer2weight_copy_infos, p2p_op_infos, src_rank=chosen_src_rank, logical_expert_id=logical_expert_id, dst_expert_location=dst_expert_location, ) if debug: output_logs.append( f"handle_recv_of_dst_expert_location {dst_expert_location=} case=cross-node {chosen_src_rank=}" ) return
def_create_p2p_recv_and_buffer2weight_copy( buffer2weight_copy_infos, p2p_op_infos, *, logical_expert_id: int, src_rank: int, dst_expert_location: int, ): p2p_op_infos.append( ( logical_expert_id, [ P2POp( op=torch.distributed.irecv, tensor=_get_tensor(temp_buffers, i, dst_expert_location), peer=src_rank, ) for i inrange(num_tensors) ], ) ) buffer2weight_copy_infos.append((dst_expert_location, dst_expert_location))
def_create_isend_ops(p2p_op_infos): handled_logical_expert_ids = set() for src_expert_location inrange(*local_expert_location_range): logical_expert_id = old_physical_to_logical_map[src_expert_location]
if logical_expert_id in handled_logical_expert_ids: continue handled_logical_expert_ids.add(logical_expert_id)
if debug: output_logs.append( f"create_isend_ops_of_logical_expert_id {logical_expert_id=}{src_expert_location=}{same_node_dst_ranks=}{cross_node_dst_ranks=}" )
p2p_op_infos.append( ( logical_expert_id, [ P2POp( op=torch.distributed.isend, tensor=_get_tensor( routed_experts_weights, i, src_expert_location ), peer=dst_rank, ) for dst_rank in all_dst_ranks for i inrange(num_tensors) ], ) )
def_compute_comm_info(logical_expert_id: int): all_src_ranks = _deduplicate_ordered( [ x // num_local_physical_experts for x inrange(num_physical_experts) if old_physical_to_logical_map[x] == logical_expert_id ] ) all_src_nodes = [x // num_gpu_per_node for x in all_src_ranks] self_node_src_ranks = [ x for x in all_src_ranks if x // num_gpu_per_node == self_node_id ]
need_comm_dst_ranks = _deduplicate_ordered( [ x // num_local_physical_experts for x inrange(num_physical_experts) if new_physical_to_logical_map[x] == logical_expert_id and x // num_local_physical_experts notin all_src_ranks ] ) need_comm_self_node_dst_ranks = ( [x for x in need_comm_dst_ranks if x // num_gpu_per_node == self_node_id] iflen(self_node_src_ranks) > 0 else [] ) need_comm_cross_node_dst_ranks = [ x for x in need_comm_dst_ranks if (x // num_gpu_per_node) notin all_src_nodes ]
def_execute_p2p_ops(p2p_op_infos): sorted_infos = sorted(p2p_op_infos, key=lambda info: info[0]) p2p_ops = [op for _, ops in sorted_infos for op in ops] iflen(p2p_ops) == 0: return
reqs = torch.distributed.batch_isend_irecv(p2p_ops) for req in reqs: req.wait()
def_execute_buffer2weight_copies(buffer2weight_copy_infos): for ( temp_buffers_expert_location, routed_experts_weights_expert_location, ) in buffer2weight_copy_infos: for i inrange(num_tensors): _get_tensor( routed_experts_weights, i, routed_experts_weights_expert_location ).copy_(_get_tensor(temp_buffers, i, temp_buffers_expert_location))