Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions software/control/core/multi_point_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from squid.abc import CameraFrame

if TYPE_CHECKING:
from control.core.qc import FOVMetrics, PolicyDecision
from control.slack_notifier import TimepointStats, AcquisitionStats


Expand Down Expand Up @@ -125,3 +126,6 @@ class MultiPointControllerFunctions:
# Zarr frame written callback - called when subprocess completes writing a frame
# Args: (fov, time_point, z_index, channel_name, region_idx)
signal_zarr_frame_written: Callable[[int, int, int, str, int], None] = lambda *a, **kw: None
# QC callbacks
signal_qc_metrics_updated: Callable[["FOVMetrics"], None] = lambda *a, **kw: None
signal_qc_policy_decision: Callable[["PolicyDecision"], None] = lambda *a, **kw: None
69 changes: 67 additions & 2 deletions software/control/core/multi_point_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
ensure_plate_resolution_in_well_resolutions,
)
from control.core.backpressure import BackpressureController, BackpressureValues
from control.core.qc import QCConfig, QCJob, QCPolicy, QCPolicyConfig, QCResult, TimepointMetricsStore
from squid.config import CameraPixelFormat

# Module-level logger for static methods
Expand Down Expand Up @@ -86,6 +87,8 @@ def __init__(
slack_notifier=None,
prewarmed_job_runner: Optional[JobRunner] = None,
prewarmed_bp_values: Optional["BackpressureValues"] = None,
qc_config: Optional[QCConfig] = None,
qc_policy_config: Optional[QCPolicyConfig] = None,
):
self._log = squid.logging.get_logger(__class__.__name__)
self._timing = utils.TimingManager("MultiPointWorker Timer Manager")
Expand Down Expand Up @@ -161,6 +164,10 @@ def __init__(
self.num_fovs = 0
self.total_scans = 0
self._last_time_point_z_pos = {}
self._qc_config = qc_config or QCConfig()
self._qc_policy_config = qc_policy_config or QCPolicyConfig()
self._qc_policy = QCPolicy(self._qc_policy_config) if self._qc_policy_config.enabled else None
self._metrics_store: Optional[TimepointMetricsStore] = None
self.scan_region_fov_coords_mm = (
acquisition_parameters.scan_position_information.scan_region_fov_coords_mm.copy()
)
Expand Down Expand Up @@ -370,6 +377,10 @@ def __init__(
# Subprocess starts warming up in background - don't block here

self._job_runners.append((job_class, job_runner))

if self._qc_config.enabled:
self._job_runners.append((QCJob, None))
Comment on lines +385 to +386
Comment on lines +385 to +386

self._abort_on_failed_job = abort_on_failed_jobs
self._first_job_dispatched = False # Track if we've waited for subprocess warmup

Expand Down Expand Up @@ -644,6 +655,8 @@ def run_single_time_point(self):
self._timepoint_fov_count = 0
self._laser_af_successes = 0
self._laser_af_failures = 0
if self._qc_config.enabled:
self._metrics_store = TimepointMetricsStore(timepoint_index=self.time_point)
self.microcontroller.enable_joystick(False)

self._log.debug("multipoint acquisition - time point " + str(self.time_point + 1))
Expand All @@ -663,6 +676,19 @@ def run_single_time_point(self):
with self._timing.get_timer("run_coordinate_acquisition"):
self.run_coordinate_acquisition(current_path)

# QC policy check
if self._qc_policy is not None and self._qc_policy_config.check_after_timepoint:
if self._metrics_store is not None:
try:
decision = self._qc_policy.check_timepoint(self._metrics_store)
self.callbacks.signal_qc_policy_decision(decision)
Comment on lines +683 to +688
if decision.should_pause:
# TODO: implement actual pause mechanism — currently advisory only,
# the UI can react via signal_qc_policy_decision callback
self._log.info(f"QC policy flagged {len(decision.flagged_fovs)} FOVs, requesting pause")
except Exception as e:
self._log.error(f"QC policy evaluation failed for timepoint {self.time_point}: {e}")

# Save plate view for this timepoint
if self._generate_downsampled_views and self._downsampled_view_manager is not None:
# Wait for pending downsampled view jobs to complete
Expand All @@ -678,6 +704,14 @@ def run_single_time_point(self):
# finished region scan
self.coordinates_pd.to_csv(os.path.join(current_path, "coordinates.csv"), index=False, header=True)

# Save QC metrics
if self._qc_config.enabled and self._metrics_store is not None:
qc_csv_path = os.path.join(current_path, "qc_metrics.csv")
try:
self._metrics_store.save(qc_csv_path)
except OSError as e:
self._log.error(f"Failed to save QC metrics to {qc_csv_path}: {e}")

# Send Slack timepoint notification via callback (allows main thread to capture screenshot)
if self._slack_notifier is not None:
try:
Expand Down Expand Up @@ -805,6 +839,18 @@ def _summarize_runner_outputs(self, drain_all: bool = False) -> SummarizeResult:

return SummarizeResult(none_failed=none_failed, had_results=had_results)

def _handle_qc_result(self, qc_result: QCResult) -> None:
"""Store QC metrics and emit signal."""
if qc_result.error:
self._log.error(
f"QC metric calculation failed for region={qc_result.metrics.fov_id.region_id} "
f"fov={qc_result.metrics.fov_id.fov_index}: {qc_result.error}"
)
# Always store metrics (positional data is valid even on partial failure)
if self._metrics_store is not None:
self._metrics_store.add(qc_result.metrics)
self.callbacks.signal_qc_metrics_updated(qc_result.metrics)

def _summarize_job_result(self, job_result: JobResult) -> bool:
"""
Prints a summary, then returns True if the result was successful or False otherwise.
Expand Down Expand Up @@ -833,6 +879,9 @@ def _summarize_job_result(self, job_result: JobResult) -> bool:
elif isinstance(job_result.result, ZarrWriteResult):
r = job_result.result
self.callbacks.signal_zarr_frame_written(r.fov, r.time_point, r.z_index, r.channel_name, r.region_idx)
# Handle QCResult - store metrics and emit signal
elif isinstance(job_result.result, QCResult):
self._handle_qc_result(job_result.result)
return True

def _handle_downsampled_view_result(self, result: DownsampledViewResult) -> None:
Expand Down Expand Up @@ -888,9 +937,25 @@ def _create_job(self, job_class: Type[Job], info: CaptureInfo, image: np.ndarray
"""
if job_class == DownsampledViewJob:
return self._create_downsampled_view_job(info, image)
elif job_class == QCJob:
return self._create_qc_job(info, image)
else:
return job_class(capture_info=info, capture_image=JobImage(image_array=image))

def _create_qc_job(self, info: CaptureInfo, image: np.ndarray) -> QCJob:
"""Create a QCJob for the given capture."""
previous_z = None
if self._qc_config.calculate_z_diff_from_last_timepoint and self.time_point > 0:
fov_key = (info.region_id, info.fov)
if fov_key in self._last_time_point_z_pos:
previous_z = self._last_time_point_z_pos[fov_key] * 1000 # mm -> um
return QCJob(
capture_info=info,
capture_image=JobImage(image_array=image),
qc_config=self._qc_config,
previous_timepoint_z=previous_z,
)

def _create_downsampled_view_job(self, info: CaptureInfo, image: np.ndarray) -> Optional[DownsampledViewJob]:
"""Create a DownsampledViewJob for the given capture.

Expand Down Expand Up @@ -1424,9 +1489,9 @@ def _image_callback(self, camera_frame: CameraFrame):
return
else:
try:
# NOTE(imo): We don't have any way of people using results, so for now just
# grab and ignore it.
result = job.run()
if isinstance(result, QCResult):
self._handle_qc_result(result)
except Exception:
self._log.exception("Failed to execute job, abandoning acquisition!")
self.request_abort_fn()
Expand Down
Loading
Loading