#!/usr/bin/env python3
"""
W.I.N.G.S. — Δp vs p figure (complementary strategies).
Extracts per-interval infection frequency change (Δp) from ABM time
series and overlays analytical models.
Key design choices
------------------
- **First-passage extraction**: for each replicate, only uses the FIRST
time the trajectory passes through each p-bin (ascending), avoiding
contamination from equilibrium wobble or stalled runs.
- **Asymmetric CI model**: A·p^α·(1-p)^γ allows the peak to shift right,
capturing spatial/growth effects the mean-field Turelli model misses.
- **Diminishing-returns ER model**: s_0·(1-p)^β captures mate-finding
saturation.
Outputs (PNG 300dpi + SVG with editable text):
delta_p_vs_p.{png,svg} — 3-panel (CI, ER, CI+ER)
delta_p_overlay.{png,svg} — single-panel with crossing point
Usage:
python plot_delta_p.py --input data/combined_delta_p.csv
python plot_delta_p.py --input data/combined_delta_p.csv --dt 14 --ascending-only
"""
import argparse
import os
import sys
import warnings
from pathlib import Path
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.optimize import curve_fit
warnings.filterwarnings("ignore", category=FutureWarning)
# ======================================================================
# Style
# ======================================================================
plt.rcParams.update({
"figure.dpi": 200,
"savefig.dpi": 300,
"savefig.bbox": "tight",
"font.family": "sans-serif",
"font.sans-serif": ["Arial", "DejaVu Sans"],
"font.size": 10,
"axes.titlesize": 12,
"axes.labelsize": 11,
"xtick.labelsize": 9,
"ytick.labelsize": 9,
"legend.fontsize": 9,
"legend.framealpha": 0.85,
"legend.edgecolor": "0.8",
"axes.spines.top": False,
"axes.spines.right": False,
"figure.facecolor": "white",
"svg.fonttype": "none", # editable text in SVG
})
# Paul Tol colours
COL_CI = "#4477AA"
COL_ER = "#228833"
COL_CIER = "#009988"
COL_GREY = "#BBBBBB"
PHENO_STYLE = {
"CI": {"color": COL_CI, "label": "CI only"},
"ER": {"color": COL_ER, "label": "ER only"},
"CI_ER": {"color": COL_CIER, "label": "CI + ER"},
}
EGG_HATCH_DAY = 23
# ======================================================================
# Column detection
# ======================================================================
[docs]
def detect_columns(df):
"""Auto-detect the time and infection rate column names.
Searches for common variants (``Day``, ``Days``, ``Time``, etc.)
case-insensitively.
Args:
df (pandas.DataFrame): Input data.
Returns:
tuple[str, str]: ``(time_column_name, infection_column_name)``.
Raises:
SystemExit: If either column cannot be found.
"""
cols_lower = {c.lower().strip(): c for c in df.columns}
time_col = None
for cand in ['day', 'days', 'time', 'hour', 'hours', 'generation', 'step']:
if cand in cols_lower:
time_col = cols_lower[cand]
break
if time_col is None:
for c in df.columns:
if any(k in c.lower() for k in ['day', 'time', 'generation']):
time_col = c
break
if time_col is None:
print(f" ERROR: No time column found. Columns: {list(df.columns)}")
sys.exit(1)
inf_col = None
for cand in ['infection rate', 'infection_rate', 'infectionrate']:
if cand in cols_lower:
inf_col = cols_lower[cand]
break
if inf_col is None:
for c in df.columns:
if 'infection' in c.lower():
inf_col = c
break
if inf_col is None:
print(f" ERROR: No infection rate column found. Columns: {list(df.columns)}")
sys.exit(1)
return time_col, inf_col
# ======================================================================
# Analytical models
# ======================================================================
[docs]
def model_ci_asymmetric(p, A, alpha, gamma):
"""Asymmetric CI selection coefficient: ``A · p^α · (1-p)^γ``.
Generalises the Turelli (1994) model ``p(1-p) · s_h / (1 - p·s_h)``
by allowing different exponents on the left (α) and right (γ)
sides. When α > γ the peak shifts right of 0.5, capturing
spatial clustering and population growth effects.
Args:
p (numpy.ndarray): Infection frequency array.
A (float): Amplitude parameter.
alpha (float): Left exponent (rise rate at low p).
gamma (float): Right exponent (decay rate at high p).
Returns:
numpy.ndarray: Expected Δp per generation.
"""
return A * np.power(p, alpha) * np.power(1 - p, gamma)
[docs]
def model_er(p, s_0, beta):
"""ER selection coefficient with diminishing returns: ``s_0 · (1-p)^β``.
The mate-finding advantage of increased exploration decays as
infected females become common and the mating landscape saturates.
Args:
p (numpy.ndarray): Infection frequency array.
s_0 (float): Base advantage at p → 0.
beta (float): Decay exponent (higher = faster saturation).
Returns:
numpy.ndarray: Expected Δp per generation from ER.
"""
return s_0 * np.power(1 - p, beta)
[docs]
def model_ci_er(p, A, alpha, gamma, s_0, beta):
"""Combined CI + ER selection (additive).
Args:
p (numpy.ndarray): Infection frequency.
A (float): CI amplitude.
alpha (float): CI left exponent.
gamma (float): CI right exponent.
s_0 (float): ER base advantage.
beta (float): ER decay exponent.
Returns:
numpy.ndarray: Combined expected Δp.
"""
return model_ci_asymmetric(p, A, alpha, gamma) + model_er(p, s_0, beta)
# ======================================================================
# Δp extraction
# ======================================================================
[docs]
def bin_delta_p(dp_df, n_bins=20):
"""Bin ``(p, Δp)`` data by infection frequency and compute statistics.
Args:
dp_df (pandas.DataFrame): Raw Δp data from extraction.
n_bins (int): Number of equal-width bins over [0, 1].
Defaults to ``20``.
Returns:
pandas.DataFrame: Per-bin summary with ``Phenotype``,
``p_mid``, ``dp_median``, ``dp_q25``, ``dp_q75``,
``dp_mean``, ``n``.
"""
records = []
bin_edges = np.linspace(0, 1, n_bins + 1)
bin_mids = (bin_edges[:-1] + bin_edges[1:]) / 2
for pheno in sorted(dp_df['Phenotype'].unique()):
sub = dp_df[dp_df['Phenotype'] == pheno]
for i in range(n_bins):
lo, hi = bin_edges[i], bin_edges[i + 1]
in_bin = sub[(sub['p'] >= lo) & (sub['p'] < hi)]
if len(in_bin) < 3:
continue
records.append({
'Phenotype': pheno,
'p_mid': bin_mids[i],
'dp_median': in_bin['delta_p'].median(),
'dp_q25': in_bin['delta_p'].quantile(0.25),
'dp_q75': in_bin['delta_p'].quantile(0.75),
'dp_mean': in_bin['delta_p'].mean(),
'n': len(in_bin),
})
return pd.DataFrame(records)
# ======================================================================
# Curve fitting
# ======================================================================
[docs]
def fit_analytical(binned_df):
"""Fit the asymmetric CI and diminishing-returns ER models.
Uses ``scipy.optimize.curve_fit`` on the binned median Δp data.
Falls back to default parameters if fitting fails or data is
insufficient.
Args:
binned_df (pandas.DataFrame): Binned Δp data.
Returns:
dict: Fitted parameters ``{A, alpha, gamma, s_0, beta}``.
"""
params = {}
# --- CI: A · p^α · (1-p)^γ ---
ci_data = binned_df[binned_df['Phenotype'] == 'CI']
if len(ci_data) >= 4:
try:
popt, _ = curve_fit(
model_ci_asymmetric,
ci_data['p_mid'].values,
ci_data['dp_median'].values,
p0=[0.01, 1.5, 0.8],
bounds=([0, 0.1, 0.1], [1, 5, 5]),
maxfev=10000,
)
params['A'] = float(popt[0])
params['alpha'] = float(popt[1])
params['gamma'] = float(popt[2])
print(f" CI fit: A={popt[0]:.5f}, α={popt[1]:.2f}, γ={popt[2]:.2f}")
if popt[1] > popt[2]:
print(f" → peak shifted RIGHT (α > γ): spatial/growth effects")
except Exception as e:
print(f" CI fit failed ({e}), using defaults")
params['A'] = 0.008
params['alpha'] = 1.5
params['gamma'] = 0.8
else:
print(f" CI: {len(ci_data)} bins, using defaults")
params['A'] = 0.008
params['alpha'] = 1.5
params['gamma'] = 0.8
# --- ER: s_0 · (1-p)^β ---
er_data = binned_df[binned_df['Phenotype'] == 'ER']
if len(er_data) >= 3:
try:
popt, _ = curve_fit(
model_er,
er_data['p_mid'].values,
er_data['dp_median'].values,
p0=[0.001, 1.0],
bounds=([0, 0.1], [0.1, 10]),
maxfev=10000,
)
params['s_0'] = float(popt[0])
params['beta'] = float(popt[1])
print(f" ER fit: s_0={popt[0]:.6f}, β={popt[1]:.2f}")
except Exception as e:
print(f" ER fit failed ({e}), using defaults")
params['s_0'] = 0.0005
params['beta'] = 1.5
else:
print(f" ER: {len(er_data)} bins, using defaults")
params['s_0'] = 0.0005
params['beta'] = 1.5
return params
# ======================================================================
# Figure saving
# ======================================================================
[docs]
def save_fig(fig, path_stem):
"""Save a figure as PNG (300 dpi) and SVG (editable text).
Args:
fig (matplotlib.figure.Figure): Figure to save.
path_stem (str): Output path without extension.
"""
fig.savefig(f"{path_stem}.png")
fig.savefig(f"{path_stem}.svg")
plt.close(fig)
print(f" ✓ {Path(path_stem).name}.{{png,svg}}")
# ======================================================================
# 3-panel figure
# ======================================================================
# ======================================================================
# Overlay figure
# ======================================================================
# ======================================================================
# CLI
# ======================================================================
[docs]
def main():
"""CLI entry point for the Δp figure generator.
Supports four modes via ``--mode``:
- ``all``: Full trajectory averaging (default).
- ``ascending``: First-passage, Δp > 0 only.
- ``initial``: One Δp per replicate (Turelli-correct).
- ``compare``: Run all three and produce comparison figure.
"""
parser = argparse.ArgumentParser(
description="W.I.N.G.S. — Δp vs p figure generator")
parser.add_argument('--input', required=True,
help='Combined CSV from ingest_delta_p.py')
parser.add_argument('--outdir', default='figures_delta_p',
help='Output directory')
parser.add_argument('--dt', type=int, default=7,
help='Δt in days (default: 7)')
parser.add_argument('--n-bins', type=int, default=20,
help='Number of frequency bins (default: 20)')
parser.add_argument('--ascending-only', action='store_true',
help='Only use ascending (Δp > 0) first-passage data')
# Manual overrides
parser.add_argument('--A', type=float, default=None, help='CI amplitude')
parser.add_argument('--alpha', type=float, default=None, help='CI left exponent')
parser.add_argument('--gamma', type=float, default=None, help='CI right exponent')
parser.add_argument('--s-0', type=float, default=None, help='ER base advantage')
parser.add_argument('--beta', type=float, default=None, help='ER decay exponent')
parser.add_argument('--exclude', default=None,
help='Comma-separated Wolbachia mechanics to exclude. '
'Valid: CI, MK, ER, IE. Phenotypes using excluded '
'mechanics are removed from all figures. '
'Example: --exclude MK or --exclude MK,IE')
args = parser.parse_args()
# Parse exclusion list
excluded = set()
if args.exclude:
excluded = {s.strip().upper() for s in args.exclude.split(",")}
valid = {"CI", "MK", "ER", "IE"}
unknown = excluded - valid
if unknown:
print(f" ERROR: Unknown mechanic(s): {unknown}")
print(f" Valid options: {', '.join(sorted(valid))}")
sys.exit(1)
os.makedirs(args.outdir, exist_ok=True)
print("=" * 56)
print(" W.I.N.G.S. — Δp Figure Generator")
print("=" * 56)
print(f" Input: {args.input}")
print(f" Output: {args.outdir}/")
print(f" Δt: {args.dt} days")
print(f" Bins: {args.n_bins}")
print(f" Ascending: {args.ascending_only}")
if excluded:
print(f" Exclude: {', '.join(sorted(excluded))}")
df = pd.read_csv(args.input)
time_col, inf_col = detect_columns(df)
print(f" Columns: time='{time_col}', infection='{inf_col}'")
print(f" Rows: {len(df):,}")
# Filter out phenotypes that use excluded mechanics
if excluded:
# Map mechanic abbreviations to phenotype label components
# Phenotype labels in sweep data: CI, ER, CI_ER
def _pheno_uses_excluded(pheno_label):
parts = set(pheno_label.split('_'))
return bool(parts & excluded)
before = len(df['Phenotype'].unique())
keep_phenos = [p for p in df['Phenotype'].unique()
if not _pheno_uses_excluded(p)]
df = df[df['Phenotype'].isin(keep_phenos)]
after = len(keep_phenos)
print(f" Filtered: {before} → {after} phenotypes "
f"(kept: {', '.join(sorted(keep_phenos)) or 'none'})")
if len(df) == 0:
print("\n ERROR: All phenotypes excluded — nothing to plot!")
sys.exit(1)
phenotypes = sorted(df['Phenotype'].unique())
fractions = sorted(df['Infected Fraction'].unique())
print(f" Phenotypes: {', '.join(phenotypes)}")
print(f" Fractions: {len(fractions)} ({min(fractions):.2f}–{max(fractions):.2f})")
print()
# --- Extract Δp ---
print(" Extracting Δp...")
dp_df = extract_delta_p(df, time_col, inf_col,
dt=args.dt, ascending_only=args.ascending_only)
print(f" → {len(dp_df):,} (p, Δp) data points")
if len(dp_df) == 0:
print("\n ERROR: No valid Δp data! Check time series have data "
"after day 23 with 0.01 < p < 0.99")
sys.exit(1)
# --- Bin ---
print(" Binning...")
binned = bin_delta_p(dp_df, n_bins=args.n_bins)
for pheno in phenotypes:
n = len(binned[binned['Phenotype'] == pheno])
print(f" {pheno}: {n} bins")
# --- Fit ---
print("\n Fitting analytical models...")
params = fit_analytical(binned)
# Overrides
for key, arg_val in [('A', args.A), ('alpha', args.alpha), ('gamma', args.gamma),
('s_0', args.s_0), ('beta', args.beta)]:
if arg_val is not None:
params[key] = arg_val
print(f" [override] {key} = {arg_val}")
# --- Save intermediates ---
dp_df.to_csv(os.path.join(args.outdir, 'raw_delta_p.csv'), index=False)
binned.to_csv(os.path.join(args.outdir, 'binned_delta_p.csv'), index=False)
with open(os.path.join(args.outdir, 'fitted_params.txt'), 'w') as f:
for k, v in sorted(params.items()):
f.write(f"{k} = {v}\n")
print(f"\n Saved: raw_delta_p.csv, binned_delta_p.csv, fitted_params.txt")
# --- Plot ---
print("\n Generating figures (PNG + SVG)...")
plot_delta_p_figure(binned, params, args.outdir, args.dt)
plot_overlay_figure(binned, params, args.outdir, args.dt)
print(f"\n Done — figures in {args.outdir}/")
print("=" * 56)
if __name__ == '__main__':
main()
#Example usage: python -m wings.analysis.plot_delta_p --input data/combined_delta_p.csv --dt 24