Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion fastdeploy/cache_manager/cache_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""

from dataclasses import dataclass
from typing import List
from typing import List, Optional


@dataclass(frozen=True, kw_only=True)
Expand All @@ -35,3 +35,8 @@ class ReadStorageTask(CacheTask):
@dataclass(frozen=True, kw_only=True)
class WriteStorageTask(CacheTask):
timeout: float = 30.0
# Used in FD_AS_ONLY_FLUSH mode to indicate whether cache is present on this node.
# True = cache exists (request finish), False = cache gone (CPU eviction), None = not applicable.
flush_cache_exists: Optional[bool] = None
# Block index to start the write/flush operation from. Defaults to 0 (all blocks).
start_write_block_idx: int = 0
31 changes: 31 additions & 0 deletions fastdeploy/cache_manager/cache_transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,34 @@ def _run_write_back_storage(
)
return 0

def _flush_only_storage_task(self, task: WriteStorageTask):
"""
AS-only flush mode: skip actual storage write, only report cache index to AttentionStore.
Used when FD_AS_ONLY_FLUSH is enabled — AS acts as index-only (no data storage).

Args:
task: WriteStorageTask with flush_cache_exists indicating cache state:
True/None = cache present on this node (request finish)
False = cache gone from this node (CPU eviction)
"""
try:
if (self.rank == 0) and self.storage_backend_type == "attention_store":
reside_in_gpu = task.flush_cache_exists if task.flush_cache_exists is not None else True
self.storage_backend.flush_token_index(
task.task_id, task.token_ids, task.start_write_block_idx, reside_in_gpu
)
logger.info(
f"[AS_ONLY_FLUSH] flush token index reside_in_gpu={reside_in_gpu} "
f"start_block_idx={task.start_write_block_idx} for task {task.task_id}"
)
except Exception as e:
logger.warning(f"[AS_ONLY_FLUSH] Failed to flush token index for task {task.task_id}, error: {e}")
result = (CacheStatus.GPU2STORAGE, task.task_id, task.keys, [])
self.cache_task_queue.swap_to_storage_barrier.wait()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 _flush_only_storage_taskreside_in_gpu 语义存疑

task.flush_cache_exists is None 时,代码回退为 reside_in_gpu=True,注释说明此场景是「request finish」,语义上 cache 仍在 GPU,看起来合理。但 None 的含义是「不适用」,若调用方并非 request-finish 场景而忘记赋值,这里会静默传入 True,产生错误的索引状态。

建议在此处添加日志警告,明确 None 仅允许在请求完成(非 flush-only)路径下出现:

if task.flush_cache_exists is None:
    logger.warning("[AS_ONLY_FLUSH] flush_cache_exists is None, treating as cache present (reside_in_gpu=True)")

if self.rank == 0:
self.cache_task_queue.swap_to_storage_barrier.reset()
self.cache_task_queue.put_transfer_done_signal(result)

