# ╔══════════════════════════════════════════════════════════════════╗
# ║ STAG — analysis.circadian ║
# ║ « hourly proportions, day/night tests, per-animal time budgets»║
# ╠══════════════════════════════════════════════════════════════════╣
# ║ Sprint 3 deliverables that depend on the per-sample (deer_id, ║
# ║ timestamp) cache: ║
# ║ ║
# ║ - hourly_proportions(idx, ts) — diurnal panel data ║
# ║ - split_by_day(idx, ts) — day-1 vs day-2 consistency panel ║
# ║ (R1 + R3 asked for the second 24 h to be analysed too) ║
# ║ - ear_flick_day_night_test(idx, ts, deer_ids, ear_pms, ║
# ║ activity_pms) — R1 #10: paired Wilcoxon per animal, ║
# ║ day-rate / night-rate, normalised to overall activity ║
# ║ - per_animal_time_budget(idx, deer_ids) — R2 #8 individual ║
# ║ variability table ║
# ║ ║
# ║ Day/night is astral-based (sunrise/sunset for the Waikato ║
# ║ recording site on 2018-11-12 → 2018-12-01), not a fixed 06/18 ║
# ║ clock cut. Crepuscular margins are excluded by default (15 ║
# ║ min on each side of the solar event) — adjustable. ║
# ╚══════════════════════════════════════════════════════════════════╝
"""Circadian summaries, day/night Wilcoxon, and per-animal time budgets."""
from __future__ import annotations
from collections.abc import Sequence
from datetime import date as _date
import numpy as np
import pandas as pd
from astral import LocationInfo
from astral.sun import sun
from scipy.stats import wilcoxon
# ─────────────────────────────────────────────────────────────────
# Recording site (from per-animal-means median GPS, see Sprint 3
# cache step — Tokanui-area paddock, Waikato, NZ).
# ─────────────────────────────────────────────────────────────────
RECORDING_SITE: LocationInfo = LocationInfo(
name="Tokanui_area",
region="Waikato, NZ",
timezone="Pacific/Auckland",
latitude=-38.11015,
longitude=175.49841,
)
# ─────────────────────────────────────────────────────────────────
# Day/night classification
# ─────────────────────────────────────────────────────────────────
def _solar_events_table(
dates: Sequence[_date],
site: LocationInfo = RECORDING_SITE,
crepuscular_margin_minutes: float = 15.0,
) -> pd.DataFrame:
"""Return per-date sunrise / sunset plus padded day-window edges.
The padded edges shrink the "day" window by ``crepuscular_margin_minutes``
on each side so the test compares well-resolved day to well-resolved
night and ignores the ambiguous twilight periods. Pass ``0.0`` to
use the raw solar events.
"""
rows = []
margin_ns = int(crepuscular_margin_minutes * 60 * 1e9)
for d in dates:
s = sun(site.observer, date=d, tzinfo=site.timezone)
# The cache step writes NZ_DateTime as naive wall-clock
# nanoseconds — "12:00 NZDT" is stored as the int value of
# "12:00 UTC". Match that convention here by stripping the
# tz info from astral's tz-aware events (keeps the wall-clock
# numbers, treats them as UTC for int conversion).
sunrise_ns = pd.Timestamp(s["sunrise"].replace(tzinfo=None)).value
sunset_ns = pd.Timestamp(s["sunset"].replace(tzinfo=None)).value
rows.append({
"date": d,
"sunrise_ns": sunrise_ns,
"sunset_ns": sunset_ns,
"day_start_ns": sunrise_ns + margin_ns,
"day_end_ns": sunset_ns - margin_ns,
})
return pd.DataFrame(rows)
[docs]
def classify_day_night(
timestamps_ns: np.ndarray,
site: LocationInfo = RECORDING_SITE,
crepuscular_margin_minutes: float = 15.0,
) -> np.ndarray:
"""Per-sample 0/1/-1 = night / day / crepuscular for ``timestamps_ns``.
Args:
timestamps_ns: int64 nanoseconds since the Unix
epoch, NZ local time (what the
cache writes).
site: Astral LocationInfo (default Waikato).
crepuscular_margin_minutes: Minutes excluded around sunrise /
sunset. Default 15 min.
Returns:
int8 array, same length as ``timestamps_ns``:
- 1 = day (between sunrise+margin and sunset-margin)
- 0 = night
- -1 = crepuscular (excluded from day/night comparisons)
"""
ts = np.asarray(timestamps_ns, dtype=np.int64)
if ts.size == 0:
return np.zeros(0, dtype=np.int8)
dates = pd.to_datetime(ts).date
unique_dates = sorted(set(dates))
table = _solar_events_table(unique_dates, site=site,
crepuscular_margin_minutes=crepuscular_margin_minutes)
by_date = {row["date"]: row for _, row in table.iterrows()}
out = np.zeros(ts.size, dtype=np.int8) # default night
# Vectorise per-date using contiguous spans — much faster than
# per-sample lookup on 200 M rows.
df = pd.DataFrame({"ts": ts, "date": dates})
for d, group in df.groupby("date", sort=False):
ev = by_date[d]
block = group.index.to_numpy()
sub_ts = ts[block]
is_day = (sub_ts >= ev["day_start_ns"]) & (sub_ts < ev["day_end_ns"])
# Crepuscular margins: [sunrise, day_start) ∪ [day_end, sunset)
is_crep = (
((sub_ts >= ev["sunrise_ns"]) & (sub_ts < ev["day_start_ns"]))
|
((sub_ts >= ev["day_end_ns"]) & (sub_ts < ev["sunset_ns"]))
)
out[block[is_day]] = 1
out[block[is_crep]] = -1
return out
# ─────────────────────────────────────────────────────────────────
# Hourly proportions
# ─────────────────────────────────────────────────────────────────
[docs]
def hourly_proportions(
idx: np.ndarray,
timestamps_ns: np.ndarray,
pm_ids: Sequence[int],
) -> pd.DataFrame:
"""Per-hour proportion of each PM across the cohort.
Args:
idx: Per-sample cluster IDs.
timestamps_ns: int64 ns timestamps aligned with ``idx``.
pm_ids: PMs to report (columns of the result).
Returns:
``DataFrame`` indexed by ``hour_of_day`` (0..23) with one
column per requested PM, plus ``n_samples``. Rows sum to 1
across PM columns when all PMs in ``idx`` are listed.
"""
idx = np.asarray(idx)
ts = pd.to_datetime(np.asarray(timestamps_ns, dtype=np.int64))
hour = ts.hour
df = pd.DataFrame({"hour": hour, "pm": idx})
counts = df.groupby("hour")["pm"].value_counts().unstack(fill_value=0)
counts = counts.reindex(columns=list(pm_ids), fill_value=0)
n_per_hour = counts.sum(axis=1).rename("n_samples")
proportions = counts.div(n_per_hour, axis=0)
proportions["n_samples"] = n_per_hour
return proportions
# ─────────────────────────────────────────────────────────────────
# Day-split for replication panels
# ─────────────────────────────────────────────────────────────────
[docs]
def split_by_day(
timestamps_ns: np.ndarray,
deer_ids: np.ndarray | None = None,
) -> np.ndarray:
"""Per-sample "recording day" index (0 = first 24 h, 1 = next, …).
Day boundaries are local-time midnights of the *first* timestamp
in the array (per animal, when ``deer_ids`` is provided). This
is the index reviewers asked for in the day-1 vs day-2 panel
(R1 + R3 want to see the second day replicates the first).
Args:
timestamps_ns: int64 ns timestamps.
deer_ids: Optional per-sample deer_id; when given,
day-indexing restarts at each animal's first
timestamp.
Returns:
int8 array of day indices (0, 1, 2, …), length matching input.
"""
ts = pd.to_datetime(np.asarray(timestamps_ns, dtype=np.int64))
out = np.zeros(ts.size, dtype=np.int8)
if deer_ids is None:
groups = [(None, np.arange(ts.size))]
else:
deer_ids = np.asarray(deer_ids)
groups = [
(int(d), np.flatnonzero(deer_ids == d))
for d in np.unique(deer_ids)
]
for _, idx_block in groups:
if idx_block.size == 0:
continue
block_ts = ts[idx_block]
start = block_ts[0].floor("D")
out[idx_block] = ((block_ts - start) // pd.Timedelta(days=1)).astype("int8").to_numpy()
return out
# ─────────────────────────────────────────────────────────────────
# Ear-flick day/night rate test
# ─────────────────────────────────────────────────────────────────
[docs]
def ear_flick_day_night_test(
idx: np.ndarray,
timestamps_ns: np.ndarray,
deer_ids: np.ndarray,
ear_flick_pms: Sequence[int],
activity_pms: Sequence[int],
site: LocationInfo = RECORDING_SITE,
crepuscular_margin_minutes: float = 15.0,
) -> dict:
"""Paired Wilcoxon test of ear-flick rate, day vs night, per animal.
R1 #10 — "test whether ear flicks are diurnal". The rate is
normalised to overall activity (any PM in ``activity_pms``) so
a higher rate during the day is not just driven by the animal
being awake.
For each animal we compute:
rate_day = (# ear-flick samples during day) / (# activity samples during day)
rate_night = (# ear-flick samples during night) / (# activity samples during night)
and run a paired Wilcoxon signed-rank test on ``(rate_day, rate_night)``
across animals. Crepuscular samples are excluded.
Returns:
Dict with the per-animal table and the test statistics:
``per_animal``: ``DataFrame`` with rate_day / rate_night.
``W``: Wilcoxon test statistic.
``p_value``: Two-sided p.
``median_ratio_day_over_night``: across animals.
``q025_ratio``, ``q975_ratio``: 2.5/97.5 bootstrap of ratio.
"""
idx = np.asarray(idx)
deer_ids = np.asarray(deer_ids)
day_label = classify_day_night(
timestamps_ns, site=site,
crepuscular_margin_minutes=crepuscular_margin_minutes,
)
ear_set = np.isin(idx, list(ear_flick_pms))
act_set = np.isin(idx, list(activity_pms))
rows = []
for d in np.unique(deer_ids):
mask = deer_ids == d
if not mask.any():
continue
ear = ear_set[mask]
act = act_set[mask]
dl = day_label[mask]
day_act = int(((dl == 1) & act).sum())
night_act = int(((dl == 0) & act).sum())
day_ear = int(((dl == 1) & ear).sum())
night_ear = int(((dl == 0) & ear).sum())
rows.append({
"deer_id": int(d),
"day_act": day_act,
"night_act": night_act,
"day_ear": day_ear,
"night_ear": night_ear,
"rate_day": day_ear / day_act if day_act > 0 else float("nan"),
"rate_night": night_ear / night_act if night_act > 0 else float("nan"),
})
per_animal = pd.DataFrame(rows).set_index("deer_id")
finite = per_animal.dropna(subset=["rate_day", "rate_night"])
if len(finite) < 6:
return {
"per_animal": per_animal,
"n_animals_in_test": int(len(finite)),
"W": float("nan"), "p_value": float("nan"),
"median_ratio_day_over_night": float("nan"),
"q025_ratio": float("nan"), "q975_ratio": float("nan"),
}
W, p = wilcoxon(finite["rate_day"].to_numpy(),
finite["rate_night"].to_numpy())
ratios = finite["rate_day"] / finite["rate_night"].replace(0, np.nan)
ratios = ratios.replace([np.inf, -np.inf], np.nan).dropna()
return {
"per_animal": per_animal,
"n_animals_in_test": int(len(finite)),
"W": float(W),
"p_value": float(p),
"median_ratio_day_over_night": float(ratios.median()),
"q025_ratio": float(ratios.quantile(0.025)) if len(ratios) else float("nan"),
"q975_ratio": float(ratios.quantile(0.975)) if len(ratios) else float("nan"),
}
# ─────────────────────────────────────────────────────────────────
# Per-animal time budget
# ─────────────────────────────────────────────────────────────────
[docs]
def per_animal_time_budget(
idx: np.ndarray,
deer_ids: np.ndarray,
pm_ids: Sequence[int],
) -> pd.DataFrame:
"""Per-animal proportion of time in each PM (rows = animal, cols = PM).
Used by the R2 #8 individual-variability supplementary figure
(stacked bar of PM proportions per stag, ordered by inactive
proportion).
"""
idx = np.asarray(idx)
deer_ids = np.asarray(deer_ids)
df = pd.DataFrame({"deer_id": deer_ids, "pm": idx})
counts = df.groupby("deer_id")["pm"].value_counts().unstack(fill_value=0)
counts = counts.reindex(columns=list(pm_ids), fill_value=0)
n_per_animal = counts.sum(axis=1).rename("n_samples")
proportions = counts.div(n_per_animal, axis=0)
proportions["n_samples"] = n_per_animal
return proportions