Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ SiEval is a **model delivery quality verification system**: eval-side knowledge
Precise reproducibility is a product contract, not a nicety.

* Safety guards (e.g. `--resume` strict match) ship strict-only. No `--force-*` flags, no bypass env vars, no "(future)" escape-hatch hints in errors.
* `--resume` match scope is narrowed, not bypassed: only fields that touch neither sample data nor any persisted artifact may differ across a resume — pure scheduling (concurrency, shard-I/O, write buffers) and console-only progress. Everything affecting on-disk content stays strict (sampling/seeds, `max_iterations`, `shard_samples`, `record_*`, `max_retries`, `profile_*`, `detect_anomalies*`, the `progress.json` dump), as does `infer_plans.yaml`.
* Recovery: "start fresh" or "match the invocation". Escape-hatch proposals must re-justify the contract, not just add a flag.

## Toolchain
Expand Down
310 changes: 263 additions & 47 deletions sieval/cli/leaderboard/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import os
import shlex
import sys
from collections.abc import Mapping
from collections.abc import Callable, Mapping
from datetime import UTC, datetime
from pathlib import Path
from types import MappingProxyType
Expand Down Expand Up @@ -115,24 +115,74 @@ class RootConfigDict(TypedDict, total=False):
alignment: AlignmentBlockDict


def _strip_header(text: str) -> str:
"""Strip the comment header block written by ``_format_comment_header``.
def _split_header(text: str) -> tuple[str, str]:
"""Partition ``text`` into ``(header, body)`` at the comment header block
written by ``_format_comment_header``.

Anchored to the ``# ---...`` border so only OUR header is stripped, not
arbitrary user-added top-of-file comments. A leading border with no
closing border is treated as malformed and returned unchanged so body
comparison detects the tampering instead of silently succeeding.
Anchored to the ``# ---...`` border pair so only OUR header is split off,
not arbitrary user-added top-of-file comments. A leading border with no
closing border is treated as malformed and yields ``("", text)`` so body
comparison detects the tampering instead of silently succeeding. When no
header is present, returns ``("", text)``. In all cases ``header + body``
reconstructs ``text`` exactly.
"""
lines = text.splitlines(keepends=True)
if not lines or not lines[0].startswith("# -"):
return text
return "", text
for i in range(1, len(lines)):
if lines[i].startswith("# -"):
end = i + 1
if end < len(lines) and lines[end].strip() == "":
end += 1
return "".join(lines[end:])
return text
return "".join(lines[:end]), "".join(lines[end:])
return "", text


def _strip_header(text: str) -> str:
"""Return ``text`` with the ``_format_comment_header`` block removed.

Thin wrapper over :func:`_split_header`; see it for border/malformed
semantics.
"""
return _split_header(text)[1]


def _diff_lines(a: dict[str, Any], b: dict[str, Any]) -> list[str]:
"""Return ``- <path>: <old> → <new>`` lines for every differing leaf.

Walks two parsed-config mappings depth-first. Empty list means the two
parse to the same structure (any textual difference was whitespace /
formatting only).
"""
diffs: list[str] = []

def _walk(x: Any, y: Any, path: str) -> None:
if isinstance(x, dict) and isinstance(y, dict):
for k in sorted(set(x) | set(y)):
_walk(x.get(k), y.get(k), f"{path}.{k}" if path else k)
elif isinstance(x, list) and isinstance(y, list):
if len(x) != len(y):
diffs.append(f"- {path}: list length {len(x)} → {len(y)}")
else:
for i, (xv, yv) in enumerate(zip(x, y, strict=True)):
_walk(xv, yv, f"{path}[{i}]")
elif x != y:
diffs.append(f"- {path}: {x!r} → {y!r}")

_walk(a, b, "")
return diffs


def _diff_dicts(a: dict[str, Any], b: dict[str, Any]) -> str:
"""Return a short human-readable hint describing which keys differ.

