Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,9 @@ def _fetch_request():
self.llm_logger.debug(
f"P has allocated resources and then ask D resource for request: {task.request_id}"
)
trace_print(
LoggingEventName.ASK_DECODE_RESOURCE_START, task.request_id, getattr(task, "user", "")
)
task.metrics.ask_decode_resource_start_time = time.time()
while True:
self.split_connector.send_splitwise_tasks([task], task.idx)
Expand All @@ -966,6 +969,11 @@ def _fetch_request():
time.sleep(0.05)
else:
task.metrics.ask_decode_resource_finish_time = time.time()
trace_print(
LoggingEventName.ASK_DECODE_RESOURCE_END,
task.request_id,
getattr(task, "user", ""),
)
break
self.llm_logger.debug(f"D has allocated resource for request: {task.request_id}")
else:
Expand All @@ -977,13 +985,19 @@ def _fetch_request():
self.llm_logger.debug(
f"P has allocated resources and then ask D resource for req_id: {task.request_id}"
)
trace_print(
LoggingEventName.ASK_DECODE_RESOURCE_START, task.request_id, getattr(task, "user", "")
)
task.metrics.ask_decode_resource_start_time = time.time()
self.split_connector.send_splitwise_tasks([task], task.idx)

for task in tasks:
# assure fetch block ids from D
status, msg = self.split_connector.check_decode_allocated(task)
task.metrics.ask_decode_resource_finish_time = time.time()
trace_print(
LoggingEventName.ASK_DECODE_RESOURCE_END, task.request_id, getattr(task, "user", "")
)
if not status:
error_msg = (
f"PD Error: prefill failed to apply for resource from decode, "
Expand All @@ -1000,6 +1014,7 @@ def _fetch_request():
)
]
)
main_process_metrics.reschedule_req_num.inc()
need_delete_tasks.append(task)
continue
for tmp_task in need_delete_tasks:
Expand Down Expand Up @@ -1110,6 +1125,7 @@ def _fetch_request():
f"preallocated request. req:{task.request_id} "
)
self.llm_logger.error(msg)
main_process_metrics.reschedule_req_num.inc()
self.scheduler.put_results(
[
RequestOutput(
Expand Down Expand Up @@ -2109,6 +2125,7 @@ def _process_allocate_resource_requests():
processed_indices = []
for idx, task in enumerate(allocate_resource_requests):
is_success = False
trace_print(LoggingEventName.DECODE_PROCESS_PREALLOCATE_REQUEST_START, task.request_id, task.user)

if envs.ENABLE_V1_KVCACHE_SCHEDULER:
Comment thread
juncaipeng marked this conversation as resolved.
if self.resource_manager.preallocate_resource_in_d(task):
Expand All @@ -2118,6 +2135,7 @@ def _process_allocate_resource_requests():
self.llm_logger.debug(f"D has successfully sent cache infos for task {task.request_id}")
processed_indices.append(idx)
is_success = True
main_process_metrics.decode_preallocated_req_num.inc()
else:
if self.resource_manager.is_resource_sufficient(task.prompt_token_ids_len):
self.llm_logger.debug(f"D Resource available, processing task {task.request_id}")
Expand All @@ -2137,6 +2155,11 @@ def _process_allocate_resource_requests():
break

for idx in sorted(processed_indices, reverse=True):

This comment was marked as outdated.

trace_print(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 此处引用了拼写错误的 LoggingEventName.DECODE_PROCESS_PREALLOCAT_REQUEST_END(少了字母 E),需与 constants.py 中修正后保持一致。

LoggingEventName.DECODE_PROCESS_PREALLOCAT_REQUEST_END,
allocate_resource_requests[idx].request_id,
allocate_resource_requests[idx].user,
)
allocate_resource_requests.pop(idx)

def _process_prefilled_requests():
Expand All @@ -2152,6 +2175,7 @@ def _process_prefilled_requests():
continue
req_output.finished = False
ready_request_outputs.append(req_output)
trace_print(LoggingEventName.DECODE_PROCESS_PREFILLED_REQUEST_START, req_output.request_id, "")
self.llm_logger.debug(f"there are enough resource for prefilled request: {req_output.request_id}")

prefilled_request_ouputs = waiting_request_outputs
Expand All @@ -2164,6 +2188,8 @@ def _process_prefilled_requests():
else:
for req_output in ready_request_outputs:
request_id = req_output.request_id
main_process_metrics.decode_preallocated_req_num.dec()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug decode_preallocated_req_num.dec()inc() 逻辑不对称:

  • inc() 仅在 ENABLE_V1_KVCACHE_SCHEDULER 分支中执行(_process_allocate_resource_requests 第2135行)
  • dec() 在此处无条件执行(包括非 V1 分支路径)

ENABLE_V1_KVCACHE_SCHEDULER=False 时,inc() 从未被调用,而 dec() 仍会执行,导致计数持续为负值,Gauge 语义失真。

建议:同样在 V1 分支条件内执行 dec(),或确认非 V1 分支的 preallocated 语义。

trace_print(LoggingEventName.DECODE_PROCESS_PREFILLED_REQUEST_END, request_id, "")
if envs.FD_ENABLE_INTERNAL_ADAPTER and not req_output.outputs.token_ids:
# first token is eos in Prefill, just recycle resource and continue
self.llm_logger.warning(f"{request_id} need not decode after first token")
Expand All @@ -2177,6 +2203,7 @@ def _process_prefilled_requests():
self.llm_logger.warning(
f"{request_id} prefill failed with msg:{req_output.error_msg}, recycle resource."
)
main_process_metrics.failed_recv_first_token_req_num.inc()
self.resource_manager.pre_recycle_resource(request_id)
if request_id in self.token_processor.tokens_counter:
del self.token_processor.tokens_counter[request_id]
Expand Down
14 changes: 10 additions & 4 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -1649,7 +1649,9 @@ def clear_data(self):

def update_metrics(self, verbose=False):
# Update metrics
num_tasks = sum([1 if task else 0 for task in self.tasks_list])
num_requests_running = len(self.running)
num_requests_waiting = len(self.waiting)
num_requests_queuing = max(int(getattr(self, "scheduler_unhandled_request_num", 0) or 0), 0)
blocks_used_by_tasks = set()
for task in self.tasks_list:
if task is not None:
Expand All @@ -1658,10 +1660,14 @@ def update_metrics(self, verbose=False):
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - len(blocks_used_by_tasks))
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.num_requests_running.set(len(self.running))
main_process_metrics.num_requests_waiting.set(num_tasks - len(self.running))
main_process_metrics.num_requests_running.set(num_requests_running)
main_process_metrics.num_requests_waiting.set(num_requests_waiting)
main_process_metrics.num_requests_queuing.set(num_requests_queuing)
if verbose:
llm_logger.info(f"update metrics: running={len(self.running)}, waiting={num_tasks - len(self.running)}")
llm_logger.info(
f"update metrics: running={num_requests_running}, "
f"waiting={num_requests_waiting}, queuing={num_requests_queuing}"
)

def log_status(self):
llm_logger.info(
Expand Down
33 changes: 31 additions & 2 deletions fastdeploy/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class MetricsManager:

num_requests_running: "Gauge"
num_requests_waiting: "Gauge"
num_requests_queuing: "Gauge"
time_to_first_token: "Histogram"
time_per_output_token: "Histogram"
request_inference_time: "Histogram"
Expand All @@ -153,7 +154,6 @@ class MetricsManager:
spec_decode_num_emitted_tokens_total: "Gauge"
spec_decode_draft_single_head_acceptance_rate: "list[Gauge]"

# for YIYAN Adapter
prefix_cache_token_num: "Counter"
prefix_gpu_cache_token_num: "Counter"
prefix_cpu_cache_token_num: "Counter"
Expand Down Expand Up @@ -192,6 +192,11 @@ class MetricsManager:
request_prompt_tokens: "Histogram"
request_token_ratio: "Histogram"

# for pd
decode_preallocated_req_num: "Gauge"
reschedule_req_num: "Counter"

This comment was marked as outdated.

failed_recv_first_token_req_num: "Counter"

Comment thread
juncaipeng marked this conversation as resolved.
# 定义所有指标配置

# gauge指标在多进程中,会有pid隔离,需要特殊处理,因此手动定义出来
Expand All @@ -205,7 +210,13 @@ class MetricsManager:
"num_requests_waiting": {
"type": Gauge,
"name": "fastdeploy:num_requests_waiting",
"description": "Number of requests currently waiting",
"description": "Number of requests currently waiting in resource manager",
Comment thread
juncaipeng marked this conversation as resolved.
"kwargs": {},
},
"num_requests_queuing": {
"type": Gauge,
"name": "fastdeploy:num_requests_queuing",
"description": "Number of requests currently queuing in local scheduler",
"kwargs": {},
},
"gpu_cache_usage_perc": {
Expand Down Expand Up @@ -298,6 +309,12 @@ class MetricsManager:
"description": "Token-level GPU prefix cache hit rate",
"kwargs": {},
},
"decode_preallocated_req_num": {
"type": Gauge,
"name": "fastdeploy:decode_preallocated_req_num",
"description": "Number of preallocated requests in decode instance",
"kwargs": {},
},
}

METRICS = {
Expand Down Expand Up @@ -459,6 +476,18 @@ class MetricsManager:
],
},
},
"reschedule_req_num": {
"type": Counter,
"name": "fastdeploy:reschedule_req_num",
"description": "Total number of reschedule requests",
"kwargs": {},
},
"failed_recv_first_token_req_num": {
"type": Counter,
"name": "fastdeploy:failed_recv_first_token_req_num",
"description": "Total number of failed requests to receive the first token in decode",
"kwargs": {},
},
}

