Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
51eebeb
docs(watchdog): add acquisition watchdog design spec and implementati…
Alpaca233 Jun 23, 2026
95edd86
feat(slack): add dependency-free squid.slack.post_message sender
Alpaca233 Jun 23, 2026
044e40f
refactor(slack): route SlackNotifier sends through squid.slack
Alpaca233 Jun 23, 2026
9741fd9
feat(watchdog): add squid.acquisition_state breadcrumb schema + writer
Alpaca233 Jun 23, 2026
5f2539c
feat(watchdog): add config resolution + [SlackNotifications] loader
Alpaca233 Jun 23, 2026
648f07c
feat(watchdog): add Slack alert formatting
Alpaca233 Jun 24, 2026
a0dad7a
feat(watchdog): add poll/classify/dedup monitor
Alpaca233 Jun 24, 2026
0985a56
feat(watchdog): add CLI entry point
Alpaca233 Jun 24, 2026
a5e38bb
feat(watchdog): write acquisition start breadcrumb from the engine
Alpaca233 Jun 24, 2026
7cceeba
feat(watchdog): heartbeat + end-reason breadcrumb in acquisition worker
Alpaca233 Jun 24, 2026
ea615c4
feat(watchdog): notifier reports only clean finishes; watchdog owns p…
Alpaca233 Jun 25, 2026
54efda8
feat(watchdog): write an aborted breadcrumb when quitting mid-acquisi…
Alpaca233 Jun 25, 2026
c0f40e4
test(watchdog): end-to-end breadcrumb lifecycle in simulation
Alpaca233 Jun 25, 2026
93cc90b
docs(watchdog): add systemd + Windows service recipes and README
Alpaca233 Jun 25, 2026
72df6f6
docs(watchdog): replace Windows Task XML with a self-contained instal…
Alpaca233 Jun 25, 2026
811693d
docs(watchdog): note pythonw.exe PATH requirement for Windows install
Alpaca233 Jun 25, 2026
be8ba6a
fix(watchdog): read Slack credentials from cache/slack_settings.yaml …
Alpaca233 Jun 25, 2026
a816c60
refactor(watchdog): cleanup from /simplify review
Alpaca233 Jun 26, 2026
ccb7355
Merge remote-tracking branch 'origin/master' into acquisition-watchdog
Alpaca233 Jun 26, 2026
7186e26
fix(watchdog): warn if acquisition thread outlives the shutdown join
Alpaca233 Jun 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions software/acquisition_watchdog/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Acquisition Watchdog

Independent process that alerts (via Slack) when a Squid acquisition ends
prematurely — process crash/hang/kill, fatal error, or user abort. Covers runs
launched from the GUI and from the MCP control server.

## How it works
The Squid GUI writes a `run.json` breadcrumb (start / throttled heartbeat / end)
into a shared state dir. This watchdog polls it and posts one Slack alert when a
run dies, hangs, or ends with a non-clean reason. Clean completions are silent.

## Run it
cd software
python3 -m acquisition_watchdog

Options: `--slack-settings`, `--state-dir`, `--poll-interval` (5s), `--heartbeat-timeout` (120s), `--once`.

Slack credentials are read from `cache/slack_settings.yaml` — the same file the GUI's
Slack settings dialog writes (keys `bot_token`, `channel_id`, `enabled`). Run the
watchdog from the `software/` directory (so the default `cache/slack_settings.yaml`
path resolves), or pass `--slack-settings <path>`. To disable watchdog alerts on a
machine without disabling the GUI's notifications, add `watchdog_enabled: false` to
that YAML.

## Install as an always-on service
- **Linux:** see `systemd/squid-acquisition-watchdog.service` (header has steps).
- **Windows:** run `windows/install.ps1` (registers a logon-triggered task). Ensure
`pythonw.exe` is on `PATH`, or edit the `-Execute` value in `install.ps1` to the full
Python path.

