Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion dimos/core/resource_monitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dimos.core.resource import Resource
from dimos.core.resource_monitor.stats import (
WorkerStats,
collect_children_stats,
collect_process_stats,
)
from dimos.utils.logging_config import setup_logger
Expand Down Expand Up @@ -110,8 +111,13 @@ def _collect_and_log(self) -> None:
pid = w.pid
if pid is not None:
ps = collect_process_stats(pid)
children = collect_children_stats(pid)
ps_dict = asdict(ps)
ps_dict["cpu_percent"] += sum(c.cpu_percent for c in children)
worker_stats.append(
WorkerStats(**asdict(ps), worker_id=w.worker_id, modules=w.module_names)
WorkerStats(
**ps_dict, worker_id=w.worker_id, modules=w.module_names, children=children
)
)
else:
worker_stats.append(
Expand Down
28 changes: 28 additions & 0 deletions dimos/core/resource_monitor/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,33 @@ def _collect_proc(proc: psutil.Process) -> ProcStats:
)


@dataclass(frozen=True)
class ChildProcessStats:
"""CPU stats for a single child process."""

pid: int
name: str
cpu_percent: float


def collect_children_stats(pid: int) -> list[ChildProcessStats]:
"""Return per-child CPU stats for all direct children of pid."""
result = []
try:
proc = _get_process(pid)
except (psutil.NoSuchProcess, psutil.AccessDenied):
return result
for child in proc.children(recursive=False):
try:
child_proc = _get_process(child.pid)
name = child_proc.name()
cpu = child_proc.cpu_percent(interval=None)
result.append(ChildProcessStats(pid=child.pid, name=name, cpu_percent=cpu))
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return result
Comment on lines +130 to +142
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably _get_process raises (psutil.NoSuchProcess, psutil.AccessDenied). Then surround just that function call with try-except. Nesting the try-excepts is confusing.



def collect_process_stats(pid: int) -> ProcessStats:
"""Collect resource stats for a single process by PID."""
try:
Expand All @@ -137,3 +164,4 @@ class WorkerStats(ProcessStats):

worker_id: int = -1
modules: list[str] = field(default_factory=list)
children: list[ChildProcessStats] = field(default_factory=list)
92 changes: 72 additions & 20 deletions dimos/utils/cli/dtop.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from __future__ import annotations

from collections import deque
import json
import threading
import time
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -119,6 +120,20 @@ def _fmt_io(v: float) -> str:
return f"{v / 1048576:.0f} MB"


def _cpu_metric(line: Text, cpu: float, stale: bool, cpu_hist: deque[float] | None = None) -> None:
"""Append a CPU label + value + sparkline/bar to an existing Text line."""
dim = "#606060"
line.append("CPU ", style=dim if stale else _LABEL_COLOR)
line.append(_fmt_pct(cpu), style=dim if stale else _heat(min(cpu / 100.0, 1.0)))
line.append(" ")
if stale:
line.append("░" * _SPARK_WIDTH, style=dim)
elif cpu_hist is not None and len(cpu_hist) > 0:
line.append_text(_spark(cpu_hist))
else:
line.append_text(_bar(cpu, 100))


