Source code for simplicity.evolution.mutations

# This file is part of SIMPLICITY
# Copyright (C) 2025 Pietro Gerletti
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Jun 1 11:15:10 2023

@author: pietro
"""
import simplicity.phenotype.update    as pheno
import numpy as np
import copy

[docs] def sub_events(rng, NSR, L, dt, IH_lineages): ''' Choose the number of substitutions that happen in a time step dt, drawing random number sampled from Poisson distribution. Parameters ---------- rng : numpy method Random number generator. NSR : float nucleotide substitution rate. dt : float delta t - extrande time step. IH_lineages : int number of active IH_lineages that can mutate in time step dt Returns ------- int number of substitutions that happen in a time step dt. ''' lambda_pop = NSR*dt*IH_lineages*L events = rng.poisson(lambda_pop) return events
[docs] def select_positions(population, L, NSR, dt): ''' Select genome positions to mutate using two parallel Poisson processes. Returns: positions : array of pooled genome indices to mutate mutating_lineages : list of [individual_id, lineage_id] per mutation sub_number : number of mutations this time step ''' rng = population.rng5 # define Groups ids_long = population.long_shedder_i ids_standard = population.infected_i - population.long_shedder_i # define Rates rate_standard = NSR rate_long = population.NSR_long # build Pools pool_standard = [] # sorted for reproducibility for i in sorted(ids_standard): n_lin = population.individuals[i]['IH_lineages_number'] for j in range(n_lin): pool_standard.append([i, j]) pool_long = [] for i in sorted(ids_long): n_lin = population.individuals[i]['IH_lineages_number'] for j in range(n_lin): pool_long.append([i, j]) # poisson draws total_lineages_standard = len(pool_standard) total_lineages_long = len(pool_long) sub_number_standard = sub_events(rng, rate_standard, L, dt, total_lineages_standard) if total_lineages_standard > 0 else 0 sub_number_long = sub_events(rng, rate_long, L, dt, total_lineages_long) if total_lineages_long > 0 else 0 sub_number = sub_number_standard + sub_number_long if sub_number == 0: return 'No substitutions' # select Targets and merge mutating_lineages = [] if sub_number_standard > 0: # uniform draw with replacement selected_indices_standard = rng.choice(total_lineages_standard, size=sub_number_standard, replace=True) for idx in selected_indices_standard: mutating_lineages.append(pool_standard[idx]) if sub_number_long > 0: # uniform draw with replacement selected_indices_long = rng.choice(total_lineages_long, size=sub_number_long, replace=True) for idx in selected_indices_long: mutating_lineages.append(pool_long[idx]) # generate pooled positions positions = [] for k in range(sub_number): genome_pos = rng.integers(0, L) pooled_pos = k * L + genome_pos positions.append(pooled_pos) return np.array(positions), mutating_lineages, sub_number
[docs] def map_to_lin(positions,L): ''' Maps positions to be mutated onto active_lineages (lineages). Parameters ---------- positions: array positions that will undergo substitution L: int lenght of lineage genome Returns ------- sub_coord_linmap : tuple lineages mutation coordinates: (LIST index of active_lineage to mutate, position in the genome to mutate). the index here does not refer to the index of the population.individuals dic ''' return [[pos // L, pos % L] for pos in positions]
[docs] def map_to_dic(mutating_lineages, positions, L_lin): ''' Map mutation positions onto dictionary of population.individuals Parameters ---------- mutating_lineages : list list of selected lineages that will mutate (subset of active lineages( coord map onto individuals positions: array positions that will undergo substitution Returns ------- sub_coord_dicmap : list coordinates of substitution: index of individual (in population.individuals) that hosts the lineage index of lineage inside the host position in the genome of lineage to be mutated ''' sub_coord_dicmap = [] sub_coord_linmap = map_to_lin(positions, L_lin) for i in sub_coord_linmap: # i[0] --> index of active_lineages that refers to lineages to mutate lin_coord = mutating_lineages[i[0]] # lin_coord[0] --> index of individual (in population.individuals) that hosts the lineage # lin_coord[1] --> index of lineage inside the host # i[1] --> position in the genome of lineage to be mutated sub_coord_dicmap.append([lin_coord[0], lin_coord[1], i[1]]) return sub_coord_dicmap
[docs] def fetch_bases(population, sub_coord_dicmap): ''' Add base to be substituted to subst_coord. First it checks if the position is already mutated, if so it takes the mutated base. Otherwise fetches the base from the reference genome. Parameters ---------- population : instance of population class sub_coord_dicmap : list coordinates of substitution Returns ------- sub_coord_dicmap: list coordinates of substitution: index of individual (in population.individuals) that hosts the lineage index of lineage inside the host position in the genome of lineage mutated nitrogenous base of the lineage at that position ''' # loop over list of coordinates to fetch the corresponding base for coord in sub_coord_dicmap: # lineage --> list of mutations that a lineage holds (property of individual) lineage_name = population.individuals[coord[0]]['IH_lineages'][coord[1]] lineage_genome = population.get_lineage_genome(lineage_name) # get mutation if the lineage already have one at position mut_coord = [mut for mut in lineage_genome if coord[2] in mut] # if so, fetch the base from lineage if mut_coord: coord.append(mut_coord[0][1]) # else, fetch it from reference genome else: coord.append(population.ref_genome[coord[2]]) return sub_coord_dicmap
[docs] def get_subst_numbers(sub_coord_dicmap, sub_number): ''' Returns count of how many of each nitrogenous base will undergo substitution Parameters ---------- sub_coord_dicmap : list coordinates of substitutions sub_number : int number of substitution events Raises ------ ValueError Raise error if the list of coordinates is not the same size as the number of substitution events Returns ------- TYPE dic that counts the number of bases to be substituted. e.g. 4 A, 3 T, 0 C, 5 G ''' if sub_number != len(sub_coord_dicmap): raise ValueError('subst_coord must have sub_number lenght') A = sum('A' in coord for coord in sub_coord_dicmap) T = sum('T' in coord for coord in sub_coord_dicmap) C = sum('C' in coord for coord in sub_coord_dicmap) G = sum('G' in coord for coord in sub_coord_dicmap) # print("bases count", {'A':A,'T':T,'C':C,'G':G}) return {'A':A,'T':T,'C':C,'G':G} # bases_count
[docs] def sub_matrix(): ''' Define substitution matrix. If you want to specify a different matrix, change the code here. ''' # row A pAA = 0 pAT = 1/3 pAC = 1/3 pAG = 1/3 # row T pTA = 1/3 pTT = 0 pTC = 1/3 pTG = 1/3 # row C pCA = 1/3 pCT = 1/3 pCC = 0 pCG = 1/3 # row G pGA = 1/3 pGT = 1/3 pGC = 1/3 pGG = 0 M = { 'A': [pAA,pAT,pAC,pAG], 'T': [pTA,pTT,pTC,pTG], 'C': [pCA,pCT,pCC,pCG], 'G': [pGA,pGT,pGC,pGG], } return M
[docs] def substitution(M,bases_count,rng): ''' Generate substitutions for each base based on substitution matrix. These substitutions are not assigned to any genome position yet, the function returns a dict with a list of substitutions for each nitrogenous base. Parameters ---------- M : substitution matrix bases_count : dic dictionary with count of bases to substitute Returns ------- dic dictionary that contain a list of substitutions for each base type. e.g A = [T,T,G,C,G,T,T,C,C,G] ''' NB = ['A','T','C','G'] A = rng.choice(NB,bases_count['A'],p=M['A']) T = rng.choice(NB,bases_count['T'],p=M['T']) C = rng.choice(NB,bases_count['C'],p=M['C']) G = rng.choice(NB,bases_count['G'],p=M['G']) # print("unassigned subst", {'A':A,'T':T,'C':C,'G':G}) return {'A':A,'T':T,'C':C,'G':G} # unassigned_subst
[docs] def assign_sub(unassigned_subst,sub_coord_dicmap): ''' Update the substitution coordinates to include the substitutions that will be included (assign the substitutions to lineages with the positions in their genome) Parameters ---------- unassigned_subst : dic dict with a list of substitutions for each nitrogenous base. sub_coord_dicmap : coordinates of the substitutions that will be introduced Returns ------- sub_coord_dicmap: list UPDATED coordinates of substitution: index of individual (in population.individuals) that hosts the lineage index of lineage inside the host position in the genome of lineage mutated nitrogenous base that will be introduced at that position ''' i_A = 0 i_T = 0 i_C = 0 i_G = 0 for coord in sub_coord_dicmap: if coord[3] == 'A': coord[3] = unassigned_subst['A'][i_A] i_A +=1 elif coord[3] == 'T': coord[3] = unassigned_subst['T'][i_T] i_T +=1 elif coord[3] == 'C': coord[3] = unassigned_subst['C'][i_C] i_C +=1 elif coord[3] == 'G': coord[3] = unassigned_subst['G'][i_G] i_G +=1 # print(i_A) # print(i_T) # print(i_C) # print(i_G) return sub_coord_dicmap
[docs] def get_lineage_name(phylodots,lineage_name): # function to name the lineages during the simulation n_dots = lineage_name.count('.') try: phylodots[n_dots] += 1 lineage_name = lineage_name + '.' + str(phylodots[n_dots]) except: phylodots.append(0) lineage_name = lineage_name + '.0' return phylodots, lineage_name
[docs] def update_lineages(population, sub_coord_dicmap): ''' Update lineages genomes in population with the selected mutations from the coordinates. Parameters ---------- population : instance of population class sub_coord_dicmap : substituions coordinates ''' # mutate every individual selected and update nodes info for coord in sub_coord_dicmap: individual = population.individuals[coord[0]] # parent of the phylogenetic tree node parent_lineage_name = individual['IH_lineages'][coord[1]] # rename the IH lineage after new mutation introduced (in individual) population.phylodots, new_lineage_name = get_lineage_name(population.phylodots,parent_lineage_name) individual['IH_lineages'][coord[1]] = new_lineage_name # update lineage ih trajectories individual['IH_lineages_trajectory'][new_lineage_name] = {'ih_birth':population.time, 'ih_death': None } if parent_lineage_name not in individual['IH_lineages']: individual['IH_lineages_trajectory'][parent_lineage_name]['ih_death'] = population.time # update lineages count inside individual individual['IH_unique_lineages_number'] = len( set(individual['IH_lineages'])) parent_lineage_genome = population.get_lineage_genome(parent_lineage_name) new_lineage_genome = copy.deepcopy(parent_lineage_genome) new_lineage_genome.append([int(coord[2]), str(coord[3])]) new_lineage_genome.sort(key=lambda x: x[0]) # sort by position new_row_dict = {'Time_emergence' : population.time, 'Lineage_name' : new_lineage_name, 'Lineage_parent' : parent_lineage_name, 'Genome' : new_lineage_genome, 'Host_type' : individual['type'], 'Total_infections': 0 } population.phylogenetic_data.append(new_row_dict) population._phylo_name_map[new_lineage_name] = new_row_dict # print(f'Phylo data: {population.phylogenetic_data}') mutated_individuals = {coord[0] for coord in sub_coord_dicmap} # update individuals that mutated for i in mutated_individuals: individual = population.individuals[i] combined = sorted(zip(individual['IH_lineages'], individual['IH_lineages_fitness_score'])) individual['IH_lineages'], individual['IH_lineages_fitness_score'] = map(list, zip(*combined)) individual['time_last_weight_event'] = population.time # reset time counter for weight
[docs] def get_individuals_to_update(subst_coord): # get indices of individuals that mutated to update their fitness scores mutated_individuals = [i[0] for i in subst_coord] mutated_individuals = sorted(set(mutated_individuals)) return mutated_individuals
[docs] def mutate(population, NSR, L, dt, phenotype_model, *args): ''' Mutation model, mutates the viruses in the population. Parameters ---------- NSR : float nucleotide substitution rate. dt : float delta t - extrande time step (in years) ''' # select number of substitutions and positions in pooled genome select_pos = select_positions(population, L, NSR, dt) # if mutations are happening if select_pos != 'No substitutions': # positions in pooled genome positions = select_pos[0] # vector of active lineages mutating_lineages = select_pos[1] # number of substitutions happening subst_number = select_pos[2] # map substitutions to index of lineages subst_coord = map_to_dic(mutating_lineages, positions, L) # fetch the bases that will undergo substituion subst_coord = fetch_bases(population, subst_coord) # count number of each nitrogenous base to be mutated (for bulk update) bases_count = get_subst_numbers(subst_coord, subst_number) # use substitution matrix to select mutations that will happen unassigned_subst = substitution(sub_matrix(), bases_count,population.rng5) # assign the mutations to their relative genome subst_coord = assign_sub(unassigned_subst, subst_coord) # update lineages in simulation with the corresponding substitution update_lineages(population, subst_coord) # update fitness score of individuals and lineages # print('--X----X----X----X----X----X--') # print('Substitution coordinates: ', subst_coord) individuals_to_update = get_individuals_to_update(subst_coord) # print('Individuals to be updated: ', individuals_to_update) update_fitness = pheno.update_fitness_factory(phenotype_model) if args: consensus = args[0] update_fitness(population, individuals_to_update, consensus) else: update_fitness(population, individuals_to_update)