def write_back_storage_task(self, task: WriteStorageTask):
"""
Write cache to the storage backend from the GPU memory.
Expand All @@ -957,6 +985,9 @@ def write_back_storage_task(self, task: WriteStorageTask):
self.storage_backend
), f"storage_backend not initialized, storage_backend_type: {self.storage_backend_type}"

if envs.FD_AS_ONLY_FLUSH:
return self._flush_only_storage_task(task)

try:
gpu_block_ids = task.gpu_block_ids.copy()
cpu_block_ids = [i for i in range(len(gpu_block_ids))]
Expand Down
23 changes: 20 additions & 3 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ def request_match_blocks(self, task: Request, block_size, *args):
storage_match_token_num = 0
match_storage_block_ids = []

if self.kvcache_storage_backend and no_match_token_num >= block_size:
if self.kvcache_storage_backend and no_match_token_num >= block_size and not envs.FD_AS_ONLY_FLUSH:
if not self.can_allocate_gpu_blocks(num_blocks=no_match_block_num, try_free_gpu_blocks=False):
raise Exception(
"request_match_blocks: Not enough GPU memory to allocate cache for matched Storage Cache"
Expand Down Expand Up @@ -1240,14 +1240,15 @@ def issue_write_back_storage_task(self, task: WriteStorageTask, is_sync=True):
if self.kvcache_storage_backend is None:
return

if len(task.keys) != len(task.gpu_block_ids):
if not envs.FD_AS_ONLY_FLUSH and len(task.keys) != len(task.gpu_block_ids):
err_msg = (
f"write_back_storage error: hash_keys({len(task.keys)}) != gpu_block_ids({len(task.gpu_block_ids)})"
)
logger.error(err_msg)
raise ValueError(err_msg)

self.task_write_back_event[task.task_id] = Event()
if is_sync:
self.task_write_back_event[task.task_id] = Event()
self.cache_task_queue.put_transfer_task((CacheStatus.GPU2STORAGE, task))
if is_sync:
self.wait_write_storage_task(task.task_id)
Expand Down Expand Up @@ -1536,6 +1537,7 @@ def free_cpu_block_ids(self, need_block_num):
- freed_block_num: Number of CPU blocks successfully evicted
"""
hash_value_block_ids_map = defaultdict(list)
hash_value_flush_info = {} # {input_hash_value: (token_ids, min_depth)}
total_cpu_free_count = 0
with self.request_release_lock:
while True:
Expand All @@ -1551,6 +1553,10 @@ def free_cpu_block_ids(self, need_block_num):

self.recycle_cpu_blocks(node.block_id)
hash_value_block_ids_map[node.input_hash_value].extend(reversed(tmp_block_ids))
if envs.FD_AS_ONLY_FLUSH and self.kvcache_storage_backend == "attention_store":
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 node.input_ids 是当前节点的 token ids 还是整条路径的完整 token ids?

flush_token_index 需要完整的 token_ids 才能正确定位索引。若 input_ids 只是当前节点新增的 token slice,传入的 token_ids 将不完整,导致 AttentionStore 索引更新错误。

请确认 BlockNode.input_ids 的语义,如果是 partial token ids,需要从 root 到当前节点拼接完整路径才能使用。

key = node.input_hash_value
if key not in hash_value_flush_info or node.depth < hash_value_flush_info[key][1]:
hash_value_flush_info[key] = (node.input_ids, node.depth)
logger.info(f"free_cpu_block_ids: free node {node}")

self.node_id_pool.append(node.node_id)
Expand All @@ -1575,6 +1581,17 @@ def free_cpu_block_ids(self, need_block_num):
logger.info(
"free_cpu_block_ids: after free, " + f"len(self.cpu_free_block_list) {len(self.cpu_free_block_list)}"
)
if envs.FD_AS_ONLY_FLUSH and self.kvcache_storage_backend == "attention_store" and hash_value_flush_info:
for input_hash_value, (token_ids, min_depth) in hash_value_flush_info.items():
flush_task = WriteStorageTask(
task_id=str(uuid.uuid4()),
keys=[input_hash_value],
token_ids=token_ids,
gpu_block_ids=[],
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 建议补充注释说明 is_sync=False 的原因,避免维护者误改

此处使用 is_sync=False 是因为 flush 操作不需要等待结果,这与请求完成时的 flush(通常同步)行为不同。建议在调用处增加注释说明异步原因,避免后续维护者误改为同步导致死锁或性能问题。

另外,is_sync=False 的任务不在 task_write_back_event 中注册 Event,请确认下游(如 transfer_done 信号消费方)能正确处理仅 rank==0 发出信号的场景。

flush_cache_exists=False,
start_write_block_idx=min_depth - 1,
)
self.issue_write_back_storage_task(flush_task, is_sync=False)
return total_cpu_free_count

