Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 76 additions & 147 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1292,10 +1292,28 @@ def _insert_zmq_task_to_scheduler(self):
self.request_worker_map[req_id_for_map] = worker_pid
status_value = data.get("status", None)
if status_value is not None and status_value == RequestStatus.ABORT.value:
req_id = data["request_id"]
self.llm_logger.info(f"Receive abort request, req_id: {req_id}")
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.resource_manager.add_abort_req_ids(req_id)
if not envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.llm_logger.info("abort requests only supported in ENABLE_V1_KVCACHE_SCHEDULER")
else:
abort_all = data.get("abort_all", False)
req_ids = data.get("req_ids", [])
if abort_all or req_ids:
target_req_ids = self._resolve_abort_targets(abort_all, req_ids)
self.llm_logger.info(
f"Receive abort_reqs, abort_all={abort_all}, "
f"input={len(req_ids)}, resolved={len(target_req_ids)}"
)
for req_id in target_req_ids:
self.resource_manager.add_abort_req_ids(req_id)
time.sleep(0.0001)
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在接收线程里对每个 req_idtime.sleep(0.0001) 会把 abort_all 的耗时线性放大(例如上万请求会变成秒级阻塞),并且阻塞期间无法及时处理其他 ZMQ 消息。建议改为不 sleep,或仅在必要时做批量/指数退避(例如每 N 个 sleep 一次),并评估是否需要把 add_abort_req_ids 放到更合适的异步/批处理路径。

Suggested change
time.sleep(0.0001)

Copilot uses AI. Check for mistakes.
if self.cfg.scheduler_config.splitwise_role != "prefill":
results = self._build_abort_results(target_req_ids)
Comment thread
qwes5s5 marked this conversation as resolved.
if results:
self.scheduler.put_results(results)
else:
req_id = data["request_id"]
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里 else 分支直接访问 data["request_id"],当收到的 ABORT 消息是批量格式但 abort_all=False && req_ids=[] 时(例如 abort_reqs(req_ids=[])),该字段可能不存在,会导致 KeyError 并中断 ZMQ 接收线程。建议改为使用 data.get("request_id") 并在缺失时记录 warning 后跳过,或统一要求批量/单个 abort 消息都携带 request_id

Suggested change
req_id = data["request_id"]
req_id = data.get("request_id")
if not req_id:
self.llm_logger.warning(
"Receive abort request without request_id, skip invalid abort message"
)
continue

Copilot uses AI. Check for mistakes.
self.llm_logger.info(f"Receive abort request, req_id: {req_id}")
self.resource_manager.add_abort_req_ids(req_id)
continue
err_msg = None
try:
Expand Down Expand Up @@ -1584,149 +1602,6 @@ def _control_update_weights(self, control_request: ControlRequest) -> Optional[d

return responses

def _control_abort_requests(self, control_req: ControlRequest):
if not envs.ENABLE_V1_KVCACHE_SCHEDULER:
raise Exception("abort_requests only supported in ENABLE_V1_KVCACHE_SCHEDULER")
args = control_req.get_args()
abort_all = args.get("abort_all", False)
req_ids = args.get("req_ids", [])
matched_input_ids = set()
now_reqs = list(set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys()))

# Step 1: Determine target request list
if abort_all:
# all requests in running + waiting
target_req_ids = now_reqs
else:
# filter out requests that actually exist
target_req_ids = []
for rid in req_ids:
if rid in now_reqs:
target_req_ids.append(rid)
matched_input_ids.add(rid)
elif f"{rid}_0" in now_reqs:
target_req_ids.append(f"{rid}_0")
matched_input_ids.add(rid)

if not target_req_ids:
return {"aborted": [], "not_found": req_ids if not abort_all else []}

# Step 2: Collect partial results
aborted_info = []
results = []
for req_id in target_req_ids:
request = self.resource_manager.requests.get(req_id)
if request is None:
scheduled_req = self.scheduler.requests.get(req_id)
if scheduled_req is None:
continue
request = scheduled_req.raw

partial_token_ids = list(request.output_token_ids)

