Source code for simplicity.output_manager

# This file is part of SIMPLICITY
# Copyright (C) 2025 Pietro Gerletti
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Aug 29 13:56:20 2024

@author: pietro
"""

import simplicity.dir_manager as dm
import simplicity.settings_manager as sm
import os
import shutil
import tarfile
import csv
from simplicity.evolution.decoder import decode_genome
import simplicity.phenotype.distance  as dis
import pandas as pd
import simplicity.tuning.evolutionary_rate as er
import glob
import ast
import numpy as np
from tqdm import tqdm
import warnings
from pathlib import Path

[docs] def setup_output_directory(experiment_name, seeded_simulation_parameters_path): """ Sets up the output directory structure for a simulation. This function constructs the output directory path based on the provided seeded simulation parameters file path and the experiment name. Args: seeded_simulation_parameters_path (str): The file path to the seeded simulation parameters. experiment_name (str): The name of the experiment for which the output directory is being set up. Returns: str: The path to the main output directory created for this experiment. Example: If the `seeded_simulation_parameters_path` is '/path/to/seed/files/seed_file.txt' and the `experiment_name` is 'Experiment_1', the function will create and return the path: '/data_dir/Experiment_1/04_Output/files/seed_file' """ # Split the path into components path_components = os.path.split(seeded_simulation_parameters_path) # Extract the necessary components root_folder_of_seed_file = os.path.split(path_components[0])[-1] # Get the seed file name without the extension seed_file_name = os.path.splitext(path_components[-1])[0] # Construct the output directory path output_dir = os.path.join(dm.get_experiment_output_dir(experiment_name), root_folder_of_seed_file, seed_file_name) try: os.makedirs(output_dir) except: raise RuntimeError('You already run an experiment with the same name!') # Return the output directory path return output_dir
[docs] def archive_experiment(experiment_name): """ Archives the specified experiment folder as a .tar.gz file and deletes the original folder. This script: 1. Checks if the 'Archive' directory exists within the data directory, and creates it if not. 2. Compresses the experiment folder into a .tar.gz archive that is compatible with Windows. 3. Deletes the original experiment folder after archiving. Args: experiment_name (str): The name of the experiment to be archived. Raises: FileNotFoundError: If the experiment folder does not exist. """ # Get the data directory from the dm data_dir = dm.get_data_dir() # Define the path to the experiment folder experiment_folder_path = os.path.join(data_dir, experiment_name) # Check if the experiment folder exists if not os.path.exists(experiment_folder_path) or experiment_name=='': raise FileNotFoundError(f"The experiment folder '{experiment_folder_path}' does not exist.") # Define the path to the Archive directory archive_dir = os.path.join(data_dir, "Archive") # Create the Archive directory if it does not exist if not os.path.exists(archive_dir): os.makedirs(archive_dir) print(f"Created archive directory: {archive_dir}") # Define the path for the tar.gz archive file archive_file_path = os.path.join(archive_dir, f"{experiment_name}.tar.gz") # Create the tar.gz archive with tarfile.open(archive_file_path, "w:gz") as tar: tar.add(experiment_folder_path, arcname=os.path.basename(experiment_folder_path)) print(f"Archived '{experiment_folder_path}' to '{archive_file_path}'") # Delete the original experiment folder shutil.rmtree(experiment_folder_path) print(f"Deleted the original experiment folder: {experiment_folder_path}")
# ----------------------------------------------------------------------------- # read and write simulation output # -----------------------------------------------------------------------------
[docs] def save_sequencing_dataset(simulation_output, output_path, sequence_long_shedders=False): """ Writes the simulated sequencing data to FASTA files and a regression CSV. Parameters: - simulation_output: The object containing simulation results. - output_path: Directory to save files. - sequence_long_shedders (bool): If True, extracts and saves all lineages from long shedders (at end of infection/sim) to a separate FASTA and includes them in the regression CSV. Outputs: 1. sequencing_data.fasta: Random surveillance sequences 2. sequencing_data_long.fasta: All lineages from long-shedders 3. sequencing_data_regression.csv: Combined metrics for both groups. 4. sequencing_data.csv: Raw surveillance metadata """ # Ensure output directory exists if not os.path.exists(output_path): os.makedirs(output_path) try: # 1. Setup Paths fasta_file_path = os.path.join(output_path, 'sequencing_data.fasta') fasta_long_path = os.path.join(output_path, 'sequencing_data_long.fasta') data_regression_file_path = os.path.join(output_path, 'sequencing_data_regression.csv') data_file_path = os.path.join(output_path, 'sequencing_data.csv') # List to collect data for the regression CSV (combine surveillance + long shedders) sequencing_data_regression_dic = [] # ============================================================================== # Process Random Surveillance Data # ============================================================================== sequencing_data = simulation_output.sequencing_data if sequencing_data: # Open standard FASTA with open(fasta_file_path, 'w') as fasta_file: for individual_sequencing_data in sequencing_data: # Extract details ind_index = individual_sequencing_data['individual_index'] ih_lin_index = individual_sequencing_data['intra-host_lineage_index'] seq_time = individual_sequencing_data['sequencing_time'] lin_name = individual_sequencing_data['lineage_name'] sequence = individual_sequencing_data['sequence'] ind_type = individual_sequencing_data['individual_type'] # Write to FASTA sequences_id = f'>{ind_index}_{ih_lin_index}_time_{seq_time:.2f}_lin_{lin_name}' fasta_file.write(f"{sequences_id}\n") decoded_genome = decode_genome(sequence) genome_str = decoded_genome.replace("'", "") fasta_file.write(f"{genome_str}\n") # Add to Regression Dict sequencing_data_regression_dic.append({ 'Sequencing_time': round(seq_time / 365.25, 4), # Years 'Distance_from_root': dis.hamming(sequence) / len(dis.reference), 'individual_type': ind_type }) # Save Sequencing Metadata CSV with open(data_file_path, mode='w', newline='') as file: fieldnames = sequencing_data[0].keys() writer = csv.DictWriter(file, fieldnames=fieldnames) writer.writeheader() writer.writerows(sequencing_data) # ============================================================================== # Process All Long Shedders (Optional) # ============================================================================== long_shedder_entries = [] # Buffer if sequence_long_shedders: # Iterate through all individuals to find long shedders for i, ind_data in simulation_output.individuals.items(): # Filter: Only Long Shedders who were infected and completed their infection course before the end of a simulation if ind_data['type'] == 'long_shedder' and ind_data['state'] == 'recovered': # 1. Determine Sequencing Time if ind_data['t_not_infectious'] is not None: seq_time = ind_data['t_not_infectious'] else: seq_time = simulation_output.time # 2. Iterate through ALL lineages if not ind_data['IH_lineages']: continue for lineage_name in ind_data['IH_lineages']: # Retrieve the genome sequence sequence_list = simulation_output.get_lineage_genome(lineage_name) # Prepare FASTA entry string header = f">{i}_long_shedder_time_{seq_time:.2f}_lin_{lineage_name}" decoded_genome = decode_genome(sequence_list) genome_str = decoded_genome.replace("'", "") # Store in buffer long_shedder_entries.append(f"{header}\n{genome_str}\n") # Add to Regression Dict sequencing_data_regression_dic.append({ 'Sequencing_time': round(seq_time / 365.25, 4), 'Distance_from_root': dis.hamming(sequence_list) / len(dis.reference), 'individual_type': 'long_shedder' }) # Only write the Long Shedder FASTA file if we actually found data if long_shedder_entries: with open(fasta_long_path, 'w') as fasta_long: fasta_long.writelines(long_shedder_entries) # ============================================================================== # PART C: Save Regression CSV (Combined) # ============================================================================== if sequencing_data_regression_dic: with open(data_regression_file_path, mode='w', newline='') as file: fieldnames = sequencing_data_regression_dic[0].keys() writer = csv.DictWriter(file, fieldnames=fieldnames) writer.writeheader() writer.writerows(sequencing_data_regression_dic) except Exception as e: import traceback traceback.print_exc() print(f"Warning: An error occurred while saving sequencing datasets: {e}")
[docs] def read_sequencing_data_regression(seeded_simulation_output_dir): sequencing_data_file_path = os.path.join(seeded_simulation_output_dir, "sequencing_data_regression.csv") df = pd.read_csv(sequencing_data_file_path) return df
[docs] def save_simulation_trajectory(simulation_output, seeded_simulation_output_dir): df = pd.DataFrame(simulation_output.trajectory, columns= ['time','infected','diagnosed','recovered', 'infectious','detectables','susceptibles','long_shedders']) trajectory_file_path = os.path.join(seeded_simulation_output_dir, "simulation_trajectory.csv") df.to_csv(trajectory_file_path, index=False)
[docs] def read_simulation_trajectory(seeded_simulation_output_dir): trajectory_file_path = os.path.join(seeded_simulation_output_dir, "simulation_trajectory.csv") df = pd.read_csv(trajectory_file_path) return df
[docs] def save_lineage_frequency(simulation_output, seeded_simulation_output_dir): df = simulation_output.lineage_frequency_to_df() lineage_frequency_file_path = os.path.join(seeded_simulation_output_dir, "lineage_frequency.csv") df.to_csv(lineage_frequency_file_path, index=False)
[docs] def read_lineage_frequency(seeded_simulation_output_dir): lineage_frequency_file_path = os.path.join(seeded_simulation_output_dir, "lineage_frequency.csv") df = pd.read_csv(lineage_frequency_file_path) return df
[docs] def save_individuals_data(simulation_output, seeded_simulation_output_dir): individuals_data = simulation_output.individuals_data_to_df() individuals_data_file_path = os.path.join(seeded_simulation_output_dir, "individuals_data.csv") individuals_data.to_csv(individuals_data_file_path)
[docs] def read_individuals_data(seeded_simulation_output_dir): trajectory_file_path = os.path.join(seeded_simulation_output_dir, "individuals_data.csv") df = pd.read_csv(trajectory_file_path, index_col=0,low_memory=False) df['type'] = df['type'].astype(str) df['IH_lineages'] = df['IH_lineages'].apply(ast.literal_eval) df['IH_lineages_fitness_score'] = df['IH_lineages_fitness_score'].apply(ast.literal_eval) df['new_infections'] = df['new_infections'].apply(ast.literal_eval) df['IH_lineages_trajectory'] = df['IH_lineages_trajectory'].apply(ast.literal_eval) return df
[docs] def save_phylogenetic_data(simulation_output, seeded_simulation_output_dir): phylogenetic_data = simulation_output.phylogenetic_data_to_df() phylogenetic_data_file_path = os.path.join(seeded_simulation_output_dir, "phylogenetic_data.csv") phylogenetic_data.to_csv(phylogenetic_data_file_path)
[docs] def read_phylogenetic_data(seeded_simulation_output_dir): trajectory_file_path = os.path.join(seeded_simulation_output_dir, "phylogenetic_data.csv") df = pd.read_csv(trajectory_file_path, index_col=0) df['Genome'] = df['Genome'].apply(ast.literal_eval) return df
[docs] def save_fitness_trajectory(simulation_output, seeded_simulation_output_dir): fitness_trajectory = simulation_output.fitness_trajectory_to_df() fitness_trajectory_file_path = os.path.join(seeded_simulation_output_dir, "fitness_trajectory.csv") fitness_trajectory.to_csv(fitness_trajectory_file_path)
[docs] def save_final_time(simulation_output, seeded_simulation_output_dir): final_time = round(simulation_output.time,6) final_time_file_path = os.path.join(seeded_simulation_output_dir, "final_time.csv") with open(final_time_file_path, mode='w', newline='') as file: writer = csv.writer(file) writer.writerow([final_time])
[docs] def read_final_time(seeded_simulation_output_dir): final_time_file_path = os.path.join(seeded_simulation_output_dir, "final_time.csv") with open(final_time_file_path, 'r') as f: final_time = float(f.read().strip()) return final_time
[docs] def save_R_effective_trajectory(simulation_output, seeded_simulation_output_dir): R_effective_trajectory = simulation_output.R_effective_trajectory_to_df() R_effective_trajectory_file_path = os.path.join(seeded_simulation_output_dir, "R_effective_trajectory.csv") R_effective_trajectory.to_csv(R_effective_trajectory_file_path)
[docs] def read_R_effective_trajectory(seeded_simulation_output_dir): trajectory_file_path = os.path.join(seeded_simulation_output_dir, "R_effective_trajectory.csv") df = pd.read_csv(trajectory_file_path,index_col=0) return df
# ----------------------------------------------------------------------------- # Intra host lineages # -----------------------------------------------------------------------------
[docs] def get_all_individuals_data_for_simulation_output_dir(simulation_output_dir): seeded_simulation_output_dirs = dm.get_seeded_simulation_output_dirs(simulation_output_dir) all_individuals_data = pd.DataFrame() for seeded_simulation_output_dir in seeded_simulation_output_dirs: individuals_data = os.path.join(seeded_simulation_output_dir,'individuals_data.csv') df = pd.read_csv(individuals_data, index_col=0) all_individuals_data = pd.concat([all_individuals_data, df], axis=0) return all_individuals_data
[docs] def get_IH_lineages_data_simulation(simulation_output_dir): # get individuals dataframe data = get_all_individuals_data_for_simulation_output_dir(simulation_output_dir) # keep only needed columns data = data[['IH_lineages_number', 'IH_unique_lineages_number']] # DataFrames with all possible IH_lineages_number and IH_unique_lineages_numbervalues (1 to 5) IH_lineages_number_values = pd.DataFrame({'IH_lineages_number': [1, 2, 3, 4, 5]}) IH_unique_lineages_number_values = pd.DataFrame({'IH_unique_lineages_number': [1, 2, 3, 4, 5]}) # Calculate the counts of each value for each type ih_virus_count = data.groupby(['IH_lineages_number']).size().reset_index(name='ih_virus_count') ih_lineage_count = data.groupby(['IH_unique_lineages_number']).size().reset_index(name='ih_lineage_count') # Merge with the IH_lineages_number_values DataFrame, filling NaNs with 0 ih_virus_count = pd.merge(IH_lineages_number_values, ih_virus_count, on='IH_lineages_number', how='left').fillna({'ih_virus_count': 0}) ih_lineage_count = pd.merge(IH_unique_lineages_number_values, ih_lineage_count, on='IH_unique_lineages_number', how='left').fillna({'ih_lineage_count': 0}) # Normalize counts by the total count ih_virus_count['ih_virus_count'] = ih_virus_count['ih_virus_count'] / ih_virus_count['ih_virus_count'].sum() ih_lineage_count['ih_lineage_count'] = ih_lineage_count['ih_lineage_count'] / ih_lineage_count['ih_lineage_count'].sum() df = pd.concat([ih_virus_count,ih_lineage_count],axis=1) # add hue column to df - hue is IH_lineage_emergence_rate df['IH_virus_emergence_rate'] = sm.get_parameter_value_from_simulation_output_dir(simulation_output_dir, 'IH_virus_emergence_rate') return df
[docs] def get_IH_lineages_data_experiment(experiment_name): simulation_output_dirs = dm.get_simulation_output_dirs(experiment_name) df = pd.DataFrame() for simulation_output_dir in simulation_output_dirs: new_df = get_IH_lineages_data_simulation(simulation_output_dir) df = pd.concat([df,new_df],axis=0) return df
# ----------------------------------------------------------------------------- # OSR fitting # ------------------------------------------------------------------------------
[docs] def filter_sequencing_files_by_simulation_lenght(files, min_sim_lenght): """ Filters sequencing files by keeping only the ones from simulation that lasted at least min_sim_lenght. """ filtered_files = [] for file in files: seeded_simulation_output_dir = os.path.dirname(file) try: final_time = read_final_time(seeded_simulation_output_dir) if final_time >= min_sim_lenght: filtered_files.append(file) except Exception as e: print(f"Error reading final_time: {e}") print(f'Keeping files with simulation lenght >= {min_sim_lenght}') # print(filtered_files) return filtered_files
[docs] def create_combined_sequencing_df(source, min_seq_number=0, min_sim_lenght=0, individual_type=None): ''' Join sequencing_data_regression.csv files. Args: source: Either a string (path to simulation directory) or a list of SSOD paths. - If str: Globs all seeds in that directory. - If list: Uses only the provided seed paths. ''' # 1. Determine the list of files to process if isinstance(source, str): # Case A: Source is a directory print(f"processing simulation batch: {os.path.basename(source)}") csv_files = glob.glob(os.path.join(source, '**', 'sequencing_data_regression.csv'), recursive=True) elif isinstance(source, list): # Case B: Source is a list of specific SSOD paths (filtered behavior) # print(f"processing filtered list of {len(source)} seeds") csv_files = [os.path.join(ssod, 'sequencing_data_regression.csv') for ssod in source] else: return None if individual_type: print(f"Filtering for individual_type: {individual_type}") # 2. Filter files by simulation length filtered_csv_files = filter_sequencing_files_by_simulation_lenght(csv_files, min_sim_lenght) print(f'Keeping files with at least {min_seq_number} sequences') print('') # 3. Read and concatenate DataFrames data_frames = [] for csv_file in filtered_csv_files: try: df = pd.read_csv(csv_file) # filter by host type if individual_type: if 'individual_type' in df.columns: df = df[df['individual_type'] == individual_type] # Check sequence count AFTER filtering if len(df) >= min_seq_number: data_frames.append(df) except: pass print('Total kept files: ', len(data_frames)) try: combined_df = pd.concat(data_frames, ignore_index=True) return combined_df except: print('No sequencing data available! Check filter settings!') print('') return None
[docs] def detect_sod_outliers(sod_df, threshold=1.5, hard_floor=1e-9): """ Identifies outliers using a Hard Floor for failures AND IQR for statistical deviants. """ sod_df['is_outlier'] = 0 # ----------------------------------------------------------- # HARD FLOOR # Any rate below 1e-9 is physically impossible/noise -> Outlier # ----------------------------------------------------------- sod_df.loc[sod_df['observed_substitution_rate'] < hard_floor, 'is_outlier'] = 1 # IQR CHECK valid_df = sod_df[sod_df['is_outlier'] == 0] if len(valid_df) < 3: return sod_df # Log-transform only the valid points log_rates = np.log10(valid_df['observed_substitution_rate'] + 1e-15) q1 = log_rates.quantile(0.25) q3 = log_rates.quantile(0.75) iqr = q3 - q1 lower_bound = q1 - (threshold * iqr) upper_bound = q3 + (threshold * iqr) # Find indices of statistical outliers within the valid set stat_outliers = valid_df[ (log_rates < lower_bound) | (log_rates > upper_bound) ].index # Mark them in the main DF sod_df.loc[stat_outliers, 'is_outlier'] = 1 return sod_df
[docs] def get_combined_OSR_vs_parameter_csv_file_path(experiment_name, parameter, min_seq_number, min_sim_lenght, individual_type=None): experiment_output_dir = dm.get_experiment_output_dir(experiment_name) type_suffix = f"_{individual_type}" if individual_type else "" csv_file_path = os.path.join(experiment_output_dir, f'{experiment_name}_combined_OSR_vs_{parameter}_sim_lenght_{min_sim_lenght}_seq_n_{min_seq_number}{type_suffix}.csv') return csv_file_path
[docs] def get_OSR_vs_parameter_csv_file_path(experiment_name, parameter, min_seq_number, min_sim_lenght, individual_type=None): experiment_output_dir = dm.get_experiment_output_dir(experiment_name) type_suffix = f"_{individual_type}" if individual_type else "" csv_file_path = os.path.join(experiment_output_dir, f'{experiment_name}_OSR_vs_{parameter}_sim_lenght_{min_sim_lenght}_seq_n_{min_seq_number}{type_suffix}.csv') return csv_file_path
[docs] def write_OSR_vs_parameter_csv(experiment_name, parameter, min_seq_number=0, min_sim_lenght=0, individual_type=None): """ Calculates OSR for every individual seed and detects outliers per parameter set. """ csv_file_path = get_OSR_vs_parameter_csv_file_path(experiment_name, parameter, min_seq_number, min_sim_lenght, individual_type) if os.path.exists(csv_file_path): return simulation_output_dirs = dm.get_simulation_output_dirs(experiment_name) all_sod_dfs = [] for sod in simulation_output_dirs: parameter_value = sm.get_parameter_value_from_simulation_output_dir(sod, parameter) seeded_dirs = dm.get_seeded_simulation_output_dirs(sod) sod_results = [] for ssod in seeded_dirs: try: final_time = read_final_time(ssod) sequencing_data = read_sequencing_data_regression(ssod) if individual_type and 'Individual_type' in sequencing_data.columns: sequencing_data = sequencing_data[sequencing_data['Individual_type'] == individual_type] seq_number = len(sequencing_data) if final_time >= min_sim_lenght and seq_number >= min_seq_number: observed_rate = er.tempest_regression(sequencing_data).coef_[0] sod_results.append({ parameter: parameter_value, 'observed_substitution_rate': observed_rate, 'ssod_path': ssod, 'simulation_id': os.path.basename(sod), 'is_outlier': 0 }) except Exception: continue # Detect outliers for this SOD if sod_results: sod_df = pd.DataFrame(sod_results) sod_df = detect_sod_outliers(sod_df) all_sod_dfs.append(sod_df) if all_sod_dfs: final_df = pd.concat(all_sod_dfs, ignore_index=True) final_df.sort_values(by=str(parameter)).to_csv(csv_file_path, index=False) else: print("No results to write for individual OSR.")
[docs] def read_OSR_vs_parameter_csv(experiment_name, parameter, min_seq_number, min_sim_lenght, individual_type=None, include_outliers=False): csv_file_path = get_OSR_vs_parameter_csv_file_path(experiment_name, parameter, min_seq_number, min_sim_lenght, individual_type) if not os.path.exists(csv_file_path): print(f"[Warning] File not found: {csv_file_path}") return pd.DataFrame() df = pd.read_csv(csv_file_path) # Filter out outliers if include_outliers is False if not include_outliers and 'is_outlier' in df.columns: initial_count = len(df) df = df[df['is_outlier'] == 0] # removed_count = initial_count - len(df) # print(f"Excluded {removed_count} outliers.") return df
[docs] def write_combined_OSR_vs_parameter_csv(experiment_name, parameter, min_seq_number=0, min_sim_lenght=0, individual_type=None, include_outliers=False): ''' Create df of observed substitution rate (tempest regression on joint data). Uses individual OSR results to filter out outliers before combining, unless include_outliers=True. ''' csv_file_path = get_combined_OSR_vs_parameter_csv_file_path(experiment_name, parameter, min_seq_number, min_sim_lenght, individual_type) if os.path.exists(csv_file_path): return # Load individual OSR results individual_osr_df = read_OSR_vs_parameter_csv( experiment_name, parameter, min_seq_number, min_sim_lenght, individual_type, include_outliers=include_outliers ) simulation_output_dirs = dm.get_simulation_output_dirs(experiment_name) results = [] for sod in simulation_output_dirs: parameter_value = sm.get_parameter_value_from_simulation_output_dir(sod, parameter) sod_id = os.path.basename(sod) combined_df = None if not individual_osr_df.empty: # Get list of valid paths from the individual DF clean_paths = individual_osr_df[ individual_osr_df['simulation_id'] == sod_id ]['ssod_path'].tolist() if clean_paths: # Pass LIST of paths to the function combined_df = create_combined_sequencing_df(clean_paths, min_seq_number, min_sim_lenght, individual_type) # Fallback Strategy if combined_df is None or combined_df.empty: # Pass directory path to the function combined_df = create_combined_sequencing_df(sod, min_seq_number, min_sim_lenght, individual_type) # Perform Regression if combined_df is not None and not combined_df.empty: try: observed_substitution_rate = er.tempest_regression(combined_df).coef_[0] results.append({str(parameter): parameter_value, 'observed_substitution_rate': observed_substitution_rate}) except Exception as e: print(f"Regression failed for {sod}: {e}") results_df = pd.DataFrame(results) if not results_df.empty: df = results_df.sort_values(by=str(parameter)) df.to_csv(csv_file_path, index=False) else: print("No results to write for combined OSR.")
[docs] def read_combined_OSR_vs_parameter_csv(experiment_name, parameter, min_seq_number, min_sim_lenght, individual_type=None): # Reads the file generated by write_combined_OSR... csv_file_path = get_combined_OSR_vs_parameter_csv_file_path(experiment_name, parameter, min_seq_number, min_sim_lenght, individual_type) return pd.read_csv(csv_file_path)
[docs] def get_mean_std_OSR(experiment_name, parameter, min_seq_number, min_sim_lenght, individual_type=None, include_outliers=False): # Ensure the CSV exists write_OSR_vs_parameter_csv(experiment_name, parameter, min_seq_number, min_sim_lenght, individual_type) # Read the data df = read_OSR_vs_parameter_csv(experiment_name, parameter, min_seq_number, min_sim_lenght, individual_type, include_outliers=include_outliers) if df.empty: return pd.DataFrame() # Calculate statistics df_mean_std = df.groupby(parameter)['observed_substitution_rate'].agg(['mean','std']).reset_index() return df_mean_std
[docs] def get_fit_results_filepath(experiment_name, model_type, individual_type=None): experiment_fit_result_dir = dm.get_experiment_fit_result_dir(experiment_name) type_suffix = f"_{individual_type}" if individual_type else "" filename = f'{experiment_name}_{model_type}_fit_results{type_suffix}.csv' return os.path.join(experiment_fit_result_dir, filename)
[docs] def write_fit_results_csv(experiment_name, model_type, fit_result, individual_type=None): # Pass individual_type to get unique filepath fit_results_filepath = get_fit_results_filepath(experiment_name, model_type, individual_type) # Save best-fit parameters to CSV param_dict = {name: param.value for name, param in fit_result.params.items()} df = pd.DataFrame.from_dict(param_dict, orient='index', columns=['Best Fit']) df.to_csv(fit_results_filepath, index=True, header=True) print(f'Saved fit results to: {fit_results_filepath}')
[docs] def read_fit_results_csv(experiment_name, model_type, individual_type=None): fit_results = get_fit_results_filepath(experiment_name, model_type, individual_type) if not os.path.exists(fit_results): print(f"[Warning] Fit results file not found: {fit_results}") return None df = pd.read_csv(fit_results, index_col=0) best_fit_df = pd.to_numeric(df['Best Fit'], errors='coerce') return best_fit_df
# ----------------------------------------------------------------------------- # Tree exporter # -----------------------------------------------------------------------------
[docs] def get_tree_filename(experiment_name, seeded_simulation_output_dir, tree_type, tree_subtype, file_type): seed_folder = os.path.basename(seeded_simulation_output_dir) ext = {'json' :'.json', 'newick': '_newick.txt', 'nexus' : '_nexus.txt', 'img' : '.png', } if file_type not in ext.keys(): raise ValueError('Invalid file_type selection for tree export.') foldername = dm.get_simulation_output_foldername_from_SSOD(seeded_simulation_output_dir) filename = f'{experiment_name}_{foldername}_{seed_folder}_{tree_type}_tree_{tree_subtype}{ext[file_type]}' return filename
[docs] def get_tree_file_filepath(experiment_name, seeded_simulation_output_dir, tree_type, tree_subtype, file_type): experiment_tree_simulation_files_dir = dm.get_experiment_tree_simulation_files_dir( experiment_name, seeded_simulation_output_dir) tree_filename = get_tree_filename(experiment_name, seeded_simulation_output_dir, tree_type, tree_subtype, file_type) return os.path.join(experiment_tree_simulation_files_dir,tree_filename)
[docs] def get_tree_plot_filepath(experiment_name, seeded_simulation_output_dir, tree_type, tree_subtype, file_type='img'): experiment_tree_simulation_plots_dir = dm.get_experiment_tree_simulation_plots_dir( experiment_name, seeded_simulation_output_dir) tree_filename = get_tree_filename(experiment_name, seeded_simulation_output_dir, tree_type, tree_subtype, file_type) return os.path.join(experiment_tree_simulation_plots_dir,tree_filename)
[docs] def export_tree(tree, experiment_name, seeded_simulation_output_dir, tree_type, tree_subtype, file_type): """Export an anytree tree to file""" import simplicity.tree.newick as nwk import simplicity.tree.nexus as nx import simplicity.tree.json_tree as jt root = tree[0] if file_type =='json': json_tree_filepath = get_tree_file_filepath(experiment_name, seeded_simulation_output_dir, tree_type, tree_subtype, file_type) print(f'saving json file: {json_tree_filepath}') jt.write_json_tree_file(root, json_tree_filepath) if file_type =='nexus': nexus_filepath = get_tree_file_filepath(experiment_name, seeded_simulation_output_dir, tree_type, tree_subtype, file_type) print(f'saving nexus file: {nexus_filepath}') nx.write_nexus_file(tree, nexus_filepath) if file_type == 'newick': newick_filepath = get_tree_file_filepath(experiment_name, seeded_simulation_output_dir, tree_type, tree_subtype, file_type) print(f'saving newick file: {newick_filepath}') nwk.write_newick_file(root, newick_filepath) print('') print('Tree file exported successfully.')
# ----------------------------------------------------------------------------- # R effective # -----------------------------------------------------------------------------
[docs] def filter_lineage_frequency_df(lineage_frequency_df, threshold): """ Filters the lineage frequency DataFrame by threshold. Args: lineage_frequency_df: raw DataFrame from read_lineage_frequency() threshold: float, minimum frequency a lineage must reach at any time Returns: filtered_df: pivoted and filtered DataFrame (Time_sampling x Lineage) """ pivot_df = lineage_frequency_df.pivot( index='Time_sampling', columns='Lineage_name', values='Frequency_at_t' ) selected_lineages = pivot_df.columns[pivot_df.max() >= threshold].tolist() filtered_df = pivot_df[selected_lineages] return filtered_df, selected_lineages
[docs] def get_r_effective_population_csv_filepath(experiment_name, seeded_simulation_output_dir, window_size, threshold): experiment_output_dir = dm.get_experiment_output_dir(experiment_name) foldername = dm.get_simulation_output_foldername_from_SSOD(seeded_simulation_output_dir) seed = os.path.basename(seeded_simulation_output_dir) filename = f"{experiment_name}_{foldername}_{seed}_r_effective_population_win_{window_size}_ts_{threshold}.csv" return os.path.join(experiment_output_dir, filename)
[docs] def get_r_effective_lineages_csv_filepath(experiment_name, seeded_simulation_output_dir, window_size, threshold): experiment_output_dir = dm.get_experiment_output_dir(experiment_name) foldername = dm.get_simulation_output_foldername_from_SSOD(seeded_simulation_output_dir) seed = os.path.basename(seeded_simulation_output_dir) filename = f"{experiment_name}_{foldername}_{seed}_r_effective_lineages_win_{window_size}_ts_{threshold}.csv" return os.path.join(experiment_output_dir, filename)
[docs] def get_r_effective_population_traj(ssod, time_window): """ Compute and return the population-level R_effective trajectory as a pandas Series. """ df = read_individuals_data(ssod) # df = df[df['t_infection'] > 0] final_time = read_final_time(ssod) timeline = np.arange(0, final_time, 1) r_effective = [] for t in tqdm(timeline, desc="Computing R_effective (population)"): window_start = t - time_window births = df[ (df['t_infectious'] > window_start) & (df['t_infectious'] <= t) ].shape[0] deaths = df[ (df['t_not_infectious'] > window_start) & (df['t_not_infectious'] <= t) ].shape[0] r = births / deaths if deaths > 0 else float('nan') r_effective.append((t, r)) return pd.Series([r for _, r in r_effective], index=[t for t, _ in r_effective], name='R_effective')
[docs] def get_r_effective_lineages_traj(ssod, time_window, threshold): """ Compute and return the lineage-level R_effective trajectories as a dictionary of Series. """ def flatten_lineage_events(individuals_df): """ Convert nested IH_lineages_trajectory dicts into a flat DataFrame. Returns: DataFrame with columns: individual_id, lineage, ih_birth, ih_death """ rows = [] for ind_id, row in individuals_df.iterrows(): traj = row.get('IH_lineages_trajectory', {}) for lineage, event in traj.items(): rows.append({ 'individual_id': ind_id, 'lineage': lineage, 'ih_birth': event.get('ih_birth'), 'ih_death': event.get('ih_death') }) return pd.DataFrame(rows) individuals_df = read_individuals_data(ssod) # individuals_df = individuals_df[individuals_df['t_infection'] > 0] final_time = read_final_time(ssod) # Read and filter lineage frequency lineage_freq_df = read_lineage_frequency(ssod) _, filtered_lineages = filter_lineage_frequency_df(lineage_freq_df, threshold) flat_df = flatten_lineage_events(individuals_df) timeline = np.arange(0, final_time, 1) lineage_r_dict = {} for lineage in tqdm(filtered_lineages, desc="Computing R_effective (lineages)"): r_series = [] sub = flat_df[flat_df['lineage'] == lineage] for t in timeline: window_start = t - time_window births = sub[ (sub['ih_birth'] > window_start) & (sub['ih_birth'] <= t) ].shape[0] deaths = sub[ (sub['ih_death'] > window_start) & (sub['ih_death'] <= t) ].shape[0] r = births / deaths if deaths > 0 else float('nan') r_series.append((t, r)) lineage_r_dict[lineage] = pd.Series( [r for _, r in r_series], index=[t for t, _ in r_series], name=f'R_eff_{lineage}' ) return lineage_r_dict
[docs] def write_r_effective_trajs_csv(experiment_name, seeded_simulation_output_dir, time_window, threshold): """ Compute and write R_effective (population + lineage) trajectories to CSV. """ # Compute R_effective r_effective_population_traj = get_r_effective_population_traj(seeded_simulation_output_dir, time_window) r_effective_lineages_traj = get_r_effective_lineages_traj(seeded_simulation_output_dir, time_window, threshold) # Get filepaths pop_fp = get_r_effective_population_csv_filepath(experiment_name, seeded_simulation_output_dir, time_window, threshold) line_fp = get_r_effective_lineages_csv_filepath(experiment_name, seeded_simulation_output_dir, time_window, threshold) # Save r_effective_population_traj.to_csv(pop_fp, header=True) pd.DataFrame(r_effective_lineages_traj).to_csv(line_fp) print(f"R_effective CSVs saved to:\n- {pop_fp}\n- {line_fp}")
[docs] def read_r_effective_trajs_csv(experiment_name, seeded_simulation_output_dir, time_window, threshold): """ Read R_effective (population + lineage) trajectories from CSV. If files are missing, issues a warning and generates them. Returns: - r_effective_population_traj: pd.Series - r_effective_lineages_traj: dict of {lineage_name: pd.Series} """ pop_fp = get_r_effective_population_csv_filepath(experiment_name, seeded_simulation_output_dir, time_window, threshold) line_fp = get_r_effective_lineages_csv_filepath(experiment_name, seeded_simulation_output_dir, time_window, threshold) # Check if files exist if not os.path.exists(pop_fp) or not os.path.exists(line_fp): warnings.warn("R_effective CSVs missing. Recomputing and writing them.") write_r_effective_trajs_csv(experiment_name, seeded_simulation_output_dir, time_window, threshold) # Read population R_eff r_eff_pop = pd.read_csv(pop_fp, index_col=0) if isinstance(r_eff_pop, pd.DataFrame): r_eff_pop = r_eff_pop.iloc[:, 0] # Read lineage R_eff df_line = pd.read_csv(line_fp, index_col=0) r_eff_lin_dict = {lineage: df_line[lineage] for lineage in df_line.columns} return r_eff_pop, r_eff_lin_dict
# -----------------------------------------------------------------------------
[docs] def get_procomputed_matrix_table_filepath(tau_1,tau_2,tau_3,tau_4): file_name = f"precomputed_A_exponentials_tau1={tau_1}_tau2={tau_2}_tau3={tau_3}_tau4={tau_4}.pkl" data_dir = dm.get_data_dir() return os.path.join(data_dir,file_name)
# -----------------------------------------------------------------------------
[docs] def get_clustering_table_filepath(experiment_name: str, ssod: str) -> str: """ gets the filepath: 07_Clustering/{SOD}_seed_{SEED}_clustering.csv """ cluster_dir = dm.get_experiment_cluster_dir(experiment_name) sod = dm.get_simulation_output_foldername_from_SSOD(ssod) seed = dm.get_seed_from_SSOD(ssod) fname = f"{sod}_seed_{seed}_clustering.csv" return os.path.join(cluster_dir, fname)
[docs] def write_clustering_table(experiment_name: str, ssod: str, df: pd.DataFrame): """ Writes the clustering CSV. """ path = get_clustering_table_filepath(experiment_name, ssod) df.to_csv(path, index=False)
[docs] def read_clustering_table(experiment_name: str, ssod: str) -> pd.DataFrame: """ Reads the single clustering CSV and literal-evals nested columns. """ path = get_clustering_table_filepath(experiment_name, ssod) if not os.path.exists(path): raise FileNotFoundError(f"Clustering table not found: {path}") # peek header to decide which converters to apply import csv with open(path, "r", newline="") as f: reader = csv.reader(f) header = next(reader) converters = {} for col in ("lineages", "defining_mutations"): if col in header: converters[col] = ast.literal_eval return pd.read_csv(path, converters=converters)
[docs] def get_nextstrain_dataset_paths(experiment_name: str, ssod: str): """ Returns (dataset_json_path, metadata_tsv_path, base_name) for a given SSOD. """ # Base dir ns_root = Path(dm.get_nextstrain_dir(experiment_name)) # Get the original name parts sod = dm.get_simulation_output_foldername_from_SSOD(ssod) seed = dm.get_seed_from_SSOD(ssod) # Clean the long simulation name from underscores clean_sod_name = sod.replace("_", "") # Create the new base filename base = f"seed{seed}_{clean_sod_name}" dataset_json_path = ns_root / f"{base}-nextstrain.json" metadata_tsv_path = ns_root / f"{base}-metadata.tsv" return dataset_json_path, metadata_tsv_path, base