## State dir
Defaults to `platformdirs.user_state_path("squid","cephla")/watchdog`. Override with
`SQUID_WATCHDOG_STATE_DIR` (must match the GUI's environment) or `--state-dir`.

## Remote / power-loss coverage (future)
Point `--state-dir` at a shared/synced mount on another host and run this process
there. Per-machine `run-<machine>.json` naming and clock-skew tolerance are needed
first (see the design spec, "Future work").
Empty file.
50 changes: 50 additions & 0 deletions software/acquisition_watchdog/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# acquisition_watchdog/__main__.py
"""CLI entry point: python -m acquisition_watchdog"""
import argparse
import time
from pathlib import Path
from typing import Optional, Sequence

import squid.logging
from acquisition_watchdog.monitor import Monitor


def main(argv: Optional[Sequence[str]] = None) -> None:
parser = argparse.ArgumentParser(
prog="acquisition_watchdog",
description="Alert on prematurely-ended Squid acquisitions (crash/hang/abort/error).",
)
parser.add_argument(
"--slack-settings",
help="Path to the Slack settings YAML (defaults to ./cache/slack_settings.yaml, "
"the same file the GUI writes).",
)
parser.add_argument("--state-dir", help="Override the watchdog state directory.")
parser.add_argument("--poll-interval", type=float, default=5.0, help="Seconds between checks (default 5).")
parser.add_argument(
"--heartbeat-timeout",
type=float,
default=120.0,
help="Seconds of heartbeat silence (with a live PID) before declaring a hang (default 120).",
)
parser.add_argument("--once", action="store_true", help="Run a single check and exit.")
args = parser.parse_args(argv)

log = squid.logging.get_logger("acquisition_watchdog")
monitor = Monitor(
state_dir=Path(args.state_dir) if args.state_dir else None,
slack_settings=args.slack_settings,
poll_interval=args.poll_interval,
heartbeat_timeout=args.heartbeat_timeout,
)
if args.once:
monitor.check_once(time.time())
else:
try:
monitor.run_forever()
except KeyboardInterrupt:
log.info("Acquisition watchdog stopped.")


if __name__ == "__main__":
main()
49 changes: 49 additions & 0 deletions software/acquisition_watchdog/alerts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# acquisition_watchdog/alerts.py
"""Format watchdog Slack alerts (text + Block Kit blocks)."""
from datetime import datetime, timezone
from typing import Optional, Tuple

_KIND_TITLE = {
"crash": ":red_circle: Acquisition process died",
"hang": ":large_orange_circle: Acquisition hung (no heartbeat)",
"error": ":red_circle: Acquisition ended with a fatal error",
"completed_with_errors": ":large_orange_circle: Acquisition finished with errors",
"user_abort": ":large_yellow_circle: Acquisition aborted",
}


def _fmt_ts(epoch: Optional[float]) -> str:
if not epoch:
return "unknown"
return datetime.fromtimestamp(epoch, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")


def _progress_line(run: dict) -> str:
prog = run.get("progress") or {}
expected = run.get("expected") or {}
tp = prog.get("timepoint", "?")
exp_tp = prog.get("expected_timepoints", expected.get("timepoints", "?"))
images = prog.get("images", "?")
return f"timepoint {tp}/{exp_tp}, {images} images"


def format_alert(kind: str, run: dict) -> Tuple[str, list]:
title = _KIND_TITLE.get(kind, f"Acquisition alert: {kind}")
experiment = run.get("experiment_id", "unknown")
machine = run.get("machine", "unknown")
text = f"{title}: {experiment} on {machine}"

last_seen = run.get("ended_at") or run.get("heartbeat_at")
detail = (
f"*Experiment:* {experiment}\n"
f"*Machine:* {machine}\n"
f"*Progress:* {_progress_line(run)}\n"
f"*Started:* {_fmt_ts(run.get('started_at'))}\n"
f"*Last seen:* {_fmt_ts(last_seen)}\n"
f"*Output:* {run.get('output_path', 'unknown')}"
)
blocks = [
{"type": "section", "text": {"type": "mrkdwn", "text": f"*{title}*"}},
{"type": "section", "text": {"type": "mrkdwn", "text": detail}},
]
return text, blocks
51 changes: 51 additions & 0 deletions software/acquisition_watchdog/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# acquisition_watchdog/config.py
"""Load Slack credentials from the same source the Squid GUI uses.

The GUI stores Slack settings (bot token, channel, enabled) in
`cache/slack_settings.yaml` (written by the Slack settings dialog and loaded at
GUI startup via control.widgets_slack.load_slack_settings_from_cache). This module
reads that same YAML so the watchdog alerts to the same workspace — without
importing the heavy control stack.
"""
import os
from pathlib import Path
from typing import NamedTuple, Optional

import yaml


class SlackConfig(NamedTuple):
bot_token: Optional[str]
channel_id: Optional[str]
watchdog_enabled: bool


DEFAULT_SLACK_SETTINGS = "cache/slack_settings.yaml"


def resolve_slack_settings_path(cli_path: Optional[str]) -> Path:
"""Priority: --slack-settings > $SQUID_SLACK_SETTINGS > cache/slack_settings.yaml (cwd-relative)."""
if cli_path:
return Path(cli_path)
env = os.environ.get("SQUID_SLACK_SETTINGS")
if env:
return Path(env)
return Path(DEFAULT_SLACK_SETTINGS)


def load_slack_config(path: Optional[Path]) -> SlackConfig:
p = Path(path) if path else Path(DEFAULT_SLACK_SETTINGS)
if not p.exists():
return SlackConfig(None, None, True)
try:
with open(p) as f:
data = yaml.safe_load(f) or {}
except Exception:
return SlackConfig(None, None, True)
if not isinstance(data, dict):
return SlackConfig(None, None, True)
return SlackConfig(
bot_token=(data.get("bot_token") or None),
channel_id=(data.get("channel_id") or None),
watchdog_enabled=bool(data.get("watchdog_enabled", True)),
)
Comment on lines +47 to +51
124 changes: 124 additions & 0 deletions software/acquisition_watchdog/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# acquisition_watchdog/monitor.py
"""Poll the acquisition run-state and alert on premature ends."""
import json
import os
import time
from pathlib import Path
from typing import Optional, Set

import squid.acquisition_state as acquisition_state
import squid.logging
import squid.slack
from acquisition_watchdog import alerts, config

_log = squid.logging.get_logger("acquisition_watchdog")

ALERT_REASONS = {"completed_with_errors", "error", "user_abort"}


def pid_alive(pid: Optional[int]) -> bool:
if not pid:
return False
try:
import psutil

return psutil.pid_exists(pid)
except ImportError:
pass
if os.name == "posix":
try:
os.kill(pid, 0)
return True
except ProcessLookupError:
return False
except PermissionError:
return True
except OSError:
return False
# Windows without psutil: cannot check reliably; rely on the heartbeat instead.
return True


class Monitor:
def __init__(
self,
state_dir: Optional[Path] = None,
slack_settings: Optional[str] = None,
poll_interval: float = 5.0,
heartbeat_timeout: float = 120.0,
):
self._state_dir = Path(state_dir) if state_dir else None
self._slack_settings = slack_settings
self._poll = poll_interval
self._timeout = heartbeat_timeout
self._base = self._state_dir or acquisition_state.default_state_dir()
self._alerted_path = self._base / "alerted.json"
self._alerted = self._load_alerted()

def _load_alerted(self) -> Set[str]:
try:
with open(self._alerted_path) as f:
return set(json.load(f))
except (FileNotFoundError, json.JSONDecodeError):
return set()

def _save_alerted(self) -> None:
try:
self._alerted_path.parent.mkdir(parents=True, exist_ok=True)
with open(self._alerted_path, "w") as f:
json.dump(sorted(self._alerted), f)
except OSError as e:
_log.warning(f"Could not persist alerted set: {e}")

def classify(self, run: Optional[dict], now: float) -> Optional[str]:
"""Return an alert kind ('crash'|'hang'|<reason>) or None."""
if not run or run.get("run_id") in self._alerted:
return None
status = run.get("status")
if status == "running":
if not pid_alive(run.get("pid")):
return "crash"
if (now - (run.get("heartbeat_at") or 0)) > self._timeout:
return "hang"
return None
if status == "ended" and run.get("reason") in ALERT_REASONS:
return run["reason"]
return None

def check_once(self, now: float) -> None:
run = acquisition_state.read_run(self._state_dir)
kind = self.classify(run, now)
if kind is None:
return

cfg_path = config.resolve_slack_settings_path(self._slack_settings)
slack_cfg = config.load_slack_config(cfg_path)
if not (slack_cfg.bot_token and slack_cfg.channel_id and slack_cfg.watchdog_enabled):
_log.warning(
f"Premature end ({kind}) for run_id={run.get('run_id')} but Slack is not "
f"configured/enabled; not alerting."
)
self._mark_alerted(run["run_id"])
return

text, blocks = alerts.format_alert(kind, run)
ok, _ = squid.slack.post_message(slack_cfg.bot_token, slack_cfg.channel_id, text, blocks)
if ok:
_log.info(f"Sent watchdog alert ({kind}) for run_id={run.get('run_id')}")
self._mark_alerted(run["run_id"])
else:
# Leave unmarked so a transient Slack failure retries on the next poll.
_log.warning(f"Failed to send watchdog alert ({kind}) for run_id={run.get('run_id')}; will retry")

def _mark_alerted(self, run_id: str) -> None:
self._alerted.add(run_id)
self._save_alerted()

def run_forever(self) -> None:
_log.info(f"Acquisition watchdog started. state_dir={self._base} heartbeat_timeout={self._timeout}s")
while True:
try:
self.check_once(time.time())
except Exception as e:
_log.exception(f"Watchdog poll error: {e}")
time.sleep(self._poll)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Install (per user):
# mkdir -p ~/.config/systemd/user
# cp acquisition_watchdog/systemd/squid-acquisition-watchdog.service ~/.config/systemd/user/
# # edit WorkingDirectory below to match this machine, then:
# systemctl --user daemon-reload
# systemctl --user enable --now squid-acquisition-watchdog
[Unit]
Description=Squid acquisition watchdog (alerts on prematurely-ended acquisitions)
After=default.target

[Service]
Type=simple
WorkingDirectory=%h/Squid/software
ExecStart=/usr/bin/python3 -m acquisition_watchdog
Restart=always
RestartSec=5

[Install]
WantedBy=default.target
34 changes: 34 additions & 0 deletions software/acquisition_watchdog/windows/install.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Run in PowerShell (as the user who runs the Squid GUI), from software\ :
# .\acquisition_watchdog\windows\install.ps1
# Registers a logon-triggered scheduled task that runs the acquisition watchdog.
# Edit $workingDir below if your install path differs.
$ErrorActionPreference = "Stop"

$taskName = "SquidAcquisitionWatchdog"
$workingDir = "C:\Squid\software"

$action = New-ScheduledTaskAction `
-Execute "pythonw.exe" `
-Argument "-m acquisition_watchdog" `
-WorkingDirectory $workingDir

$trigger = New-ScheduledTaskTrigger -AtLogOn

$settings = New-ScheduledTaskSettingsSet `
-MultipleInstances IgnoreNew `
-RestartInterval (New-TimeSpan -Minutes 1) `
-RestartCount 999 `
-ExecutionTimeLimit (New-TimeSpan -Seconds 0) `
-AllowStartIfOnBatteries `
-DontStopIfGoingOnBatteries

Write-Host "Registering scheduled task '$taskName'..."
Register-ScheduledTask `
-TaskName $taskName `
-Action $action `
-Trigger $trigger `
-Settings $settings `
-Description "Squid acquisition watchdog (alerts on prematurely-ended acquisitions)" `
-Force

Write-Host "Done. It starts at next logon. Run now with: Start-ScheduledTask -TaskName $taskName"
Loading
Loading