_LINE1: list[tuple[str, str, Callable[[float], str]]] = [
("CPU", "cpu_percent", _fmt_pct),
("PSS", "pss", _fmt_mem),
Expand Down Expand Up @@ -176,9 +191,12 @@ class ResourceSpyApp(App[None]):

BINDINGS = [("q", "quit"), ("ctrl+c", "quit")]

def __init__(self, topic_name: str = "/dimos/resource_stats") -> None:
def __init__(
self, topic_name: str = "/dimos/resource_stats", log_path: str | None = None
) -> None:
super().__init__()
self._topic_name = topic_name
self._log_file = open(log_path, "a") if log_path else None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 File handle leak if __init__ raises after open()

_log_file is opened before autoconf, PickleLCM(), and subscribe(). If any of those subsequent calls throw, on_unmount is never called and the file handle is leaked. A try/except (or opening the file later, e.g. in on_mount) would prevent this.

# Warn about missing system config before entering TUI raw mode.
from dimos.protocol.service.lcmservice import autoconf

Expand All @@ -191,6 +209,7 @@ def __init__(self, topic_name: str = "/dimos/resource_stats") -> None:
self._latest: dict[str, Any] | None = None
self._last_msg_time: float = 0.0
self._cpu_history: dict[str, deque[float]] = {}
self._child_cpu_history: dict[int, deque[float]] = {}

def compose(self) -> ComposeResult:
with VerticalScroll():
Expand All @@ -201,11 +220,15 @@ def on_mount(self) -> None:

async def on_unmount(self) -> None:
self._lcm.stop()
if self._log_file:
self._log_file.close()

def _on_msg(self, msg: dict[str, Any], _topic: str) -> None:
with self._lock:
self._latest = msg
self._last_msg_time = time.monotonic()
if self._log_file:
self._log_file.write(json.dumps({"ts": time.time(), **msg}) + "\n")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Log file not flushed between writes

Each _on_msg call writes a line to _log_file but never calls flush(). Because Python's file I/O is buffered by default, lines written near a crash or SIGKILL will silently stay in the OS/Python buffer and never reach disk. Adding a flush() after the write ensures each message is durable.

Suggested change
self._log_file.write(json.dumps({"ts": time.time(), **msg}) + "\n")
self._log_file.write(json.dumps({"ts": time.time(), **msg}) + "\n")
self._log_file.flush()


def _refresh(self) -> None:
with self._lock:
Expand Down Expand Up @@ -266,6 +289,13 @@ def _refresh(self) -> None:
title.append(" ")
parts.append(Rule(title=title, style=border_style))
parts.extend(self._make_lines(d, stale, ranges, self._cpu_history[role]))
for child in d.get("children", []):
pid = child.get("pid", 0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 pid variable shadowed by inner loop

The outer for tuple unpacks pid as a string (worker pid for display), but this inner assignment overwrites it with an integer child pid. The outer pid is rebound at the start of each outer iteration so there's no runtime bug, but the shadowing is confusing and could easily introduce a bug if code is added between the inner loop and the next outer iteration.

Suggested change
pid = child.get("pid", 0)
child_pid = child.get("pid", 0)
if child_pid not in self._child_cpu_history:
self._child_cpu_history[child_pid] = deque(maxlen=_SPARK_WIDTH * 2)
if not stale:
self._child_cpu_history[child_pid].append(child.get("cpu_percent", 0.0))
parts.append(self._make_child_line(child, stale, self._child_cpu_history[child_pid]))

if pid not in self._child_cpu_history:
self._child_cpu_history[pid] = deque(maxlen=_SPARK_WIDTH * 2)
if not stale:
self._child_cpu_history[pid].append(child.get("cpu_percent", 0.0))
parts.append(self._make_child_line(child, stale, self._child_cpu_history[pid]))

# First entry title goes on the Panel itself
first_role, first_rs, _, first_mods, first_pid = entries[0]
Expand All @@ -285,6 +315,24 @@ def _refresh(self) -> None:
)
self.query_one("#panels", Static).update(panel)

@staticmethod
def _make_child_line(
child: dict[str, Any], stale: bool, cpu_hist: deque[float] | None = None
) -> Text:
dim = "#606060"
sep = " · "
sep_style = dim if stale else "#555555"
cpu = child.get("cpu_percent", 0.0)
pid = child.get("pid", "")
name = child.get("name", "?")
line = Text()
line.append(" ↳ ", style=sep_style)
line.append(f"{name}", style=dim if stale else _LABEL_COLOR)
line.append(f" [{pid}]", style=dim if stale else "#777777")
line.append(sep, style=sep_style)
_cpu_metric(line, cpu, stale, cpu_hist)
return line

@staticmethod
def _make_lines(
d: dict[str, Any],
Expand All @@ -304,24 +352,13 @@ def _make_lines(
for idx, (label, key, fmt) in enumerate(_LINE1):
val = d.get(key, 0)
lo, hi = ranges[key]
# CPU% uses absolute 0-100 scale; everything else is relative
if key == "cpu_percent":
val_style = dim if stale else _heat(min(val / 100.0, 1.0))
else:
val_style = dim if stale else _rel_style(val, lo, hi)
if idx > 0:
line1.append(sep, style=sep_style)
line1.append(f"{label} ", style=label1_style)
line1.append(fmt(val), style=val_style)
# CPU bar right after CPU%
if key == "cpu_percent":
line1.append(" ")
if stale:
line1.append("░" * _SPARK_WIDTH, style=dim)
elif cpu_hist is not None and len(cpu_hist) > 0:
line1.append_text(_spark(cpu_hist))
else:
line1.append_text(_bar(val, 100))
_cpu_metric(line1, val, stale, cpu_hist)
else:
line1.append(f"{label} ", style=label1_style)
line1.append(fmt(val), style=dim if stale else _rel_style(val, lo, hi))

# Line 2
line2 = Text()
Expand Down Expand Up @@ -456,17 +493,32 @@ def _preview() -> None:


def main() -> None:
import argparse
import sys

if "--preview" in sys.argv:
_preview()
return

topic = "/dimos/resource_stats"
if len(sys.argv) > 1 and sys.argv[1] == "--topic" and len(sys.argv) > 2:
topic = sys.argv[2]
parser = argparse.ArgumentParser(
prog="dtop", description="Live TUI for per-worker resource stats."
)
parser.add_argument(
"--topic", default="/dimos/resource_stats", help="LCM topic to subscribe to."
)
parser.add_argument(
"--log",
nargs="?",
const=f"dtop_{time.strftime('%Y%m%d_%H%M%S')}.ignore.jsonl",
metavar="PATH",
help="Log stats to a JSONL file. Uses a timestamped filename if no path is given.",
)
args = parser.parse_args()

if args.log:
print(f"Logging to {args.log}")

ResourceSpyApp(topic_name=topic).run()
ResourceSpyApp(topic_name=args.topic, log_path=args.log).run()


if __name__ == "__main__":
Expand Down
131 changes: 131 additions & 0 deletions dimos/utils/cli/dtop_plot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""dtop-plot — Plot resource stats from a dtop JSONL log file.

Usage:
dtop-plot <log.jsonl> [--metrics cpu_percent,pss] [--out plot.png]
"""

from __future__ import annotations

_COORDINATOR = "coordinator"

_METRIC_LABELS: dict[str, str] = {
"cpu_percent": "CPU %",
"pss": "PSS (MB)",
"num_threads": "Threads",
"num_children": "Children",
"num_fds": "File Descriptors",
"cpu_time_user": "User CPU Time (s)",
"cpu_time_system": "Sys CPU Time (s)",
"cpu_time_iowait": "IO Wait Time (s)",
"io_read_bytes": "IO Read (MB)",
"io_write_bytes": "IO Write (MB)",
}

_SCALE: dict[str, float] = {
"pss": 1 / 1048576,
"io_read_bytes": 1 / 1048576,
"io_write_bytes": 1 / 1048576,
}


def _load(path: str):
import pandas as pd

raw = pd.read_json(path, lines=True)

rows = []
for _, msg in raw.iterrows():
ts = msg["ts"]
rows.append({"ts": ts, "role": _COORDINATOR, **msg[_COORDINATOR]})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 KeyError on malformed log lines

msg[_COORDINATOR] raises KeyError if any line in the JSONL file is missing the "coordinator" key (e.g., a truncated line written during an unclean shutdown). Wrapping the row processing in a try/except KeyError and skipping bad rows would make the tool more robust.

Suggested change
rows.append({"ts": ts, "role": _COORDINATOR, **msg[_COORDINATOR]})
try:
ts = msg["ts"]
rows.append({"ts": ts, "role": _COORDINATOR, **msg[_COORDINATOR]})
except (KeyError, TypeError):
continue

for w in msg.get("workers", []):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 msg.get("workers", []) returns NaN, not [], on pandas null rows

pd.read_json(path, lines=True) creates a "workers" column for the whole DataFrame. If any log line is missing the "workers" key (e.g. a coordinator-only message from an older build, or a partially-written line), pandas fills that row with NaN. A pandas Series get(key, default) only falls back to default when the key is absent from the index — not when the value is NaN. So msg.get("workers", []) returns NaN for those rows, and for w in NaN raises TypeError: 'float' object is not iterable.

Use msg.get("workers") or [] to handle the NaN case:

        for w in msg.get("workers") or []:

wid = w.get("worker_id", 0)
rows.append({"ts": ts, "role": f"worker_{wid}", **w})

df = pd.DataFrame(rows)
df["ts"] = pd.to_datetime(df["ts"], unit="s")

labels: dict[str, str] = {_COORDINATOR: _COORDINATOR}
for role, group in df.groupby("role"):
if role == _COORDINATOR:
continue
mods = next((m for m in group.get("modules", []) if m), None)
labels[role] = ", ".join(mods) if mods else role

df["label"] = df["role"].map(labels)
return df, labels


def _plot(df, labels: dict[str, str], metrics: list[str], out: str, show: bool = False) -> None:
import matplotlib.pyplot as plt

fig, axes = plt.subplots(len(metrics), 1, figsize=(12, 3 * len(metrics)), sharex=True)
if len(metrics) == 1:
axes = [axes]

for ax, metric in zip(axes, metrics, strict=False):
if metric not in df.columns:
ax.set_visible(False)
continue
scale = _SCALE.get(metric, 1.0)
for role, group in df.groupby("role"):
ax.plot(group["ts"], group[metric] * scale, label=labels[role])
ax.set_ylabel(_METRIC_LABELS.get(metric, metric))
ax.legend(fontsize=8, loc="center left", bbox_to_anchor=(1.01, 0.5), borderaxespad=0)
ax.grid(True, alpha=0.3)

axes[-1].set_xlabel("Time")
fig.tight_layout()

fig.savefig(out, dpi=150, bbox_inches="tight")
print(f"Saved to {out}")
if show:
plt.show()


def _default_out(log_path: str) -> str:
base = log_path.removesuffix(".ignore.jsonl")
return f"{base}.ignore.png"


def main() -> None:
import argparse

parser = argparse.ArgumentParser(
prog="dtop-plot", description="Plot resource stats from a dtop JSONL log file."
)
parser.add_argument("log", metavar="LOG", help="Path to a dtop JSONL log file.")
parser.add_argument(
"--metrics",
default="cpu_percent,pss,num_threads",
help="Comma-separated list of metrics to plot (default: cpu_percent,pss,num_threads).",
)
parser.add_argument(
"--out", metavar="PATH", help="Output image path (default: <log>.ignore.png)."
)
parser.add_argument(
"--show", action="store_true", help="Open the plot interactively after saving."
)
args = parser.parse_args()

out = args.out or _default_out(args.log)
metrics = [m.strip() for m in args.metrics.split(",")]
df, labels = _load(args.log)
_plot(df, labels, metrics, out, args.show)


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ dimos = "dimos.robot.cli.dimos:main"
rerun-bridge = "dimos.visualization.rerun.bridge:app"
doclinks = "dimos.utils.docs.doclinks:main"
dtop = "dimos.utils.cli.dtop:main"
dtop-plot = "dimos.utils.cli.dtop_plot:main"

[project.urls]
Homepage = "https://dimensionalos.com"
Expand Down
Loading