# This file is part of SIMPLICITY
# Copyright (C) 2025 Pietro Gerletti
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Created on Fri Mar 28 13:31:54 2025
@author: pietro
"""
import time
import math
import numpy as np
import simplicity.population_model as SIDR
import simplicity.evolution.mutations as evo
import simplicity.evolution.reference as ref
import simplicity.phenotype.consensus as c
import simplicity.phenotype.update as pheno
import simplicity.tuning.diagnosis_rate as dr
from tqdm import tqdm
[docs]
class ProgressReporter:
[docs]
def __init__(self, total_time, simulation_id):
self.total_time = total_time
self.current_time = 0
self.step_size = 0
self.leap_counter = 0
self.reactions_counter = 0
self.thinning_counter = 0
self.reaction_id = None
self.start_wall = time.time()
self.pbar = tqdm(
total=total_time,
desc=f"RUNNING SIMULATION: {simulation_id[:40]}",
unit="step",
bar_format="{l_bar}{bar}| {n:.2f}/{total_fmt} {unit} {postfix}",
position=1,
leave=False
)
[docs]
def update(self, population, delta_t, reaction_id=None, event_type=None):
self.step_size = delta_t
self.reaction_id = reaction_id
self.current_time += delta_t
self.infected = population.infected
# Count thinning vs accepted events
if event_type == "leap":
self.leap_counter += 1
elif event_type == "reaction":
self.reactions_counter += 1
if reaction_id == "thinning":
self.thinning_counter += 1
# Compute leap and thinning percentage
total_events = self.leap_counter + self.reactions_counter
leap_pct = 100 * self.leap_counter / total_events if total_events > 0 else 0
thinning_pct = 100 * self.thinning_counter / self.reactions_counter if self.reactions_counter > 0 else 0
# Elapsed CPU time
cpu_time = time.time() - self.start_wall
# Update tqdm
self.pbar.update(delta_t)
self.pbar.set_postfix({
"time": f"{self.current_time:.2f}",
"step": f"{self.step_size:.2e}",
"leap%": f"{leap_pct:.1f}%",
"thin%": f"{thinning_pct:.1f}%",
"last react": self.reaction_id if self.reaction_id is not None else "-",
"infected": self.infected,
"CPU(s)": f"{cpu_time:.1f}",
"Time left": f"{(cpu_time / self.current_time * (self.total_time - self.current_time)):.0f}s"
})
[docs]
def close(self):
self.pbar.close()
[docs]
def get_helpers(phenotype_model, parameters, rng1, rng2):
"""
Returns helper functions for extrande (SIMPLICITY engine).
"""
# model parameters
tau_inf_standard = parameters['tau_2'] + parameters['tau_3']
beta_standard = parameters["R"] / tau_inf_standard
tau_inf_long = parameters['tau_2'] + parameters['tau_3_long']
beta_long = parameters['R_long']/ tau_inf_long
# print(f'R: {R} Beta: {beta}')
k_ds = dr.get_k_d_from_diagnosis_rate(
parameters["diagnosis_rate_standard"],
parameters["tau_1"],
parameters["tau_2"],
parameters["tau_3"],
parameters["tau_4"]
)
k_dl = dr.get_k_d_from_diagnosis_rate(
parameters["diagnosis_rate_long"],
parameters["tau_1"],
parameters["tau_2"],
parameters["tau_3_long"],
parameters["tau_4"]
)
k_v = parameters["IH_virus_emergence_rate"]
NSR = parameters["nucleotide_substitution_rate"]
L = len(ref.get_reference())
seq_rate = parameters["sequencing_rate"]
max_runtime = parameters["max_runtime"]
# for fitness update
update_all_fitness = pheno.update_fitness_factory(phenotype_model)
use_consensus = phenotype_model == "immune_waning"
last_consensus_snapshot = {"t_snapshot": 0, "consensus": []} if use_consensus else None
def compute_upperbound(population):
propensities, params = SIDR.SIDR_propensities(population, beta_standard, beta_long, k_ds, k_dl, k_v, seq_rate)
# a0 = sum(rate for rate, _ in propensities)
B = np.sum(params) * population.infected
# print(f"B = {B}")
return B
def look_ahead(t, final_time):
'''
We clamp the look ahead to 1 day: if extrande does a step bigger than a day
we need to update the ih model to make sure not to miss host transitions
and skewer their intra-host trajectory
'''
# L = min(final_time - t, population.get_next_ih_transition() - t)
# return max(L, 0.0417) # either next IH state update or 1/24 = 1 hour
return min(1, final_time - t)
def draw_delta_t(B):
tau = rng1.uniform(0, 1)
delta_t = -np.log(tau) / B
delta_t = round(delta_t, 10)
return delta_t
def update_step(population, delta_t):
population.update_states(delta_t)
def update_fitness_step(population):
t = population.time
individuals_to_update = sorted(population.infectious_i)
# calculate consensus every 5 simulation days and use it for fitness score (immune waning model)
if use_consensus:
if np.floor(t) > last_consensus_snapshot["t_snapshot"]:
last_consensus_snapshot["t_snapshot"] += 5
consensus = c.get_consensus(population.consensus_snapshot, t)
population.consensus_sequences_t.append([consensus, t])
last_consensus_snapshot['consensus'] = consensus
else:
consensus = last_consensus_snapshot['consensus']
update_all_fitness(population, individuals_to_update, consensus)
# fitness update for linear model
else:
update_all_fitness(population, individuals_to_update)
population.update_fitness_trajectory()
def mutation_step(population, delta_t):
delta_t_y = delta_t / 365.25 # time in years
if use_consensus:
consensus = last_consensus_snapshot['consensus']
evo.mutate(population, NSR, L, delta_t_y, phenotype_model, consensus)
else:
evo.mutate(population, NSR, L, delta_t_y, phenotype_model)
def fire_reaction(population, propensities, tau_2):
a0 = sum(rate for _, rate, _ in propensities)
if tau_2 > a0:
# print('thinning')
return 'thinning' # thinning
threshold = 0
for reaction_id, rate, action in propensities:
threshold += rate
if tau_2 <= threshold:
# print(reaction_id)
action()
return reaction_id
def reaction_step(population, B):
propensities, _ = SIDR.SIDR_propensities(population, beta_standard, beta_long, k_ds, k_dl, k_v, seq_rate)
tau_2 = rng2.uniform(0, B)
return fire_reaction(population, propensities, tau_2)
def check_stop_conditions(population, t, start_time):
if population.infected == 0:
return "No infected left - ending simulation"
if population.susceptibles == 0:
return "No susceptibles left - ending simulation"
if t > 60.0 and population.infectious == 0:
return "No infectious left - ending simulation"
if len(population.reservoir_i) < 1000:
return "Reservoir depleted - ending simulation"
if time.time() - start_time > max_runtime:
return "Max runtime exceeded"
return None
return {
"compute_upperbound": compute_upperbound,
"look_ahead": look_ahead,
"draw_delta_t": draw_delta_t,
"check_stop_conditions": check_stop_conditions,
"update_step": update_step,
"update_fitness_step": update_fitness_step,
"mutation_step": mutation_step,
"reaction_step": reaction_step,
}
[docs]
def extrande_core_loop(parameters, population, helpers, sim_id):
"""
Core extrande loop.
"""
print(f'{sim_id}')
t = parameters['t_0']
final_time = parameters['final_time']
start_time = time.time()
t_day = 0
reporter = ProgressReporter(total_time=final_time, simulation_id=sim_id)
min_update_threshold = 0.5 # minimum dt for intra-host update (12h step)
dt_accumulated = 0 # initialize accumulator
while t < final_time:
L = helpers["look_ahead"](t, final_time)
B = helpers["compute_upperbound"](population)
delta_t = helpers["draw_delta_t"](B)
reaction_id = None
if delta_t > L:
event_type = 'leap'
# update time
delta_t = L
t += delta_t
t = round(t, 10)
population.update_time(t)
dt_accumulated += delta_t
# update system (IH host model states)
# Only update the intra-host state if accumulated dt exceeds threshold:
if dt_accumulated >= min_update_threshold:
helpers["update_step"](population, dt_accumulated)
# population.DEBUG_update_ih.append([t,dt_accumulated,False])
# mutations
helpers["mutation_step"](population, dt_accumulated)
dt_accumulated = 0 # reset accumulator
else:
event_type = 'reaction'
# update time
t += delta_t
t = round(t, 10)
population.update_time(t)
dt_accumulated += delta_t
# update system (IH host model states)
# Only update the intra-host state if accumulated dt exceeds threshold:
if dt_accumulated >= min_update_threshold:
# print('DEBUG: UPDATE STEP')
helpers["update_step"](population, dt_accumulated)
# population.DEBUG_update_ih.append([t,dt_accumulated,False])
# mutations
helpers["mutation_step"](population, dt_accumulated)
dt_accumulated = 0 # reset accumulator
# update fitness
helpers["update_fitness_step"](population)
# reactions
reaction_id = helpers["reaction_step"](population, B)
# update progress bar
reporter.update(population, delta_t, reaction_id, event_type)
# update system trajectory
population.update_trajectory()
if math.floor(t) > t_day:
t_day += 1
population.update_lineage_frequency_t(t)
# break if conditions met
reason = helpers["check_stop_conditions"](population, t, start_time)
if reason:
break
reporter.close()
if reason:
tqdm.write(f"\n {reason}")
tqdm.write("\n----------------------------------------")
tqdm.write("SIMPLICITY SIMULATION COMPLETED")
tqdm.write("----------------------------------------\n")
tqdm.write('')
return population
[docs]
def extrande_factory(phenotype_model, parameters, sim_id, rng1, rng2):
def extrande_generic(population):
helpers = get_helpers(phenotype_model, parameters, rng1, rng2)
return extrande_core_loop(parameters, population, helpers, sim_id)
return extrande_generic