def get_block_hash_extra_keys(self, request, start_idx, end_idx, mm_idx):
Expand Down
2 changes: 2 additions & 0 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ def _validate_split_kv_size(value: int) -> int:
"FD_ENABLE_MODEL_LOAD_CACHE": lambda: bool(int(os.getenv("FD_ENABLE_MODEL_LOAD_CACHE", "0"))),
# Whether to clear cpu cache when clearing model weights.
"FD_ENABLE_SWAP_SPACE_CLEARING": lambda: int(os.getenv("FD_ENABLE_SWAP_SPACE_CLEARING", "0")),
# AS-only flush mode: AttentionStore only reports cache index without storing actual data.
"FD_AS_ONLY_FLUSH": lambda: bool(int(os.getenv("FD_AS_ONLY_FLUSH", "0"))),
# enable return text, used when FD_ENABLE_INTERNAL_ADAPTER=1
"FD_ENABLE_RETURN_TEXT": lambda: bool(int(os.getenv("FD_ENABLE_RETURN_TEXT", "0"))),
# Used to truncate the string inserted during thinking when reasoning in a model. (</think> for ernie-45-vl, \n</think>\n\n for ernie-x1)
Expand Down
83 changes: 83 additions & 0 deletions tests/cache_manager/test_prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1544,6 +1544,89 @@ def test_reset_sets_empty_cpu_free_list_when_no_cpu_blocks(self):
manager.reset()
self.assertEqual(manager.cpu_free_block_list, [])

@patch("fastdeploy.cache_manager.prefix_cache_manager.envs")
def test_free_cpu_block_ids_flushes_cache_gone_with_as_only_flush(self, mock_envs):
"""Verify free_cpu_block_ids sends flush(flush_cache_exists=False) with correct start_write_block_idx."""
mock_envs.FD_AS_ONLY_FLUSH = True
manager = _create_manager(num_gpu_blocks=4, num_cpu_blocks=4)
manager.kvcache_storage_backend = "attention_store"

# Create a CPU node at depth=3 in the LRU heap
cpu_hash = get_hash_str([7, 8])
node = BlockNode(
88,
[7, 8],
cpu_hash,
3,
0,
2,
cpu_hash,
0,
parent=manager.radix_tree_root,
cache_status=CacheStatus.CPU,
)
node.shared_count = 0
node.block_id = 10
manager.radix_tree_root.children[cpu_hash] = node
manager.cpu_lru_leaf_heap.append(node)
manager.cpu_lru_leaf_set.add(node)

# Mock issue_write_back_storage_task to capture the task
captured_tasks = []

def mock_issue(task, is_sync=True):
captured_tasks.append(task)

manager.issue_write_back_storage_task = mock_issue

freed = manager.free_cpu_block_ids(1)

self.assertEqual(freed, 1)
# Should have issued exactly one flush task
self.assertEqual(len(captured_tasks), 1)
flush_task = captured_tasks[0]
# Verify flush_cache_exists=False (cache gone)
self.assertFalse(flush_task.flush_cache_exists)
# Verify token_ids come from the node
self.assertEqual(flush_task.token_ids, [7, 8])
# Verify gpu_block_ids is empty (flush-only, no data transfer)
self.assertEqual(flush_task.gpu_block_ids, [])
# Verify start_write_block_idx = depth - 1
self.assertEqual(flush_task.start_write_block_idx, 2)

def test_free_cpu_block_ids_no_flush_without_attention_store(self):
"""Verify free_cpu_block_ids does NOT flush when backend is not attention_store."""
manager = _create_manager(num_gpu_blocks=4, num_cpu_blocks=4)
manager.kvcache_storage_backend = "mooncake"

cpu_hash = get_hash_str([5, 6])
node = BlockNode(
89,
[5, 6],
cpu_hash,
1,
0,
2,
cpu_hash,
0,
parent=manager.radix_tree_root,
cache_status=CacheStatus.CPU,
)
node.shared_count = 0
node.block_id = 11
manager.radix_tree_root.children[cpu_hash] = node
manager.cpu_lru_leaf_heap.append(node)
manager.cpu_lru_leaf_set.add(node)

captured_tasks = []
manager.issue_write_back_storage_task = lambda task, is_sync=True: captured_tasks.append(task)

freed = manager.free_cpu_block_ids(1)

self.assertEqual(freed, 1)
# No flush should be issued for non-attention_store backends
self.assertEqual(len(captured_tasks), 0)


if __name__ == "__main__":
unittest.main()
Loading