# ╔══════════════════════════════════════════════════════════════════╗
# ║ reRandomStats — dose_response ║
# ║ « breakpoint + sigmoid dose-response fitting » ║
# ╠══════════════════════════════════════════════════════════════════╣
# ║ Threshold-detection toolkit for one-predictor / one-response ║
# ║ data where the relationship is suspected to be non-linear: ║
# ║ ║
# ║ broken_stick_fit — two-segment piecewise linear fit, with ║
# ║ profile-RSS 95 % CI on the breakpoint ║
# ║ (Muggeo 2003 segmented regression) ║
# ║ davies_test — Davies (1987 / 2002) test for breakpoint ║
# ║ existence ║
# ║ pscore_test — Muggeo (2016) Pseudo-Score test for ║
# ║ breakpoint existence; typically more ║
# ║ powerful than Davies for a single ║
# ║ changepoint ║
# ║ hill_fit — 4-parameter logistic / Hill dose-response ║
# ║ fit (Hill 1910; Goutelle et al. 2008) ║
# ║ with Sebaugh & McCray (2003) lower-bend ║
# ║ point as a sigmoid-equivalent of the ║
# ║ broken-stick breakpoint ║
# ║ per_subject_segmented — iterate any of the above fitters ║
# ║ over a panel of subjects; pickle-safe ║
# ║ for ProcessPoolExecutor parallelism ║
# ║ ║
# ║ Generalised from the DigiMuh dairy-cow heat-stress pipeline. ║
# ╚══════════════════════════════════════════════════════════════════╝
"""Breakpoint and sigmoid dose-response fitting for two-variable data.
All five functions in this module take ``x`` / ``y`` arrays (or a
DataFrame for :func:`per_subject_segmented`) and return either a
``dict`` (the four primary fitters) or a ``DataFrame`` (the per-subject
iterator). No I/O, no side effects, no project-specific assumptions.
**Skip-marker convention.** When the input is too small to fit
(``n < 30`` for the primary fitters, ``n < min_n`` for the per-subject
iterator), the result dict carries ``converged=False`` and ``n=<actual
sample size>``, plus the primary parameter (e.g. ``breakpoint`` or
``ec50``) as ``np.nan``. This is the DigiMuh-derived convention;
callers should test ``converged`` before accessing fit-specific fields.
**References.**
- Davies RB (1987) Biometrika 74:33-43.
- Davies RB (2002) Biometrika 89:484-489.
- Goutelle S, Maurin M, Rougier F, Barbaut X, Bourguignon L, Ducher M,
Maire P (2008) Fundamental & Clinical Pharmacology 22:633-648.
- Hill AV (1910) Journal of Physiology 40:iv-vii.
- Muggeo VMR (2003) Statistics in Medicine 22:3055-3071.
- Muggeo VMR (2016) Journal of Statistical Computation and Simulation
86:3059-3067.
- Sebaugh JL, McCray PD (2003) Pharmaceutical Statistics 2:167-174.
"""
from __future__ import annotations
import logging
from collections.abc import Callable
from typing import Any
import numpy as np
import pandas as pd
from scipy.optimize import minimize_scalar
log = logging.getLogger("rerandomstats.dose_response")
# ┌────────────────────────────────────────────────────────────┐
# │ Broken-stick (segmented) regression with breakpoint CI │
# └────────────────────────────────────────────────────────────┘
[docs]
def broken_stick_fit(
x: np.ndarray,
y: np.ndarray,
x_range: tuple[float, float] | None = None,
n_grid: int = 200,
) -> dict:
"""Fit a two-segment piecewise-linear model with one breakpoint.
Model: ``y = α_below + β_below·x`` for ``x ≤ ψ``; ``y = α_above +
β_above·x`` for ``x > ψ``. Biological constraint enforced:
``slope_above > slope_below`` and ``slope_above > 0`` (the
"flat-ish below, steeper above" pattern typical of threshold
physiological responses). Fits that violate this constraint are
returned with ``converged=False`` and a ``rejected_reason`` string.
The breakpoint is located by grid search over ``n_grid`` candidates
in ``x_range``, refined by ``scipy.optimize.minimize_scalar`` on
the residual sum of squares. A **profile-RSS 95 % confidence
interval** on the breakpoint is computed from the grid-evaluated
RSS profile using the F-distribution threshold for a
one-parameter constraint::
CI = {ψ : RSS(ψ) ≤ RSS_min · (1 + F_{1, df_resid; 0.95} / df_resid)}
Threshold crossings are linearly interpolated between adjacent
grid points; CIs that hit the search bounds are flagged with
``breakpoint_ci_truncated=True``.
Args:
x: Predictor array. NaN entries dropped.
y: Response array. Must align with ``x``.
x_range: Search bounds for the breakpoint. Default
``(percentile_10, percentile_90)`` of ``x``.
n_grid: Grid resolution for the initial breakpoint search.
Default 200.
Returns:
Dict with the breakpoint, both segment slopes and intercepts,
``r_squared``, ``n``, ``n_below``, ``n_above``, ``converged``,
``breakpoint_ci_lo``, ``breakpoint_ci_hi``, ``breakpoint_se``
(derived from half-CI-width / 1.96), ``breakpoint_ci_truncated``,
and ``rejected_reason`` (``None`` if the fit converged).
References:
Muggeo VMR (2003) Estimating regression models with unknown
break-points. Statistics in Medicine 22:3055-3071.
"""
from scipy.stats import f as _f_dist
mask = np.isfinite(x) & np.isfinite(y)
x, y = x[mask], y[mask]
n = len(x)
if n < 30:
return {"breakpoint": np.nan, "converged": False, "n": n}
if x_range is None:
x_range = (np.percentile(x, 10), np.percentile(x, 90))
if x_range[0] >= x_range[1]:
return {"breakpoint": np.nan, "converged": False, "n": n}
def rss_at_bp(bp: float) -> float:
left, right = x <= bp, x > bp
if left.sum() < 10 or right.sum() < 10:
return np.inf
try:
Xl = np.column_stack([np.ones(left.sum()), x[left]])
cl, *_ = np.linalg.lstsq(Xl, y[left], rcond=None)
Xr = np.column_stack([np.ones(right.sum()), x[right]])
cr, *_ = np.linalg.lstsq(Xr, y[right], rcond=None)
except np.linalg.LinAlgError:
return np.inf
return np.sum((y[left] - Xl @ cl) ** 2) + np.sum((y[right] - Xr @ cr) ** 2)
grid = np.linspace(x_range[0], x_range[1], n_grid)
rss_vals = np.array([rss_at_bp(bp) for bp in grid])
best_idx = int(np.argmin(rss_vals))
if not np.isfinite(rss_vals[best_idx]):
return {"breakpoint": np.nan, "converged": False, "n": n}
lo = grid[max(0, best_idx - 2)]
hi = grid[min(len(grid) - 1, best_idx + 2)]
result = minimize_scalar(rss_at_bp, bounds=(lo, hi), method="bounded")
bp = float(result.x)
final_rss = float(result.fun)
left, right = x <= bp, x > bp
Xl = np.column_stack([np.ones(left.sum()), x[left]])
cl, *_ = np.linalg.lstsq(Xl, y[left], rcond=None)
Xr = np.column_stack([np.ones(right.sum()), x[right]])
cr, *_ = np.linalg.lstsq(Xr, y[right], rcond=None)
ss_tot = float(np.sum((y - np.mean(y)) ** 2))
r_sq = 1.0 - final_rss / ss_tot if ss_tot > 0 else np.nan
slope_below, slope_above = float(cl[1]), float(cr[1])
valid = slope_above > slope_below and slope_above > 0
# ── Profile-RSS 95 % CI on the breakpoint ───────────────────
p_linear = 4 # two intercepts, two slopes
df_resid = n - p_linear
ci_lo = ci_hi = bp_se = np.nan
truncated = False
if df_resid > 0 and final_rss > 0 and valid:
f_crit = float(_f_dist.ppf(0.95, 1, df_resid))
rss_thresh = final_rss * (1.0 + f_crit / df_resid)
ok = rss_vals <= rss_thresh
if ok.any():
lo_idx = int(np.argmax(ok))
hi_idx = int(len(ok) - 1 - np.argmax(ok[::-1]))
def _interp_cross(i_out: int, i_in: int) -> float:
ri, rj = rss_vals[i_out], rss_vals[i_in]
if not (np.isfinite(ri) and np.isfinite(rj)) or ri == rj:
return float(grid[i_in])
t = (rss_thresh - ri) / (rj - ri)
return float(grid[i_out] + t * (grid[i_in] - grid[i_out]))
if lo_idx > 0:
ci_lo = _interp_cross(lo_idx - 1, lo_idx)
else:
ci_lo = float(grid[0])
truncated = True
if hi_idx < len(grid) - 1:
ci_hi = _interp_cross(hi_idx + 1, hi_idx)
else:
ci_hi = float(grid[-1])
truncated = True
ci_lo, ci_hi = min(ci_lo, ci_hi), max(ci_lo, ci_hi)
bp_se = (ci_hi - ci_lo) / (2.0 * 1.96)
return {
"breakpoint": bp if valid else np.nan,
"slope_below": slope_below,
"intercept_below": float(cl[0]),
"slope_above": slope_above,
"intercept_above": float(cr[0]),
"r_squared": float(r_sq) if np.isfinite(r_sq) else np.nan,
"n": n,
"n_below": int(left.sum()),
"n_above": int(right.sum()),
"converged": bool(valid),
"breakpoint_ci_lo": float(ci_lo) if np.isfinite(ci_lo) else np.nan,
"breakpoint_ci_hi": float(ci_hi) if np.isfinite(ci_hi) else np.nan,
"breakpoint_se": float(bp_se) if np.isfinite(bp_se) else np.nan,
"breakpoint_ci_truncated": bool(truncated),
"rejected_reason": (
None if valid
else f"slope_above ({slope_above:.4f}) <= slope_below ({slope_below:.4f})"
),
}
# ┌────────────────────────────────────────────────────────────┐
# │ Davies (1987 / 2002) test for breakpoint existence │
# └────────────────────────────────────────────────────────────┘
[docs]
def davies_test(
x: np.ndarray,
y: np.ndarray,
k: int = 10,
x_range: tuple[float, float] | None = None,
) -> dict:
"""Davies (1987 / 2002) test for the existence of a breakpoint.
Tests ``H0`` (the relationship is a single straight line, no
breakpoint) vs ``H1`` (there is an unknown breakpoint where the
slope changes). The procedure evaluates ``k`` candidate
breakpoints, computes a naive t-statistic for the
slope-difference at each, takes the maximum, and corrects the
p-value for the search using Davies' upper bound::
p ≤ Pr(|T| > M) + V · φ(M)
where ``V`` is the number of sign changes in the t-statistic
sequence and ``φ`` is the standard-normal (or t) density.
Args:
x: Predictor array. NaN entries dropped.
y: Response array. Must align with ``x``.
k: Number of evaluation points. Default 10 (matches the
R ``segmented`` package default).
x_range: Search range for candidate breakpoints. Default
``(percentile_5, percentile_95)`` of ``x``.
Returns:
Dict with ``pvalue``, ``best_at`` (candidate with strongest
signal), ``max_statistic``, ``n_sign_changes``, and ``n``.
References:
Davies RB (1987) Biometrika 74:33-43.
Davies RB (2002) Biometrika 89:484-489.
"""
from scipy.stats import norm
from scipy.stats import t as t_dist
mask = np.isfinite(x) & np.isfinite(y)
x, y = x[mask], y[mask]
n = len(x)
if n < 30:
return {"pvalue": np.nan, "best_at": np.nan, "max_statistic": np.nan, "n": n}
if x_range is None:
x_range = (np.percentile(x, 5), np.percentile(x, 95))
psi_candidates = np.linspace(x_range[0], x_range[1], k)
t_stats = np.full(k, np.nan)
for j, psi in enumerate(psi_candidates):
seg_var = np.maximum(x - psi, 0.0)
X_design = np.column_stack([np.ones(n), x, seg_var])
try:
beta, _residuals, rank, _sv = np.linalg.lstsq(X_design, y, rcond=None)
except np.linalg.LinAlgError:
continue
if rank < 3:
continue
y_hat = X_design @ beta
resid = y - y_hat
df = n - 3
if df <= 0:
continue
sigma2 = float(np.sum(resid ** 2)) / df
try:
XtX_inv = np.linalg.inv(X_design.T @ X_design)
except np.linalg.LinAlgError:
continue
se_delta = float(np.sqrt(max(sigma2 * XtX_inv[2, 2], 0.0)))
if se_delta <= 0:
continue
t_stats[j] = beta[2] / se_delta
valid_t = t_stats[np.isfinite(t_stats)]
if len(valid_t) < 2:
return {"pvalue": np.nan, "best_at": np.nan, "max_statistic": np.nan, "n": n}
abs_t = np.abs(valid_t)
M = float(np.max(abs_t))
best_idx = int(np.nanargmax(np.abs(t_stats)))
best_psi = float(psi_candidates[best_idx])
signs = np.sign(valid_t)
sign_changes = int(np.sum(np.abs(np.diff(signs)) > 0))
df = n - 3
if df > 300:
# Large-sample normal approximation (exact gamma overflows).
p_naive = 2.0 * (1.0 - norm.cdf(M))
phi_M = norm.pdf(M)
else:
p_naive = 2.0 * (1.0 - t_dist.cdf(M, df))
phi_M = t_dist.pdf(M, df)
p_davies = float(min(p_naive + sign_changes * phi_M, 1.0))
return {
"pvalue": p_davies,
"best_at": best_psi,
"max_statistic": M,
"n_sign_changes": sign_changes,
"n": n,
}
# ┌────────────────────────────────────────────────────────────┐
# │ Pseudo-Score test (Muggeo 2016) │
# └────────────────────────────────────────────────────────────┘
[docs]
def pscore_test(
x: np.ndarray,
y: np.ndarray,
k: int = 10,
x_range: tuple[float, float] | None = None,
) -> dict:
"""Pseudo-Score test for breakpoint existence (Muggeo 2016).
Typically **more powerful than the Davies test** for detecting a
single changepoint. Averages the segmented variable over ``k``
candidate breakpoints::
φ̄(x) = (1/k) · Σ_j max(x - ψ_j, 0)
and tests whether ``γ`` is significant in the augmented linear
model ``y = α + β·x + γ·φ̄``. The key insight: under H0 (no
breakpoint) ``γ = 0``; under H1 ``γ`` picks up the slope change
regardless of the true breakpoint location, sidestepping the
nuisance-parameter problem that Davies' procedure handles via
upper-bound correction.
Args:
x: Predictor array. NaN entries dropped.
y: Response array. Must align with ``x``.
k: Number of evaluation points. Default 10.
x_range: Search range for candidate breakpoints. Default
``(percentile_5, percentile_95)`` of ``x``.
Returns:
Dict with ``pvalue``, ``t_statistic``, ``best_at`` (candidate
ψ with the strongest INDIVIDUAL signal, for reporting only —
not the test statistic itself), ``df``, and ``n``.
References:
Muggeo VMR (2016) Testing with a nuisance parameter present
only under the alternative: a score-based approach with
application to segmented modelling. J Stat Comput Simul
86:3059-3067.
"""
from scipy.stats import t as t_dist
mask = np.isfinite(x) & np.isfinite(y)
x, y = x[mask], y[mask]
n = len(x)
if n < 30:
return {"pvalue": np.nan, "t_statistic": np.nan, "n": n}
if x_range is None:
x_range = (np.percentile(x, 5), np.percentile(x, 95))
psi_candidates = np.linspace(x_range[0], x_range[1], k)
# Averaged segmented variable.
phi_bar = np.zeros(n)
for psi in psi_candidates:
phi_bar += np.maximum(x - psi, 0.0)
phi_bar /= k
X_design = np.column_stack([np.ones(n), x, phi_bar])
try:
beta, _residuals, rank, _sv = np.linalg.lstsq(X_design, y, rcond=None)
except np.linalg.LinAlgError:
return {"pvalue": np.nan, "t_statistic": np.nan, "n": n}
if rank < 3:
return {"pvalue": np.nan, "t_statistic": np.nan, "n": n}
y_hat = X_design @ beta
resid = y - y_hat
df = n - 3
if df <= 0:
return {"pvalue": np.nan, "t_statistic": np.nan, "n": n}
sigma2 = float(np.sum(resid ** 2)) / df
try:
XtX_inv = np.linalg.inv(X_design.T @ X_design)
except np.linalg.LinAlgError:
return {"pvalue": np.nan, "t_statistic": np.nan, "n": n}
se_gamma = float(np.sqrt(max(sigma2 * XtX_inv[2, 2], 0.0)))
if se_gamma <= 0:
return {"pvalue": np.nan, "t_statistic": np.nan, "n": n}
t_stat = float(beta[2] / se_gamma)
pvalue = float(2.0 * (1.0 - t_dist.cdf(abs(t_stat), df)))
# Reporting-only: which individual candidate ψ gave the strongest signal.
best_t = 0.0
best_psi = np.nan
for psi in psi_candidates:
seg = np.maximum(x - psi, 0.0)
Xd = np.column_stack([np.ones(n), x, seg])
try:
b, *_ = np.linalg.lstsq(Xd, y, rcond=None)
yh = Xd @ b
r = y - yh
s2 = float(np.sum(r ** 2)) / (n - 3)
inv_xx = np.linalg.inv(Xd.T @ Xd)
se = float(np.sqrt(max(s2 * inv_xx[2, 2], 0.0)))
if se > 0 and abs(b[2] / se) > abs(best_t):
best_t = b[2] / se
best_psi = float(psi)
except (np.linalg.LinAlgError, ValueError):
continue
return {
"pvalue": pvalue,
"t_statistic": t_stat,
"best_at": best_psi,
"df": df,
"n": n,
}
# ┌────────────────────────────────────────────────────────────┐
# │ Hill / 4-parameter logistic fit « sigmoid dose-response » │
# └────────────────────────────────────────────────────────────┘
[docs]
def hill_fit(
x: np.ndarray,
y: np.ndarray,
x_range: tuple[float, float] | None = None,
) -> dict:
"""Fit a four-parameter logistic (Hill) dose-response model.
Model::
y = y_min + (y_max - y_min) / (1 + (EC50 / x)^n)
EC50 is the predictor value at half-maximal response. The Hill
coefficient ``n`` (``hill_n`` in the return dict) captures
steepness: large ``n`` = sharp threshold-like switch, small ``n`` =
gradual transition.
In addition to EC50, the **lower bend point** following
Sebaugh & McCray (2003) is returned: this is the x-value where the
second derivative of the Hill curve equals zero on the lower
side, marking the transition from the baseline plateau into the
rising phase::
x_bend_lower = EC50 · ((n - 1) / (n + 1))^(1/n) for n > 1
For ``n ≤ 1`` (no inflection in the lower portion), falls back to
EC10 as a conventional onset marker. The lower bend is directly
comparable to a broken-stick breakpoint — both mark the onset of
the rising regime — and is the recommended single-number "where
does the response start" summary for sigmoid dose-response data.
Args:
x: Predictor array. NaN entries dropped.
y: Response array. Must align with ``x``.
x_range: Plausible range for the EC50 initial guess and
bounds. Default ``(percentile_10, percentile_90)``
of ``x``.
Returns:
Dict with ``ec50``, ``hill_n``, ``y_min``, ``y_max``,
``lower_bend`` (``np.nan`` if outside the data range),
``r_squared``, ``aic``, ``n``, ``converged``, and
``bend_plausible``.
References:
Goutelle S et al. (2008) The Hill equation: a review of its
capabilities in pharmacological modelling. Fundamental &
Clinical Pharmacology 22:633-648.
Hill AV (1910) The possible effects of the aggregation of the
molecules of haemoglobin on its dissociation curves. Journal
of Physiology 40:iv-vii.
Sebaugh JL, McCray PD (2003) Defining the linear portion of a
sigmoid-shaped curve: bend points. Pharmaceutical Statistics
2:167-174.
"""
from scipy.optimize import curve_fit
mask = np.isfinite(x) & np.isfinite(y)
x, y = x[mask], y[mask]
n = len(x)
if n < 30:
return {"ec50": np.nan, "converged": False, "n": n}
if x_range is None:
x_range = (np.percentile(x, 10), np.percentile(x, 90))
def hill_func(xv, y_min, y_max, ec50, hill_n):
ratio = np.clip(ec50 / np.maximum(xv, 1e-10), 1e-10, 1e10)
return y_min + (y_max - y_min) / (1.0 + np.power(ratio, hill_n))
y_lo = float(np.percentile(y, 10))
y_hi = float(np.percentile(y, 90))
ec50_init = float(np.mean(x_range))
bounds_lo = [np.min(y) - 5, np.median(y), x_range[0] * 0.5, 0.5]
bounds_hi = [np.median(y), np.max(y) + 5, x_range[1] * 1.5, 30.0]
try:
popt, _pcov = curve_fit(
hill_func, x, y,
p0=[y_lo, y_hi, ec50_init, 2.0],
bounds=(bounds_lo, bounds_hi),
maxfev=10000,
)
except (RuntimeError, ValueError):
return {"ec50": np.nan, "converged": False, "n": n}
y_min_fit, y_max_fit, ec50, hill_n = (float(p) for p in popt)
y_pred = hill_func(x, *popt)
ss_res = float(np.sum((y - y_pred) ** 2))
ss_tot = float(np.sum((y - np.mean(y)) ** 2))
r_sq = 1.0 - ss_res / ss_tot if ss_tot > 0 else np.nan
k_params = 5 # 4 Hill params + sigma
aic = n * np.log(ss_res / n) + 2 * k_params if n > k_params else np.nan
if hill_n > 1.0:
lower_bend = ec50 * ((hill_n - 1.0) / (hill_n + 1.0)) ** (1.0 / hill_n)
else:
lower_bend = ec50 * (0.10 / 0.90) ** (1.0 / hill_n)
x_lo, x_hi = float(np.min(x)), float(np.max(x))
plausible = bool((x_lo - 10) <= lower_bend <= x_hi)
return {
"ec50": ec50,
"hill_n": hill_n,
"y_min": y_min_fit,
"y_max": y_max_fit,
"lower_bend": float(lower_bend) if plausible else np.nan,
"r_squared": float(r_sq) if np.isfinite(r_sq) else np.nan,
"aic": float(aic) if np.isfinite(aic) else np.nan,
"n": n,
"converged": True,
"bend_plausible": plausible,
}
# ┌────────────────────────────────────────────────────────────┐
# │ Per-subject iterator « DigiMuh-style cohort fitting » │
# └────────────────────────────────────────────────────────────┘
[docs]
def per_subject_segmented(
df: pd.DataFrame,
subject_col: str | list[str],
x_col: str,
y_col: str,
*,
model: Callable[..., dict] = broken_stick_fit,
min_n: int = 50,
model_kwargs: dict[str, Any] | None = None,
) -> pd.DataFrame:
"""Apply a dose-response fitter to each subject's data.
For every unique value (or tuple, for composite keys) in
``df[subject_col]``, fit ``model(x, y, **model_kwargs)`` where
``x`` and ``y`` are the subject's rows of ``df[x_col]`` and
``df[y_col]``. Subjects with fewer than ``min_n`` observations
are returned with the model's standard skip marker
(``converged=False`` plus the primary parameter as ``np.nan``).
Pickle-safe: the ``model`` callable must be a module-level
function so this iterator can be wrapped in a
``concurrent.futures.ProcessPoolExecutor`` for parallel fitting
over large panels. All four primary fitters in this module
(``broken_stick_fit``, ``davies_test``, ``pscore_test``,
``hill_fit``) qualify.
Args:
df: Long-format DataFrame with one row per
observation.
subject_col: Column name (or list of column names for a
composite key) identifying subjects.
x_col: Column holding the predictor.
y_col: Column holding the response.
model: Fitter callable taking ``(x, y, **kwargs)`` and
returning a result dict. Defaults to
:func:`broken_stick_fit`.
min_n: Minimum observations per subject. Subjects
below this threshold get a skip-marker row.
Default 50.
model_kwargs: Extra keyword arguments forwarded to ``model``.
Returns:
DataFrame with one row per subject. Columns: the subject key
column(s) plus all keys returned by ``model``. Subjects with
``n < min_n`` carry ``converged=False`` and ``n=<actual>``.
"""
if model_kwargs is None:
model_kwargs = {}
if isinstance(subject_col, str):
group_keys = [subject_col]
else:
group_keys = list(subject_col)
rows: list[dict] = []
for key, grp in df.groupby(group_keys, dropna=False):
x = grp[x_col].to_numpy(dtype=float)
y = grp[y_col].to_numpy(dtype=float)
# pandas 2.x: groupby on a LIST of columns always yields a tuple key
# (even when the list has length 1). Normalising once handles both
# the single-column and composite-key cases uniformly.
key_tuple = key if isinstance(key, tuple) else (key,)
subject_id = dict(zip(group_keys, key_tuple))
if len(x) < min_n:
row = {**subject_id, "n": len(x), "converged": False}
else:
result = model(x, y, **model_kwargs)
row = {**subject_id, **result}
rows.append(row)
return pd.DataFrame(rows)
__all__ = [
"broken_stick_fit",
"davies_test",
"pscore_test",
"hill_fit",
"per_subject_segmented",
]