Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gcm/exporters/graph_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(
job_scribe_category: Optional[str] = None,
statvfs_scribe_category: Optional[str] = None,
pure_scribe_category: Optional[str] = None,
sdiag_scribe_category: Optional[str] = None,
ods_entity: Optional[str | int] = None,
scribe_write: ScribeWrite = write_messages,
):
Expand All @@ -65,6 +66,7 @@ def __init__(
DataIdentifier.NODE: node_scribe_category,
DataIdentifier.STATVFS: statvfs_scribe_category,
DataIdentifier.PURE: pure_scribe_category,
DataIdentifier.SDIAG: sdiag_scribe_category,
}
self.scribe_write = scribe_write
if ods_entity is None:
Expand Down
10 changes: 9 additions & 1 deletion gcm/exporters/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,15 @@ def __init__(
self._logger_provider = otel_log_init(
log_resource_attributes, endpoint + "/v1/logs", timeout
)
self.otel_logger = logging.getLogger("gcm")
# Use a dedicated, isolated logger for sink emits. Attaching the
# LoggingHandler to "gcm" makes EVERY `gcm.*` log record (e.g.
# `gcm.monitoring.dataclass_utils.logger.warning(...)`) propagate up
# and fire the otel handler, polluting the target Scuba table with
# null-data log rows. The leaf logger name + propagate=False keeps
# only explicit `self.otel_logger.info("", extra=...)` emits in the
# otel pipeline.
self.otel_logger = logging.getLogger("_gcm_otel_emit")
self.otel_logger.propagate = False

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

polluting the target Scuba table with
# null-data log rows.

the row gets published to scuba but with empty fields? do you have a sample?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — concrete sample. Two tables affected, the second one is the dramatic case.

fair_sdiag sample row (one per dataclass_utils.warning("Missing field_name=...") per slurm_monitor cycle):

time=2026-05-17 16:14:41 PDT  cluster=null  derived_cluster=null
schedule_cycle_last=null  bf_active=null  ...all 70 cols null...
code.file.path=gcm/monitoring/dataclass_utils.py
code.function.name=instantiate_dataclass

~80 such null rows in last 24h. Scuba: https://fburl.com/scuba/fair_sdiag/oj8u9g16

fair_sacct_running is the dramatic case: 637,929 of 713,988 rows over the last 24h (89.3%) are null-data leakage. Most of those (574,000) attribute to gcm/exporters/otel.py:_write_log itself — the handler was recursing on log records emitted by its own emit path (_write_log logs at gcm.exporters.otel → propagates up to "gcm" parent → re-fires the handler → emits another row → which logs again → snowball). sacct_running runs hourly with large batches so each invocation generates 26-38k junk rows.

Full counts after running this query across all otel-targeted Scuba tables:

Dataset Total 24h Null-cluster leaked %
fair_sacct_running 713,988 637,929 89.3%
fair_sdiag 200 80 40%
fair_sacct 10,360,194 0 0%
fair_scontrol_data 74 0 0%
fair_scontrol_config 68 0 0%
fair_sacctmgr_qos 639 0 0%
fair_sacctmgr_user 517,754 0 0%

Only the first two trigger gcm-level logging during their otel write path; the other four don't, so the bug never fires on them. The fix (logger = "_gcm_otel_emit", propagate=False) removes the entire class of self-amplification.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here: https://fburl.com/scuba/fair_sacct_running/eqt1fnfy

that query is Cluster vs cluster naming diff. there's a few nulls on both but with other fields populated, I'm not confident this fixes the root of this issue: https://fburl.com/scuba/fair_sacct_running/28owvuw2

fair_sdiag

there's a few rows here where cluster=null and Body gets populated, I think your change fixes this: https://fburl.com/scuba/fair_sdiag/q9a0ou8f

add some tests:

def test_unrelated_gcm_logger_does_not_emit_to_otel():
    sink = OtelSink(...)
    other = logging.getLogger("gcm.dataclass_utils")
    other.warning("Missing field_name=foo")
    assert sink._scuba_writes == []
    
def test_gcm_logger_emits_to_otel():
    sink = OtelSink(...)
    sink.write(<data> ...)
    assert sink._scuba_writes == [<data> ...]

otel_handler = LoggingHandler(
level=logging._nameToLevel["INFO"], logger_provider=self._logger_provider
)
Expand Down
77 changes: 73 additions & 4 deletions gcm/monitoring/cli/slurm_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# All rights reserved.
import logging
from collections import defaultdict
from dataclasses import asdict, dataclass, field
from dataclasses import asdict, dataclass, field, replace
from datetime import timedelta
from typing import (
Collection,
Expand Down Expand Up @@ -40,9 +40,14 @@
ClockImpl,
unixtime_to_pacific_datetime,
)
from gcm.monitoring.sink.protocol import DataType, SinkAdditionalParams, SinkImpl
from gcm.monitoring.sink.protocol import (
DataIdentifier,
DataType,
SinkAdditionalParams,
SinkImpl,
)
from gcm.monitoring.sink.utils import Factory, HasRegistry
from gcm.monitoring.slurm.client import SlurmCliClient
from gcm.monitoring.slurm.client import SlurmCliClient, SlurmClient
from gcm.monitoring.slurm.constants import RUNNING_JOB_STATES
from gcm.monitoring.slurm.derived_cluster import get_derived_cluster
from gcm.monitoring.slurm.sacct import parse_slurm_jobs
Expand Down Expand Up @@ -242,11 +247,22 @@ def clock(self) -> Clock: ...

def cluster(self) -> str: ...

@property
def slurm_client(self) -> SlurmClient: ...

# Per-cycle cache populated by collect_slurm and consumed by collect_sdiag
# so the sdiag scrape is shared (no double-scrape, no reset race).
last_sdiag: Optional[Sdiag]


@dataclass
class CliObjectImpl:
clock: Clock = field(default_factory=ClockImpl)
slurm_client: SlurmClient = field(default_factory=SlurmCliClient)
registry: Mapping[str, Factory[SinkImpl]] = field(default_factory=lambda: registry)
# Set by collect_slurm at the start of each cycle, read by collect_sdiag.
# Single-cluster per process, so no per-cluster keying needed.
last_sdiag: Optional[Sdiag] = None

def cluster(self) -> str:
return clusterscope.cluster()
Expand All @@ -265,10 +281,18 @@ def collect_slurm(
"""Collect all the relevant slurm metrics that will be stored on ODS"""
end_time = unixtime_to_pacific_datetime(obj.clock.unixtime())
start_time = end_time - timedelta(seconds=interval)
slurm_client = SlurmCliClient()
slurm_client = obj.slurm_client

# Invalidate the cache up-front so a scrape failure below cannot let
# collect_sdiag re-publish a stale prior-cycle sdiag as a fresh row.
obj.last_sdiag = None

sinfo = slurm_client.sinfo_structured()
sdiag = slurm_client.sdiag_structured()
# Stash for collect_sdiag (same cycle, runs after this task). sdiag is a
# cluster-wide slurmctld stat -- one value per scrape, no per-partition
# variation -- so a single attr is sufficient.
obj.last_sdiag = sdiag

if heterogeneous_cluster_v1:
nodes_per_partition: dict[str, list[SinfoNode]] = defaultdict(list)
Expand Down Expand Up @@ -309,6 +333,35 @@ def collect_slurm(
yield from slurm_log


def collect_sdiag(
obj: CliObject,
cluster: str,
logger: logging.Logger,
) -> Generator[Sdiag, None, None]:
"""Project the most recent sdiag scrape (cached on the CliObject by
`collect_slurm`) to an Sdiag row for the Scuba `perfpipe_fair_sdiag_v2` dataset.

Single sdiag scrape per cycle is shared with `collect_slurm` -- no
double-scrape, no reset-counter race. sdiag is a cluster-wide slurmctld
stat so we yield exactly one Sdiag per cycle (not one per partition);
`derived_cluster` is intentionally omitted -- tagging a cluster-wide
stat with per-partition derived_cluster values would create misleading
duplicate rows for the same scrape.

Prerequisite: `collect_slurm` must have run earlier in the same cycle to
populate `obj.last_sdiag`. If `collect_slurm` failed or hasn't run, this
collector no-ops (yields zero rows).
"""
sdiag = obj.last_sdiag
if sdiag is None:
# collect_slurm hasn't populated the cache yet (first cycle race or
# collect_slurm raised). Skip rather than crash the loop -- the next
# cycle will recover.
logger.debug("collect_sdiag: no cached sdiag scrape available, skipping")
return
yield replace(sdiag, cluster=cluster)


@click_default_cmd(
context_settings={
"obj": _default_obj,
Expand Down Expand Up @@ -356,6 +409,11 @@ def collect_slurm_callable(
heterogeneous_cluster_v1=heterogeneous_cluster_v1,
)

def collect_sdiag_callable(
cluster: str, interval: int, logger: logging.Logger
) -> Generator[Sdiag, None, None]:
return collect_sdiag(obj=obj, cluster=cluster, logger=logger)

run_data_collection_loop(
logger_name=LOGGER_NAME,
log_folder=log_folder,
Expand All @@ -366,13 +424,24 @@ def collect_slurm_callable(
once=once,
interval=interval,
data_collection_tasks=[
# Task A: SLURMLog -> METRIC -> ODS via graph_api._write_metric.
(
collect_slurm_callable,
SinkAdditionalParams(
data_type=DataType.METRIC,
heterogeneous_cluster_v1=heterogeneous_cluster_v1,
),
),
# Task B: Sdiag -> LOG -> scribe (perfpipe_fair_sdiag_v2) ->
# Scuba (perfpipe_fair_sdiag_v2) via graph_api._write_log. Reads cached
# sdiag from Task A.
(
collect_sdiag_callable,
SinkAdditionalParams(
data_type=DataType.LOG,
data_identifier=DataIdentifier.SDIAG,
),
),
],
sink=sink,
sink_opts=sink_opts,
Expand Down
4 changes: 3 additions & 1 deletion gcm/monitoring/meta_utils/scribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,10 @@ def try_write_logs(
num_failed = sum(not ack for ack in acks)
if num_failed == 0:
return
unique_codes = sorted({code for code in response_codes.values() if code != "OK"})
raise ScribeErrorWithAcks(
f"Failed to write {num_failed}/{len(logs)} messages", acks=acks
f"Failed to write {num_failed}/{len(logs)} messages; reject codes: {unique_codes}",
acks=acks,
)


Expand Down
1 change: 1 addition & 0 deletions gcm/monitoring/sink/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class DataIdentifier(Enum):
GENERIC = auto()
K8S_POD = auto()
K8S_NODE = auto()
SDIAG = auto()


@dataclass
Expand Down
102 changes: 94 additions & 8 deletions gcm/monitoring/slurm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import (
Any,
Callable,
cast,
Generator,
Hashable,
Iterable,
Expand Down Expand Up @@ -202,47 +203,122 @@ def sdiag_structured(self) -> Sdiag:
subprocess.check_output(["sdiag", "--all", "--json"], text=True)
)
stats = sdiag_output["statistics"]

# Extract nested objects (use `or {}` to handle both missing keys
# and explicit None values from sdiag).
schedule_exit = stats.get("schedule_exit") or {}
bf_exit = stats.get("bf_exit") or {}

# Extract timestamp fields (they have {set, infinite, number} structure).
# Use `or {}` to handle both missing keys AND explicit `null` values
# from sdiag JSON; `.get(key, {})` alone returns None when the key
# is present but null, and `.get("set")` then raises AttributeError.
req_time_obj = stats.get("req_time") or {}
req_time_start_obj = stats.get("req_time_start") or {}
job_states_ts_obj = stats.get("job_states_ts") or {}
bf_when_last_cycle_obj = stats.get("bf_when_last_cycle") or {}

# Serialize RPCs to JSON strings
rpcs_by_message_type = stats.get("rpcs_by_message_type", [])
rpcs_by_user = stats.get("rpcs_by_user", [])

result = Sdiag(
# Required fields
server_thread_count=stats.get("server_thread_count"),
agent_queue_size=stats.get("agent_queue_size"),
agent_count=stats.get("agent_count"),
agent_thread_count=stats.get("agent_thread_count"),
dbd_agent_queue_size=stats.get("dbd_agent_queue_size"),
# Schedule cycle
schedule_cycle_max=stats.get("schedule_cycle_max"),
schedule_cycle_mean=stats.get("schedule_cycle_mean"),
schedule_cycle_sum=stats.get("schedule_cycle_sum"),
schedule_cycle_total=stats.get("schedule_cycle_total"),
schedule_cycle_per_minute=stats.get("schedule_cycle_per_minute"),
schedule_queue_length=stats.get("schedule_queue_length"),
schedule_cycle_last=stats.get("schedule_cycle_last"),
schedule_cycle_mean_depth=stats.get("schedule_cycle_mean_depth"),
schedule_cycle_depth=stats.get("schedule_cycle_depth"),
# Schedule exit
schedule_exit_end_job_queue=schedule_exit.get("end_job_queue"),
schedule_exit_default_queue_depth=schedule_exit.get(
"default_queue_depth"
),
schedule_exit_max_job_start=schedule_exit.get("max_job_start"),
schedule_exit_max_rpc_cnt=schedule_exit.get("max_rpc_cnt"),
schedule_exit_max_sched_time=schedule_exit.get("max_sched_time"),
schedule_exit_licenses=schedule_exit.get("licenses"),
# Job stats
sdiag_jobs_submitted=stats.get("jobs_submitted"),
sdiag_jobs_started=stats.get("jobs_started"),
sdiag_jobs_completed=stats.get("jobs_completed"),
sdiag_jobs_canceled=stats.get("jobs_canceled"),
sdiag_jobs_failed=stats.get("jobs_failed"),
sdiag_jobs_pending=stats.get("jobs_pending"),
sdiag_jobs_running=stats.get("jobs_running"),
# Backfill stats
bf_backfilled_jobs=stats.get("bf_backfilled_jobs"),
bf_last_backfilled_jobs=stats.get("bf_last_backfilled_jobs"),
bf_backfilled_het_jobs=stats.get("bf_backfilled_het_jobs"),
bf_cycle_counter=stats.get("bf_cycle_counter"),
bf_cycle_mean=stats.get("bf_cycle_mean"),
bf_cycle_sum=stats.get("bf_cycle_sum"),
bf_cycle_max=stats.get("bf_cycle_max"),
bf_cycle_last=stats.get("bf_cycle_last"),
bf_depth_mean=stats.get("bf_depth_mean"),
bf_depth_mean_try=stats.get("bf_depth_mean_try"),
bf_depth_sum=stats.get("bf_depth_sum"),
bf_depth_try_sum=stats.get("bf_depth_try_sum"),
bf_last_depth=stats.get("bf_last_depth"),
bf_last_depth_try=stats.get("bf_last_depth_try"),
bf_queue_len=stats.get("bf_queue_len"),
schedule_exit_end_job_queue=schedule_exit.get("end_job_queue"),
schedule_exit_default_queue_depth=schedule_exit.get(
"default_queue_depth"
bf_queue_len_mean=stats.get("bf_queue_len_mean"),
bf_queue_len_sum=stats.get("bf_queue_len_sum"),
bf_table_size=stats.get("bf_table_size"),
bf_table_size_sum=stats.get("bf_table_size_sum"),
bf_table_size_mean=stats.get("bf_table_size_mean"),
bf_when_last_cycle=(
bf_when_last_cycle_obj.get("number")
if bf_when_last_cycle_obj.get("set")
else None
),
schedule_exit_max_job_start=schedule_exit.get("max_job_start"),
schedule_exit_max_rpc_cnt=schedule_exit.get("max_rpc_cnt"),
schedule_exit_max_sched_time=schedule_exit.get("max_sched_time"),
schedule_exit_licenses=schedule_exit.get("licenses"),
bf_active=(
bool(stats.get("bf_active"))
if stats.get("bf_active") is not None
else None
),
# Backfill exit
bf_exit_end_job_queue=bf_exit.get("end_job_queue"),
bf_exit_max_job_start=bf_exit.get("bf_max_job_start"),
bf_exit_max_job_test=bf_exit.get("bf_max_job_test"),
bf_exit_max_time=bf_exit.get("bf_max_time"),
bf_exit_node_space_size=bf_exit.get("bf_node_space_size"),
bf_exit_state_changed=bf_exit.get("state_changed"),
# Timing
req_time=(
req_time_obj.get("number") if req_time_obj.get("set") else None
),
req_time_start=(
req_time_start_obj.get("number")
if req_time_start_obj.get("set")
else None
),
gettimeofday_latency=stats.get("gettimeofday_latency"),
job_states_ts=(
job_states_ts_obj.get("number")
if job_states_ts_obj.get("set")
else None
),
parts_packed=stats.get("parts_packed"),
# JSON blobs
rpcs_by_message_type_json=(
json.dumps(rpcs_by_message_type)
if rpcs_by_message_type is not None
else "[]"
),
rpcs_by_user_json=(
json.dumps(rpcs_by_user) if rpcs_by_user is not None else "[]"
),
)

# Reset sdiag counters after collection
Expand All @@ -258,6 +334,9 @@ def sdiag_structured(self) -> Sdiag:
"Agent thread count:": "agent_thread_count",
"DBD Agent queue size:": "dbd_agent_queue_size",
}
# Legacy (slurm < 23.2) text-parse path only emits int/None values;
# the new JSON-blob and string-typed Sdiag fields are only populated
# via the slurm >= 23.2 JSON branch above. Keep the type narrow.
data: dict[str, Optional[int]] = {
"server_thread_count": 0,
"agent_queue_size": 0,
Expand Down Expand Up @@ -299,10 +378,17 @@ def sdiag_structured(self) -> Sdiag:
else:
data[name] = None

# Cast to Any for the splat: mypy cannot reconcile dict[str, int|None]
# values against Sdiag's mixed int|bool|str|None fields without per-key
# checks. At runtime this path only emits int/None values; non-int
# Sdiag fields (bf_active, rpcs_by_*_json) fall back to their None
# defaults.
result = Sdiag(**cast(dict[str, Any], data))

# Reset sdiag counters after collection
self._reset_sdiag_counters()

return Sdiag(**data)
return result

def _reset_sdiag_counters(self) -> None:
"""Reset sdiag counters after collection.
Expand Down
1 change: 1 addition & 0 deletions gcm/monitoring/utils/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def run_data_collection_loop(
run_st_time = clock.monotonic()
log_time = clock.unixtime()

# Task ordering matters for some collections. If you wish to update this assumption please create a new `run_data_collection_loop` function
for get_data, additional_params in data_collection_tasks:
logger.debug("will try getting data for %s", logger_name)
data = get_data(
Expand Down
Loading
Loading