Thin wrapper over :func:`_diff_lines`; reports up to 10 differing leaf
paths and a "formatting only" sentinel when nothing differs.
"""
lines = _diff_lines(a, b)
if not lines:
return "Diff: (whitespace / formatting only)"
return "Diff:\n" + "\n".join(f" {line}" for line in lines[:10])


def _brief_diff(existing: str, current: str) -> str:
Expand All @@ -148,25 +198,103 @@ def _brief_diff(existing: str, current: str) -> str:
c = yaml.safe_load(current) or {}
except yaml.YAMLError:
return "Diff: (existing file is not valid YAML — cannot compute key-level diff)"
diffs: list[str] = []
return _diff_dicts(e, c)

def _walk(a: Any, b: Any, path: str) -> None:
if isinstance(a, dict) and isinstance(b, dict):
for k in sorted(set(a) | set(b)):
_walk(a.get(k), b.get(k), f"{path}.{k}" if path else k)
elif isinstance(a, list) and isinstance(b, list):
if len(a) != len(b):
diffs.append(f" - {path}: list length {len(a)} → {len(b)}")
else:
for i, (av, bv) in enumerate(zip(a, b, strict=True)):
_walk(av, bv, f"{path}[{i}]")
elif a != b:
diffs.append(f" - {path}: {a!r} → {b!r}")

_walk(e, c, "")
if not diffs:
return "Diff: (whitespace / formatting only)"
return "Diff:\n" + "\n".join(diffs[:10])
# ── Resume strict-match field policy (must partition TaskRunnerConfig) ──
# Adjustable across --resume only if a field touches neither the sample data
# nor any persisted artifact: pure scheduling + console-only progress.
_THROUGHPUT_RUNNER_KEYS: frozenset[str] = frozenset(
{
"concurrency_limit",
"concurrency_limits",
"shard_read_concurrency",
"shard_write_concurrency",
"write_buffer_size",
"write_buffer_flush_interval",
# Console-only (tqdm bar + log cadence); not the progress.json dump.
"show_progress",
"progress_log_interval",
"progress_log_pct_interval",
}
)

# Must match: affect sample data, an on-disk artifact, or a recorded outcome's
# meaning — e.g. max_retries is the failure signal written into FAILED records;
# profile_*/detect_anomalies*/dump_progress write profiler/anomaly/progress files.
_STRICT_RUNNER_KEYS: frozenset[str] = frozenset(
{
"shard_samples",
"record_each_stage",
"record_type_metadata",
"record_meta",
"max_iterations",
"deterministic",
"max_retries",
"profile_io",
"profile_stages",
"profile_usage",
"detect_anomalies",
"detect_anomalies_on_resume",
"dump_progress",
"progress_dump_interval",
}
)

# Neither adjustable nor strict — listed only so the three buckets partition
# TaskRunnerConfig exactly (see test_every_field_classified_exactly_once). The
# strip removes result_dir at top level (reification injects it there); the rest
# are never reached because they don't survive into a persisted runner_config
# block: auto_resume is set by the orchestration layer at runtime, stage_meta
# hooks are non-serializable callables. A hand-authored runner_config field from
# this set that changed across a resume would still be compared strictly.
_NONMATCH_RUNNER_KEYS: frozenset[str] = frozenset(
{
"result_dir",
"auto_resume",
"stage_meta_hook",
"stage_meta_hooks",
}
)


def _strip_noncomparable_fields(cfg: dict[str, Any]) -> dict[str, Any]:
"""Deep-copy ``cfg`` with resume-mutable fields removed, for comparison.