# Construct finished response with partial results
now = time.time()
abort_metrics = RequestMetrics(
arrival_time=request.metrics.arrival_time if request.metrics else now,
inference_start_time=request.metrics.inference_start_time if request.metrics else now,
engine_recv_latest_token_time=now,
engine_recv_first_token_time=request.metrics.engine_recv_first_token_time if request.metrics else now,
request_start_time=request.metrics.arrival_time if request.metrics else now,
)
eos_token_ids = getattr(request, "eos_token_ids", [0])
result = RequestOutput(
request_id=req_id,
finished=True,
outputs=CompletionOutput(
index=0,
send_idx=len(partial_token_ids),
token_ids=[eos_token_ids[0]],
),
metrics=abort_metrics,
error_code=200,
error_msg="Aborted",
)
results.append(result)
aborted_info.append(
{
"request_id": req_id,
"output_token_count": len(partial_token_ids),
}
)

# Step 3: Execute abort — add all requests to waiting_abort_req_id_set
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
for req_id in target_req_ids:
self.resource_manager.add_abort_req_ids(req_id)
time.sleep(0.0001)
if self.cfg.scheduler_config.splitwise_role != "prefill":
self._wait_abort_complete(target_req_ids)

# Add results to scheduler, engine will have a thread calling get_results,
# then cleanup and call send_response to send to client.
# When client disconnects, send_response will automatically ignore
if self.cfg.scheduler_config.splitwise_role != "prefill":
try:
# self.send_response_server.send_response(req_id, [result])
self.scheduler.put_results(results)
except Exception:
pass # client may have disconnected

not_found = [rid for rid in req_ids if rid not in matched_input_ids] if not abort_all else []

return {"aborted": aborted_info, "not_found": not_found}

def _wait_abort_complete(self, target_req_ids, stall_timeout=1):
"""
Wait for all abort requests to complete.
- Keep monitoring as long as remaining is not empty, which means cleanup is not done yet
- If no progress within stall_timeout seconds, force cleanup requests stuck in to_be_aborted_req_id_set,
reset progress state if any, then continue monitoring
"""
target_set = set(target_req_ids)
target_set = target_set & (set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys()))
prev_remaining_count = len(target_set)
last_progress_time = time.time()
remaining = target_set & self.resource_manager.get_reqs_in_aborting()
while remaining:
alive_reqs = set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys())
finished_reqs = target_set - alive_reqs
if finished_reqs:
self.llm_logger.info(f"abort targets already finished, skip: {finished_reqs}")
for req_id in finished_reqs:
self.resource_manager.waiting_abort_req_id_set.discard(req_id)
self.resource_manager.to_be_aborted_req_id_set.discard(req_id)
target_set -= finished_reqs
remaining = target_set & self.resource_manager.get_reqs_in_aborting()
if not remaining:
self.llm_logger.info(f"all {len(target_set)} abort reqs cleaned")
return

current_count = len(remaining)
if current_count < prev_remaining_count:
# progress made: recycle_abort_task was called
self.llm_logger.info(f"abort progress: {prev_remaining_count} -> {current_count}")
last_progress_time = time.time()
prev_remaining_count = current_count

if time.time() - last_progress_time > stall_timeout:
# no progress timeout: only cleanup requests stuck in to_be_aborted (worker hasn't returned -9)
stuck = remaining & self.resource_manager.to_be_aborted_req_id_set
if stuck:
self.llm_logger.warning(
f"no abort progress for {stall_timeout}s, "
f"force cleanup {len(stuck)} stuck requests (in to_be_aborted)"
)
for req_id in list(stuck):
self.llm_logger.warning(f"force cleanup stuck req_id:{req_id}")
self.resource_manager.recycle_abort_task(req_id)
# reset progress state
last_progress_time = time.time()
prev_remaining_count = current_count - len(stuck)
# else: remaining are all in waiting_abort_req_id_set, waiting for natural flow

time.sleep(0.005)

