API Reference

stag.constants — Project-wide constants, paths and palettes

Central configuration module. Reads machine-specific paths via stag.local_paths; defines the manuscript’s PM names, colours, family categories, canonical labels, and helper functions for figure output.

Central configuration for STAG analyses and figures.

stag.constants.FPS: int = 50

Accelerometer sampling rate in Hz.

stag.constants.GPS_FPS: float = 0.5

GPS sampling rate in Hz (0.5 Hz = one fix every 2 s).

stag.constants.VIDEO_FPS: int = 30

Ground-truth video sampling rate in Hz.

stag.constants.ACCELEROMETER_RANGE_G: int = 16

Accelerometer dynamic range in units of Earth gravity (±16 g).

stag.constants.FEATURE_LABELS: tuple[str, ...] = ('Head_X', 'Head_Y', 'Head_Z', 'Ear_X', 'Ear_Y', 'Ear_Z')

Six accelerometer feature names in the order used by the clustering input matrix.

stag.constants.WONG: dict[str, str] = {'black': '#000000', 'blue': '#0072B2', 'bluish_green': '#009E73', 'orange': '#E69F00', 'reddish_purple': '#CC79A7', 'sky_blue': '#56B4E9', 'vermilion': '#D55E00', 'yellow': '#F0E442'}
  1. colourblind-safe palette.

Type:

Wong (2011, Nature Methods 8

stag.constants.PM_NAMES: dict[int, str] = {0: 'quiescent', 1: 'resting', 2: 'ear_flick_out', 3: 'resting_active', 4: 'ear_flick_back', 5: 'ear_flick_composite', 6: 'grazing_stepping', 7: 'stationary_grazing'}

Canonical PM index → snake_case label mapping.

stag.constants.PM_DISPLAY_NAMES: dict[int, str] = {0: 'Quiescent', 1: 'Resting', 2: 'Ear flick (out)', 3: 'Resting, active', 4: 'Ear flick (back)', 5: 'Ear flick (composite)', 6: 'Stepping, grazing', 7: 'Stationary, grazing'}

Long display name per PM — keeps the ear-flick subtype distinction.

Use this for axes where each cluster is shown on its own row/column (centroid heatmaps, confusion matrices, transition matrices).

stag.constants.PM_DISPLAY_NAMES_SHORT: dict[int, str] = {0: 'Quiescent', 1: 'Resting', 2: 'Ear flick', 3: 'Resting, active', 4: 'Ear flick', 5: 'Ear flick', 6: 'Stepping, grazing', 7: 'Stationary, grazing'}

Short display name per PM — collapses the three ear-flick subtypes to a single “Ear flick” label.

Use this for legends and figures where the three ear-flick prototypes share a colour (see PM_COLOURS) and read as one behaviour.

stag.constants.PM_CATEGORY: dict[int, str] = {0: 'inactive', 1: 'inactive', 2: 'ear_flick', 3: 'inactive', 4: 'ear_flick', 5: 'ear_flick', 6: 'grazing', 7: 'grazing'}

PM index → behavioural-category label (three families).

stag.constants.PM_COLOURS: dict[int, str] = {0: '#66ddcc', 1: '#1d8475', 2: '#e0ce61', 3: '#96d3ed', 4: '#e0ce61', 5: '#e0ce61', 6: '#86771a', 7: '#8497b0'}

Per-PM colour assignment used in every manuscript figure.

PM2, PM4, and PM5 share #e0ce61 so the three ear-flick subtypes read as one visual category. These colours are the authoritative figure palette and supersede any Wong-derived category palette.

stag.constants.PM_CATEGORY_COLOURS: dict[str, str] = {'ear_flick': '#D96828', 'grazing': '#4E8C3A', 'inactive': '#3F7FB5'}

Per-behavioural-family colour swatch — lab canonical palette.

These three colours are the agreed-upon family swatches for every figure that groups the eight PMs into their behavioural families (Inactive = PM 0 + PM 1 + PM 3, Grazing = PM 6 + PM 7, Ear flick = PM 2 + PM 4 + PM 5). They are deliberately not derived from PM_COLOURS (which uses the Wong palette for per-PM plotting): the family-level chart asks a different question (three families, not eight prototypes) and the lab uses a distinct palette to make that distinction visually obvious in posters, slides, and the manuscript figures.

Use PM_COLOURS for per-PM plotting; use this dict for category-level legends and category-grouped bar charts where one swatch per family is wanted.

stag.constants.FIGURE_DPI: int = 200

Raster export resolution (PNG).

stag.constants.FIGURE_SIZE_SINGLE: tuple[float, float] = (3.5, 2.8)

Single-column Elsevier figure size in inches.

stag.constants.FIGURE_SIZE_DOUBLE: tuple[float, float] = (7.2, 4.5)

Two-column (full-width) Elsevier figure size in inches.

stag.constants.FONT_FAMILY: str = 'DejaVu Sans'

Default sans-serif font for figures.

stag.constants.apply_figure_defaults()[source]

Apply STAG figure rcParams to the current matplotlib session.

Return type:

None

stag.constants.RESULTS_DIR_DEFAULT: Path = PosixPath('results')

Default top-level output directory (relative to working dir).

stag.constants.HCS_SOURCE_DIR: Path = PosixPath('<hcs_source not configured - see local_paths.template.json>')

Read-only network archive of the 2024 deer dataset (3.0 TB). Too slow for direct analysis — mirror the curated subset locally. Resolved by stag.local_paths: STAG_HCS_DIR env var, then local_paths.json hcs_source field, then the placeholder default (which will crash any downstream read with a clear path).

stag.constants.LOCAL_DATA_DIR: Path = PosixPath('<data_root not configured - see local_paths.template.json>')

Local working copy on the NVMe data drive. Tier-1 footprint ≈ 43 GB. All path constants below are anchored here. Resolved by stag.local_paths: STAG_DATA_DIR env var, then local_paths.json data_root field, then placeholder default.

stag.constants.RAW_CLUSTERING_INPUT: Path = PosixPath('<data_root not configured - see local_paths.template.json>/clust_data_raw_20240412.npy')

Raw 8-column input the SLURM clustering actually read. Shape (204_554_618, 8) float64. Columns 0–5 are the six accelerometer axes; columns 6–7 are GPS-derived speed and tortuosity (excluded from clustering).

stag.constants.MAXABS_CLUSTERING_INPUT: Path = PosixPath('<data_root not configured - see local_paths.template.json>/clust_data_maxabs_6col.npy')

Six-column MaxAbs-scaled feature matrix derived from RAW_CLUSTERING_INPUT by scripts/preprocess_clustering_data.py. Shape (204_554_618, 6) float64. Each column is divided by its absolute maximum, mapping the data to [-1, 1] per column — reproducing the 2024 SLURM pipeline’s normalisation exactly (MaxAbsScaler + col-5 ±7.99 clip). This is the file every internal- and external-validation analysis consumes.

stag.constants.MAXABS_SCALER_CSV: Path = PosixPath('<data_root not configured - see local_paths.template.json>/clust_data_maxabs_6col.maxabs.csv')

Per-column max-abs divisors used to produce MAXABS_CLUSTERING_INPUT. Written alongside the .npy by the preprocess script — needed to invert the scaling back to physical units for centroid interpretation.

stag.constants.DEER_DB: Path = PosixPath('<data_root not configured - see local_paths.template.json>/deer_data_gps.db')

SQLite copy of the canonical deer-2024 DB (~58 GB) — six tables: accelerometer_data (~222 M rows), cluster_labels (~204.5 M rows, FK acc_idaccelerometer_data.data_id), trajectory_data (GPS, ~207 M rows after upsampling), deer_info (26 animals), video_observation_reference (926 clips), video_availability (2.8 M rows). All required composite/FK indexes are already in place. See scripts/cache_label_timeline.py for the canonical join that aligns DB rows with the saved labels.npy.

stag.constants.LABEL_TIMELINE_DEER_IDS: Path = PosixPath('<data_root not configured - see local_paths.template.json>/label_timeline_deer_ids.npy')

Per-sample deer_id aligned with the saved k=8 labels.npy (shape (204_554_618,) int8). Built once by scripts/cache_label_timeline.py from the DEER_DB join cluster_labels.acc_idaccelerometer_data.data_id.

stag.constants.LABEL_TIMELINE_TIMESTAMPS: Path = PosixPath('<data_root not configured - see local_paths.template.json>/label_timeline_timestamps.npy')

Per-sample wall-clock timestamp aligned with the saved k=8 labels.npy (shape (204_554_618,) int64 nanoseconds since the Unix epoch, NZ local time as stored in the DB). Built once by scripts/cache_label_timeline.py.

stag.constants.CLUSTER_RESULTS_DIR: Path = PosixPath('<data_root not configured - see local_paths.template.json>/cluster_results/deer6raw')

Root of the per-fit metadata + centroids + labels tree produced by the Aoraki SLURM sweep. Per-fit JSONs and centroid arrays are present in full; labels are present for the 24 representative runs only.

stag.constants.CANONICAL_K8_LABELS: Path = PosixPath('<data_root not configured - see local_paths.template.json>/cluster_results/deer6raw/delSize_0/k_8/labels/deer6raw_labels_k8_delSize0_partA.npy')

Manuscript-aligned k=8 labels.npy (Partition A).

Of the 50 fits saved at delSize_0/k_8, 17 converged to the basin whose centroids match centroid_label_info exactly and whose cluster IDs are in manuscript PM order (0 = Quiescent, 1 = Resting, … 7 = Stationary grazing). None of those 17 had their labels mirrored from the original sweep, so this file is regenerated locally by nearest-manuscript-centroid assignment on MAXABS_CLUSTERING_INPUT — bit-equivalent to the converged k-means assignment for that basin, prevalences match the manuscript table to within 0.013 % per PM.

Use this constant — not a glob of the labels directory — wherever downstream code wants “the k=8 labels.

stag.constants.save_figure(fig, stem, output_dir, data=None)[source]

Export a figure as SVG + PNG with an optional CSV companion table.

Return type:

None

Args:

fig: Matplotlib figure to save. stem: Filename stem (no extension). output_dir: Target directory (created if needed). data: Optional dataframe; written as <stem>.csv alongside.

stag.local_paths — Machine-specific path resolver

One source of truth for every absolute path STAG needs. Resolves keys via environment variable → local_paths.json → caller-supplied default → LocalPathNotConfiguredError.

Machine-specific path resolver — env > local_paths.json > default.

exception stag.local_paths.LocalPathNotConfiguredError[source]

Bases: RuntimeError

Raised when a local-path key cannot be resolved.

Triggered when neither the matching env var, local_paths.json, nor an explicit default kwarg supplies a value.

stag.local_paths.get_path(key, *, default=None)[source]

Resolve key to a concrete path string.

Return type:

str

Priority order:
  1. Environment variable named in _ENV_VAR_NAMES for key.

  2. The same-named field in local_paths.json.

  3. default keyword argument.

  4. LocalPathNotConfiguredError.

Placeholder values in local_paths.json (strings starting with <) are ignored — they signal “still to be filled in” and the resolver falls through to the default or raises.

stag.local_paths.get_path_obj(key, *, default=None)[source]

Same as get_path() but returns a Path.

Return type:

Path

stag.local_paths.reset_cache()[source]

Drop the memoised local_paths.json content (test-helper).

Return type:

None

stag.sync — Sensor synchronisation

Synchronise the head and ear accelerometer channels into a single combined stream and write it to the deer database.

Synchronise head and ear accelerometers via shared calibration drops.

class stag.sync.data_sync.BetterDataSync(deer_id, head_data, ear_data, window_dict, log=True, log_folder='', mkplot=False, plot_folder='')[source]

Bases: object

Synchronise head and ear accelerometer data via calibration drops.

Parameters:
  • deer_id (str) – Identifier for the deer (e.g. "R1_D1").

  • head_data (pandas.DataFrame) – Accelerometer data from the head-mounted logger with columns 'X', 'Y', 'Z'.

  • ear_data (pandas.DataFrame) – Accelerometer data from the ear-mounted logger.

  • window_dict (dict) – Processing window with keys 'start' and 'end' (sample indices).

  • log (bool, optional) – Enable CSV logging. Default True.

  • log_folder (str, optional) – Directory for log files.

  • mkplot (bool, optional) – Generate diagnostic plots. Default False.

  • plot_folder (str, optional) – Directory for saved plots.

  • Attributes

  • ----------

  • drops_dict (dict) – Detected calibration-drop timestamps after synchronisation.

__init__(deer_id, head_data, ear_data, window_dict, log=True, log_folder='', mkplot=False, plot_folder='')[source]

Initialise a deer-wise head/ear sync run.

Args:

deer_id: Animal identifier. head_data: DataFrame of head-accelerometer rows. ear_data: DataFrame of ear-accelerometer rows. window_dict: Start/end window indices for both channels. log: Write a per-deer log file (default True). log_folder: Directory for the log files. mkplot: Write per-deer diagnostic plots (default False). plot_folder: Directory for the plots.

detect_drops(signal, prominence=5.0, distance=500)[source]

Detect calibration-drop peaks in a preprocessed signal.

Parameters:
  • signal (array-like) – Preprocessed (summed absolute z-scored) acceleration signal.

  • prominence (float, optional) – Minimum peak prominence. Default 5.0.

  • distance (int, optional) – Minimum samples between peaks. Default 500.

  • Returns

  • -------

  • numpy.ndarray – Indices of detected peaks.

run_synchronization()[source]

Execute the full synchronisation pipeline.

Returns:

: dict or None

Dictionary with 'head' and 'ear' drop indices if successful, None otherwise.

Helpers for synchronisation drop-event detection.

stag.sync.utils.correct_calibration(data, cols=None)[source]

Z-score the specified columns (zero mean, unit variance).

Parameters:
  • data (pandas.DataFrame) – Input accelerometer data.

  • cols (list of str, optional) – Columns to standardise. Default ['X', 'Y', 'Z'].

  • Returns

  • -------

  • pandas.DataFrame – Z-scored copy of the selected columns.

stag.sync.utils.make_absolute(data)[source]

Return the element-wise absolute value of a DataFrame.

Parameters:
  • data (pandas.DataFrame) – Input data.

  • Returns

  • -------

  • pandas.DataFrame – DataFrame with absolute values.

stag.sync.utils.sum_columns(data)[source]

Sum all columns row-wise.

Parameters:
  • data (pandas.DataFrame) – Input data.

  • Returns

  • -------

  • pandas.Series – Row-wise sum.

stag.sync.utils.get_consecutive_differences(series)[source]

Compute first-order differences of a Series.

Parameters:
  • series (pandas.Series) – Input time series.

  • Returns

  • -------

  • pandas.Series – Consecutive differences (length = original − 1).

stag.sync.utils.get_calibrated_absolute_accelleration(data, cols=None)[source]

One-step pipeline: z-score → absolute → sum.

Parameters:
  • data (pandas.DataFrame) – Raw accelerometer data.

  • cols (list of str, optional) – Columns to process. Default ['X', 'Y', 'Z'].

  • Returns

  • -------

  • pandas.Series – Summed absolute z-scored acceleration.

stag.database — Database models and ingestion

SQLAlchemy ORM, schema management, and high-level ingestion / export helpers. See stag.database.handler.DeerDatabaseHandler for the entry-point class.

SQLAlchemy ORM schema for the STAG deer sensor database.

class stag.database.orm.DeerInfo(**kwargs)[source]

Bases: Base

One row per (repetition, deer) combination.

Attributes:

deer_id: Primary key, autoincrementing. repetition_number: Identifier for the data-collection repetition. deer_number: Unique identifier for the deer. accelerometer_data: Reverse relationship to AccelerometerData. trajectory_data: Reverse relationship to TrajectoryData.

deer_id
repetition_number
deer_number
accelerometer_data
trajectory_data
class stag.database.orm.AccelerometerData(**kwargs)[source]

Bases: Base

50 Hz six-axis accelerometer stream (three head, three ear axes).

data_id
deer_id
NZ_DateTime
X_head
Y_head
Z_head
X_ear
Y_ear
Z_ear
deer_info
class stag.database.orm.TrajectoryData(**kwargs)[source]

Bases: Base

0.5 Hz GPS positions (WGS84 + NZMG) plus derived speed and tortuosity.

data_id
deer_id
pos_WGS84_lat
pos_WGS84_lon
pos_NZMG_x_meter
pos_NZMG_y_meter
pos_x_meter_filt
pos_y_meter_filt
abs_speed_mPs
tortuosity
deer_info
class stag.database.orm.VideoObservationReference(**kwargs)[source]

Bases: Base

Ground-truth video clip metadata.

The start_time / stop_time window is used to assign frame numbers to accelerometer samples via VideoAvailability.

id
original_file_path
repetition
deer
frame_count
start_time
stop_time
comment
class stag.database.orm.VideoAvailability(**kwargs)[source]

Bases: Base

Frame-level link between accelerometer samples and video observations.

A NULL video_observation_reference_id or frame indicates that the accelerometer sample has no synchronised video coverage.

id
accelerometer_data_id
video_observation_reference_id
frame
accelerometer_data
video_observation_reference
class stag.database.orm.ClusterLabels(**kwargs)[source]

Bases: Base

Per-sample k-means cluster assignment.

data_id
label
accelerometer_data

Database handler — ingestion and query methods for the deer schema.

class stag.database.handler.DeerDatabaseHandler(database_url)[source]

Bases: object

Engine-owning handler for the STAG deer sensor database.

Attributes:

engine: SQLAlchemy Engine bound to the configured database URL.

__init__(database_url)[source]

Bind a SQLAlchemy engine for database_url.

Args:
database_url: SQLAlchemy URL string (e.g.

"sqlite:////path/to/deer.db").

create_database()[source]

Create the database schema (idempotent — uses create_all).

Return type:

None

make_session()[source]

Return a new SQLAlchemy session bound to self.engine.

static read_h5_file(filename)[source]

Read the 'df' table from an HDF5 file.

Return type:

DataFrame

insert_data_from_directory(directory)[source]

Insert accelerometer rows from every *.h5 in directory.

Filenames are expected to follow the pattern *_R<rep>_D<deer>*.h5.

Return type:

None

insert_trajectory_data_from_h5(file_position, data)[source]

Insert trajectory rows for the deer identified by the filename.

Return type:

None

get_deer_id_from_filename(filename)[source]

Return deer_id for a filename of pattern *_R<rep>_D<deer>*.h5.

Return type:

int | None

write_trajectory_data_for_deer(deer_number, output_file=None)[source]

Write a deer’s trajectory rows to CSV (or stdout if output_file is None).

Return type:

None

insert_video_observation_data(csv_file_path, fps)[source]

Insert one VideoObservationReference row per CSV row.

Return type:

None

generate_video_availability_csv(deer_number, repetition_number, output_dir)[source]

Emit CSV linking accelerometer rows to video frames for one (deer, rep).

Return type:

None

import_video_availability_from_csv(csv_file_path)[source]

Load a video_availability CSV into the database.

Return type:

None

insert_cluster_labels_from_npy(npy_file_path)[source]

Load cluster labels from a .npy file and link them to accel rows.

Labels are assigned in data_id order; a length mismatch is logged but does not abort the insertion.

Return type:

None

calculate_statistics_for_cluster(label, column_name)[source]

Return (mean, SEM) of a TrajectoryData column for one cluster label.

Return type:

tuple[float, float] | None

Args:

label: Cluster label to filter on. column_name: Either 'abs_speed_mPs' or 'tortuosity'.

update_json_with_statistics(json_file_path, column_name)[source]

Annotate a centroid JSON file with mean/SEM for one column.

Return type:

None

DB → clustering-ready .npy feature matrix.

stag.database.make_cluster_data.open_session(database_url)[source]

Opens a session for the database.

stag.database.make_cluster_data.get_deer_ids(session)[source]

Returns a list of all deer_ids in the database.

stag.database.make_cluster_data.get_data_for_deer(session, deer_id)[source]

Fetches and concatenates accelerometer and trajectory data for a given deer_id.

stag.database.make_cluster_data.aggregate_all_data(session, deer_ids)[source]

Aggregates data for all deer_ids and interpolates over NaN values.

stag.database.make_cluster_data.save_data_to_npy(data, filename)[source]

Saves the data to a .npy file.

stag.gps — GPS trajectory analysis

Project WGS84 GPS to the New Zealand Map Grid, compute per-sample tortuosity and ground speed, and render trajectory + speed figures.

GPS trajectory processing and feature extraction.

Project WGS84 GPS samples to NZMG, fill linear gaps, apply a Gaussian filter to the projected positions, and compute per-sample tortuosity and ground speed. Driver block (main()) reads a single h5 trajectory file, processes it, and inserts the result into a deer-data SQLite database via stag.database.handler.DeerDatabaseHandler.

stag.gps.analysis.project_to_NZ_map_grid(lats, lons)[source]

Project (lat, lon) WGS84 coordinates to the New Zealand Map Grid.

Args:

lats: Iterable of latitude values (WGS84, EPSG:4326). lons: Iterable of longitude values (WGS84, EPSG:4326).

Returns:

Tuple (xx, yy) of NZMG x and y coordinates (EPSG:27200).

stag.gps.analysis.calculate_tortuosity_and_speed(pos_x, pos_y, fps=50)[source]

Compute tortuosity and absolute speed from a trajectory.

Args:

pos_x: Array of x coordinates (metres). pos_y: Array of y coordinates (metres). fps: Sample rate in Hz (default 50).

Returns:

Dict with "tortuosity" and "speed" lists, one entry per sample (boundary samples duplicated to match input length).

stag.gps.analysis.update_df_to_cartesian_positions(df)[source]

Add NZMG pos_x_meter and pos_y_meter columns to df.

Args:

df: DataFrame with location-lat and location-lon columns.

Returns:

The same DataFrame with the two new metre columns populated.

stag.gps.analysis.fill_linearly_df(df)[source]

Linearly interpolate then forward / backward-fill df.

Args:

df: DataFrame to fill in place.

stag.gps.analysis.gaussian_filter_column(df, columnstr, sigma=75)[source]

Apply a Gaussian filter to one column of a DataFrame.

Args:

df: DataFrame with the source column. columnstr: Name of the column to filter. sigma: Gaussian sigma (default 75).

Returns:

df with a new {columnstr}_filt column appended.

stag.gps.analysis.main(filelocation)[source]

Process one h5 trajectory file into a DataFrame ready for DB ingest.

Transforms (lat, lon) to NZMG metres, interpolates NaN gaps, Gaussian-smooths the positions, and computes per-sample tortuosity and ground speed.

Args:

filelocation: Path to the h5 trajectory file.

Returns:

DataFrame with the filtered positions, speed and tortuosity columns, ready for DeerDatabaseHandler. insert_trajectory_data_from_h5().

Arc-chord path-tortuosity measure.

stag.gps.tortuosity.calculate_tortuosity_and_speed(lat, lon, fps=0.5)[source]

Per-sample tortuosity and ground speed from (lat, lon) tracks.

Args:

lat: Iterable of WGS84 latitudes. lon: Iterable of WGS84 longitudes. fps: Sample period in seconds (default 0.5 = 2 Hz).

Returns:

Dict with "tortuosity" and "speed" lists, one entry per sample (boundary samples duplicated to match input length).

stag.gps.tortuosity.lat_lon_vec_to_meter_vec(lat1, lon1, lat2, lon2)[source]

Great-circle distance in metres between two WGS84 (lat, lon) points.

stag.gps.tortuosity.extract_tort_and_speed(saved_loc_and_tort_filepath)[source]

Compute tortuosity + speed for the trajectories stored in an h5 file.

Args:
saved_loc_and_tort_filepath: Path to a single-table h5 file with

location-lat and location-lon columns.

Returns:

DataFrame with the original lat/lon plus interpolated tortuosity and absolute_speed columns.

GPS trajectory and speed visualisation (pre-existing rot — see banner).

stag.gps.plotting.load_trajectory_data(csv_file)[source]

Load trajectory data from a CSV file.

Parameters: - csv_file: Path to the CSV file containing trajectory data.

Returns:

:
  • DataFrame with the trajectory data.

stag.gps.plotting.prepare_line_collection(df, x_col, y_col, start_idx=None, end_idx=None, cmap='Greys', full_length_color=None)[source]

Prepare a colour-coded LineCollection for plotting a trajectory.

Args:

df: DataFrame with the trajectory coordinates. x_col: Column name for the x axis. y_col: Column name for the y axis. start_idx: Optional slice start (default: full DataFrame). end_idx: Optional slice end. cmap: Matplotlib colormap name. full_length_color: Optional pre-computed colour array from the

full-length plot — pass this to keep the close-up consistent.

stag.gps.plotting.plot_trajectory(ax, line_collection, x_label, y_label)[source]

Plot a trajectory on a given axis.

Parameters: - ax: The axis on which to plot. - line_collection: The LineCollection object representing the trajectory. - x_label: Label for the x-axis. - y_label: Label for the y-axis.

stag.gps.plotting.highlight_area(ax, df, x_col, y_col, start_idx, end_idx, edgecolor='black')[source]

Highlight an area on the plot.

Parameters: - ax: The axis on which to plot. - df: DataFrame containing trajectory data. - x_col: Name of the column for x-axis values. - y_col: Name of the column for y-axis values. - start_idx: Start index of the area to highlight. - end_idx: End index of the area to highlight. - edgecolor: Color of the highlighting rectangle.

stag.gps.plotting.main_plot_with_closeup(df, start_idx, end_idx, coord_system, line_color='Greys')[source]

Render the full-trajectory panel plus a close-up of the highlighted range.

Args:

df: DataFrame with WGS84 / NZMG / filtered position columns. start_idx: Start index of the close-up window. end_idx: End index of the close-up window. coord_system: One of "WGS", "NZMG", "FILT". line_color: Matplotlib colormap for the trajectory line.

Returns:

Matplotlib Figure with the two side-by-side panels.

stag.gps.plotting.calculate_time_axis(df, fps=50)[source]

Calculate time axis for the DataFrame based on the FPS.

Parameters: - df: DataFrame containing the data. - fps: Frames per second (default: 50).

Returns:

:
  • A NumPy array representing the time in seconds.

stag.gps.plotting.plot_speed_and_tortuosity_with_highlight(df, start_idx, end_idx, fps=50)[source]

Plot speed and tortuosity vs time, highlighting one trajectory window.

Args:

df: DataFrame with abs_speed_mPs and tortuosity columns. start_idx: Start index of the highlight window. end_idx: End index of the highlight window. fps: Sample rate in Hz (default 50).

Returns:

Matplotlib Figure with side-by-side speed and tortuosity panels.

stag.clustering — k-means clustering and evaluation

GPU-accelerated k-means (cuML), meta-analysis across the leave-out sweep, internal cluster-quality metrics (Calinski–Harabasz, silhouette, Kneedle elbow, Hungarian-matched centroid stability), and centroid-radar dashboards.

GPU-accelerated k-means clustering with contiguous leave-out stability.

stag.clustering.kmeans.shrink_data(data, reduction_percent, cut_position_percent)[source]

Remove a contiguous block from the data for stability analysis.

Implements the circular leave-out scheme described in the paper: a block of reduction_percent % of the data starting at cut_position_percent % is excised, wrapping around if the block extends past the end of the array.

Parameters:
  • data (numpy.ndarray) – Feature matrix of shape (n_samples, n_features).

  • reduction_percent (float) – Percentage of data to remove (0–100).

  • cut_position_percent (float) – Starting position of the cut as a percentage of total length.

  • Returns

  • -------

  • numpy.ndarray – Reduced feature matrix.

stag.clustering.kmeans.generate_filename(parent_dir, tag, num_clusters, deletion_size, deletion_position)[source]

Build standardised output paths for centroids, labels, and metadata.

Parameters:
  • parent_dir (str) – Root directory for results.

  • tag (str) – Experiment tag (e.g. "deer8").

  • num_clusters (int) – Number of clusters (k).

  • deletion_size (int) – Deletion size percentage.

  • deletion_position (int) – Deletion position percentage.

  • Returns

  • -------

  • dict – Dictionary with keys 'centroids', 'labels', 'meta' mapping to their respective file paths.

stag.clustering.kmeans.save_output(centroids, labels, quality_score, inertia, data_file, reduction_percent, cut_position_percent, filenames, start_time, duration)[source]

Persist clustering results (centroids, labels, metadata JSON).

Args:

centroids: Cluster centroids, shape (k, n_features). labels: Per-sample cluster assignments. quality_score: Calinski–Harabasz index. inertia: Within-cluster sum of squared distances

(W(k) — the k-means objective). Required for the Elbow / Kneedle internal metric.

data_file: Path to the input data file. reduction_percent: Deletion size used. cut_position_percent: Deletion position used. filenames: Output paths from generate_filename(). start_time: Analysis start timestamp. duration: Wall-clock duration of the analysis.

stag.clustering.kmeans.get_quality(labels, data_gpu_scaled)[source]

Compute the Calinski–Harabasz index for a clustering solution.

Parameters:
  • labels (numpy.ndarray) – Cluster assignments.

  • data_gpu_scaled (cupy.ndarray or numpy.ndarray) – Standardised feature matrix (on GPU or CPU).

  • Returns

  • -------

  • float – Calinski–Harabasz score, or NaN if only one cluster is populated.

stag.clustering.kmeans.main(tag, n_clusters, deletion_size, deletion_position, random_state, data_file, save_dir, no_rescale=False)[source]

Run a single k-means clustering job.

Args:

tag: Experiment tag. n_clusters: Number of clusters. deletion_size: Percentage of contiguous data to leave out

(0 = full data).

deletion_position: Starting position of the leave-out block

(percentage).

random_state: Random seed for k-means initialisation. data_file: Path to the .npy feature matrix. save_dir: Root directory for output files. no_rescale: When True, skip the in-script StandardScaler

and feed the data through to cuML as-is. Use this when the input is already in a comparable per-column scale (e.g. the MaxAbs-normalised file produced by scripts/preprocess_clustering_data.py), reproducing the 2024 SLURM pipeline exactly.

Internal cluster-validation metrics: Silhouette, Inertia, Kneedle Elbow.

stag.clustering.internal_metrics.compute_inertia(X, centroids, labels, chunk_size=1000000)[source]

Within-cluster sum of squared distances — the k-means objective W(k).

Streams over the feature matrix in chunk_size rows so the residual array never exceeds chunk_size × n_features × 8 bytes of working memory (≈ 48 MB at the default). This matters for parallel back-fill across many workers, where the naïve X - centroids[labels] would materialise a 13 GB residual per worker.

Return type:

float

Args:

X: Feature matrix, (n_samples, n_features). centroids: Centroid matrix, (n_clusters, n_features). labels: Per-sample cluster assignment, (n_samples,). chunk_size: Rows per streaming chunk. Default 1e6.

Returns:

Scalar inertia. Matches sklearn.cluster.KMeans.inertia_ to floating-point tolerance when centroids and labels come from a converged fit.

stag.clustering.internal_metrics.compute_silhouette_stratified(X, labels, n_per_cluster=5000, n_repeats=50, rng=None)[source]

Mean silhouette score via stratified per-cluster subsampling.

Silhouette is \(\mathcal{O}(n^2)\) in pairwise distances, so on the full ~$8.6times10^{7}$ STAG sample it is infeasible. This helper draws n_per_cluster samples from every cluster, computes silhouette on that subsample, and repeats n_repeats times with different seeds to give a median ± IQR.

Stratification is essential: PM2/PM4/PM5 (the ear flicks) comprise < 1 % of the data. Uniform subsampling (as in sklearn.silhouette_score(..., sample_size=N)) would under- represent them and bias the metric.

Return type:

dict[str, float | ndarray]

Args:

X: Feature matrix, (n_samples, n_features). labels: Per-sample cluster assignment. n_per_cluster: Samples per cluster per repeat. Default 5 000. n_repeats: Number of independent subsamples. Default 50. rng: NumPy generator; created from default seed if None.

Returns:
Dict with keys:
"mean_silhouette": median across repeats of the mean

silhouette per subsample.

"iqr_silhouette": (lower, upper) quartiles across repeats. "per_repeat": 1-D array of per-repeat mean silhouettes. "per_cluster_mean": (n_clusters,) median silhouette per

cluster across repeats.

stag.clustering.internal_metrics.recompute_inertia_for_meta_dir(meta_dir, data_path, overwrite=False, workers=1, reduction_percents=None)[source]

Back-fill inertia for every metadata JSON under meta_dir.

The pre-Sprint-0 save_output() did not persist inertia_. This helper walks the JSON tree, locates the matching centroids and labels files, recomputes W(k) from the saved data, and optionally writes the value back into each JSON.

Return type:

DataFrame

Args:

meta_dir: Root directory containing the metadata JSON files. data_path: Path to the clust_data_*.npy feature matrix. overwrite: When True, edit each JSON in place to add the

"inertia" field. Default False (returns a DataFrame only).

workers: Process-pool size. Default 1 (sequential).

Higher values speed up the back-fill significantly when labels live on a local SSD. Workers share the feature matrix via mmap_mode="r" so total RSS stays bounded.

reduction_percents: Optional whitelist of reduction_percent

values. When None (default), every fit is back-filled. Restricting to a single value for a given run skips ~75 % of the fits and matches the silhouette pass’s filter.

Returns:

DataFrame indexed by JSON path with columns k_number, reduction_percent, cut_position_percent, inertia.

stag.clustering.internal_metrics.locate_elbow_kneedle(k_values, inertia, S=1.0, curve='convex', direction='decreasing')[source]

Locate the elbow on a W(k) curve via the Kneedle algorithm.

Wraps kneed.KneeLocator (Satopää et al. 2011) and returns the chosen k plus diagnostic fields useful for the Figure 2C annotation.

Return type:

dict[str, float | int | None]

Args:

k_values: Monotonic increasing sequence of cluster counts. inertia: W(k) values in the same order. Should be

monotonically non-increasing.

S: Sensitivity parameter (lower → more aggressive

elbow detection). Default 1.0.

curve: "convex" for inertia curves. Default. direction: "decreasing" for inertia curves. Default.

Returns:
Dict with:

"elbow_k" — chosen k, or None if no elbow found. "elbow_y" — inertia at the elbow. "elbow_index" — position of the elbow in k_values. "normalised_knee_distance" — Kneedle internal score

(larger ⇒ more pronounced elbow).

stag.clustering.internal_metrics.selection_summary(k_values, ch=None, ch_low=None, ch_high=None, instability=None, instability_low=None, instability_high=None, silhouette=None, silhouette_low=None, silhouette_high=None, inertia=None, inertia_low=None, inertia_high=None)[source]

Single per-k DataFrame ready for the revised Figure 2.

Each metric is accompanied by an optional metric_low and metric_high aligned to k_values that the plotter renders as a 95 % confidence band around the median line. Bounds are optional — missing bounds appear as NaN columns and the plotter silently skips the fill.

Return type:

DataFrame

Args:

k_values: Cluster counts in plotting order. ch: Calinski–Harabasz medians. ch_low: Lower bound (typically the 25th percentile). ch_high: Upper bound (typically the 75th percentile). instability: Hungarian-matched centroid-drift medians. instability_low: Lower bound for instability. instability_high: Upper bound for instability. silhouette: Stratified mean-silhouette medians. silhouette_low: Lower bound for silhouette (over repeats). silhouette_high: Upper bound for silhouette (over repeats). inertia: W(k) medians. inertia_low: Lower bound for inertia. inertia_high: Upper bound for inertia.

Returns:

DataFrame with the median columns plus matching _low / _high columns.

Hungarian-matched centroid stability and meta-analysis.

class stag.clustering.meta_analysis.ClusterMetaAnalysis(directory)[source]

Bases: object

Load clustering metadata and quantify centroid stability across runs.

Walks a directory of per-run metadata JSONs (one per k-means fit) and computes the Hungarian-matched centroid instability for every (k, reduction_percent) combination. The minimum-instability run in each group serves as the representative solution.

Attributes:

directory: Root directory containing the metadata JSON files. df: DataFrame of per-run metadata + instability values

(populated by analyze() or load_df()).

__init__(directory)[source]

Bind the analyser to the per-fit metadata directory.

Args:

directory: Root of the delSize_*/k_* sweep tree.

load_data()[source]

Walk self.directory and return one row per metadata JSON.

Return type:

DataFrame

Returns:

DataFrame with the JSON fields plus k_number (inferred from the centroid count) and file_path. The centroids field is dropped to keep memory usage small.

load_centroids_for_analysis(reduction_percent, k_number)[source]

Return a list of centroid arrays for one (reduction_percent, k) group.

Each entry is a (k, n_features) array re-loaded from disk.

Return type:

list[ndarray]

static calculate_instability(centroids_list)[source]

Pair-wise Hungarian-matched distances, ranked against the most-stable run.

For each pair of clustering attempts, builds the per-centroid Euclidean distance matrix, solves the linear-sum assignment problem, and totals the matched distances. The “most stable” run is the one whose total distance to every other run is minimal; the returned vector gives every run’s distance to that reference.

Return type:

ndarray

Args:
centroids_list: List of (k, n_features) arrays — one per

independent clustering run within the same group.

Returns:

1-D array of length len(centroids_list): distance from each run to the most-stable reference run.

calculate_and_assign_instability()[source]

Compute and assign per-run instability values back to self.df.

Return type:

None

analyze()[source]

Load all metadata and populate the instability column.

Return type:

None

save_df(save_path)[source]

Write self.df to CSV.

Return type:

None

load_df(load_path)[source]

Read self.df from CSV and return it.

Return type:

DataFrame

find_most_stable_centroids(k_number, reduction_percent)[source]

Return the centroid array of the minimum-instability run in a group.

On a tie, the first matching row is returned.

Return type:

tuple[ndarray, str]

static normalize_centroids(centroids)[source]

Normalise each feature to [-1, 1] by dividing by its absolute max.

Return type:

ndarray

class stag.clustering.meta_analysis.ClusterPlotter(dataframe)[source]

Bases: object

Plot quality / instability panels and centroid radar charts.

__init__(dataframe)[source]

Wrap a per-fit metadata DataFrame for plotting.

Args:
dataframe: Output of ClusterAnalyser.analyze() containing

k_number, reduction_percent, calinski_harabasz_score, instability, centroids columns.

plot_metric(y_metric, log_scale=False)[source]

Line plot of y_metric against k_number per reduction_percent.

Return type:

Figure

Args:

y_metric: Column name to plot on the y-axis. log_scale: Whether to log-scale the y-axis.

static plot_radar_charts(centroids, feature_labels, normalise=True)[source]

Plot one polar radar chart per cluster on a shared grid.

Return type:

Figure

Args:

centroids: (n_clusters, n_features) array. feature_labels: Axis labels (one per feature). normalise: If True, set y-limits to [-1, 1] and draw

a thicker zero ring.

Per-centroid dashboards and internal-metric figures.

class stag.clustering.plotting.CentroidDashboard(centroids_info_path)[source]

Bases: object

Render the per-centroid radar + metrics dashboard.

Reads a centroid_label_info.json produced by the meta-analysis pipeline and renders one radar chart per centroid showing the feature values plus a summary table of behavioural metrics (prevalence, mean bout duration, mean speed, etc.).

__init__(centroids_info_path)[source]

Load centroids information and additional metrics from a JSON file.

plot_radar_and_metrics(feature_set)[source]

Plots a radar chart for each centroid with additional metrics.

stag.clustering.plotting.plot_internal_metrics_panel(summary, elbow_k=None, chosen_k=8, figsize=(9.0, 7.2))[source]

Four-panel internal-metric figure for the revised Figure 2.

Return type:

Figure

Panels:
  1. Calinski–Harabasz index vs k.

  2. Instability (Hungarian-matched centroid drift) vs k.

  3. Mean stratified Silhouette vs k.

  4. Inertia W(k) vs k with the Kneedle elbow marked.

Args:
summary: Per-k DataFrame from

stag.clustering.internal_metrics.selection_summary(). Columns: k, calinski_harabasz, instability, silhouette, inertia.

elbow_k: k flagged by the Kneedle algorithm (drawn as a marker

on panel D). None to suppress.

chosen_k: k highlighted across all panels as the manuscript’s

selected solution. Default 8.

figsize: Figure size in inches.

Returns:

The figure handle (caller saves via stag.constants.save_figure()).

stag.analysis — Behavioural sequence analysis

Per-sample label processing, NaN repair, first-order Markov transition models, super-prototype detection with first-order Markov shuffles + BH-FDR significance flagging, circadian rate-by-hour, and per-animal time budgets.

Prevalence, bout, and Markov-transition analysis from cluster labels.

class stag.analysis.label_analysis.LabelAnalyser(file_path, fps=50)[source]

Bases: object

Analyse cluster labels for behavioural statistics and transitions.

Parameters:
  • file_path (str) – Path to a .npy file containing the integer label array.

  • fps (int) – Sampling rate in Hz, used to convert bout lengths to seconds. Default 50.

  • Attributes

  • ----------

  • IDX (numpy.ndarray) – The label array (modified in place by filterIDX()).

  • fps – Frames per second.

  • cen_num (int) – Number of unique cluster labels.

  • label_num (int) – Total number of time steps.

__init__(file_path, fps=50)[source]

Load a per-sample label .npy and cache deriveables.

Args:

file_path: Path to the per-sample cluster-label .npy. fps: Sample rate in Hz (default 50).

filterIDX(cutoff)[source]

Merge short label runs into neighbouring bouts.

Sequences shorter than cutoff frames are absorbed by the adjacent bout (previous or next) that is longer, following the approach of Braun & Geurten (2010).

Parameters:

cutoff (int) – Minimum bout length (in frames) to retain.

get_percentage()[source]

Compute the prevalence of each label as a percentage.

Returns:

: numpy.ndarray

Array of length cen_num with percentage values.

get_mean_durations()[source]

Compute mean bout duration and SEM for each label.

Returns:

: list of tuple of (float, float)

Each tuple is (mean_seconds, sem_seconds).

get_transitions()[source]

Build the first-order transition matrix.

Thin delegate around stag.analysis.markov.build_transition_matrix() — the canonical Markov-transition helper. Returned for backward compatibility with the LabelAnalyser API; new code should call the free function directly.

Returns:

: numpy.ndarray

Square matrix of shape (cen_num, cen_num) where entry (i, j) counts transitions from label i to label j.

save_results_to_json(file_path, durations, percentages, transitions)[source]

Write analysis results to a JSON file.

Parameters:
main(cutoff, save_path)[source]

Run the full label analysis pipeline.

Parameters:
  • cutoff (int) – Minimum bout length for filtering (in frames).

  • save_path (str) – Path for the output JSON file.

NaN detection and linear interpolation for sensor data.

stag.analysis.nan_handler.load_data(filename)[source]

Loads data from a file. Handles various potential file formats.

stag.analysis.nan_handler.find_nan_sequences(arr)[source]

Finds sequences of NaN values within each column.

stag.analysis.nan_handler.interpolate_nan_sequences(arr, nan_sequences)[source]

Interpolates NaN sequences linearly in each column.

First-order Markov transition structure for prototype label sequences.

stag.analysis.markov.build_transition_matrix(IDX, n_states=None, smoothing=0.0, treat_as_single_chain=True)[source]

Count first-order transitions in a label sequence.

Return type:

ndarray

Args:

IDX: Integer label array of shape (n_samples,). n_states: Number of distinct labels; inferred from

IDX.max() + 1 when None.

smoothing: Pseudo-count added to every cell before

returning (Laplace smoothing). Default 0.0 (raw counts, matching the manuscript).

treat_as_single_chain: When True (default), the array is treated

as one continuous 48-h chain. Reserved for future segmented-chain handling.

Returns:

(n_states, n_states) matrix where entry (i, j) counts transitions from state i to state j.

Notes:

Reviewer R3 asked for explicit smoothing handling. The manuscript keeps raw counts (no pseudocounts) and shows zero-transition cells as blanks on the logarithmic colour scale in Figure 4A.

stag.analysis.markov.marginal_rates(IDX, n_states=None)[source]

Per-state marginal probability (base rate) over the full sequence.

Return type:

ndarray

Args:

IDX: Integer label array. n_states: Number of distinct labels; inferred when None.

Returns:

1-D array of length n_states summing to 1.0.

stag.analysis.markov.conditional_transition_matrix(transitions)[source]

Row-normalise a raw transition-count matrix.

Return type:

ndarray

Args:

transitions: (n_states, n_states) raw counts.

Returns:

Same shape; each row sums to 1.0. Rows whose source state never appears are left as zeros (rather than NaN-filled) so downstream log-scale plotting does not break.

stag.analysis.markov.flag_above_below_baseline(conditional, marginals, factor=1.0)[source]

Compare each transition probability to its target state’s base rate.

For every cell (i, j), the value P(j | i) is compared to factor * P(j). This is the operation that draws the up- and down-pointing triangles in Figure 4A of the manuscript.

Return type:

ndarray

Args:
conditional: Row-normalised transition matrix from

conditional_transition_matrix().

marginals: Per-state base rates from marginal_rates(). factor: Multiplier applied to P(j) before comparison.

Default 1.0.

Returns:
Integer array same shape as conditional:

+1 where P(j|i) > factor * P(j) (above baseline), -1 where P(j|i) < factor * P(j) (below baseline), 0 otherwise (equal, or both zero).

stag.analysis.markov.expected_bout_duration(conditional, fps=50)[source]

Geometric expectation of bout duration from self-transition probabilities.

Under a first-order Markov assumption with self-transition probability p = P(i | i), bout length is geometrically distributed with mean 1 / (1 - p) time steps. This helper converts those expected durations to seconds using fps.

Return type:

ndarray

Args:

conditional: Row-normalised transition matrix. fps: Sampling rate in Hz (default stag.constants.FPS).

Returns:

1-D array of length n_states giving expected bout duration in seconds. States with p == 1.0 return np.inf.

Shuffle nulls and n-gram frequency significance.

stag.analysis.null_models.shuffle_marginal(idx, rng)[source]

Random permutation of idx.

Preserves P(state); destroys all temporal structure including the transition matrix. Cheap and useful as a sanity comparator — but for the super-prototype claim use shuffle_first_order().

Return type:

ndarray

stag.analysis.null_models.shuffle_first_order(idx, rng, n_states=None)[source]

Sample a Markov chain matching idx’s empirical transitions.

Estimates the empirical transition matrix and the empirical starting distribution from idx, then samples a new chain of the same length. Preserves P(state) and P(j|i); destroys only higher-order memory.

Return type:

ndarray

Args:

idx: Per-sample state IDs in observed order, (n,). rng: NumPy generator (use one per replicate for

independent shuffles).

n_states: Total number of states. Inferred from idx if

None. Specify when idx is a slice that may not contain every state.

Returns:

Synthetic chain of shape (n,), dtype matching idx.

stag.analysis.null_models.ngram_frequencies(idx, n=3, n_states=None)[source]

Count contiguous n-grams in idx.

Return type:

dict[tuple[int, ...], int]

Args:

idx: Per-sample state IDs. n: n-gram length (default 3 = triplets). n_states: Optional cap; ngrams containing states ≥ n_states

are dropped. Useful for restricting to the canonical k=8 alphabet when idx has any noise.

Returns:

Dict mapping (s0, s1, …, s_{n-1}) to its count in idx. Sliding window with stride 1.

stag.analysis.null_models.triplet_frequencies(idx, n_states=None)[source]

Convenience wrapper: 3-gram frequencies.

Return type:

dict[tuple[int, int, int], int]

stag.analysis.null_models.null_distribution(idx, n=3, n_shuffles=1000, null_kind='first_order', rng=None, n_states=None, desc=None)[source]

Build the null frequency distribution for every observed n-gram.

Runs n_shuffles shuffles of the requested null and records each candidate n-gram’s count per shuffle. The result is a dict {ngram: shuffle_counts} aligned in iteration order; the union of n-grams seen in any shuffle plus the empirical n-grams is what we test in flag_significant_ngrams().

Return type:

dict[tuple[int, ...], ndarray]

Args:

idx: Observed state sequence. n: n-gram length. n_shuffles: Number of shuffle replicates. null_kind: "first_order" or "marginal". rng: Seeded numpy.random.Generator. None → fresh. n_states: See shuffle_first_order(). desc: Optional tqdm description.

Returns:

Dict from ngram tuple to a (n_shuffles,) int array of per-shuffle counts.

stag.analysis.null_models.flag_significant_ngrams(observed, null, percentile=99.9, fdr_alpha=0.05)[source]

Identify n-grams whose observed count exceeds the null tail.

Return type:

list[dict]

For each n-gram seen empirically:
  • percentile cutoff — observed > np.quantile(null, percentile/100)

  • empirical p-value — (1 + Σ𝟙[null observed]) / (1 + n_shuffles)

  • BH-FDR q-value — Benjamini–Hochberg across all tested n-grams.

A “super-prototype” call requires both the percentile flag and the FDR flag (Boolean AND). Returns a list of dicts ordered by ascending q (most significant first).

Args:

observed: ngram observed_count. null: ngram (n_shuffles,) array of null counts. percentile: Percentile cutoff for the percentile flag. fdr_alpha: Family-wise FDR control level.

Returns:

List of dicts with keys: ngram, observed, null_median, null_q975, null_qP, percentile_flag, p_empirical, q_bh, fdr_flag, super_prototype (= percentile_flag & fdr_flag).

stag.analysis.null_models.top_n_super_prototypes(idx, n_gram=3, n_shuffles=1000, percentile=99.9, fdr_alpha=0.05, rng=None, n_states=None, desc=None)[source]

One-shot driver: empirical n-grams → null → significance.

Equivalent to:

observed = ngram_frequencies(idx, n_gram, n_states)
null     = null_distribution(idx, n_gram, n_shuffles, "first_order", rng, n_states)
return flag_significant_ngrams(observed, null, percentile, fdr_alpha)
Return type:

list[dict]

Bout-level n-grams: run-length encode IDX, then super-prototype detection.

class stag.analysis.super_prototypes.BoutStream(labels, lengths, start_index)[source]

Bases: object

Run-length-encoded label sequence.

Attributes:

labels: Per-bout cluster label, shape (n_bouts,). lengths: Per-bout duration in samples, shape (n_bouts,). start_index: Per-bout starting sample index in the original

IDX, shape (n_bouts,).

labels: ndarray
lengths: ndarray
start_index: ndarray
property n_bouts: int

Number of bouts in the stream (equals labels.size).

stag.analysis.super_prototypes.bout_stream(idx)[source]

Run-length-encode idx into bouts.

A bout is a maximal contiguous run of the same label. Length-1 bouts (single-sample flickers between two longer dwells) are kept — filtering them is an interpretation-layer decision, not something this module should hide.

Return type:

BoutStream

Args:

idx: Per-sample label sequence, (n,) integer.

Returns:

BoutStream with three arrays of length n_bouts.

stag.analysis.super_prototypes.per_animal_bout_streams(idx, deer_ids)[source]

Build one BoutStream per unique deer_id.

Bouts are computed within an animal’s recording and never cross between animals — important when the saved per-sample arrays concatenate multiple animals’ tracks.

Return type:

dict[int, BoutStream]

stag.analysis.super_prototypes.extract_n_grams(bouts, n=3, n_states=None)[source]

Count bout-level n-grams.

Accepts either a BoutStream (uses its labels) or a 1-D array of bout labels. Sliding-window stride 1 on the bout sequence — so a triplet (a, b, c) is one bout of a followed by one bout of b followed by one bout of c.

Return type:

dict[tuple[int, ...], int]

stag.analysis.super_prototypes.extract_triplets(bouts, n_states=None)[source]

Convenience wrapper for n = 3.

Return type:

dict[tuple[int, int, int], int]

stag.analysis.super_prototypes.identify_super_prototypes(idx, deer_ids=None, n_gram=3, n_shuffles=1000, percentile=99.9, fdr_alpha=0.05, null_kind='first_order', rng=None, n_states=None, desc=None)[source]

Identify bout-level n-grams that beat the first-order Markov null.

Return type:

list[dict]

Pipeline:
  1. Build the bout stream (per-animal if deer_ids given, single global stream otherwise).

  2. Concatenate per-animal bout-label streams into a single sequence for the null test. Sentinel breaks are not inserted — the null is applied to the concatenated bout sequence, which preserves the empirical bout-to-bout transition matrix that is the whole point of the first- order null. Inter-animal “transitions” are a tiny minority (n_animals − 1 of ~10 M bout-pairs in the cohort) and do not bias the percentile cutoff.

  3. Run null_distribution() and flag_significant_ngrams() on the bout sequence.

Args:

idx: Per-sample cluster IDs, (n,). deer_ids: Per-sample deer_id, (n,) — required when

idx spans multiple animals so bouts do not cross animal boundaries.

n_gram: n-gram length (default 3 = triplets). n_shuffles: Replicates for the null (manuscript: 1000). percentile: Percentile cutoff for the percentile flag. fdr_alpha: BH-FDR control level. null_kind: "first_order" (default) or "marginal". rng: Seeded numpy.random.Generator. None → fresh. n_states: Optional state-count override. desc: Optional tqdm description for the null pass.

Returns:

List of dicts as in flag_significant_ngrams(), sorted by ascending q-value (most significant first). Each dict has the super_prototype boolean.

stag.analysis.super_prototypes.bout_duration_stats(bouts, fps)[source]

Per-PM bout-duration summary (median, IQR, count) in seconds.

Operates on a SINGLE BoutStream and pools all of its bouts. For across-animal summaries use per_animal_pm_duration_stats() followed by aggregate_durations_across_animals() — pooling across animals at this level pseudo-replicates (the animal with most bouts dominates the moments and SEMs).

Return type:

dict[int, dict[str, float]]

stag.analysis.super_prototypes.per_animal_pm_duration_stats(streams, fps)[source]

Per-(animal, PM) bout-duration summary.

The animal is the unit of observation here. This function emits one row per non-empty (deer_id, pm) cell; callers feed the result to aggregate_durations_across_animals() to get cohort-level summaries without pseudo-replicating across the millions of pooled bouts.

Return type:

DataFrame

Args:
streams: deer_id BoutStream mapping (typically the output of

per_animal_bout_streams()).

fps: Sample rate in Hz so durations come out in seconds.

Returns:

DataFrame with columns deer_id, pm, n_bouts, mean_s, median_s, q25_s, q75_s.

stag.analysis.super_prototypes.aggregate_durations_across_animals(per_animal)[source]

Cohort-level summary of per-(animal, PM) duration values.

Two complementary aggregations per PM:

  • mean-of-means + SEM (matches the manuscript’s existing duration style; SEM is across animals, n = n_animals, not across bouts).

  • median-of-medians + IQR (robust to the heavy right-skew of bout durations; IQR is taken across the per-animal medians).

Args:

per_animal: Output of per_animal_pm_duration_stats().

Returns:

DataFrame with columns pm, n_animals, mean_of_means_s, sem_s, median_of_medians_s, q25_of_medians_s, q75_of_medians_s, total_bouts.

Return type:

DataFrame

Circadian summaries, day/night Wilcoxon, and per-animal time budgets.

stag.analysis.circadian.classify_day_night(timestamps_ns, site=LocationInfo(name='Tokanui_area', region='Waikato, NZ', timezone='Pacific/Auckland', latitude=-38.11015, longitude=175.49841), crepuscular_margin_minutes=15.0)[source]

Per-sample 0/1/-1 = night / day / crepuscular for timestamps_ns.

Return type:

ndarray

Args:
timestamps_ns: int64 nanoseconds since the Unix

epoch, NZ local time (what the cache writes).

site: Astral LocationInfo (default Waikato). crepuscular_margin_minutes: Minutes excluded around sunrise /

sunset. Default 15 min.

Returns:
int8 array, same length as timestamps_ns:
  • 1 = day (between sunrise+margin and sunset-margin)

  • 0 = night

  • -1 = crepuscular (excluded from day/night comparisons)

stag.analysis.circadian.hourly_proportions(idx, timestamps_ns, pm_ids)[source]

Per-hour proportion of each PM across the cohort.

Return type:

DataFrame

Args:

idx: Per-sample cluster IDs. timestamps_ns: int64 ns timestamps aligned with idx. pm_ids: PMs to report (columns of the result).

Returns:

DataFrame indexed by hour_of_day (0..23) with one column per requested PM, plus n_samples. Rows sum to 1 across PM columns when all PMs in idx are listed.

stag.analysis.circadian.split_by_day(timestamps_ns, deer_ids=None)[source]

Per-sample “recording day” index (0 = first 24 h, 1 = next, …).

Day boundaries are local-time midnights of the first timestamp in the array (per animal, when deer_ids is provided). This is the index reviewers asked for in the day-1 vs day-2 panel (R1 + R3 want to see the second day replicates the first).

Return type:

ndarray

Args:

timestamps_ns: int64 ns timestamps. deer_ids: Optional per-sample deer_id; when given,

day-indexing restarts at each animal’s first timestamp.

Returns:

int8 array of day indices (0, 1, 2, …), length matching input.

stag.analysis.circadian.ear_flick_day_night_test(idx, timestamps_ns, deer_ids, ear_flick_pms, activity_pms, site=LocationInfo(name='Tokanui_area', region='Waikato, NZ', timezone='Pacific/Auckland', latitude=-38.11015, longitude=175.49841), crepuscular_margin_minutes=15.0)[source]

Paired Wilcoxon test of ear-flick rate, day vs night, per animal.

R1 #10 — “test whether ear flicks are diurnal”. The rate is normalised to overall activity (any PM in activity_pms) so a higher rate during the day is not just driven by the animal being awake.

Return type:

dict

For each animal we compute:

rate_day = (# ear-flick samples during day) / (# activity samples during day) rate_night = (# ear-flick samples during night) / (# activity samples during night)

and run a paired Wilcoxon signed-rank test on (rate_day, rate_night) across animals. Crepuscular samples are excluded.

Returns:
Dict with the per-animal table and the test statistics:

per_animal: DataFrame with rate_day / rate_night. W: Wilcoxon test statistic. p_value: Two-sided p. median_ratio_day_over_night: across animals. q025_ratio, q975_ratio: 2.5/97.5 bootstrap of ratio.

stag.analysis.circadian.per_animal_time_budget(idx, deer_ids, pm_ids)[source]

Per-animal proportion of time in each PM (rows = animal, cols = PM).

Used by the R2 #8 individual-variability supplementary figure (stacked bar of PM proportions per stag, ordered by inactive proportion).

Return type:

DataFrame

stag.embedded — Bare-metal Q4.12 nearest-centroid classifier

Q4.12 fixed-point export of the manuscript’s k = 8 centroids into a single C header consumable by every MCU in the Sprint 4 benchmark matrix. The C implementation itself lives in stag/embedded/nearest_centroid.{h,c}.

Python centroids → C header (Q4.12 fixed-point + float).

class stag.embedded.export_centroids.QFormat(int_bits, frac_bits)[source]

Bases: object

Fixed-point format descriptor.

int_bits: int
frac_bits: int
property total_bits: int

Sum of integer and fractional bits (excluding sign storage).

property scale: int

Integer scale factor 2 ** frac_bits mapping float to fixed-point.

property max_signed: int

Largest signed integer representable in total_bits bits.

property min_signed: int

Smallest (most-negative) signed integer in total_bits bits.

property range_float: tuple[float, float]

Representable float range (min, max) after fixed-point encoding.

stag.embedded.export_centroids.encode_q_format(x, q)[source]

Quantise float values to a Q-format signed integer representation.

Values outside the representable range are clipped (the caller is expected to have scaled inputs to fit; we clip rather than crash because the float centroids might be a hair over ±1 from floating- point round-off, and we want a deterministic encoding).

Return type:

ndarray

stag.embedded.export_centroids.decode_q_format(x_int, q)[source]

Inverse of encode_q_format() — returns the float value.

Return type:

ndarray

stag.embedded.export_centroids.verify_round_trip(centroids_float, centroids_q, q, n_test=100000, tolerance=0.001, rng=None)[source]

Quantify Q-format quantisation loss for nearest-centroid classification.

Generates n_test synthetic feature vectors uniformly in the centroid bounding box, runs the nearest-centroid classifier under both the float and Q-format representations, and reports the mismatch fraction.

Return type:

dict

Args:

centroids_float: Original (K, D) float centroids. centroids_q: (K, D) Q-format encoded centroids

(int32 already scaled).

q: The Q format used. n_test: Test-vector count. tolerance: Acceptable disagreement fraction. rng: Seeded generator. None → fresh.

Returns:
Dict: mismatch_fraction, max_abs_quant_error,

n_test, passed.

stag.embedded.export_centroids.centroids_to_c_header(centroids, maxabs_divisors, out_path, feature_labels=('Head_X', 'Head_Y', 'Head_Z', 'Ear_X', 'Ear_Y', 'Ear_Z'), q_frac_bits=12, source_files=None)[source]

Emit a self-contained C header with Q-format + float centroids.

The generated header is consumed by stag/embedded/nearest_centroid.c (Sprint 4 Phase 2) and by the per-MCU benchmark binaries.

Return type:

dict

Args:
centroids: (K, D) float centroids in MaxAbs-scaled

coordinates (i.e. each axis already divided by its absolute maximum).

maxabs_divisors: (D,) per-axis MaxAbs divisors used to

produce the scaled inputs at training time. The on-MCU classifier multiplies its raw sensor input by the inverse of these divisors before the distance loop.

out_path: Where to write centroids.h. feature_labels: Per-axis names (for comments in the header). q_frac_bits: Fractional bits in the Q format. Default 12

→ Q4.12 in 16-bit storage.

source_files: Optional dict of provenance fields written

into the header comment block (e.g. {"centroids": Path("…/k_8/centroids/…npy")}).

Returns:

Verification dict from verify_round_trip().

stag.embedded.export_centroids.parse_args()[source]

Parse CLI arguments for the centroid-export driver.

Return type:

Namespace

stag.embedded.export_centroids.main()[source]

Export the k=8 centroids as a C header (driver entry point).

Return type:

None

stag.utils — Cross-cutting helpers

Filename generators, ASCII-banner generators, and other small utilities used across the package.

Box-drawing banner generators for STAG module headers.

stag.utils.banners.double_banner(package, module, tagline, description_lines, width=66)[source]

Generate a double-line module banner with perfect alignment.

Return type:

str

Args:

package: Package name (e.g. “STAG”). module: Module name (e.g. “clustering.internal_metrics”). tagline: Short « tagline » caption. description_lines: Body lines of the banner. width: Inner width in characters (default 66).

Returns:

Multi-line string ready to paste as a comment block.

stag.utils.banners.section_banner(title, tagline, width=60)[source]

Generate a single-line section header with perfect alignment.

Return type:

str

stag.utils.banners.thin_rule(title, width=65)[source]

Generate a thin section separator.

Return type:

str

CSV-formatted log handler for Python logging.

class stag.utils.csv_formatter.CsvFormatter[source]

Bases: Formatter

logging.Formatter that emits each record as one CSV row.

Each row is "<LEVELNAME>,<message>" with both fields fully quoted so embedded commas in the message are preserved. Useful for log files that downstream pandas / Excel readers want to consume directly.

__init__()[source]

Create the StringIO-backed CSV writer.

format(record)[source]

Render one logging.LogRecord as a CSV-formatted string.

Standardised output path generation for clustering results.

stag.utils.filename_generator.generate_filename(parent_dir, tag, num_clusters, deletion_size, deletion_position)[source]

Build the (centroids, labels, meta) path triplet for one fit.

Args:

parent_dir: Root directory of the sweep (e.g. cluster_results/). tag: Sweep tag (e.g. "deer6raw"). num_clusters: k value. deletion_size: Leave-out percentage (delSize_*). deletion_position: Leave-out start position percentage (delPosP*).

Returns:

Dict with "centroids", "labels", "meta" keys mapped to absolute file paths.