Source code for wings.models.wfm

#!/usr/bin/env python3
"""
W.I.N.G.S. — Fixed-size discrete-generation Wolbachia spread model.

Wright-Fisher style:  N adults are sampled each generation from the
offspring pool.  This decouples infection-frequency dynamics from
population-size dynamics, providing a clean baseline for measuring
the effect of CI, male-killing, increased exploration, and increased
eggs on Wolbachia invasion.

Biology
-------
Tribolium castaneum generation time ≈ 30 days at 30 °C (Pointer et al.
2021, Heredity).  One year ≈ 12 generations.

Mapping spatial effects to well-mixed
--------------------------------------
The ABM's "increased exploration rate" gives infected ♀ a 1.4× mating
distance.  In a well-mixed cage of 50 beetles, the spatial advantage
is translated as a *mating-probability advantage*: infected ♀ mate
with probability 1.0 while uninfected ♀ mate with probability
1 / exploration_rate_boost ≈ 0.714.  This represents the empirical
observation that more-active females encounter males more frequently
even in small arenas (Pai & Bhatt 1995, J. Stored Prod. Res.).

Expected CI dynamics (sanity check)
------------------------------------
With full CI (strength 1.0) and initial freq p = 0.5:
  Infected ♀ (freq p) mate with any ♂  → offspring all infected.
  Uninfected ♀ (freq 1-p) × uninfected ♂ (freq 1-p) → offspring uninfected.
  Uninfected ♀ × infected ♂ → NO offspring (CI).

  New p ≈ p / (p + (1-p)²)

  Gen 0: p=0.50 → Gen 1: p≈0.67 → Gen 2: p≈0.86 → Gen 3: p≈0.97 → fixation.
  Expect fixation within 4–6 generations with stochastic variation.

Usage
-----
  # Single run
  python fixed_generation_sim.py --ci --seed 42 --output result.csv

  # All 16 combos × 200 reps (takes ~30 seconds on one CPU core)
  python fixed_generation_sim.py --run-all --nreps 200

  # Quick test
  python fixed_generation_sim.py --run-all --nreps 2 --outdir ./test_fixed
"""

import argparse
import csv
import os
import sys
import time
from itertools import product as iterproduct

import numpy as np


# ======================================================================
#  Core simulation
# ======================================================================