def _parse_tags(self, control_request: ControlRequest):
"""
Parse tags from control request.
Expand Down Expand Up @@ -2740,3 +2615,57 @@ def detect_thread():
if hasattr(self, "log_dir") and hasattr(self, "paddle_log_dir"):
ensure_workerlog_alias(self.log_dir, self.paddle_log_dir)
return True

def _resolve_abort_targets(self, abort_all, req_ids):
"""
Resolve abort target request IDs.
"""
now_reqs = set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys())
self.llm_logger.debug(f"now_reqs: {now_reqs}")
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now_reqs 可能非常大(并且包含请求 ID),这里直接把完整集合打到 debug 日志会导致日志量激增、影响排障信噪比,甚至可能造成敏感信息暴露风险。建议仅记录数量/采样(例如 len(now_reqs) + 前 N 个),或用更低成本的结构化字段。

Suggested change
self.llm_logger.debug(f"now_reqs: {now_reqs}")
sample_size = 10
now_reqs_sample = list(now_reqs)[:sample_size]
self.llm_logger.debug(
f"now_reqs count={len(now_reqs)}, sample={now_reqs_sample}, "
f"sample_truncated={len(now_reqs) > sample_size}"
)

Copilot uses AI. Check for mistakes.

Comment thread
qwes5s5 marked this conversation as resolved.
if abort_all:
return list(now_reqs)

target_req_ids = []
for rid in req_ids:
if rid in now_reqs:
target_req_ids.append(rid)
elif f"{rid}_0" in now_reqs:
target_req_ids.append(f"{rid}_0")
return target_req_ids

def _build_abort_results(self, target_req_ids):
"""Build abort finished results for streaming client notification."""
results = []
for req_id in target_req_ids:
request = self.resource_manager.requests.get(req_id)
if request is None:
scheduled_req = self.scheduler.requests.get(req_id)
if scheduled_req is None:
continue
request = scheduled_req.raw

partial_token_ids = list(request.output_token_ids)
now = time.time()
result = RequestOutput(
request_id=req_id,
finished=True,
outputs=CompletionOutput(
index=0,
send_idx=len(partial_token_ids),
token_ids=[],
),
Comment thread
qwes5s5 marked this conversation as resolved.
metrics=RequestMetrics(
arrival_time=request.metrics.arrival_time if request.metrics else now,
inference_start_time=request.metrics.inference_start_time if request.metrics else now,
engine_recv_latest_token_time=now,
engine_recv_first_token_time=(
request.metrics.engine_recv_first_token_time if request.metrics else now
),
request_start_time=request.metrics.arrival_time if request.metrics else now,
),
error_code=200,
error_msg="Aborted",
)
results.append(result)
return results
12 changes: 12 additions & 0 deletions fastdeploy/entrypoints/engine_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,18 @@ async def abort(self, request_id, n=1) -> None:
request_ids=",".join(request_ids),
)

async def abort_reqs(self, req_ids=None, abort_all=False):
"""
Fire-and-forget: abort multiple requests in one ZMQ message.
Used by /v1/abort_requests API.
"""
data = {
"status": RequestStatus.ABORT.value,
"abort_all": abort_all,
"req_ids": req_ids or [],
Comment on lines +1138 to +1141
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abort_reqsabort_all=Falsereq_ids 为空列表(例如调用方显式传 req_ids=[])时仍会发送一条不带 request_id 的 ABORT 消息。引擎侧当前在这种情况下会走到读取 data["request_id"] 的分支并触发 KeyError。建议在这里做参数校验:若 not abort_all and not req_ids 则直接 return/raise(或在 data 中补充一个 request_id 字段),保证消息格式与引擎侧逻辑一致。

Suggested change
data = {
"status": RequestStatus.ABORT.value,
"abort_all": abort_all,
"req_ids": req_ids or [],
normalized_req_ids = req_ids or []
if not abort_all and not normalized_req_ids:
return
data = {
"status": RequestStatus.ABORT.value,
"abort_all": abort_all,
"req_ids": normalized_req_ids,

Copilot uses AI. Check for mistakes.
}
Comment thread
qwes5s5 marked this conversation as resolved.
self._send_task(data)

def process_messages(self, messages):
for message in messages:
if message["role"] == "assistant" and "tool_calls" in message:
Expand Down
9 changes: 2 additions & 7 deletions fastdeploy/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,13 +496,8 @@ async def abort_requests(request: Request):
if not abort_all and not req_ids:
return JSONResponse(status_code=400, content={"error": "must provide abort_all=true or req_ids"})

control_request = ControlRequest(
request_id=f"control-{uuid.uuid4()}",
method="abort_requests",
args={"abort_all": abort_all, "req_ids": req_ids or []},
)
control_response = await app.state.engine_client.run_control_method(control_request)
return control_response.to_api_json_response()
await app.state.engine_client.abort_reqs(req_ids=req_ids or [], abort_all=abort_all)
return Response(status_code=200)
Comment thread
qwes5s5 marked this conversation as resolved.
Comment thread
qwes5s5 marked this conversation as resolved.
Comment thread
qwes5s5 marked this conversation as resolved.
Comment on lines +499 to +500
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

当前 API 直接返回 200 且不关心引擎是否实际支持 abort:当 ENABLE_V1_KVCACHE_SCHEDULER 关闭时,引擎侧只会打印 info 并忽略 ABORT 消息,客户端仍收到 200 会被误导。建议在 API 层或 EngineClient 层增加能力校验(例如在不支持时返回 501/400 并给出明确错误信息,或至少在响应里返回 status=unsupported)。

Copilot uses AI. Check for mistakes.
Comment on lines +499 to +500
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里将 /v1/abort_requests 的响应从原先的 JSON(包含 aborted/not_found 等)改为直接返回空 body 的 200,会与现有文档描述(docs/online_serving/README.md、docs/online_serving/router.md 等均说明会返回中断结果与 token 数)不一致,且可能破坏依赖旧响应结构的客户端。建议:要么保持原有响应 schema(即使 result 为空/仅返回 request_id+status),要么同步更新对应文档并在 release note 中明确该 breaking change。

Copilot uses AI. Check for mistakes.


def wrap_streaming_generator(original_generator: AsyncGenerator):
Expand Down
4 changes: 2 additions & 2 deletions fastdeploy/entrypoints/openai/response_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async def process_response_chat(self, request_outputs, stream, include_stop_str_
decode_type = request_output["outputs"].get("decode_type", 0) or 0
if decode_type == 0: # text
tts = req_id in self._audio_buffer
if token_ids[-1] == self.eos_token_id:
if token_ids and token_ids[-1] == self.eos_token_id:
all_audio_tokens = self._audio_buffer.pop(req_id, [])
else:
all_audio_tokens = None
Expand Down Expand Up @@ -186,7 +186,7 @@ async def process_response_chat(self, request_outputs, stream, include_stop_str_
else:
self.accumulate_token_ids(request_output)
token_ids = request_output["outputs"]["token_ids"]
if token_ids[-1] == self.eos_token_id:
if token_ids and token_ids[-1] == self.eos_token_id:
multipart = []
num_image_tokens = 0
for part in self._multipart_buffer:
Expand Down
45 changes: 11 additions & 34 deletions fastdeploy/router/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import aiohttp
import uvicorn
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, ORJSONResponse, Response, StreamingResponse
from fastapi.responses import ORJSONResponse, Response, StreamingResponse

from fastdeploy.router.utils import (
InstanceInfo,
Expand All @@ -29,6 +29,7 @@
from fastdeploy.utils import router_logger as logger

app = FastAPI()
_background_tasks = set()


@dataclass
Expand Down Expand Up @@ -588,39 +589,15 @@ async def abort_requests(request: Request):
decode_servers = app.state.router.decode_servers
all_servers = prefill_servers + decode_servers

async with aiohttp.ClientSession() as session:
tasks = [session.post(f"{server.url()}/v1/abort_requests", json=body) for server in all_servers]
responses = await asyncio.gather(*tasks, return_exceptions=True)

# Aggregate results from Node D only
all_aborted = []
all_not_found = []
errors = []
decode_start = len(prefill_servers)
for i, (server, resp) in enumerate(zip(all_servers, responses)):
if i < decode_start:
continue
if isinstance(resp, Exception):
errors.append({"server": server.url(), "error": str(resp)})
elif resp.status == 200:
data = await resp.json()
result = data.get("result") or {}
all_aborted.extend(result.get("aborted", []))
all_not_found.extend(result.get("not_found", []))
else:
errors.append({"server": server.url(), "status": resp.status})

return JSONResponse(
content={
"request_id": f"router-{uuid4()}",
"status": "success" if not errors else "error",
"error_message": None if not errors else str(errors),
"result": {
"aborted": all_aborted,
"not_found": list(set(all_not_found)),
},
}
)
async def _forward_abort():
async with aiohttp.ClientSession() as session:
tasks = [session.post(f"{server.url()}/v1/abort_requests", json=body) for server in all_servers]
await asyncio.gather(*tasks, return_exceptions=True)

Comment on lines +593 to +596
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

当前实现是 fire-and-forget,但这里完全忽略了下游返回值/异常:gather(return_exceptions=True) 的结果没有任何日志记录,且没有设置请求超时;如果某个下游实例 hang 住,task 可能长期不结束,_background_tasks 会持续累积。建议为 ClientSession/post 增加合理的 timeout,并至少对异常/非 200 做 warning 日志(必要时带 server url),以便线上排障与避免后台任务泄漏。

Suggested change
async with aiohttp.ClientSession() as session:
tasks = [session.post(f"{server.url()}/v1/abort_requests", json=body) for server in all_servers]
await asyncio.gather(*tasks, return_exceptions=True)
timeout = aiohttp.ClientTimeout(total=5)
async def _post_abort(session: aiohttp.ClientSession, server):
server_url = f"{server.url()}/v1/abort_requests"
try:
async with session.post(server_url, json=body) as resp:
if resp.status != 200:
logger.warning(f"Abort request forwarding to {server_url} returned status {resp.status}")
except asyncio.TimeoutError:
logger.warning(f"Abort request forwarding to {server_url} timed out")
except aiohttp.ClientError as exc:
logger.warning(f"Abort request forwarding to {server_url} failed: {exc}")
except Exception:
logger.warning(
f"Unexpected error when forwarding abort request to {server_url}:\n{traceback.format_exc()}"
)
async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = [_post_abort(session, server) for server in all_servers]
await asyncio.gather(*tasks)

Copilot uses AI. Check for mistakes.
task = asyncio.create_task(_forward_abort())
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
return Response(status_code=200)
Comment thread
qwes5s5 marked this conversation as resolved.
Comment thread
qwes5s5 marked this conversation as resolved.


Comment on lines +600 to 602
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Router 的 /v1/abort_requests 现在直接返回空 body 的 200(异步转发),与现有文档中“返回 aborted/not_found 与 token 数”的描述不一致,也可能破坏客户端兼容性。建议保持原响应 schema(例如返回 request_id + status,并注明为异步受理),或同步更新 docs/online_serving/router.md 等文档并明确该接口语义变更。

Suggested change
return Response(status_code=200)
response_payload = {"status": "accepted"}
if isinstance(body, dict):
if "request_id" in body:
response_payload["request_id"] = body["request_id"]
elif "request_ids" in body:
response_payload["request_ids"] = body["request_ids"]
return ORJSONResponse(response_payload, status_code=200)

Copilot uses AI. Check for mistakes.
def launch_router(router_args: RouterArgs):
Expand Down
6 changes: 4 additions & 2 deletions fastdeploy/scheduler/local_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,10 @@ def _recycle(self, request_id: Optional[str] = None):
if request_id is not None:
self.requests.pop(request_id, None)
self.responses.pop(request_id, None)
self.ids.pop(self.ids.index(request_id))
self.ids_read_cursor -= 1
idx = self.ids.index(request_id)
self.ids.pop(idx)
if idx < self.ids_read_cursor:
Comment thread
qwes5s5 marked this conversation as resolved.
self.ids_read_cursor -= 1
Comment thread
qwes5s5 marked this conversation as resolved.
return

if self.max_size <= 0:
Expand Down
Loading
Loading