Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 85 additions & 2 deletions src/paradigma/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ def run_paradigma(
{}
if p != "gait"
else {
"filtered": {"per_segment": {}},
"unfiltered": {"per_segment": {}},
"filtered": {"combined": {"duration_s": 0.0}, "per_segment": {}},
"unfiltered": {"combined": {"duration_s": 0.0}, "per_segment": {}},
}
)
for p in pipelines
Expand Down Expand Up @@ -685,6 +685,24 @@ def run_paradigma(
max_segment_in_metadata,
)

if (
quantification_metadata
and "combined" in quantification_metadata
):
combined_meta = quantification_metadata[
"combined"
]
duration_s = float(
combined_meta.get("duration_s", 0)
)
target_meta = all_results["metadata"][
pipeline_name
].setdefault(quant_type, {})
target_combined = target_meta.setdefault(
"combined", {"duration_s": 0.0}
)
target_combined["duration_s"] += duration_s

elif pipeline_name == "tremor":
pipeline_result = run_tremor_pipeline(
df_prepared=df_prepared,
Expand Down Expand Up @@ -878,6 +896,38 @@ def run_paradigma(
active_logger.warning(
f"No {pipeline_name} quantifications to aggregate"
)
if pipeline_name == "gait":
filtered_meta = all_results["metadata"][pipeline_name][
"filtered"
]
unfiltered_meta = all_results["metadata"][pipeline_name][
"unfiltered"
]
duration_filtered_s = float(
filtered_meta.get("combined", {}).get("duration_s", 0)
)
duration_unfiltered_s = float(
unfiltered_meta.get("combined", {}).get("duration_s", 0)
)
all_results["aggregations"][pipeline_name] = {
"metadata": {
"nr_gait_segments": len(
unfiltered_meta.get("per_segment", {})
),
"nr_filtered_gait_segments": len(
filtered_meta.get("per_segment", {})
),
"nr_arm_swings": 0,
"nr_filtered_arm_swings": 0,
"gait_duration_s": duration_unfiltered_s,
"filtered_gait_duration_s": duration_filtered_s,
"hours_of_gait_data": duration_unfiltered_s / 3600,
"hours_of_filtered_gait_data": duration_filtered_s
/ 3600,
},
"filtered": {},
"unfiltered": {},
}
continue

if pipeline_name == "tremor":
Expand Down Expand Up @@ -945,6 +995,7 @@ def run_paradigma(

# Initialize nested dictionary for gait aggregations
all_results["aggregations"][pipeline_name] = {
"metadata": {},
"filtered": {},
"unfiltered": {},
}
Expand Down Expand Up @@ -1019,6 +1070,37 @@ def run_paradigma(
"No unfiltered gait quantifications found for aggregation"
)

filtered_quantifications = all_results["quantifications"][
pipeline_name
]["filtered"]
unfiltered_quantifications = all_results["quantifications"][
pipeline_name
]["unfiltered"]
filtered_meta = all_results["metadata"][pipeline_name]["filtered"]
unfiltered_meta = all_results["metadata"][pipeline_name][
"unfiltered"
]

duration_filtered_s = float(
filtered_meta.get("combined", {}).get("duration_s", 0)
)
duration_unfiltered_s = float(
unfiltered_meta.get("combined", {}).get("duration_s", 0)
)
nr_segments_filtered = len(filtered_meta.get("per_segment", {}))
nr_segments_unfiltered = len(unfiltered_meta.get("per_segment", {}))

all_results["aggregations"][pipeline_name]["metadata"] = {
"nr_gait_segments": nr_segments_unfiltered,
"nr_filtered_gait_segments": nr_segments_filtered,
"nr_arm_swings": len(unfiltered_quantifications),
"nr_filtered_arm_swings": len(filtered_quantifications),
"gait_duration_s": duration_unfiltered_s,
"filtered_gait_duration_s": duration_filtered_s,
"hours_of_gait_data": duration_unfiltered_s / 3600,
"hours_of_filtered_gait_data": duration_filtered_s / 3600,
}

except Exception as e:
error_msg = f"Failed to aggregate {pipeline_name} results: {e}"
active_logger.error(error_msg)
Expand All @@ -1027,6 +1109,7 @@ def run_paradigma(
)
if pipeline_name == "gait":
all_results["aggregations"][pipeline_name] = {
"metadata": {},
"filtered": {},
"unfiltered": {},
}
Expand Down
7 changes: 5 additions & 2 deletions src/paradigma/pipelines/gait_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1220,8 +1220,11 @@ def run_gait_pipeline(
active_logger.warning("No gait detected in this segment")
empty_df_filtered = _empty_arm_swing_df(df_prepared)
empty_df_unfiltered = _empty_arm_swing_df(df_prepared)
empty_meta_filtered = {"all": {"duration_s": 0}, "per_segment": {}}
empty_meta_unfiltered = {"all": {"duration_s": 0}, "per_segment": {}}
empty_meta_filtered = {"combined": {"duration_s": 0}, "per_segment": {}}
empty_meta_unfiltered = {
"combined": {"duration_s": 0},
"per_segment": {},
}
result_dict["quantification"] = {
"filtered": empty_df_filtered,
"unfiltered": empty_df_unfiltered,
Expand Down
51 changes: 51 additions & 0 deletions tests/test_gait_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,57 @@ def test_gait_pipeline_integration():
assert expected_columns.issubset(gait_quantifications["unfiltered"].columns)


def test_gait_aggregation_metadata_alignment():
"""Test that gait aggregation output contains aligned metadata."""
dfs = {"test": create_test_gait_data()}

with tempfile.TemporaryDirectory() as temp_dir:
results = run_paradigma(
pipelines=["gait"],
dfs=dfs,
output_dir=temp_dir,
watch_side="right",
skip_preparation=True,
imu_config=IMUConfig(),
gait_config=GaitConfig("gait"),
arm_activity_config=GaitConfig("arm_activity"),
)

gait_aggregations = results["aggregations"]["gait"]
assert "metadata" in gait_aggregations
assert "filtered" in gait_aggregations
assert "unfiltered" in gait_aggregations

metadata = gait_aggregations["metadata"]
expected_keys = {
"nr_gait_segments",
"nr_filtered_gait_segments",
"nr_arm_swings",
"nr_filtered_arm_swings",
"gait_duration_s",
"filtered_gait_duration_s",
"hours_of_gait_data",
"hours_of_filtered_gait_data",
}
assert expected_keys.issubset(metadata.keys())

assert metadata["nr_arm_swings"] == len(
results["quantifications"]["gait"]["unfiltered"]
)
assert metadata["nr_filtered_arm_swings"] == len(
results["quantifications"]["gait"]["filtered"]
)
assert metadata["gait_duration_s"] >= metadata["filtered_gait_duration_s"]
assert metadata["filtered_gait_duration_s"] >= 0
assert metadata["hours_of_gait_data"] >= metadata["hours_of_filtered_gait_data"]
assert metadata["hours_of_gait_data"] == pytest.approx(
metadata["gait_duration_s"] / 3600
)
assert metadata["hours_of_filtered_gait_data"] == pytest.approx(
metadata["filtered_gait_duration_s"] / 3600
)


def test_multi_file_unfiltered_only_offsets(monkeypatch):
"""Ensure unfiltered-only gait results keep unique segment ids across files."""

Expand Down