Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions movement/kinematics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
compute_displacement,
compute_forward_displacement,
compute_backward_displacement,
compute_inactivity_bouts,
compute_path_length,
compute_speed,
compute_time_derivative,
Expand All @@ -27,6 +28,7 @@
"compute_speed",
"compute_path_length",
"compute_time_derivative",
"compute_inactivity_bouts",
"compute_pairwise_distances",
"compute_forward_vector",
"compute_head_direction_vector",
Expand Down
129 changes: 129 additions & 0 deletions movement/kinematics/kinematics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import warnings
from typing import Literal

import numpy as np
import xarray as xr

from movement.utils.logging import logger
Expand Down Expand Up @@ -494,3 +495,131 @@
valid_proportion = valid_segments / (data.sizes["time"] - 1)
# return scaled path length
return compute_norm(displacement).sum(dim="time") / valid_proportion


def compute_inactivity_bouts(
data: xr.DataArray,
speed_threshold: float,
min_bout_duration: float = 0.0,
) -> xr.DataArray:
"""Detect periods of inactivity in trajectory data.

Identifies contiguous time windows during which the speed of an individual
(or keypoint) falls below a user-defined ``speed_threshold``. Each such
window (a "bout") is assigned a unique positive integer ID ordered by
onset time. Active frames are labelled 0.

Parameters
----------
data : xarray.DataArray
The input data containing position information, with ``time``
and ``space`` (in Cartesian coordinates) as required dimensions.
speed_threshold : float
Maximum speed (in position units per time unit) below which a
frame is considered inactive. Must be non-negative.
min_bout_duration : float, optional
Minimum duration (in the same time units as ``data``) for a bout
to be included in the output. Bouts whose total duration is
strictly less than this value are suppressed (labelled 0).
Defaults to 0.0, which retains all detected bouts including
single-frame events.

Returns
-------
xarray.DataArray
An integer DataArray with the same dimensions as the speed
computed from ``data`` (i.e. ``time`` and all dimensions of
``data`` except ``space``). A value of 0 indicates an active
frame; a positive integer *k* indicates that the frame belongs
to the *k*-th inactivity bout (1-indexed, ordered by onset time).
The DataArray is named ``"inactivity_bout_id"``.

Notes
-----
* Frames where speed is NaN (e.g. due to missing position data) are
treated as active frames and are not assigned to any bout.
* For pose-estimation data containing multiple ``keypoints``, the
function returns a separate bout-label array per keypoint. To analyse
whole-body inactivity, select a representative keypoint (e.g. centroid)
before calling this function.
* The duration of a single-frame bout is defined as 0 (last frame time
minus first frame time). It is therefore retained when
``min_bout_duration`` is 0.0 (the default), but suppressed for any
strictly positive ``min_bout_duration``.

Examples
--------
Detect inactivity bouts for a bboxes dataset where speed drops below 5:

>>> from movement.kinematics import compute_inactivity_bouts
>>> bouts = compute_inactivity_bouts(ds.position, speed_threshold=5.0)

Keep only bouts lasting at least 1 second:

>>> bouts = compute_inactivity_bouts(
... ds.position, speed_threshold=5.0, min_bout_duration=1.0
... )

See Also
--------
compute_speed : The underlying speed function used internally.

"""
validate_dims_coords(data, {"time": [], "space": []})
if speed_threshold < 0:
raise logger.error(ValueError("speed_threshold must be non-negative."))
if min_bout_duration < 0:
raise logger.error(
ValueError("min_bout_duration must be non-negative.")
)

speed = compute_speed(data)
# Frames with NaN position are treated as active even when
# differentiate() happens to produce finite speed using valid neighbours
position_has_nan = data.isnull().any(dim="space")
# NaN speed (missing data from other causes) also treated as active
is_inactive = (speed < speed_threshold).fillna(False) & ~position_has_nan
time_coords = data.coords["time"].values.astype(float)

def _label_bouts(inactive_1d: np.ndarray) -> np.ndarray:
"""Label consecutive inactive runs in a 1D boolean array.

Parameters
----------
inactive_1d
1D boolean numpy array along the time axis.

Returns
-------
np.ndarray
1D int64 array: 0 for active frames, k for frames in the k-th
inactivity bout (bouts numbered from 1 in onset order).

"""
padded = np.concatenate([[False], inactive_1d, [False]])
diff = np.diff(padded.view(np.int8))
starts = np.where(diff == 1)[0] # False → True (bout start)

Check failure on line 601 in movement/kinematics/kinematics.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "np.nonzero" when only the condition parameter is provided to "np.where".

See more on https://sonarcloud.io/project/issues?id=neuroinformatics-unit_movement&issues=AZzeGq-hpEpGlRpxHoSe&open=AZzeGq-hpEpGlRpxHoSe&pullRequest=884
ends = np.where(diff == -1)[0] # True → False (exclusive end index)

Check failure on line 602 in movement/kinematics/kinematics.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "np.nonzero" when only the condition parameter is provided to "np.where".

See more on https://sonarcloud.io/project/issues?id=neuroinformatics-unit_movement&issues=AZzeGq-hpEpGlRpxHoSf&open=AZzeGq-hpEpGlRpxHoSf&pullRequest=884

labels = np.zeros(len(inactive_1d), dtype=np.int64)
bout_id = 0
for s, e in zip(starts, ends, strict=False):
# Duration from first to last frame of this bout
duration = time_coords[e - 1] - time_coords[s]
if duration >= min_bout_duration:
bout_id += 1
labels[s:e] = bout_id
return labels

result = xr.apply_ufunc(
_label_bouts,
is_inactive,
input_core_dims=[["time"]],
output_core_dims=[["time"]],
output_dtypes=[np.int64],
vectorize=True,
)
# apply_ufunc places core dims last; move time back to front
result = result.transpose("time", ...)
result.name = "inactivity_bout_id"
return result
Loading