[docs] def simulate( N: int = 50, max_generations: int = 12, seed: int = 42, ci: bool = False, mk: bool = False, er: bool = False, ie: bool = False, ci_strength: float = 1.0, egg_laying_max: int = 15, male_offspring_rate: float = 0.1, fecundity_increase_factor: float = 1.35, er_mating_advantage: float = 1.4, initial_infection_freq: float = 0.5, ): """ Run one fixed-size discrete-generation simulation. Parameters ---------- N : int Fixed adult population size per generation. max_generations : int Maximum generations to simulate (≈ 1 year at 12). seed : int RNG seed for reproducibility. ci, mk, er, ie : bool Wolbachia effect toggles. ci_strength : float Probability that each egg from ♂I × ♀U cross is killed (0–1). egg_laying_max : int Maximum clutch size per mating (uniform draw from 1..max). male_offspring_rate : float Probability offspring is male when MK active and mother infected. fecundity_increase_factor : float Clutch multiplier for infected mothers when IE active. er_mating_advantage : float Mating probability ratio (infected / uninfected) when ER active. initial_infection_freq : float Starting Wolbachia frequency (0–1). Returns ------- list of (population_size: int, infection_rate: float) One entry per generation, padded to max_generations + 1. """ rng = np.random.default_rng(seed) # --- Initialise population --- n_infected_init = int(round(N * initial_infection_freq)) infected = np.zeros(N, dtype=bool) infected[:n_infected_init] = True rng.shuffle(infected) is_male = np.zeros(N, dtype=bool) is_male[: N // 2] = True rng.shuffle(is_male) history = [] # (pop_size, infection_rate) for gen in range(max_generations + 1): # --- Record state --- n_alive = len(infected) n_inf = int(infected.sum()) inf_rate = n_inf / n_alive if n_alive > 0 else 0.0 history.append((n_alive, inf_rate)) # --- Stopping conditions --- if inf_rate >= 1.0: # Fixation — pad remaining generations for _ in range(max_generations - gen): history.append((N, 1.0)) break if inf_rate <= 0.0 and gen > 0: # Wolbachia lost — pad remaining generations for _ in range(max_generations - gen): history.append((N, 0.0)) break if gen == max_generations: break # --- Identify sexes --- females = np.where(~is_male)[0] males = np.where(is_male)[0] if len(females) == 0 or len(males) == 0: # All one sex — population cannot reproduce; pad with current state for _ in range(max_generations - gen): history.append((N, inf_rate)) break # --- Mating probability (ER effect) --- if er: # Infected ♀ mate with prob 1.0; uninfected ♀ at reduced rate mate_prob = np.where( infected[females], 1.0, 1.0 / er_mating_advantage ) else: mate_prob = np.ones(len(females)) do_mate = rng.random(len(females)) < mate_prob mating_females = females[do_mate] if len(mating_females) == 0: for _ in range(max_generations - gen): history.append((N, inf_rate)) break # --- Pair each mating ♀ with a random ♂ --- chosen_males = rng.choice(males, size=len(mating_females), replace=True) # --- Offspring production (vectorised) --- f_inf = infected[mating_females] m_inf = infected[chosen_males] # Base clutch sizes (Uniform 1..egg_laying_max) eggs = rng.integers(1, egg_laying_max + 1, size=len(mating_females)) # IE: infected mothers produce more eggs if ie: boost_mask = f_inf eggs = np.where( boost_mask, np.round(eggs * fecundity_increase_factor).astype(int), eggs, ) # CI: infected ♂ × uninfected ♀ → eggs killed if ci: ci_mask = m_inf & ~f_inf if ci_strength >= 1.0: eggs[ci_mask] = 0 else: # Partial CI: each egg survives independently ci_idx = np.where(ci_mask)[0] for idx in ci_idx: eggs[idx] = rng.binomial(eggs[idx], 1.0 - ci_strength) # --- Expand eggs into individual offspring --- # Mothers with 0 eggs contribute nothing valid = eggs > 0 if not valid.any(): for _ in range(max_generations - gen): history.append((N, inf_rate)) break eggs_v = eggs[valid] f_inf_v = f_inf[valid] total_offspring = int(eggs_v.sum()) # Infection: inherited from mother offspring_infected = np.repeat(f_inf_v, eggs_v) # Sex determination if mk: # Infected mothers → mostly female offspring prob_male = np.where( offspring_infected, male_offspring_rate, 0.5, ) offspring_male = rng.random(total_offspring) < prob_male else: offspring_male = rng.random(total_offspring) < 0.5 # --- Wright-Fisher sampling: draw N adults for next generation --- if total_offspring == 0: for _ in range(max_generations - gen): history.append((N, inf_rate)) break if total_offspring >= N: idx = rng.choice(total_offspring, size=N, replace=False) else: # Fewer offspring than N — sample with replacement (bottleneck) idx = rng.choice(total_offspring, size=N, replace=True) infected = offspring_infected[idx] is_male = offspring_male[idx] return history
# ====================================================================== # I/O helpers # ======================================================================
[docs] def save_csv(history, path): """Save simulation history as CSV matching gpu_simulation.py format.""" os.makedirs(os.path.dirname(path) or ".", exist_ok=True) with open(path, "w", newline="") as f: writer = csv.writer(f) writer.writerow(["Population Size", "Infection Rate"]) for pop, inf in history: writer.writerow([pop, f"{inf:.6f}"])
[docs] def combo_id(ci, mk, er, ie): """Compute the 4-bit combo index consistent with the GPU submit script.""" return (int(ci) << 3) | (int(mk) << 2) | (int(er) << 1) | int(ie)
[docs] def combo_label(ci, mk, er, ie): parts = [] if ci: parts.append("CI") if mk: parts.append("MK") if er: parts.append("ER") if ie: parts.append("IE") return "+".join(parts) if parts else "None"
[docs] def make_filename(ci, mk, er, ie, rep): """Build filename matching the original ABM format for ingest compatibility.""" return ( f"cytoplasmic_incompatibility_{ci}" f"_male_killing_{mk}" f"_increased_exploration_rate_{er}" f"_increased_eggs_{ie}" f"_{rep}.csv" )
# ====================================================================== # Batch mode # ======================================================================
[docs] def run_all(args): """Run all 16 combos × nreps replicates.""" outdir = args.outdir os.makedirs(outdir, exist_ok=True) combos = list(iterproduct([False, True], repeat=4)) # CI, MK, ER, IE total = len(combos) * args.nreps done = 0 skipped = 0 t0 = time.time() print("=" * 56) print(" W.I.N.G.S. — Fixed-size discrete-generation model") print("=" * 56) print(f" Population: {args.population} adults / generation") print(f" Generations: {args.max_generations} (≈ {args.max_generations * 30} days)") print(f" Init. freq: {args.initial_infection_freq:.0%}") print(f" Combinations: {len(combos)}") print(f" Replicates: {args.nreps}") print(f" Total runs: {total}") print(f" Output: {outdir}") print("=" * 56) print() # --- Track per-combo fixation statistics --- fixation_stats = {} for ci, mk, er, ie in combos: label = combo_label(ci, mk, er, ie) fixation_gens = [] combo_skipped = 0 for rep in range(args.nreps): fname = make_filename(ci, mk, er, ie, rep) fpath = os.path.join(outdir, fname) if os.path.exists(fpath): skipped += 1 combo_skipped += 1 done += 1 continue cid = combo_id(ci, mk, er, ie) seed = 42 + cid * 1000 + rep history = simulate( N=args.population, max_generations=args.max_generations, seed=seed, ci=ci, mk=mk, er=er, ie=ie, ci_strength=args.ci_strength, initial_infection_freq=args.initial_infection_freq, ) save_csv(history, fpath) # Track fixation for gen_i, (_, inf) in enumerate(history): if inf >= 1.0: fixation_gens.append(gen_i) break done += 1 # Report per-combo progress n_ran = args.nreps - combo_skipped n_fixed = len(fixation_gens) if n_ran > 0: pct = 100 * n_fixed / n_ran avg_gen = f"{np.mean(fixation_gens):.1f}" if n_fixed > 0 else "—" print( f" {label:20s} " f"ran={n_ran:3d} " f"fixation={n_fixed:3d}/{n_ran:3d} ({pct:5.1f}%) " f"avg_gen={avg_gen}" ) else: print(f" {label:20s} (all skipped)") fixation_stats[label] = fixation_gens elapsed = time.time() - t0 print() print("=" * 56) print(f" Complete: {done} runs ({skipped} skipped)") print(f" Wall time: {elapsed:.1f}s") print(f" Output: {outdir}") print("=" * 56) return fixation_stats
# ====================================================================== # CLI # ======================================================================
[docs] def main(): """CLI entry point for the Wright-Fisher model. Supports single-run mode and ``--run-all`` for the full 16-combination sweep with multiple replicates. """ parser = argparse.ArgumentParser( description="W.I.N.G.S. — Fixed-size discrete-generation Wolbachia model", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Single run with CI enabled python fixed_generation_sim.py --ci --seed 42 --output result.csv # All 3200 runs (16 combos × 200 reps), takes ~30 s on one CPU core python fixed_generation_sim.py --run-all --nreps 200 # Quick sanity check (2 reps per combo, local output) python fixed_generation_sim.py --run-all --nreps 2 --outdir ./test_fixed """, ) # --- Population parameters --- parser.add_argument( "--population", type=int, default=50, help="Fixed adult population size per generation (default: 50)", ) parser.add_argument( "--max-generations", type=int, default=12, help="Max generations to simulate; 12 ≈ 1 year (default: 12)", ) parser.add_argument( "--initial-infection-freq", type=float, default=0.5, help="Starting Wolbachia frequency (default: 0.5)", ) # --- Wolbachia effect toggles --- parser.add_argument("--ci", action="store_true", help="Enable cytoplasmic incompatibility") parser.add_argument("--mk", action="store_true", help="Enable male-killing") parser.add_argument("--er", action="store_true", help="Enable increased exploration rate") parser.add_argument("--ie", action="store_true", help="Enable increased eggs (fecundity boost)") # --- Effect strengths --- parser.add_argument("--ci-strength", type=float, default=1.0, help="CI strength: P(egg killed) for ♂I×♀U (default: 1.0)") parser.add_argument("--egg-laying-max", type=int, default=15, help="Max eggs per mating (uniform 1..max, default: 15)") parser.add_argument("--male-offspring-rate", type=float, default=0.1, help="P(male) for infected mothers when MK active (default: 0.1)") parser.add_argument("--fecundity-increase-factor", type=float, default=1.35, help="Clutch multiplier for IE (default: 1.35)") parser.add_argument("--er-mating-advantage", type=float, default=1.4, help="Mating probability ratio infected/uninfected for ER (default: 1.4)") # --- Single-run output --- parser.add_argument("--seed", type=int, default=42) parser.add_argument("--output", type=str, default=None, help="Output CSV path (single-run mode)") # --- Batch mode --- parser.add_argument("--run-all", action="store_true", help="Run all 16 combos × nreps replicates") parser.add_argument("--nreps", type=int, default=200, help="Replicates per combo in batch mode (default: 200)") parser.add_argument( "--outdir", type=str, default="/projects/sciences/zoology/geurten_lab/" "wolbachia_spread_model/gpu_results_50beetles", help="Output directory for batch mode", ) args = parser.parse_args() if args.run_all: run_all(args) else: history = simulate( N=args.population, max_generations=args.max_generations, seed=args.seed, ci=args.ci, mk=args.mk, er=args.er, ie=args.ie, ci_strength=args.ci_strength, egg_laying_max=args.egg_laying_max, male_offspring_rate=args.male_offspring_rate, fecundity_increase_factor=args.fecundity_increase_factor, er_mating_advantage=args.er_mating_advantage, initial_infection_freq=args.initial_infection_freq, ) outpath = args.output or "result.csv" save_csv(history, outpath) # Print summary final_pop, final_inf = history[-1] n_gens = len(history) - 1 fixation_gen = None for i, (_, inf) in enumerate(history): if inf >= 1.0: fixation_gen = i break flags = [] if args.ci: flags.append("CI") if args.mk: flags.append("MK") if args.er: flags.append("ER") if args.ie: flags.append("IE") label = "+".join(flags) if flags else "None" print(f" Effects: {label}") print(f" Generations: {n_gens}") print(f" Final freq: {final_inf:.3f}") if fixation_gen is not None: print(f" Fixation: generation {fixation_gen}") else: print(f" Fixation: not reached") print(f" Saved: {outpath}")
if __name__ == "__main__": main()