Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/usage/environment_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ environment_variables: dict[str, Callable[[], Any]] = {
# Whether to enable the decode caches requests for preallocating resource
"FD_ENABLE_CACHE_TASK": lambda: os.getenv("FD_ENABLE_CACHE_TASK", "0"),

# Batched token timeout in EP
"FD_EP_BATCHED_TOKEN_TIMEOUT": lambda: float(os.getenv("FD_EP_BATCHED_TOKEN_TIMEOUT", "0.1")),

# Max pre-fetch requests number in PD
"FD_EP_MAX_PREFETCH_TASK_NUM": lambda: int(os.getenv("FD_EP_MAX_PREFETCH_TASK_NUM", "8")),

Expand Down
3 changes: 3 additions & 0 deletions docs/zh/usage/environment_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ environment_variables: dict[str, Callable[[], Any]] = {
# 是否启用 decode 缓存请求以预分配资源
"FD_ENABLE_CACHE_TASK": lambda: os.getenv("FD_ENABLE_CACHE_TASK", "0"),

# EP 中批处理 token 的超时时间
"FD_EP_BATCHED_TOKEN_TIMEOUT": lambda: float(os.getenv("FD_EP_BATCHED_TOKEN_TIMEOUT", "0.1")),

# PD 中最大预取请求数量
"FD_EP_MAX_PREFETCH_TASK_NUM": lambda: int(os.getenv("FD_EP_MAX_PREFETCH_TASK_NUM", "8")),

Expand Down
19 changes: 19 additions & 0 deletions fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2156,6 +2156,25 @@ def postprocess(self):
self.speculative_config.num_speculative_tokens = 1
self.speculative_config.num_model_steps = 1

# Auto-compute num_max_dispatch_tokens_per_rank from max_num_seqs and num_speculative_tokens
if self.speculative_config is not None and self.speculative_config.method is not None:
num_spec_tokens = self.speculative_config.num_speculative_tokens
auto_dispatch_tokens = self.scheduler_config.max_num_seqs * (num_spec_tokens + 1)
else:
auto_dispatch_tokens = self.scheduler_config.max_num_seqs
if (
getattr(self.model_config, "num_max_dispatch_tokens_per_rank", None)
and self.model_config.num_max_dispatch_tokens_per_rank != auto_dispatch_tokens
):
logger.info(
f"Auto-setting num_max_dispatch_tokens_per_rank from "
f"{self.model_config.num_max_dispatch_tokens_per_rank} to {auto_dispatch_tokens} "
f"(max_num_seqs={self.scheduler_config.max_num_seqs}"
f"{f', num_speculative_tokens={num_spec_tokens}' if self.speculative_config is not None and self.speculative_config.method is not None else ''})."
)

self.model_config.num_max_dispatch_tokens_per_rank = auto_dispatch_tokens

if self.scheduler_config.splitwise_role == "mixed":
self._disable_sequence_parallel_moe_if_needed("Mixed")
self.model_config.moe_phase = MoEPhase(phase="prefill")
Expand Down
53 changes: 17 additions & 36 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,15 +367,6 @@ def _init_worker_monitor_signals(self): # exist_task_signal 用于各worker进
create=True,
)

engine_forward_signal_data = np.zeros([1], dtype=np.int32)
self.engine_forward_signal = IPCSignal(
name="engine_forward_signal",
array=engine_forward_signal_data,
dtype=np.int32,
suffix=current_suffix,
create=True,
)

# worker_live_signal 用于engine感知各worker进程是否存活,记录每个step 时间
worker_healthy_live_recorded_time_array = np.zeros(
shape=[min(self.cfg.worker_num_per_node, self.cfg.parallel_config.tensor_parallel_size)], dtype=np.int32
Expand Down Expand Up @@ -1037,29 +1028,26 @@ def _fetch_request():
with self._pause_cond:
self._pause_cond.wait_for(lambda: not self.is_paused)
try:
if not is_fetching:
# Check if the thread pool is still available to avoid submitting tasks to a shutdown thread pool.
try:
if self.engine_worker_queue.exist_tasks():
time.sleep(0.001)
continue
if self.cfg.scheduler_config.splitwise_role != "mixed":
if not is_fetching:
is_fetching = True
get_request_pool.submit(_fetch_request)
except RuntimeError as e:
if "shutdown" in str(e):
self.llm_logger.info("Thread pool shutdown detected, exiting scheduler loop")
break
else:
raise
if self.cfg.scheduler_config.splitwise_role != "mixed":
# Continue preprocessing incoming requests and accumulating them in the queue when forward pass not finished.
# Once the forward pass finishes, these accumulated requests can be scheduled in larger,
# more efficient batches.
if self.engine_worker_queue.exist_tasks() or self.engine_forward_signal.value[0] != 0:
time.sleep(0.001)
continue

else:
# In mixed, todo: optimze cache swap, to decouple swap from scheduler
if self.engine_worker_queue.exist_tasks():
time.sleep(0.001)
continue
if len(self.resource_manager.waiting) == 0 and (not is_fetching):
# Check if the thread pool is still available to avoid submitting tasks to a shutdown thread pool.
try:
is_fetching = True
get_request_pool.submit(_fetch_request)
except RuntimeError as e:
if "shutdown" in str(e):
self.llm_logger.info("Thread pool shutdown detected, exiting scheduler loop")
break
else:
raise

if hasattr(self.resource_manager, "scheduler_unhandled_request_num"):
self.resource_manager.scheduler_unhandled_request_num = self._get_scheduler_unhandled_request_num()
Expand Down Expand Up @@ -1120,13 +1108,6 @@ def _fetch_request():
elif not task.has_been_preempted_before:
task.metrics.inference_start_time = time.time()
self.engine_worker_queue.put_tasks((tasks, self.resource_manager.real_bsz))
else:
# When there are no actual tasks to schedule, send an empty task batch to EP workers.
# This helps EP workers barrier for syncing tasks not hang.
if self.cfg.parallel_config.enable_expert_parallel:
self.engine_worker_queue.put_tasks(
([], self.resource_manager.real_bsz)
) # Empty (as idle tasks for ep)

# 4. Response error tasks
if error_tasks:
Expand Down
8 changes: 7 additions & 1 deletion fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,8 +841,14 @@ def launch_components(self):
+ f" data parallel id {i}"
)
self.dp_processed[-1].start()

for i in range(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 新循环从 i=1 开始,跳过等待第 0 个 DP 进程(i=0)的初始化信号

原始逻辑:在同一个循环中 start() 后立即 wait 对应 launched_expert_service_signal,每个 DP 进程串行启动并等待初始化完成。

修改后逻辑:先在外层循环全部 start(),再用 range(1, N) 等待 i=1i=N-1 的信号,跳过了 i=0 进程的信号检查

请确认:

  1. i=0 是否为当前进程(本机已初始化,无需等待)?
  2. 如果 i=0 是独立子进程,跳过等待是否会导致后续代码(如发送首批任务)在 i=0 尚未完成初始化时就执行,产生竞态条件?

1,
self.cfg.parallel_config.data_parallel_size // self.cfg.nnode,
):

while self.launched_expert_service_signal.value[i] == 0:
time.sleep(1)
time.sleep(0.1)

def check_worker_initialize_status(self):
"""
Expand Down
13 changes: 0 additions & 13 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
EncoderCacheManager,
ProcessorCacheManager,
)
from fastdeploy.config import ErnieArchitectures
from fastdeploy.engine.request import (
ImagePosition,
Request,
Expand Down Expand Up @@ -757,13 +756,6 @@ def schedule(self):
Try to pull a batch of requests from the waiting queue and schedule them.
"""

def get_enough_request(request, scheduled_reqs):
return (
ErnieArchitectures.is_ernie5_arch(self.config.model_config.architectures)
and self._is_mm_request(request)
and self.exist_mm_prefill(scheduled_reqs)
)

with self.lock:
scheduled_reqs: list[Request] = []
preempted_reqs: list[Request] = []
Expand Down Expand Up @@ -898,9 +890,6 @@ def _allocate_decode_and_extend():
):
req_index += 1
continue
if get_enough_request(request, scheduled_reqs):
req_index += 1
continue
num_new_tokens = self._get_num_new_tokens(request, token_budget)
if num_new_tokens == 0:
req_index += 1
Expand Down Expand Up @@ -951,8 +940,6 @@ def _allocate_decode_and_extend():
break

request = self.waiting[0]
if get_enough_request(request, scheduled_reqs):
break
if request.status == RequestStatus.WAITING:
result = self.waiting_async_process(request)
if result is None:
Expand Down
2 changes: 2 additions & 0 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ def _validate_split_kv_size(value: int) -> int:
"FD_ZMQ_CONTROL_CMD_SERVER_PORTS": lambda: os.getenv("FD_ZMQ_CONTROL_CMD_SERVER_PORTS", "8202"),
# Whether to enable the decode caches requests for preallocating resource
"FD_ENABLE_CACHE_TASK": lambda: os.getenv("FD_ENABLE_CACHE_TASK", "0"),
# Batched token timeout in EP
"FD_EP_BATCHED_TOKEN_TIMEOUT": lambda: float(os.getenv("FD_EP_BATCHED_TOKEN_TIMEOUT", "0.1")),
# Max pre-fetch requests number in PD
"FD_EP_MAX_PREFETCH_TASK_NUM": lambda: int(os.getenv("FD_EP_MAX_PREFETCH_TASK_NUM", "8")),
# Enable or disable model caching.
Expand Down
Loading