Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions dflash/scripts/prefix_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ def __init__(self, stdout, verbose: bool = False):
self._suppress_prefixes = () if verbose else self._DEFAULT_SUPPRESS_PREFIXES
self._waiters: list[tuple[str, asyncio.Future]] = []
self._task: asyncio.Task | None = None
# Issue #114: side-channel callbacks for daemon-emitted events.
# Keys = line prefix, values = zero-arg callable invoked on match.
self._line_callbacks: dict[str, callable] = {}

def register_line_callback(self, prefix: str, cb) -> None:
"""Invoke ``cb()`` whenever the daemon emits a stdout line starting with ``prefix``."""
self._line_callbacks[prefix] = cb

def start(self, loop: asyncio.AbstractEventLoop) -> None:
self._task = loop.create_task(self._run())
Expand All @@ -103,6 +110,15 @@ async def _run(self) -> None:
return
decoded = line.decode("utf-8", errors="replace").rstrip()

# Issue #114: side-channel callbacks (e.g. PrefixCache invalidation
# on ``[snap] all-cleared``) run before waiter dispatch so cache
# state is consistent for any caller awaiting a reply.
for _cb_prefix, _cb in self._line_callbacks.items():
if decoded.startswith(_cb_prefix):
try: _cb()
except Exception as _cbe:
print(f" [bus] callback for {_cb_prefix!r} raised: {_cbe}", flush=True)

# Try to satisfy a waiter first.
matched = False
for i, (prefix, fut) in enumerate(self._waiters):
Expand Down Expand Up @@ -525,6 +541,22 @@ def lookup(self, prompt_ids: list[int]) -> tuple[int, int] | None:
f"(of {len(prompt_ids)} total)", flush=True)
return best

def mark_all_cleared(self) -> None:
"""Drop every LRU entry after the daemon emits ``[snap] all-cleared``.

Issue #114: when the daemon hits an OOM during prefill it frees every
prefix snapshot slot to recover VRAM. Without this hook the Python LRU
keeps entries pointing at freed slots, and the next request triggers
``RESTORE bad args or empty slot`` and streams nothing back.
"""
if self.disabled:
return
n = len(self.entries)
self.entries.clear()
self.next_slot = 0
self._pending_evict_key = None
print(f"{self.log_prefix} all-cleared — dropped {n} LRU entries", flush=True)
Comment on lines +554 to +558

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: mark_all_cleared() only resets prefix-cache state and leaves full-cache slot metadata intact, so post-OOM requests can reuse stale full-cache slot ids and fail restores.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At dflash/scripts/prefix_cache.py, line 554:

<comment>`mark_all_cleared()` only resets prefix-cache state and leaves full-cache slot metadata intact, so post-OOM requests can reuse stale full-cache slot ids and fail restores.</comment>

<file context>
@@ -525,6 +541,22 @@ def lookup(self, prompt_ids: list[int]) -> tuple[int, int] | None:
+        """
+        if self.disabled:
+            return
+        n = len(self.entries)
+        self.entries.clear()
+        self.next_slot = 0
</file context>
Suggested change
n = len(self.entries)
self.entries.clear()
self.next_slot = 0
self._pending_evict_key = None
print(f"{self.log_prefix} all-cleared — dropped {n} LRU entries", flush=True)
n_prefix = len(self.entries)
self.entries.clear()
self.next_slot = 0
self._pending_evict_key = None
n_full = 0
if not getattr(self, "_full_disabled", True):
n_full = len(self.full_entries)
self.full_entries.clear()
self._full_next_slot = 0
self._full_pending_evict_key = None
self._full_pending_evict_path = None
print(f"{self.log_prefix} all-cleared — dropped {n_prefix} prefix + {n_full} full entries", flush=True)


def prepare_inline_snap(self, prompt_ids: list[int]) -> tuple[int, int] | None:
"""Pick a target boundary + slot for inline snapshot during the next
request. Returns ``(slot_id, target_cut)`` or ``None`` if no
Expand Down
6 changes: 6 additions & 0 deletions dflash/scripts/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,12 @@ def _resolve_kv_k_type():
)
if prefill_cfg is not None and prefill_cache_slots > 0:
prefix_cache.init_full_cache(prefill_cache_slots)

