[Cherry-Pick][Optimization] Support multimodal runner for image/video… #7468#7595
[Cherry-Pick][Optimization] Support multimodal runner for image/video… #7468#7595xiaoxiaohehe001 wants to merge 5 commits intoPaddlePaddle:developfrom
Conversation
…dle#7237)(PaddlePaddle#7425)(PaddlePaddle#7426) (PaddlePaddle#7436) * [Optimization] Auto set num_max_dispatch_tokens_per_rank (PaddlePaddle#7237) * auto set num_max_dispatch_tokens_per_rank * fix ci * fix ci * fix ci * fix deep gemm import (PaddlePaddle#7425) * allow parallel dp starting (PaddlePaddle#7426)
…le#7437) * support blackwell gemm in ll * add attr * opt quant
This reverts commit 6727df8.
…addle#7299 The revert of PaddlePaddle#6680 incorrectly restored gpu_cache_lock (IPCLock) usage, but IPCLock was already removed in commit 26d6a20 (PaddlePaddle#7299). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… feature processing (PaddlePaddle#7485)
|
Thanks for your contribution! |
There was a problem hiding this comment.
Pull request overview
该 PR 从 #7485 cherry-pick,引入多模态(图像/视频)runner 对“预编码特征 + attention mask offsets”的处理链路,增强 GPU Model Runner 在 mm prefill 阶段的输入组织与 forward meta 传递能力,并对 EP/DP 调度与部分 MoE/FP8 路径做了配套调整。
Changes:
- GPU Model Runner:支持 image/video 预编码特征输入,prefill 阶段计算并传递
attn_mask_offsets/decode_states,并清理中间特征避免内存泄漏 - InputBatch:新增并维护多模态相关字段(image/video grid/features、attn mask offsets、decode states)在 swap/reset/resize 等流程中的一致性
- 调度/运行时:DP scheduler 拉取请求逻辑更新、ResourceManagerV1 调度限制移除、EP batched token 超时环境变量补充,以及若干与 worker/event loop、MoE backends 相关的配套改动
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/splitwise/test_internal_adapter_utils.py | 适配 internal adapter server_info 字段变更的测试桩 |
| tests/scheduler/test_dp_scheduler.py | 补充 DP scheduler get_requests 边界条件单测 |
| tests/engine/test_common_engine.py | 适配 engine_worker_queue mock 接口变化 |
| tests/ci_use/metrics/test_metrics.py | 启用 metrics + clear_load_weight 的端到端测试流程 |
| fastdeploy/worker/worker_process.py | 调整 worker event loop:eplb 调用位置、任务处理与日志行为 |
| fastdeploy/worker/input_batch.py | InputBatch 新增/维护多模态字段(grid/features/attn mask/decode states) |
| fastdeploy/worker/gpu_model_runner.py | 多模态预编码特征处理、prefill attn mask offsets 更新、forward meta 传参与清理 |
| fastdeploy/splitwise/internal_adapter_utils.py | server_info 上报字段口径调整(unhandled_request_num) |
| fastdeploy/scheduler/dp_scheduler.py | get_requests 支持按资源/token/batch/timeout 聚合拉取 |
| fastdeploy/model_executor/models/deepseek_v3.py | deep_gemm 引用路径调整以复用 fp8_utils 的加载逻辑 |
| fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py | 移除 Blackwell 相关 scale 处理分支(清理 envs 依赖) |
| fastdeploy/model_executor/layers/moe/fused_moe_blackwell_backend.py | Blackwell MoE 权重/quantize/ep prefill/decode 路径调整 |
| fastdeploy/envs.py | 新增 FD_EP_BATCHED_TOKEN_TIMEOUT 环境变量 |
| fastdeploy/engine/sched/resource_manager_v1.py | 移除 Ernie5 mm 请求的特殊调度限制,统一调度逻辑 |
| fastdeploy/engine/engine.py | 等待 expert service 启动的 sleep 频率调整 |
| fastdeploy/engine/common_engine.py | 调整 v1 调度线程拉取/投递逻辑(移除 engine_forward_signal 相关判断) |
| fastdeploy/config.py | 自动调整 num_max_dispatch_tokens_per_rank 以匹配 max_num_seqs/SpecDec 配置 |
| docs/zh/usage/environment_variables.md | 文档补充 FD_EP_BATCHED_TOKEN_TIMEOUT(中文) |
| docs/usage/environment_variables.md | 文档补充 FD_EP_BATCHED_TOKEN_TIMEOUT(英文) |
| while True: | ||
| # run eplb | ||
| self._run_eplb(tp_rank) |
There was a problem hiding this comment.
_run_eplb(tp_rank) 被移动到 while True 的每一次循环开头(447-448 行)。该函数内部包含 broadcast/barrier 等分布式同步操作,若 eplb 开启会显著增加空转开销并可能降低推理吞吐。建议仅在需要时触发(例如检测到相关 signal 置位时),或加时间间隔控制(例如每 X 秒/每 Y step 执行一次)。
| while True: | |
| # run eplb | |
| self._run_eplb(tp_rank) | |
| eplb_check_interval_s = 1.0 | |
| next_eplb_run_time = 0.0 | |
| while True: | |
| # Avoid running EPLB synchronization on every polling iteration. | |
| if self.fd_config.eplb_config.enable: | |
| now = time.monotonic() | |
| if now >= next_eplb_run_time: | |
| self._run_eplb(tp_rank) | |
| next_eplb_run_time = now + eplb_check_interval_s |
| metrics_url = f"http://0.0.0.0:{FD_METRICS_PORT}/metrics" | ||
|
|
||
| # async_concurrency(n=10) | ||
| async_concurrency(n=10) | ||
|
|
||
| # time.sleep(0.3) | ||
| time.sleep(0.3) | ||
|
|
||
| # ===== clear_load_weight ===== | ||
| # clear_url = f"http://0.0.0.0:{FD_API_PORT}/clear_load_weight" | ||
| # print("Calling clear_load_weight...") | ||
| # r = requests.get(clear_url, timeout=30) | ||
| # assert r.status_code == 200, f"clear_load_weight failed: {r.status_code}" | ||
|
|
||
| # metrics = get_metrics_dict(metrics_url) | ||
| # running = metrics["fastdeploy:num_requests_running"] | ||
| # waiting = metrics["fastdeploy:num_requests_waiting"] | ||
|
|
||
| # print( | ||
| # "ASSERT after the clear_load_weight operation, the value is 0 (Request interruption stopped inference, and related requests were cleared):", | ||
| # running, | ||
| # "waiting:", | ||
| # waiting, | ||
| # ) | ||
| clear_url = f"http://0.0.0.0:{FD_API_PORT}/clear_load_weight" | ||
| print("Calling clear_load_weight...") | ||
| r = requests.get(clear_url, timeout=30) | ||
| assert r.status_code == 200, f"clear_load_weight failed: {r.status_code}" | ||
|
|
||
| metrics = get_metrics_dict(metrics_url) | ||
| running = metrics["fastdeploy:num_requests_running"] | ||
| waiting = metrics["fastdeploy:num_requests_waiting"] | ||
|
|
||
| print( | ||
| "ASSERT after the clear_load_weight operation, the value is 0 (Request interruption stopped inference, and related requests were cleared):", | ||
| running, | ||
| "waiting:", | ||
| waiting, | ||
| ) | ||
| # assert running == 0 and waiting == 0, "Expected both running and waiting to be 0 after clear_load_weight" |
There was a problem hiding this comment.
这个用例从 pass # not stable 变为默认执行(217-239 行),会启动完整服务进程并发请求并依赖本地 GPU/模型文件,极易导致 CI 波动或在非 e2e 环境下失败。同时当前关键断言(running/waiting==0)仍被注释掉,测试实际没有验证预期行为。建议:1) 增加 pytest marker(如 slow/e2e)并在默认单测中 skip;2) 或通过环境变量显式启用;3) 恢复并稳定断言逻辑(必要时增加重试/等待条件)。
| "available_resource": float(1.0 * available_block_num / self.cfg.cache_config.total_block_num), | ||
| "max_batch_size": int(available_batch_size), | ||
| "max_input_token_num": self.cfg.model_config.max_model_len, | ||
| "unhandled_request_num": unhandled_request_num, | ||
| "unhandled_request_num": self.engine.scheduler.get_unhandled_request_num(), | ||
| "available_batch": int(self.engine.resource_manager.available_batch()), |
There was a problem hiding this comment.
unhandled_request_num 现在仅返回 scheduler.get_unhandled_request_num()(65 行)。在 ENABLE_V1_KVCACHE_SCHEDULER 下,已有逻辑会把 scheduler_unhandled_request_num 与 resource_manager.waiting 共同作为队列长度(resource_manager_v1.py:1588-1589)。这里去掉 waiting 可能导致对外上报的排队长度偏小,从而影响外部调度/负载均衡判断。建议对齐 ResourceManagerV1 的口径(至少加上 len(resource_manager.waiting),或直接复用 engine.resource_manager.scheduler_unhandled_request_num/queue_cnt)。
| current_prefill_tokens += request.prompt_tokens_ids_len | ||
| required_total_blocks += required_input_blocks + reserved_output_blocks | ||
| if required_total_blocks > available_blocks: | ||
| break | ||
|
|
There was a problem hiding this comment.
这里在判断是否超过 available_blocks 之前就累加了 current_prefill_tokens / required_total_blocks(156-158 行),当某个 request 因资源不足未被加入 requests 时,这两个计数也会被“错误”计入,进而可能导致提前触发 max_num_batched_tokens / timeout 分支并返回空 batch。建议把计数累加放到资源检查通过之后,或先计算 needed_blocks=required_input_blocks+reserved_output_blocks,确认 required_total_blocks+needed_blocks <= available_blocks 后再累加并推进 ids_read_cursor。
| current_prefill_tokens += request.prompt_tokens_ids_len | |
| required_total_blocks += required_input_blocks + reserved_output_blocks | |
| if required_total_blocks > available_blocks: | |
| break | |
| needed_blocks = required_input_blocks + reserved_output_blocks | |
| if required_total_blocks + needed_blocks > available_blocks: | |
| break | |
| current_prefill_tokens += request.prompt_tokens_ids_len | |
| required_total_blocks += needed_blocks |
| if self.enable_mm: | ||
| self.share_inputs["decode_states"][idx, 0] = 0 | ||
| inputs = request.multimodal_inputs | ||
| # mm attention_mask | ||
| attn_offset_len = prefill_end_index - prefill_start_index | ||
| if inputs.get("attention_mask_offset", None) is None: | ||
| attention_mask_offset_slice = np.arange(prefill_start_index, prefill_end_index, dtype=np.int32) | ||
| else: | ||
| attention_mask_offset_slice = np.asarray( | ||
| inputs["attention_mask_offset"][prefill_start_index:prefill_end_index], dtype=np.int32 | ||
| ) |
There was a problem hiding this comment.
enable_mm 场景下这里直接对 request.multimodal_inputs 调用 get(838 行),但 Request.multimodal_inputs 默认可能是 None(纯文本请求也会走到这段逻辑),会触发 AttributeError。建议在使用前把 inputs 规范化为 {}(例如 inputs = request.multimodal_inputs or {}),或仅在 inputs 为 dict 时才走 attention_mask_offset 分支。
| if ( | ||
| inputs is not None | ||
| and inputs.get("image_feature_urls", None) is not None | ||
| and len(inputs["image_feature_urls"]) > 0 | ||
| ): | ||
| multi_vision_inputs["image_grid_thws"].extend( | ||
| inputs["image_grid_thws"][request.image_start : request.image_end] | ||
| ) | ||
| image_feature = inputs["image_features"][request.image_start : request.image_end] |
There was a problem hiding this comment.
预编码特征分支里直接使用 inputs["image_grid_thws"] / inputs["image_features"](634-637 行)。如果上游只提供了 image_feature_urls(资源管理器会填充 image_features),但没有提供 image_grid_thws,会在这里 KeyError。建议对这些必需字段做显式校验并给出清晰错误(例如设置 request.error_code/error_message 或 raise),避免 worker 进程异常退出。
| if isinstance(image_feature[0], paddle.Tensor) and len(image_feature[0].shape) == 2: | ||
| # Enable encode vision_embedding | ||
| for image_feature_tensor in image_feature: | ||
| if image_feature_tensor.shape[1] != self.fd_config.model_config.hidden_size: | ||
| logger.error( | ||
| f"Shape mismatch: expected shape={self.fd_config.model_config.hidden_size}, \ | ||
| but got {image_feature_tensor.shape}" | ||
| ) | ||
| image_features_gpu = [vf.cuda() for vf in image_feature] | ||
| image_embeds = paddle.concat(image_features_gpu, axis=0) | ||
| multi_vision_inputs["image_features"].append(image_embeds) | ||
| logger.info("Enable Encode image embedding.") |
There was a problem hiding this comment.
这里检测到 image_feature_tensor 的 hidden_size 不匹配时仅 logger.error(643-647 行)但仍继续 concat/下发给模型,后续更可能在模型侧以更隐蔽的方式报错。建议在发现 shape 不一致时直接中止该请求的处理(抛出异常或把 request 标记为失败并返回错误响应),并在错误信息里包含期望维度与实际维度。
| if self.exist_task_signal.value[0] == ExistTaskStatus.EXIST or self.task_queue.read_finish_flag.get() == 1: | ||
| logger.debug(f"Rank: {self.local_rank} Detected new requests.") | ||
| self.engine_forward_signal.value[0] = 1 | ||
| logger.info(f"Rank: {self.local_rank} Detected new requests.") |
There was a problem hiding this comment.
"Detected new requests" 这类高频日志从 debug 调整为 info(526 行)可能在高并发下造成日志洪泛并影响性能/可观测性(日志成本显著高于计算)。建议保持 debug 级别,或增加采样/节流(例如每隔 N 次或每秒最多打印一次)。
| logger.info(f"Rank: {self.local_rank} Detected new requests.") | |
| logger.debug(f"Rank: {self.local_rank} Detected new requests.") |
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 AI Code Review |
2026-04-23 20:43:35
📋 Review 摘要
PR 概述:支持多模态 Runner 的图像/视频特征处理流程,同时优化 EP 并行调度、Blackwell MoE 权重量化加载及引擎前向信号机制
变更范围:Worker/InputBatch(多模态)、Scheduler(DP/EP)、Engine(DP 启动流程)、MoE Blackwell Backend(权重量化重构)、ResourceManager(Ernie5 调度规则)
影响面 Tag:[Optimization] [Scheduler] [Engine] [Models] [OP]
📝 PR 规范检查
标题末尾使用了 … 省略号截断了描述内容,建议补全标题以符合规范:
标题建议(可直接复制):
[Cherry-Pick][Optimization] Support multimodal runner for image/video processing (#7468)
问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🔴 Bug | fastdeploy/worker/input_batch.py:481 |
swap_states 第一分支(enable_mm)缺少 attn_mask_offsets swap,与第二分支不一致,MM 场景下序列抢占时数据可能错位 |
| 🟡 建议 | fastdeploy/scheduler/dp_scheduler.py:156 |
current_prefill_tokens 在 break 前已累加被拒请求的 token 数,导致外层循环可能提前退出,少调度请求 |
| 🟡 建议 | fastdeploy/model_executor/layers/moe/fused_moe_blackwell_backend.py:466 |
双重 .transpose([0,2,1]) 互相抵消,最终结果等价于仅 .contiguous(),可能是笔误 |
| ❓ 疑问 | fastdeploy/engine/engine.py:845 |
新循环从 i=1 开始,跳过了等待第 0 个 DP 进程初始化信号的逻辑,请确认意图 |
总体评价
本 PR 多模态特征处理逻辑整体清晰,Blackwell MoE Backend 重构幅度较大且方向正确。建议重点修复 input_batch.py 中 swap 不一致问题(存在 MM 生产环境 bug 风险),并请作者补充说明 engine.py 跳过 i=0 信号等待的原因。
| self.image_features_list[i1], | ||
| ) | ||
| swap_data(self.share_inputs["rope_emb"], i1, i2) | ||
| swap_data(self.decode_states, i1, i2) |
There was a problem hiding this comment.
🔴 Bug 第一个 swap_states 中 enable_mm 分支缺少 attn_mask_offsets swap
当前第一个 swap_states(位于 InputBatch 类的 swap_states 方法)在 enable_mm 条件下只 swap 了 decode_states 和 attn_mask_offsets_full,缺少 attn_mask_offsets(1D 扁平张量)。
而第二个 swap_states(行 936-939)在同样的 enable_mm 条件下同时 swap 了 decode_states、attn_mask_offsets、attn_mask_offsets_full、attn_mask_offsets_decoder 四个字段。
两处实现不一致,当 MM 场景下发生序列抢占时,attn_mask_offsets 对应的 slot 数据不会随 batch 位置交换,可能导致 attention mask 计算错误。
建议修复:
swap_data(self.decode_states, i1, i2)
swap_data(self.attn_mask_offsets, i1, i2) # 补充此行
swap_data(self.attn_mask_offsets_full, i1, i2)| for request_id in batch_ids: | ||
| request = self.requests[request_id] | ||
| required_input_blocks = self.calc_required_blocks(request.prompt_tokens_ids_len, block_size) | ||
| current_prefill_tokens += request.prompt_tokens_ids_len |
There was a problem hiding this comment.
🟡 建议 current_prefill_tokens 在 break 判断前已累加被拒请求的 token 数
当前逻辑:先累加 current_prefill_tokens(第 156 行),再判断 required_total_blocks > available_blocks(第 158 行)并 break。触发 break 时该请求并未加入 requests,但 current_prefill_tokens 已经包含了它的 token 数。
外层退出条件(第 169 行):
if (current_prefill_tokens > max_num_batched_tokens) or ...会因为这个虚增的 token 数而提前退出 while 循环,可能导致本轮少调度请求,降低批处理效率。
建议将累加移至 break 判断之后(或在 break 前先回滚):
required_total_blocks += required_input_blocks + reserved_output_blocks
if required_total_blocks > available_blocks:
break
current_prefill_tokens += request.prompt_tokens_ids_len # 移到此处
requests.append(request.raw)| ) | ||
| weight[expert_id].copy_(w_q, False) | ||
| scale_list.append(s_ue8m0) | ||
| scale = paddle.to_tensor(scale_list).transpose([0, 2, 1]).contiguous().transpose([0, 2, 1]) |
There was a problem hiding this comment.
🟡 建议 双重 .transpose([0, 2, 1]) 互相抵消,可能是笔误
scale = paddle.to_tensor(scale_list).transpose([0, 2, 1]).contiguous().transpose([0, 2, 1])执行链为:原始形状 [E, H, W] → 第一次转置 [E, W, H] → contiguous → 第二次转置 [E, H, W],最终形状与输入完全相同,两次转置互相抵消,实际效果等价于只做了 .contiguous()。
如果本意是想把 scale 排列为 [E, W, H] 供后续 blackwell_ops.unpack_and_convert_scale 使用,应改为:
scale = paddle.to_tensor(scale_list).transpose([0, 2, 1]).contiguous()请确认是否为笔误。
| ) | ||
| self.dp_processed[-1].start() | ||
|
|
||
| for i in range( |
There was a problem hiding this comment.
❓ 疑问 新循环从 i=1 开始,跳过等待第 0 个 DP 进程(i=0)的初始化信号
原始逻辑:在同一个循环中 start() 后立即 wait 对应 launched_expert_service_signal,每个 DP 进程串行启动并等待初始化完成。
修改后逻辑:先在外层循环全部 start(),再用 range(1, N) 等待 i=1 到 i=N-1 的信号,跳过了 i=0 进程的信号检查。
请确认:
i=0是否为当前进程(本机已初始化,无需等待)?- 如果
i=0是独立子进程,跳过等待是否会导致后续代码(如发送首批任务)在i=0尚未完成初始化时就执行,产生竞态条件?
Motivation
支持多模态 runner 中图像/视频特征处理流程,增强 GPU Model Runner 对预编码多模态特征的处理能力。
Changes
Cherry-Pick from #7485
fastdeploy/worker/gpu_model_runner.py
image_feature_urls预编码图像特征的处理逻辑,支持直接传入已编码的 image embedding,跳过 vision encoder 计算image_grid_thws和video_features/video_grid_thws的传递与管理attn_mask_offsets、decode_states)update_attn_mask_offsets在 forward 前更新 attention maskattn_mask_offsets传入 forward meta,供模型推理使用idx排序,确保处理顺序一致性image_features/video_features等中间状态,防止内存泄漏fastdeploy/worker/input_batch.py
image_grid_thws、video_features、video_grid_thws、video_infinity_scales字段decode_states、attn_mask_offsets、attn_mask_offsets_fulltensor 初始化swap_data、reset、resize等操作中补齐新增字段的维护逻辑generated_modality在 swap 和 reset 中的处理(之前遗漏)fastdeploy/engine/sched/resource_manager_v1.py
get_enough_request),统一调度逻辑Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.