-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathbenchmark_base.py
More file actions
657 lines (541 loc) · 24.7 KB
/
Copy pathbenchmark_base.py
File metadata and controls
657 lines (541 loc) · 24.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
import logging
import subprocess
import threading
from abc import ABC, abstractmethod
from datetime import datetime
from typing import (
Any,
Callable,
Generic,
Optional,
Protocol,
TypeVar,
runtime_checkable,
)
import pytest
import torch
from torch.autograd.profiler import DeviceType
from tileops.manifest import load_workloads
# Workload dict keys reserved by the benchmark harness. Everything else on
# a workload entry (e.g. ``dim``, ``keepdim``, ``correction``) is treated
# as an op-call parameter.
#
# The current harness is explicitly scoped to **single-input ops whose
# sole tensor input is named ``x``**. Multi-input ops (e.g. attention
# families that declare ``q_shape`` / ``kv_shape``) are not supported:
# :func:`workloads_to_params` will raise ``KeyError`` if ``x_shape`` is
# absent. Extending to signature-aware tensor binding is tracked as a
# follow-up and must also update ``docs/design/manifest.md``.
_WORKLOAD_META_KEYS: frozenset[str] = frozenset(
{"x_shape", "dtypes", "label"}
)
# ---------------------------------------------------------------------------
# Benchmark capability protocols
# ---------------------------------------------------------------------------
@runtime_checkable
class ShapeDtypeWorkload(Protocol):
"""Structural type for workloads that carry shape and dtype metadata.
Any object with ``shape`` and ``dtype`` satisfies this protocol.
Used by helpers that only need tensor metadata, not input generation
capability.
"""
shape: tuple[int, ...]
dtype: torch.dtype
@runtime_checkable
class InputGeneratingWorkload(Protocol):
"""Structural type for workloads that can generate benchmark inputs."""
def gen_inputs(self) -> tuple[Any, ...]: ...
@runtime_checkable
class BenchmarkWorkload(ShapeDtypeWorkload, InputGeneratingWorkload, Protocol):
"""Full benchmark workload: shape/dtype metadata + input generation.
This is the standard contract for benchmark workloads that need both
roofline metadata extraction and input tensor generation.
Workloads satisfy this protocol when they define ``shape`` and ``dtype``
metadata in addition to implementing ``gen_inputs()``.
"""
...
# Backward-compatible alias
RooflineWorkload = ShapeDtypeWorkload
W = TypeVar("W")
_logger = logging.getLogger("tileops.bench")
# Thread-local storage for conftest hook to pick up per-test bench results.
# A single test function may call record() multiple times (tileops + baseline).
_bench_results = threading.local()
# Name of the ``record_function`` annotation wrapping the timed call. Kineto
# projects this scope onto the device timeline, so kernels the call launches
# fall inside its window while the L2-flush ``cache.zero_()`` (enqueued outside
# the scope) does not.
_KERNEL_REGION = "tileops_bench_kernel"
def _sum_kernel_time_us(kineto_results):
"""Sum device time of the kernels the timed call launched.
Sums only kernels inside a :data:`_KERNEL_REGION` annotation window, so the
L2-flush fill is excluded and the kernel under test is counted regardless of
its name. A call launching several kernels contributes all of them.
Iterates the C++ Kineto events directly to bypass ``key_averages()``, which
is ~16x slower (~130ms of Python parsing/tree-building) for large traces.
Returns:
``(total_us, n_regions)``: summed kernel time in microseconds and the
number of annotation windows. The caller checks ``n_regions ==
n_repeat`` to confirm the scope projected on every iteration.
"""
import bisect
windows: list[tuple[int, int]] = []
kernels: list[tuple[int, int]] = [] # (start_ns, duration_ns)
for evt in kineto_results.events():
if evt.device_type() != DeviceType.CUDA:
continue
if evt.is_user_annotation():
if evt.name() == _KERNEL_REGION:
windows.append((evt.start_ns(), evt.end_ns()))
continue
kernels.append((evt.start_ns(), evt.duration_ns()))
windows.sort()
starts = [w[0] for w in windows]
ends = [w[1] for w in windows]
total_us = 0.0
for start_ns, dur_ns in kernels:
# Count only kernels that fall inside a timed-call window; everything
# outside (notably the L2-flush fill) is excluded.
idx = bisect.bisect_right(starts, start_ns) - 1
if idx >= 0 and start_ns < ends[idx]:
total_us += dur_ns / 1000.0
return total_us, len(windows)
# ---------------------------------------------------------------------------
# L2 cache flush buffer (sized to actual L2, allocated lazily)
# ---------------------------------------------------------------------------
_l2_flush_cache: Optional[torch.Tensor] = None
def _get_l2_flush_cache() -> torch.Tensor:
global _l2_flush_cache
if _l2_flush_cache is None:
l2_bytes = torch.cuda.get_device_properties(0).L2_cache_size
if l2_bytes <= 0:
l2_bytes = int(256e6) # fallback
_l2_flush_cache = torch.empty(l2_bytes // 4, dtype=torch.int, device="cuda")
return _l2_flush_cache
# ---------------------------------------------------------------------------
# NVIDIA SOL-ExecBench–style benchmark
# ---------------------------------------------------------------------------
def bench_kernel(
fn: Callable,
args: tuple[Any, ...] = (),
n_warmup: int = 10,
n_repeat: int = 50,
n_trials: int = 3,
) -> float:
"""Benchmark a GPU kernel with pure kernel timing via CUPTI.
Protocol (adapted from NVIDIA SOL-ExecBench, arxiv.org/abs/2603.19173):
1. Lock GPU clocks externally (nvidia-smi).
2. Run *n_warmup* un-timed iterations with L2 flush.
3. For each of *n_trials* trials, profile *n_repeat* iterations
under CUPTI to get pure kernel execution time (no launch overhead).
L2 is flushed before every iteration. Input tensors are cloned
each iteration so the kernel always sees fresh addresses.
4. Report the median trial mean (robust to outlier trials).
Uses CUPTI via torch.profiler for accurate kernel-only timing, with
direct Kineto C++ event iteration to avoid Python parsing overhead.
Falls back to CUDA events if CUPTI is unavailable.
Args:
fn: Callable to benchmark. If *args* is provided, called as
``fn(*cloned_args)``; otherwise called as ``fn()``.
args: Tensor arguments to clone each iteration. Non-tensor
values are passed through unchanged.
n_warmup: Warmup iterations (default 10).
n_repeat: Timed iterations per trial (default 50).
n_trials: Independent trials (default 3).
Returns:
Kernel latency in **milliseconds**.
"""
if not isinstance(args, tuple):
raise TypeError(
f"bench_kernel expects a tuple of args, got {type(args).__name__}. "
"Check that gen_inputs() returns a tuple."
)
from tilelang.profiler.bench import suppress_stdout_stderr
cache = _get_l2_flush_cache()
has_args = len(args) > 0
# Pre-clone a small pool of input tensors so the kernel sees different
# addresses across iterations. Skip cloning if total tensor memory
# exceeds 1 GB to avoid OOM on large workloads.
_N_CLONES = 3
_MAX_CLONE_BYTES = 1 << 30 # 1 GB
if has_args:
tensor_mask = tuple(isinstance(a, torch.Tensor) for a in args)
total_bytes = sum(a.nelement() * a.element_size()
for a, m in zip(args, tensor_mask, strict=True) if m)
if total_bytes * _N_CLONES <= _MAX_CLONE_BYTES:
arg_pool = [
tuple(a.clone() if m else a for a, m in zip(args, tensor_mask, strict=True))
for _ in range(_N_CLONES)
]
def _run(i):
return fn(*arg_pool[i % _N_CLONES])
else:
arg_pool = None
def _run(i):
return fn(*args)
else:
arg_pool = None
def _run(i):
return fn()
# Warmup (no profiling)
for i in range(n_warmup):
cache.zero_()
_run(i % n_repeat)
torch.cuda.synchronize()
# Timed trials with CUPTI. Each trial opens its own torch.profiler context
# around exactly n_repeat iterations and reads the trace after the context
# closes; summed device kernel time / n_repeat is the mean per-call kernel
# time. We deliberately do NOT use torch.profiler.schedule: that mechanism
# is for sampling a window out of a long step()-driven loop, and forcing
# n_repeat calls into a single "step" let queued, un-synchronized launches
# leak across the warmup/active boundary. A plain per-trial context records
# exactly the calls we want — no schedule, no on_trace_ready callback.
#
# Only the timed call is wrapped in record_function(_KERNEL_REGION), so the
# L2-flush cache.zero_() enqueued just before it is attributed by scope (not
# kernel name) and excluded. Flush and call share the stream, so the flush
# completes before the call begins (L2 cold) without a sync between them; we
# sync only after the call so its kernels are recorded before the next flush.
trial_means: list[float] = []
try:
with suppress_stdout_stderr():
for _ in range(n_trials):
with torch.profiler.profile(
# CPU activity is required for Kineto to project the
# annotation onto the device timeline (CUDA-only emits no
# window); it adds only host-side overhead, not kernel time.
activities=[
torch.profiler.ProfilerActivity.CPU,
torch.profiler.ProfilerActivity.CUDA,
],
) as profiler:
for i in range(n_repeat):
cache.zero_()
with torch.profiler.record_function(_KERNEL_REGION):
_run(i)
torch.cuda.synchronize() # kernel recorded in isolation
total_us, n_regions = _sum_kernel_time_us(profiler.profiler.kineto_results)
# Scope failed to project on some iteration → trace untrustworthy,
# fall back to CUDA events.
if n_regions != n_repeat:
raise RuntimeError
trial_means.append((total_us / n_repeat) * 1e-3)
except RuntimeError:
trial_means = []
# Fallback to CUDA events if CUPTI failed
if not trial_means:
for _ in range(n_trials):
start_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_repeat)]
end_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_repeat)]
for i in range(n_repeat):
cache.zero_()
start_events[i].record()
_run(i)
end_events[i].record()
torch.cuda.synchronize()
times = [s.elapsed_time(e) for s, e in zip(start_events, end_events, strict=True)]
trial_means.append(sum(times) / len(times))
# Free the arg pool and release cached GPU memory to prevent
# accumulation across hundreds of benchmark calls.
if arg_pool is not None:
del arg_pool
torch.cuda.empty_cache()
trial_means.sort()
return trial_means[len(trial_means) // 2]
def _get_env_metadata() -> list[str]:
"""Collect GPU model, driver version, CUDA version, and torch version."""
lines = []
lines.append(f"- **Torch version**: {torch.__version__}")
lines.append(f"- **CUDA version (torch)**: {torch.version.cuda or 'N/A'}")
if torch.cuda.is_available():
gpu_name = torch.cuda.get_device_name(0)
lines.append(f"- **GPU model**: {gpu_name}")
else:
lines.append("- **GPU model**: N/A (no CUDA device)")
# Try to get NVIDIA driver version from nvidia-smi
try:
result = subprocess.run(
["nvidia-smi", "--query-gpu=driver_version", "--format=csv,noheader"],
capture_output=True, text=True, timeout=5,
)
driver = result.stdout.strip().split("\n")[0] if result.returncode == 0 else "N/A"
except (FileNotFoundError, subprocess.TimeoutExpired):
driver = "N/A"
lines.append(f"- **Driver version**: {driver}")
return lines
class BenchmarkBase(Generic[W], ABC):
"""Abstract base class for op benchmarking.
Generic over workload type so subclasses can declare the exact
capability they need. ``WorkloadBase`` remains the typical in-repo
implementation, but the public contract is the type parameter.
Subclass must implement calculate_flops() and calculate_memory().
"""
def __init__(self, workload: W):
self.workload = workload
@abstractmethod
def calculate_flops(self) -> Optional[float]:
raise NotImplementedError
@abstractmethod
def calculate_memory(self) -> Optional[float]:
raise NotImplementedError
def profile(self,
functor: Any,
*inputs: Any) -> dict:
"""Profile a callable and return structured results.
Uses the NVIDIA SOL-ExecBench protocol: CUPTI kernel timing,
10 warmup, 50 repeats × 3 trials, L2 flush sized to actual
cache, input tensors cloned each iteration.
"""
with torch.no_grad():
latency = bench_kernel(functor, args=inputs)
return self._build_result(latency)
def profile_autograd(self, functor: Any) -> dict:
"""Profile a callable that requires autograd (e.g. fwd+bwd).
Same as profile() but without torch.no_grad(), so the callable
can build autograd graphs and call .backward() internally.
The functor must be a zero-arg closure that captures its inputs.
"""
latency = bench_kernel(functor)
return self._build_result(latency)
def _build_result(self, latency: float) -> dict:
result = {"latency_ms": latency}
flops = self.calculate_flops()
if flops is not None:
result["tflops"] = flops / latency * 1e-9
memory = self.calculate_memory()
if memory is not None:
result["bandwidth_tbs"] = memory / latency * 1e-9
return result
# ---------------------------------------------------------------------------
# Manifest-driven benchmark helpers
# ---------------------------------------------------------------------------
def _workload_extra_params(w: dict) -> dict[str, Any]:
"""Return op-specific params attached to a manifest workload entry.
A workload entry may carry optional op-call parameter values beyond
``x_shape`` / ``dtypes`` / ``label`` (e.g. ``dim``, ``keepdim``,
``correction``). These are forwarded to the op constructor by benchmark
files that opt into ``include_extra=True``.
Only the reserved meta keys (``x_shape``, ``dtypes``, ``label``) and
dunder-style metadata keys are stripped; everything else — including
any other ``*_shape`` keys — is surfaced as an op param. This matches
the single-input ``x_shape``-only harness contract documented in
:data:`_WORKLOAD_META_KEYS`; multi-input ops with ``q_shape`` /
``kv_shape`` are out of scope and would need a dedicated harness.
"""
return {
k: v
for k, v in w.items()
if k not in _WORKLOAD_META_KEYS and not k.startswith("__")
}
def workloads_to_params(op_name: str, include_extra: bool = False) -> list:
"""Convert manifest workload dicts for *op_name* to pytest params.
By default (``include_extra=False``) each entry becomes
``pytest.param(shape, dtype, id=...)`` — compatible with existing bench
files that use ``@pytest.mark.parametrize("shape, dtype", ...)``.
With ``include_extra=True`` each entry becomes
``pytest.param(shape, dtype, extra_params, id=...)`` where
``extra_params`` is a dict of op-call params declared on the workload
entry (e.g. ``{"dim": 0, "keepdim": False}``). Use this when the
benchmark needs to drive op calls from manifest-declared workload params.
"""
workloads = load_workloads(op_name)
params = []
for w in workloads:
if "x_shape" not in w:
raise KeyError(
f"workloads_to_params({op_name!r}) only supports single-input "
"ops whose tensor input is named 'x' (workload must declare "
"'x_shape'); multi-input ops with q_shape/kv_shape/... are "
"out of scope for this harness."
)
shape = tuple(w["x_shape"])
label = w.get("label", "x".join(str(s) for s in shape))
extra = _workload_extra_params(w) if include_extra else {}
for dtype_str in w["dtypes"]:
dtype = getattr(torch, dtype_str)
# Copy ``extra`` per parametrization so accidental mutation in
# one test case cannot leak into later parametrized cases that
# share the same workload entry.
param_args = (
(shape, dtype, dict(extra))
if include_extra
else (shape, dtype)
)
params.append(pytest.param(*param_args, id=f"{label}-{dtype_str}"))
return params
class ManifestBenchmark(BenchmarkBase[ShapeDtypeWorkload]):
"""Generic benchmark that reads FLOP/memory counts from an Op instance.
Accepts an op name, an instantiated Op, and any workload satisfying
:class:`ShapeDtypeWorkload`. The op must implement ``eval_roofline()``.
Dynamic-shape ops may bind roofline variables during ``forward()``, so
this helper calls ``op.eval_roofline()`` only while building a result
after profiling has executed the op.
Usage::
op = SumFwdOp(dtype=dtype, dim=0)
bm = ManifestBenchmark("SumFwdOp", op, workload)
result = bm.profile(op, *inputs)
"""
def __init__(
self,
op_name: str,
op: Any,
workload: ShapeDtypeWorkload,
):
super().__init__(workload)
self._op_name = op_name
self._op = op
self._roofline_cache: Optional[tuple[float, float]] = None
def _get_roofline(self) -> tuple[float, float]:
if self._roofline_cache is None:
flops, mem_bytes = self._op.eval_roofline()
self._roofline_cache = (float(flops), float(mem_bytes))
return self._roofline_cache
def calculate_flops(self) -> Optional[float]:
return self._get_roofline()[0]
def calculate_memory(self) -> Optional[float]:
return self._get_roofline()[1]
def _extract_op_config(op: object) -> Optional[dict]:
"""Return the kernel config for an Op instance, or None if unavailable.
Handles the three Op patterns currently used in tileops:
1. **Eager-init** (e.g. ``GemmOp``): ``op.kernel`` is a Kernel
instance set in ``__init__``.
2. **Lazy with dummy kernel** (e.g. ``FFTC2COp``): ``op.kernel`` is a
default Kernel and ``op._kernel_cache`` may hold others.
3. **Pure lazy cache** (e.g. ``_SoftmaxBaseOp`` and the spec-conformant
reduction ops): ``op._kernel_cache`` is the only source; ``op.kernel``
is unset.
A direct ``op.config`` attribute (legacy / explicit override) takes
precedence over kernel introspection.
"""
op_config = getattr(op, "config", None)
if op_config:
return op_config
kernel = getattr(op, "kernel", None)
op_config = getattr(kernel, "config", None) if kernel is not None else None
if op_config:
return op_config
# Pure lazy-cache pattern: pick any cached kernel's config. All cached
# kernels for a given op share dtype/op_kind, so taking the first is
# sufficient for the benchmark report (which records one entry per call).
cache = getattr(op, "_kernel_cache", None)
if cache:
try:
first_kernel = next(iter(cache.values()))
except StopIteration:
first_kernel = None
if first_kernel is not None:
op_config = getattr(first_kernel, "config", None)
if op_config:
return op_config
return None
class BenchmarkReport:
"""Collects benchmark results and dumps a markdown report.
All methods are static — use as BenchmarkReport.record(...).
Call clear() at session start, dump() at session end.
"""
_records: dict = {}
@staticmethod
def record(op_or_name, params: dict, result: dict, tag: str = "tileops") -> None:
"""Record a benchmark result.
Args:
op_or_name: Op instance or benchmark group name string.
If an Op instance, class name and module are extracted automatically.
params: Parameter dict (typically from locals())
result: Dict with latency_ms, tflops, bandwidth_tbs
tag: Label to distinguish implementations (e.g. "tileops", "FA3", "fla")
"""
if isinstance(op_or_name, str):
name = op_or_name
op_module = None
op_config = None
else:
name = op_or_name.__class__.__name__
op_module = op_or_name.__class__.__module__
op_config = _extract_op_config(op_or_name)
# Filter params to only include serializable benchmark parameters.
# Tuples of primitives (e.g. ``shape=(4096, 4096)``) are preserved
# verbatim so the profile log carries the original input geometry
# rather than a flattened element count.
def _is_serializable(v: Any) -> bool:
if isinstance(v, (int, float, bool, str, torch.dtype)):
return True
if isinstance(v, tuple):
return all(_is_serializable(x) for x in v)
return False
filtered_params = {
k: v for k, v in params.items()
if k not in ("test", "bm", "op", "inputs", "result", "result_bl",
"baseline_fn", "tune")
and not k.startswith("_")
and _is_serializable(v)
}
record_entry = {
"params": filtered_params,
"result": result,
"tag": tag,
}
if op_config:
record_entry["config"] = op_config
BenchmarkReport._records.setdefault(name, []).append(record_entry)
# Accumulate in thread-local for conftest hook.
if not hasattr(_bench_results, "entries"):
_bench_results.entries = []
entry = {"tag": tag, "op": name, **result}
if op_module:
entry["op_module"] = op_module
_bench_results.entries.append(entry)
_logger.info("op=%s module=%s tag=%s latency_ms=%.4f tflops=%.2f",
name, op_module or "N/A", tag,
result.get("latency_ms", 0),
result.get("tflops", 0))
@staticmethod
def dump(path: str) -> None:
"""Write all collected results to a markdown-formatted log file."""
if not BenchmarkReport._records:
return
lines = [
"# TileOPs Benchmark Report",
f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"",
"## Environment",
"",
]
lines.extend(_get_env_metadata())
lines.append("")
result_keys = ["latency_ms", "tflops", "bandwidth_tbs"]
for name, entries in BenchmarkReport._records.items():
if not entries:
continue
lines.append(f"## {name}")
lines.append("")
# Group by tag
tag_entries = {}
for entry in entries:
tag_entries.setdefault(entry["tag"], []).append(entry)
for tag, tag_group in tag_entries.items():
lines.append(f"### {tag}")
lines.append("")
param_keys = list(tag_group[0]["params"].keys())
has_config = any("config" in e for e in tag_group)
header_parts = param_keys + result_keys
if has_config:
header_parts.append("config")
lines.append("| " + " | ".join(header_parts) + " |")
lines.append("| " + " | ".join(["---"] * len(header_parts)) + " |")
for entry in tag_group:
row = [str(entry["params"].get(k, "")) for k in param_keys]
for rk in result_keys:
val = entry["result"].get(rk)
row.append(f"{val:.4f}" if val is not None else "N/A")
if has_config:
cfg = entry.get("config")
row.append(str(cfg) if cfg else "")
lines.append("| " + " | ".join(row) + " |")
lines.append("")
with open(path, "w") as f:
f.write("\n".join(lines))
print(f"Benchmark report saved to {path}")
@staticmethod
def clear() -> None:
"""Clear all collected records."""
BenchmarkReport._records.clear()