Strips (input never mutated) top-level ``concurrency_limit`` /
``concurrency_limits`` / ``result_dir``, ``models.*.args.concurrency_limit``,
and ``_THROUGHPUT_RUNNER_KEYS`` from every ``runner_config`` block.
"""
out = copy.deepcopy(cfg)

for key in ("concurrency_limit", "concurrency_limits", "result_dir"):
out.pop(key, None)

models = out.get("models")
if isinstance(models, dict):
for mcfg in models.values():
if isinstance(mcfg, dict):
args = mcfg.get("args")
if isinstance(args, dict):
args.pop("concurrency_limit", None)

# runner_config carries throughput knobs in two equivalent places: the
# top-level defaults block (merged into every task) and per-task overrides.
# Strip both identically.
runner_config_blocks = [out.get("runner_config")]
tasks = out.get("tasks")
if isinstance(tasks, dict):
runner_config_blocks.extend(
tcfg.get("runner_config")
for tcfg in tasks.values()
if isinstance(tcfg, dict)
)
for rc in runner_config_blocks:
if isinstance(rc, dict):
for key in _THROUGHPUT_RUNNER_KEYS:
rc.pop(key, None)

return out


def resolve_deterministic(cli_override: bool | None, config: Mapping[str, Any]) -> bool:
Expand Down Expand Up @@ -322,6 +450,30 @@ def _format_comment_header(
return "\n".join(lines) + "\n"


def _append_resume_note(header: str, diff_lines: list[str]) -> str:
"""Insert a ``Resumed by …`` audit block into ``header``, before its border.

Called when ``--resume`` rewrites a file because only resume-mutable fields
changed. The original provenance survives and ``diff_lines`` is recorded with
a timestamp, so the header accumulates the full lineage across resumes. The
note sits inside the ``# ---`` border pair so :func:`_split_header` keeps
treating the whole block as the header next time.

Assumes a well-formed ``header`` (two borders) — the only kind the caller
passes (from :func:`_format_comment_header` or :func:`_split_header`).
"""
from sieval import __version__

now = datetime.now(UTC).isoformat()
note = [f"# Resumed by sieval {__version__} at {now}:\n"]
note.extend(f"# {line}\n" for line in diff_lines)

lines = header.splitlines(keepends=True)
borders = [i for i, line in enumerate(lines) if line.startswith("# -")]
close = borders[-1]
return "".join(lines[:close] + note + lines[close:])


def _warn_best_effort_deterministic(
config: Mapping[str, Any],
effective_deterministic: bool,
Expand Down Expand Up @@ -1294,14 +1446,19 @@ async def _persist_yaml_with_strict_resume(
body: str,
header: str,
audit_label: str,
mutable_strip: Callable[[dict[str, Any]], dict[str, Any]] | None = None,
) -> None:
"""Atomically write ``header + body`` to ``result_dir/target_name``.

