# This file is part of SIMPLICITY
# Copyright (C) 2025 Pietro Gerletti
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Mar 28 12:51:33 2025
@author: pietro
"""
import numpy as np
[docs]
def SIDR_propensities(population, beta_standard, beta_long, k_ds, k_dl, k_v, seq_rate):
propensities_params = [beta_standard, beta_long, k_ds, k_dl, k_v]
# system propensities
# reaction_id, reaction_rate, action
propensities = [
('infection_standard', beta_standard * population.infectious_standard, lambda: infection(population, from_long_shedder=False)),
('infection_long', beta_long * population.infectious_long , lambda: infection(population, from_long_shedder=True)),
('diagnosis_standard', k_ds * population.detectables_standard, lambda: diagnosis(population, from_long_shedder=False, seq_rate=seq_rate)),
('diagnosis_long', k_dl * population.detectables_long, lambda: diagnosis(population, from_long_shedder=True, seq_rate=seq_rate)),
('add_ih_lineage', k_v * population.infected, lambda: add_lineage(population))
]
# print('a1:',beta * population.infectious)
# print('a2:',k_d * population.detectables)
# print('a3:',k_v * population.infected)
return propensities, propensities_params
[docs]
def diagnosis(population, from_long_shedder, seq_rate=0):
'''
Select an infected (and detectable) individual at random and tags it
as "diagnosed". Update the infected and diagnosed compartments.
Update detectable_i, infectious_i, infected_i and diagnosed_i.
'''
population.infected -= 1
if from_long_shedder:
# select random patient to be diagnosed
diagnosed_individual_i = population.rng4.choice(population.detectables_long_i)
else:
# select random patient to be diagnosed
diagnosed_individual_i = population.rng4.choice(population.detectables_standard_i)
if population.rng6.uniform(0, 1) < seq_rate:
# store sequencing data
# patient_id, time, genome, subst number, patient type, infection duration, ih lineage number
i = 0
for lineage_name in population.individuals[diagnosed_individual_i]['IH_lineages']:
genome = population.get_lineage_genome(lineage_name)
population.sequencing_data.append({
'individual_index': diagnosed_individual_i,
'sequencing_time' : population.time,
'lineage_name' : lineage_name,
'sequence' : genome,
'sequence_lenght' : len(genome),
'individual_type' : population.individuals[diagnosed_individual_i]['type'],
'infection_duration' : population.time - population.individuals[diagnosed_individual_i]['t_infection'],
'intra-host_lineage_index' :i
})
i += 1
# update the active lineages number
population.active_lineages_n -= population.individuals[diagnosed_individual_i]['IH_lineages_number']
population.diagnosed += 1
if from_long_shedder:
population.diagnosed_long += 1
else:
population.diagnosed_standard += 1
# set patient as diagnosed
population.individuals[diagnosed_individual_i]['t_not_infectious'] = population.time
population.individuals[diagnosed_individual_i]['state'] = 'diagnosed'
if population.individuals[diagnosed_individual_i]['t_not_infected'] is None:
population.individuals[diagnosed_individual_i]['t_not_infected'] = population.time
else:
raise ValueError('Individual already recovered!!')
# remove individual index from detectable_i, infectious_i, infected_i and add to diagnosed_i
if diagnosed_individual_i in population.infectious_i:
population.infectious_i.remove(diagnosed_individual_i)
population.infectious -= 1
if from_long_shedder:
population.infectious_long -= 1
population.infectious_long_i.remove(diagnosed_individual_i)
else:
population.infectious_standard -= 1
population.infectious_standard_i.remove(diagnosed_individual_i)
# update detectables
population.detectables_i.remove(diagnosed_individual_i)
population.detectables -= 1
if from_long_shedder:
population.detectables_long -= 1
population.detectables_long_i.remove(diagnosed_individual_i)
else:
population.detectables_standard -= 1
population.detectables_standard_i.remove(diagnosed_individual_i)
# update diagnosed and infected
population.infected_i.discard(diagnosed_individual_i)
population.diagnosed_i.add(diagnosed_individual_i)
# update long shedders
if from_long_shedder:
population.long_shedder_i.discard(diagnosed_individual_i)
population.long_shedders -= 1
population.diagnosed_long_i.add(diagnosed_individual_i)
else:
population.diagnosed_standard_i.add(diagnosed_individual_i)
# add a susceptible back in from reservoir
population.susceptibles += 1
new_susceptible_index = population.reservoir_i.pop()
population.susceptibles_i.add(new_susceptible_index)
[docs]
def infect_long_shedder(population, new_infected_index):
min_time = 0
min_infected_n = 0
max_long_shedders = population.size * population.long_shedders_ratio
if (population.time > min_time and
population.infected > min_infected_n and
population.long_shedders < max_long_shedders and
population.rng4.uniform() < 0.01
):
population.long_shedder_i.add(new_infected_index)
population.long_shedders += 1
individual_type = 'long_shedder'
else:
individual_type = 'standard'
return individual_type
[docs]
def infection(population, from_long_shedder=False):
'''
Select a random susceptible individual to be infected and tags it as
such. Update compartments and infected_i
'''
# Convert sets to lists for random sampling
if from_long_shedder:
# Infectious AND long-shedder
infectious_i_list = sorted(population.infectious_i & population.long_shedder_i)
else:
# Infectious but NOT long-shedder
infectious_i_list = sorted(population.infectious_i - population.long_shedder_i)
infection_fitness = [population.individuals[i]['fitness_score'] for i in infectious_i_list]
fitness_sum = sum(infection_fitness)
if fitness_sum > 0:
weights = [f / fitness_sum for f in infection_fitness]
else:
# fallback to uniform probabilities if all fitness scores are zero
weights = [1 / len(infectious_i_list)] * len(infectious_i_list)
parent = population.rng4.choice(infectious_i_list, p=weights)
susceptibles_list = sorted(population.susceptibles_i)
# select random patient to be infected
new_infected_index = population.rng4.choice(susceptibles_list)
population.exclude_i = {new_infected_index}
new_inf = population.individuals[new_infected_index]
# Move individual from susceptibles to infected
population.susceptibles_i.discard(new_infected_index)
population.infected_i.add(new_infected_index)
# time of infection
new_inf['t_infection'] = population.time
# state
new_inf['state'] = 'infected'
# parent
new_inf['parent'] = parent
# Select random lineage from parent
index = population.rng4.integers(0, population.individuals[parent]['IH_lineages_number'])
transmitted_lineage = population.individuals[parent]['IH_lineages'][index]
transmitted_fitness = population.individuals[parent]['IH_lineages_fitness_score'][index]
# count lineage infection
population._phylo_name_map[transmitted_lineage]['Total_infections'] = population._phylo_name_map[transmitted_lineage].get('Total_infections', 0) + 1
# assign transmitted lineage
new_inf['inherited_lineage'] = transmitted_lineage
new_inf['IH_lineages_number'] += 1
# update lineage trajectory
new_inf['IH_lineages_trajectory'][transmitted_lineage] = {'ih_birth':None,'ih_death':None}
# update mutation weight event timer
new_inf['time_last_weight_event'] = population.time
# update the active lineages number
population.active_lineages_n += new_inf['IH_lineages_number']
# Append transmitted lineage + fitness
new_lineages = new_inf['IH_lineages']
new_fitness = new_inf['IH_lineages_fitness_score']
new_lineages.append(transmitted_lineage)
new_fitness.append(transmitted_fitness)
# Sort lineage and fitness together
combined = sorted(zip(new_lineages, new_fitness))
lineages_sorted, fitness_sorted = zip(*combined)
new_inf['IH_lineages'] = list(lineages_sorted)
new_inf['IH_lineages_fitness_score'] = list(fitness_sorted)
# Update individual fitness (average of lineage's fitness)
new_inf['fitness_score'] = round(np.average(fitness_sorted), 4)
# store infection info for R effective
population.individuals[parent]['new_infections'].append({
'time_infection': population.time,
'transmitted_lineage': transmitted_lineage,
'individual_infected': new_infected_index
})
# set the individual to be a long shedder if conditions apply
new_inf['type'] = infect_long_shedder(population, new_infected_index)
if new_inf['type'] == 'long_shedder':
new_inf['IH_lineages_max'] = population.rng3.integers(5,16)
# update susceptibles and infected
population.susceptibles -= 1
population.infected += 1
[docs]
def add_lineage(population):
"""Adds a new lineage to a randomly selected infected individual,
duplicating an existing lineage or replacing one if at max capacity."""
# Convert set to list for sampling
infected_list = sorted(population.infected_i)
individual_index = population.rng4.choice(infected_list)
individual = population.individuals[individual_index]
IH_lineage_n = individual['IH_lineages_number']
IH_lineages_max = individual['IH_lineages_max']
# Add a duplicate lineage if under max capacity
if IH_lineage_n < IH_lineages_max:
idx = population.rng4.integers(0, IH_lineage_n)
duplicated_lineage = individual['IH_lineages'][idx]
individual['IH_lineages'].append(duplicated_lineage)
individual['IH_lineages_fitness_score'].append(individual['IH_lineages_fitness_score'][idx])
individual['IH_lineages_number'] += 1
population.active_lineages_n += 1
# Replace one lineage (delete + duplicate) if at max
elif IH_lineage_n == IH_lineages_max and IH_lineages_max > 1:
# Randomly delete one
delete_idx = population.rng4.integers(0, IH_lineage_n)
deleted_lineage = individual['IH_lineages'].pop(delete_idx)
individual['IH_lineages_fitness_score'].pop(delete_idx)
# update lineage trajectory
if deleted_lineage not in individual['IH_lineages']:
individual['IH_lineages_trajectory'][deleted_lineage]['ih_death'] = population.time
# Duplicate another
idx = population.rng4.integers(0, IH_lineage_n - 1) # now virus_n - 1 after deletion
individual['IH_lineages'].append(individual['IH_lineages'][idx])
individual['IH_lineages_fitness_score'].append(individual['IH_lineages_fitness_score'][idx])
# Keep IH_lineages and IH_lineages_fitness_score sorted
combined = sorted(zip(individual['IH_lineages'], individual['IH_lineages_fitness_score']))
individual['IH_lineages'], individual['IH_lineages_fitness_score'] = map(list, zip(*combined))
# update individual fitness
individual['fitness_score'] = round(np.average(individual['IH_lineages_fitness_score']), 4)