Source code for simplicity.population

# This file is part of SIMPLICITY
# Copyright (C) 2025 Pietro Gerletti
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""
Created on Tue Jun  6 13:13:14 2023

@author: pietro
"""
import simplicity.intra_host_model    as h
import simplicity.evolution.reference as ref
from   simplicity.random_gen          import randomgen
import pandas as pd
import numpy as np
import scipy.stats

[docs] class Population: ''' The class defines a population for the SIMPLICITY simulations. It contains the data about every individual as well as their intra-host model. '''
[docs] def __init__(self, size,I_0, ih_model_parameters, rng3,rng4,rng5,rng6, NSR_long, long_shedders_ratio=0, sequence_long_shedders=False, reservoir=100000): # random number generator self.rng3 = rng3 # for intra-host model states update self.rng4 = rng4 # for electing individuals|lineages when reactions happen self.rng5 = rng5 # for mutation model self.rng6 = rng6 # for synthetic sequencing data self.size = size self.long_shedders_ratio = long_shedders_ratio # compartments and population groups --------------------------------- self.susceptibles = size - I_0 # number of susceptible individuals self.infected = I_0 # number of infected individuals self.diagnosed = 0 # number of diagnosed individuals self.diagnosed_standard = 0 # self.diagnosed_long = 0 # self.recovered = 0 # number of recovered individuals self.long_shedders = 0 # number of long shedders self.infectious_standard = 0 self.infectious_long = 0 self.infectious = 0 # number of infectious individuals self.detectables_standard = 0 self.detectables_long = 0 self.detectables = 0 # number of detectable individuals self.reservoir = reservoir # size of total population (not everyone is # susceptible at the beginning, when # individuals get removed from the system # new ones from the reservoir become # susceptible) # individuals --------------------------------------------------------- self.individuals = {} # store individuals data self.reservoir_i = set() # set of indices of individuals in the reservoir self.susceptibles_i = set() # set of susceptible individuals indices self.infected_i = set() # set of infected individuals indices self.long_shedder_i = set() # set of infected long shedder individuals self.diagnosed_i = set() # set of diagnosed individuals indices self.diagnosed_standard_i = set() # self.diagnosed_long_i = set() # self.recovered_i = set() # set of recovered individuals indices self.infectious_standard_i = set() self.infectious_long_i = set() self.infectious_i = set() # set of infectious individuals indices self.detectables_standard_i = set() self.detectables_long_i = set() self.detectables_i = set() # set of detectable individuals indices self.exclude_i = set() # set to store the newly infected individual (excludes it from states update at time of infection) # set attributes for evolutionary model =============================== self.ref_genome = ref.get_reference() # sequence of reference genome self.L_lin = len(self.ref_genome) # lenght of reference genome self.active_lineages_n = I_0 # number of IH viruses in the # infected population self.NSR_long = NSR_long # long shedders substitution rate # Phylogenetic tree data ---------------------------------------------- self.sequence_long_shedders = sequence_long_shedders self.phylogenetic_data = [{ 'Time_emergence' : 0, 'Lineage_name' : 'wt', 'Lineage_parent' : None, 'Genome' : [], 'Host_type' : 'standard', 'Total_infections': 0 }] self._phylo_name_map = { row['Lineage_name']: row for row in self.phylogenetic_data } self.phylodots = [] # needed to name lineages self.sequencing_data = [] # to store sequencing data # --------------------------------------------------------------------- self.lineage_frequency = [] # count lineage frequency in the population # ------------------------------------------------------------------------- self.consensus_snapshot = [[[],1,0]] # [sequence, lineage_frequency, w_t(t_sim)] self.consensus_sequences_t = [[[],0]] # list to store consensus everytime is calculated in a simulation [consensus,t] # system trajectory --------------------------------------------------- self.time = 0 self.trajectory = [[self.time, self.infected, self.diagnosed, self.recovered, self.infectious, self.detectables, self.susceptibles, self.long_shedders ]] self.last_infection = {} # tracks the information about the last infection event # that happened in the simulaiton, used for # R_effective calculations self.R_effective_trajectory = [] self.fitness_trajectory = [] # ih model ------------------------------------------------------------- self.update_ih_mode = 'matrix' self.ih_model_parameters = ih_model_parameters self.host_model = {'standard': h.Host(tau_1=ih_model_parameters["tau_1"], tau_2=ih_model_parameters["tau_2"], tau_3=ih_model_parameters["tau_3"], tau_4=ih_model_parameters["tau_4"], update_mode = self.update_ih_mode) , # intra host model for standard individuals 'long_shedder': h.Host(tau_1=ih_model_parameters["tau_1"], tau_2=ih_model_parameters["tau_2"], tau_3=ih_model_parameters["tau_3_long"], tau_4=ih_model_parameters["tau_4"], update_mode = self.update_ih_mode) # intra host model for long-shedders } # ------------------------------------------------------------------------- # counter for inf reactions self.count_infections = 0 # self.count_infections_from_long_shedders = 0 # dictionary with all individuals data self.individuals = self._init_individuals(size,I_0)
# ------------------------------------------------------------------------- # ------------------------------------------------------------------------- def _init_individuals(self,size,I_0): ''' Create dictionary with all individuals in the simulation Parameters ---------- size : int Size of the population. I_0 : int Number of infected individuals at the beginning of the simulation. Returns ------- dic : dict Dictionary containing the info of all individuals in the population. ''' dic = {} # create an entry in the dictionary for each individual (0 to number of # total individuals in the population (reservoir)) for i in range(self.reservoir): dic[i] = { 't_infection' : None, 't_not_infected': None, 't_infectious': None, 't_not_infectious': None, 'type' : 'standard', 'state_t' : 0, 't_next_state': None, 'state' : 'susceptible', 'parent' : None, 'inherited_lineage': None, 'new_infections' : [], 'IH_lineages' : [], 'IH_unique_lineages_number': 0, #1 'IH_lineages_number' : 0, #1 'IH_lineages_max': self.rng3.integers(1,5), 'IH_lineages_fitness_score' : [], #1 'IH_lineages_trajectory': {}, # lineage name : [ih_time_start, ih_time_end] 'time_last_weight_event': 0, # time since infection or last mutation 'fitness_score' : 1e-6 # fitness floor (individual fitness) } # add index of individuals to either susceptibles indices or to # reservoir indices (the simulaiton starts with a pool of susceptibles) # that is replenished from the reservoir every time individuals # are removed from the system if i < size: self.susceptibles_i.add(i) else: self.reservoir_i.add(i) # set individuals infected at the beginning of the simulation for i in range(I_0): # update data of individuals infected at the beginning of the simulation dic[i]['parent'] = 'root' dic[i]['t_infection'] = 0 dic[i]['t_infectious'] = None dic[i]['state_t'] = 0 # sample next jump time from exp dist. state_t = dic[i]['state_t'] rate = - self.host_model['standard'].A[state_t][state_t] dic[i]['t_next_state'] = self.rng3.exponential(scale=1/rate) dic[i]['state'] = 'infected' dic[i]['IH_lineages'] = ['wt'] dic[i]['IH_lineages_fitness_score'] = [1e-6] dic[i]['IH_unique_lineages_number'] = 1 dic[i]['IH_lineages_number'] = 1 dic[i]['inherited_lineage'] = 'wt' dic[i]['IH_lineages_trajectory']['wt'] = {'ih_birth':None,'ih_death':None} self.susceptibles_i.remove(i) self.infected_i.add(i) # update lineage_frequency self.lineage_frequency.append({'Lineage_name' :'wt', 'Time_sampling' :0, 'Frequency_at_t' :1, 'Individuals_infected_at_t' : I_0 }) # return dictionary containing all individuals data (self.individuals) return dic # -------------------------------------------------------------------------
[docs] def get_lineage_genome(self, lineage_name): ''' Fetch lineage genome from lineage name ''' genome = next((d['Genome'] for d in self.phylogenetic_data if d['Lineage_name'] == lineage_name)) # print(f'{lineage_name} Genome: ', genome) return genome
# ------------------------------------------------------------------------- # Updates # -------------------------------------------------------------------------
[docs] def update_time(self,time): # update the time self.time = time
def _update_states_matrix(self, delta_t, individual_type): """ General intra-host state updater for a pop group using its host_model. """ if individual_type == 'standard': infected_to_update = [i for i in self.infected_i if i not in self.exclude_i and i not in self.long_shedder_i] elif individual_type == 'long_shedder': infected_to_update = [i for i in self.infected_i if i not in self.exclude_i and i in self.long_shedder_i] else: raise ValueError('Invalid individual type!') # compute transition probabilitiy vectors host_model = self.host_model[individual_type] all_probabilities = host_model.compute_all_probabilities(delta_t) # draw random variables for each infected individual in the subpopulation taus = self.rng3.uniform(size=len(infected_to_update)) for idx, i in enumerate(infected_to_update): ind = self.individuals[i] state = ind['state_t'] prob = all_probabilities[state] new_state = host_model.update_state(prob, taus[idx]) ind['state_t'] = new_state # Recovery check if new_state == 20: ind['state'] = 'recovered' if ind['t_not_infectious'] is None: ind['t_not_infectious'] = self.time if ind['t_not_infected'] is None: ind['t_not_infected'] = self.time else: raise ValueError('Individual already recovered!!') self.infected_i.remove(i) self.infectious_i.discard(i) self.detectables_i.discard(i) self.recovered_i.add(i) self.long_shedder_i.discard(i) self.infected -= 1 self.recovered += 1 self.susceptibles += 1 self.active_lineages_n -= ind['IH_lineages_number'] new_susceptible = self.reservoir_i.pop() self.susceptibles_i.add(new_susceptible) continue elif new_state <= 4: continue # Detectable update (5–19) if 4 < new_state < 20: self.detectables_i.add(i) else: self.detectables_i.discard(i) # Infectious update (5–18 for standard individuals) if 4 < new_state < 19: if i not in self.infectious_i: self.infectious_i.add(i) if ind['t_infectious'] is None: ind['t_infectious'] = self.time else: raise ValueError(f'Individual {i} t_infectious already set!!') else: if new_state != 19: raise ValueError('State here should be 19 only') if i in self.infectious_i: self.infectious_i.remove(i) if ind['t_not_infectious'] is None: ind['t_not_infectious'] = self.time else: raise ValueError(f'Individual {i} t_not_infectious already set!!') # Post-processing self.exclude_i = set() # Update compartments self.infectious_standard_i = sorted(self.infectious_i - self.long_shedder_i) self.infectious_long_i = sorted(self.infectious_i & self.long_shedder_i) self.detectables_standard_i = sorted(self.detectables_i - self.long_shedder_i) self.detectables_long_i = sorted(self.detectables_i & self.long_shedder_i) self.infectious = len(self.infectious_i) self.infectious_standard = len(self.infectious_standard_i) self.infectious_long = len(self.infectious_long_i) self.detectables = len(self.detectables_i) self.detectables_long = len(self.detectables_long_i) self.detectables_standard = len(self.detectables_standard_i) self.long_shedders = len(self.long_shedder_i)
[docs] def update_states(self, delta_t): if self.update_ih_mode == "jump": self._update_states_jump() elif self.update_ih_mode == "matrix": self._update_states_matrix(delta_t,'standard') self._update_states_matrix(delta_t, 'long_shedder') else: raise ValueError(f"Unknown update_mode: {self.update_mode}")
[docs] def update_trajectory(self): # update the system trajectory self.trajectory.append([self.time, self.infected, self.diagnosed, self.recovered, self.infectious, self.detectables, self.susceptibles, self.long_shedders ])
[docs] def update_fitness_trajectory(self): fitness_scores = [self.individuals[i]['fitness_score'] for i in self.infected_i] if not fitness_scores: self.fitness_trajectory.append([self.time, 0, 0, 0]) return mean_fitness = np.mean(fitness_scores) std_fitness = np.std(fitness_scores) # Entropy of normalized fitness total = sum(fitness_scores) normed = [f / total for f in fitness_scores] entropy = scipy.stats.entropy(normed) if total > 0 else 0.0 self.fitness_trajectory.append({ 'Time': self.time, 'Mean': mean_fitness, 'Std': std_fitness, 'Entropy': entropy })
[docs] def update_lineage_frequency_t(self, t): ''' Count how many individuals are infected by each lineage at time t in the population and store the following information: time, lineage_name, frequency. ''' # store lineages here: {'lineage name': number of infected individuals} count_lineages_t = {} # Loop through the infected individuals and get a list of unique IH_virus names (lineage) for individual_index in set(self.infected_i) - set(self.long_shedder_i): unique_lineages = set(self.individuals[individual_index]['IH_lineages']) for lineage_name in unique_lineages: count_lineages_t[lineage_name] = count_lineages_t.get(lineage_name, 0) + 1 # Calculate the total count for normalization infected_individuals_at_t_total = sum(count_lineages_t.values()) # note that if an individual is infected by more than one lineage it will count double # Store: [time, lineage_name, frequency, infected_individuals_at_t] for lineage_name in count_lineages_t: # Compute frequency as the relative count infected_individuals_lin = count_lineages_t[lineage_name] frequency = infected_individuals_lin / infected_individuals_at_t_total # store frequency data dic = {'Lineage_name' :lineage_name, 'Time_sampling' :t, 'Frequency_at_t' :frequency, 'Individuals_infected_at_t' :infected_individuals_lin } self.lineage_frequency.append(dic) for lineage_name in count_lineages_t: frequency = count_lineages_t[lineage_name] / infected_individuals_at_t_total self.consensus_snapshot.append([self.get_lineage_genome(lineage_name), frequency, t])
# -----------------------------------------------------------------------------
[docs] def update_ih_lineages_trajectories(self): individuals_data = pd.DataFrame(self.individuals).transpose() for idx, row in individuals_data.iterrows(): lineage_traj_dic = row['IH_lineages_trajectory'] for lineage in lineage_traj_dic: if lineage_traj_dic[lineage].get('ih_birth') is None: lineage_traj_dic[lineage]['ih_birth'] = row['t_infectious'] if lineage_traj_dic[lineage].get('ih_death') is None: lineage_traj_dic[lineage]['ih_death'] = row['t_not_infectious'] individuals_data.at[idx, 'IH_lineages_trajectory'] = lineage_traj_dic return individuals_data
[docs] def individuals_data_to_df(self): # return population dictionary as data frame df = self.update_ih_lineages_trajectories() filtered_df = df[~df['state'].str.contains('susceptible')] filtered_df = filtered_df.drop('t_next_state', axis=1) return filtered_df
[docs] def phylogenetic_data_to_df(self): # return phylogeny dictionary as data frame return pd.DataFrame(self.phylogenetic_data)
[docs] def lineage_frequency_to_df(self): # return lineage_frequency as data frame return pd.DataFrame(self.lineage_frequency)
[docs] def fitness_trajectory_to_df(self): return pd.DataFrame(self.fitness_trajectory)
# ----------------------------------------------------------------------------- # ============================================================================= # -----------------------------------------------------------------------------
[docs] def create_population(parameters): ''' Create population instance from parameters file and return it. ''' # population parameters pop_size = parameters['population_size'] I_0 = parameters['infected_individuals_at_start'] seed = parameters['seed'] long_shedders_ratio = parameters['long_shedders_ratio'] M_nsr_long = parameters['M_nsr_long'] sequence_long_shedders = parameters['sequence_long_shedders'] NSR_long = parameters['nucleotide_substitution_rate'] * M_nsr_long ih_model_parameters = { 'tau_1': parameters['tau_1'], 'tau_2': parameters['tau_2'], 'tau_3': parameters['tau_3'], 'tau_3_long': parameters['tau_3_long'], 'tau_4': parameters['tau_4'], } # create random number generators seeds_generator=randomgen(seed+10000) # add to the seed so that rng3 and 4 differ from rng1 and 2 in Simplicity class # random number generators for population rng3 = randomgen(seeds_generator.integers(0,10000)) # for intra-host model states update rng4 = randomgen(seeds_generator.integers(0,10000)) # for electing individuals|lineages when reactions happen rng5 = randomgen(seeds_generator.integers(0,10000)) # for mutation model rng6 = randomgen(seeds_generator.integers(0,10000)) # for synthetic sequencing data # create population pop = Population(pop_size, I_0, ih_model_parameters, rng3,rng4,rng5,rng6, NSR_long, long_shedders_ratio, sequence_long_shedders) return pop