# Issue #114: invalidate the Python LRU whenever the daemon frees every
# prefix snapshot slot during OOM recovery. Without this the next request
# would RESTORE a freed slot and stream nothing back.
bus.register_line_callback("[snap] all-cleared", prefix_cache.mark_all_cleared)

tool_memory = ToolMemory(
max_entries=int(os.environ.get("DFLASH_TOOL_MEMORY_MAX_ENTRIES", "50000")),
max_bytes=int(os.environ.get("DFLASH_TOOL_MEMORY_MAX_BYTES", str(64 * 1024 * 1024))),
Expand Down
31 changes: 27 additions & 4 deletions dflash/test/test_dflash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4293,7 +4293,13 @@ int main(int argc, char ** argv) {
/*capture_delta_intermediate=*/false,
/*fa_window=*/g_fa_window,
/*last_token_logits_only=*/true)) {
std::fprintf(stderr, "prefill gallocr pre-reserve failed\n"); return 1;
// Issue #114: gallocr OOM. Free all prefix snapshots so the next
// request has VRAM headroom; abort this request cleanly in daemon
// mode instead of killing the process.
std::fprintf(stderr, "prefill gallocr pre-reserve failed (OOM)\n");
for (int _i = 0; _i < PREFIX_CACHE_SLOTS; _i++) free_prefix_snapshot(prefix_snapshots[_i]);
std::printf("[snap] all-cleared\n"); std::fflush(stdout);
if (daemon_mode) { stream_emit(-1); continue; } else return 1;
}
// gallocr is now reserved at peak size; subsequent builds will reuse it.
}
Expand Down Expand Up @@ -4338,11 +4344,17 @@ int main(int argc, char ** argv) {
/*capture_delta_intermediate=*/false,
/*fa_window=*/g_fa_window,
/*last_token_logits_only=*/true)) {
std::fprintf(stderr, "prefill build @%d\n", start); return 1;
std::fprintf(stderr, "prefill build @%d failed (OOM)\n", start);
for (int _i = 0; _i < PREFIX_CACHE_SLOTS; _i++) free_prefix_snapshot(prefix_snapshots[_i]);
std::printf("[snap] all-cleared\n"); std::fflush(stdout);
if (daemon_mode) { stream_emit(-1); goto _req_aborted_oom; } else return 1;
}

pf_embed_buf.assign((size_t)hidden * n_tokens, 0.0f);
if (!w.embedder.embed(prompt.data() + start, n_tokens, pf_embed_buf.data())) return 1;
if (!w.embedder.embed(prompt.data() + start, n_tokens, pf_embed_buf.data())) {
std::fprintf(stderr, "prefill embed @%d failed\n", start);
if (daemon_mode) { stream_emit(-1); goto _req_aborted_oom; } else return 1;
}
ggml_backend_tensor_set(sg.inp_embed, pf_embed_buf.data(), 0,
sizeof(float) * pf_embed_buf.size());

Expand Down Expand Up @@ -4374,7 +4386,12 @@ int main(int argc, char ** argv) {
}

auto st = ggml_backend_graph_compute(backend, sg.gf);
if (st != GGML_STATUS_SUCCESS) { std::fprintf(stderr, "prefill compute @%d\n", start); return 1; }
if (st != GGML_STATUS_SUCCESS) {
std::fprintf(stderr, "prefill compute @%d failed (OOM)\n", start);
for (int _i = 0; _i < PREFIX_CACHE_SLOTS; _i++) free_prefix_snapshot(prefix_snapshots[_i]);
std::printf("[snap] all-cleared\n"); std::fflush(stdout);
if (daemon_mode) { stream_emit(-1); goto _req_aborted_oom; } else return 1;
}

// Logits are [vocab, 1] (last_token_logits_only), read from offset 0.
pf_logits_buf.assign(vocab, 0.0f);
Expand Down Expand Up @@ -4405,6 +4422,12 @@ int main(int argc, char ** argv) {
start = committed - PREFILL_UBATCH;
}
}
// Issue #114: in-loop prefill OOM lands here, then re-enters the daemon
// read loop without killing the process. No-op when daemon_mode=false.
if (false) {
_req_aborted_oom:
continue;
}
auto t_pf1 = std::chrono::steady_clock::now();
// If prefill was a no-op due to a snapshot RESTORE (cache.cur_pos already
// covers the prompt), seed last_tok from the restored cache so the decode
Expand Down
Loading