SPECULATIVE_METRICS = {}
Expand Down
27 changes: 18 additions & 9 deletions fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result:
llm_logger.info(self.resource_manager.info())
if self.cfg.speculative_config.method:
self._compute_speculative_status()
if not is_prefill:
self._record_completion_metrics(task, current_time)
self._record_completion_metrics(task, current_time)

This comment was marked as outdated.

self._recycle_resources(task_id, batch_id, task, result, is_prefill)
break
return result
Expand Down Expand Up @@ -565,6 +564,8 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False
if is_prefill:
start_time = time.time()
result.metrics.wait_for_sending_cache_time = time.time()
trace_print(LoggingEventName.CHECK_CACHE_TRANSFER_START, task_id, getattr(task, "user", ""))

while True:
finished_task_ids = self.engine_worker_queue.get_finished_req()
if len(finished_task_ids) > 0:
Expand All @@ -588,6 +589,7 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False
request_id=task_id,
cost_seconds=f"{time.time()-start_time:.5f}",
)
trace_print(LoggingEventName.CHECK_CACHE_TRANSFER_END, task_id, getattr(task, "user", ""))
result.metrics.send_request_output_to_decode_time = time.time()
self.split_connector.send_first_token(task.disaggregate_info, [result])
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
Expand Down Expand Up @@ -1036,8 +1038,7 @@ def _process_batch_output(self):
main_process_metrics.request_token_ratio.observe(token_ratio)
if self.cfg.speculative_config.method:
self._compute_speculative_status(result)
if not is_prefill:
self._record_completion_metrics(task, current_time)
self._record_completion_metrics(task, current_time)