Under ``--resume`` with an existing target file, the persisted body
(header excluded) MUST match ``body`` byte-for-byte; mismatch raises
``RuntimeError`` (the only failure mode the caller observes — every
other failure is best-effort and only logged). On match, the file is
not rewritten so existing header timestamps survive the resume.
Under ``--resume`` with an existing file: an identical body skips the
rewrite (timestamps survive). With ``mutable_strip=None`` (e.g. infer
plans) any other diff raises. Otherwise both bodies are parsed and
compared with ``mutable_strip`` applied; a diff that vanishes
(resume-mutable or formatting) is tolerated — the file is rewritten
with the new body, and the original header gains an appended
``Resumed by …`` record of what changed — and any residual diff
raises. ``RuntimeError`` is the only failure the caller observes; all
else is best-effort and logged.
"""
effective_result_dir = self._resolve_result_dir()
if effective_result_dir is None:
Expand All @@ -1313,39 +1470,97 @@ async def _persist_yaml_with_strict_resume(
result_path = anyio.Path(effective_result_dir)
target = result_path / target_name

# Strict-match resume check: if --resume was requested and a persisted
# file exists, the current invocation's body MUST match the persisted
# body byte-for-byte (comment header excluded). Silently resuming under
# a different config would mix data from two runs.
write_header = header

if self.resume_override and await target.exists():
try:
existing = await target.read_text(encoding="utf-8")
except OSError as e:
# Can't verify the match — refuse to resume. Surfaced as the
# same Resume-aborted shape so the user sees a consistent
# error class instead of a bare OSError from the persistence
# path they may not know exists.
raise RuntimeError(
f"Resume aborted: cannot read existing {target}: {e}\n"
"Either:\n"
" 1. Remove the result_dir and start fresh\n"
f" 2. Ensure {target} is readable"
) from e
existing_body = _strip_header(existing)
if existing_body != body:
diff_hint = _brief_diff(existing_body, body)

existing_header, existing_body = _split_header(existing)

if existing_body == body:
logger.info("Resume: {} matches — skipping rewrite", target_name)
return

if mutable_strip is None:
# Byte-for-byte strict (e.g. infer plans): any diff aborts.
raise RuntimeError(
f"Resume aborted: {target} does not match current invocation.\n"
f"{diff_hint}\n"
f"{_brief_diff(existing_body, body)}\n"
"Either:\n"
" 1. Remove the result_dir and start fresh\n"
f" 2. Match the invocation to the persisted {audit_label}"
)
logger.info("Resume: {} matches — skipping rewrite", target_name)
return

# Parse both sides so YAML type coercion (tuple→list) and key
# ordering can't cause a spurious mismatch.
try:
existing_cfg = yaml.safe_load(existing_body) or {}
current_cfg = yaml.safe_load(body) or {}
except yaml.YAMLError as e:
raise RuntimeError(
f"Resume aborted: cannot parse existing {target} to verify "
f"match: {e}\n"
"Either:\n"
" 1. Remove the result_dir and start fresh\n"
f" 2. Restore {target} to valid YAML matching the persisted "
f"{audit_label}"
) from e

# current_cfg is always a dict (we dump a mapping); a tampered file
# may parse to a scalar/list. Refuse cleanly so mutable_strip can't
# raise an opaque AttributeError instead of the documented RuntimeError.
if not isinstance(existing_cfg, dict) or not isinstance(current_cfg, dict):
raise RuntimeError(
f"Resume aborted: existing {target} is not a YAML mapping — "
"cannot verify match.\n"
"Either:\n"
" 1. Remove the result_dir and start fresh\n"
f" 2. Restore {target} to valid YAML matching the persisted "
f"{audit_label}"
)

stripped_existing = mutable_strip(existing_cfg)
stripped_current = mutable_strip(current_cfg)
if stripped_existing != stripped_current:
raise RuntimeError(
f"Resume aborted: {target} does not match current invocation.\n"
f"{_diff_dicts(stripped_existing, stripped_current)}\n"
"Either:\n"
" 1. Remove the result_dir and start fresh\n"
f" 2. Match the invocation to the persisted {audit_label}"
)

# Only resume-mutable (or formatting) fields differ — rewrite with
# the new body. When real fields changed (not just formatting) and
# the file had a header, append a timestamped record of the change
# so the header keeps the full resume lineage. Otherwise no note is
# added: a formatting-only diff keeps the original header, and a
# header-less file gets a fresh one (it had no lineage to extend).
logger.info(
"Resume: {} resume-mutable fields changed — updating file",
target_name,
)
# The note records genuine resume-mutable changes only; result_dir is
# a never-compared location field (reification injects it), so drop it
# to keep it out of the audit trail.
note_before = {k: v for k, v in existing_cfg.items() if k != "result_dir"}
note_after = {k: v for k, v in current_cfg.items() if k != "result_dir"}
change_lines = _diff_lines(note_before, note_after)
if existing_header and change_lines:
write_header = _append_resume_note(existing_header, change_lines)
else:
write_header = existing_header or header

tmp_path = target.with_name(target.name + ".tmp")
content = header + body
content = write_header + body

try:
await result_path.mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -1390,6 +1605,7 @@ async def _persist_effective_config(self) -> None:
body=body,
header=header,
audit_label="effective config",
mutable_strip=_strip_noncomparable_fields,
)

async def _persist_infer_plans(self) -> None:
Expand Down
Loading
Loading