This comment was marked as outdated.

log_request(
RequestLogLevel.STAGES,
message="task {request_id} received eos token. Recycling.",
Expand Down Expand Up @@ -1089,13 +1090,21 @@ def _record_first_token_metrics(self, task, current_time):

def _record_completion_metrics(self, task, current_time):
"""Record metrics when request completes"""
role = self.cfg.scheduler_config.splitwise_role
metrics = task.metrics
if metrics.engine_recv_first_token_time:
decode_time = current_time - metrics.engine_recv_first_token_time
main_process_metrics.request_decode_time.observe(decode_time)
trace_print(LoggingEventName.INFERENCE_END, task.request_id, getattr(task, "user", ""))

if role in ("mixed", "decode"):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 _record_completion_metrics 原来仅在非 prefill(if not is_prefill)时调用,现在对 prefill 角色也会调用。

其中包含 main_process_metrics.request_success_total.inc()main_process_metrics.request_inference_time.observe(...) 等统计,在 PD 分离模式下,Prefill 实例的任务完成是否应被计入 request_success_total 还是只有 Decode 实例完成才代表整个请求成功?

如果 Prefill 完成不等于请求完成,此处会导致 request_success_total 双倍计数。

if metrics.engine_recv_first_token_time:
decode_time = current_time - metrics.engine_recv_first_token_time
main_process_metrics.request_decode_time.observe(decode_time)
trace_print(LoggingEventName.INFERENCE_END, task.request_id, getattr(task, "user", ""))

if role == "prefill":
trace_print(LoggingEventName.PREFILL_INFERENCE_END, task.request_id, getattr(task, "user", ""))
elif role == "decode":
trace_print(LoggingEventName.DECODE_INFERENCE_END, task.request_id, getattr(task, "user", ""))

trace_print(LoggingEventName.POSTPROCESSING_START, task.request_id, getattr(task, "user", ""))
main_process_metrics.num_requests_running.dec(1)
main_process_metrics.request_success_total.inc()
Comment thread
juncaipeng marked this conversation as resolved.
main_process_metrics.request_inference_time.observe(current_time - metrics.inference_start_time)
main_process_metrics.request_generation_tokens.observe(self.tokens_counter[task.request_id])
Expand Down
24 changes: 24 additions & 0 deletions fastdeploy/trace/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ class LoggingEventName(Enum):
PREEMPTED = "PREEMPTED"
RESCHEDULED_INFERENCE_START = "RESCHEDULED_INFERENCE_START"

# For Prefill Instance
ASK_DECODE_RESOURCE_START = "ASK_DECODE_RESOURCE_START"
ASK_DECODE_RESOURCE_END = "ASK_DECODE_RESOURCE_END"
CHECK_CACHE_TRANSFER_START = "CHECK_CACHE_TRANSFER_START"
CHECK_CACHE_TRANSFER_END = "CHECK_CACHE_TRANSFER_END"
PREFILL_INFERENCE_END = "PREFILL_INFERENCE_END"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug 枚举命名拼写错误:DECODE_PROCESS_PREALLOCAT_REQUEST_END 缺少字母 E,应为 DECODE_PROCESS_PREALLOCATE_REQUEST_END

与对应的 DECODE_PROCESS_PREALLOCATE_REQUEST_START(第49行)命名不一致,会导致依赖 START/END 配对做耗时分析的 trace 工具无法正确匹配。

建议修复:

DECODE_PROCESS_PREALLOCATE_REQUEST_END = "DECODE_PROCESS_PREALLOCATE_REQUEST_END"

同时需同步修改 common_engine.py 中的引用:

trace_print(
    LoggingEventName.DECODE_PROCESS_PREALLOCATE_REQUEST_END,  # 修正拼写
    ...
)

# For Decode Instance
DECODE_PROCESS_PREALLOCATE_REQUEST_START = "DECODE_PROCESS_PREALLOCATE_REQUEST_START"
DECODE_PROCESS_PREALLOCAT_REQUEST_END = "DECODE_PROCESS_PREALLOCAT_REQUEST_END"
DECODE_PROCESS_PREFILLED_REQUEST_START = "DECODE_PROCESS_PREFILLED_REQUEST_START"
DECODE_PROCESS_PREFILLED_REQUEST_END = "DECODE_PROCESS_PREFILLED_REQUEST_END"
DECODE_INFERENCE_END = "DECODE_INFERENCE_END"


class StageName(Enum):
"""
Expand Down Expand Up @@ -75,4 +89,14 @@ class StageName(Enum):
LoggingEventName.WRITE_CACHE_TO_STORAGE_END: StageName.POSTPROCESSING,
LoggingEventName.POSTPROCESSING_START: StageName.POSTPROCESSING,
LoggingEventName.POSTPROCESSING_END: StageName.POSTPROCESSING,
LoggingEventName.ASK_DECODE_RESOURCE_START: StageName.SCHEDULE,
LoggingEventName.ASK_DECODE_RESOURCE_END: StageName.SCHEDULE,
LoggingEventName.CHECK_CACHE_TRANSFER_START: StageName.POSTPROCESSING,
LoggingEventName.CHECK_CACHE_TRANSFER_END: StageName.POSTPROCESSING,
LoggingEventName.PREFILL_INFERENCE_END: StageName.PREFILL,
LoggingEventName.DECODE_PROCESS_PREALLOCATE_REQUEST_START: StageName.DECODE,
LoggingEventName.DECODE_PROCESS_PREALLOCAT_REQUEST_END: StageName.DECODE,
LoggingEventName.DECODE_PROCESS_PREFILLED_REQUEST_START: StageName.DECODE,
LoggingEventName.DECODE_PROCESS_PREFILLED_REQUEST_END: StageName.DECODE,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug 映射表中引用了拼写错误的枚举名:LoggingEventName.DECODE_PROCESS_PREALLOCAT_REQUEST_END,应与修正后的枚举名保持一致。

LoggingEventName.DECODE_INFERENCE_END: StageName.DECODE,
}
Loading