# This file is part of SIMPLICITY
# Copyright (C) 2025 Pietro Gerletti
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
In this file we perform the TempEst linear regression to estimate the observed
evolutionary rate u from the simulated data of a SIMPLICITY run. There are also
the functions to plot E (model nucleotide substitution rate) vs u (observed substitution rate)
or vs any other simulation parameter.
'''
import sklearn.linear_model
import numpy as np
import lmfit
import simplicity.output_manager as om
# from tqdm import tqdm
[docs]
def tempest_regression(sequencing_data_df):
'''
perform TempEst regression on dataframe of sequencing data
Parameters
----------
df : pandas df
output of create_joint_sequencing_df.
Returns
-------
u : TYPE
observed substitution rate (OSR)
model : func
fitted model (sklearn linear regression).
'''
x = sequencing_data_df['Sequencing_time'].values.reshape(-1, 1)
y = sequencing_data_df['Distance_from_root'].values
tempest_regression = sklearn.linear_model.LinearRegression(fit_intercept=False)
fitted_tempest_regression = tempest_regression.fit(x, y)
return fitted_tempest_regression
[docs]
def factory_model_func(model_type: str):
# Define the models
def linear_model(x, A, B):
return A * x + B
def log_model(x, A, B):
return A * np.log(B * x)
def exp_model(x, A, B):
return A * x**B
def tan_model(x, A, B, C):
return A * np.tan(B * x - C)
# Define knot positions for the spline
knots = np.logspace(-3, 0, 6)
spline_model = lmfit.models.SplineModel(prefix='spline_', xknots=knots)
# Model selection dictionary
models = {
"lin": linear_model,
"log": log_model,
"exp": exp_model,
"tan": tan_model,
"spline": spline_model,
}
if model_type not in models:
raise ValueError(f"Unknown model type: {model_type}")
return models[model_type]
[docs]
def factory_model_lmfit(model_type: str):
# select the model, assign parameters and return it
if model_type == 'lin':
model = lmfit.Model(factory_model_func(model_type))
# Set initial parameter guesses
params = model.make_params(A=0.01, B=0)
return model, params
elif model_type == 'log':
model = lmfit.Model(factory_model_func(model_type))
# Set initial parameter guesses
params = model.make_params(A=0.9, B=0.7)
# Set boundaries for parameters
params['B'].set(min=0.000001)
return model, params
elif model_type == 'exp':
model = lmfit.Model(factory_model_func(model_type))
# Set initial parameter guesses
params = model.make_params(A=0.9, B=0.7)
return model, params
elif model_type == 'tan':
model = lmfit.Model(factory_model_func(model_type))
# Set initial parameter guesses
params = model.make_params(A=1, B=1, C=0)
return model, params
if model_type == 'spline':
model = factory_model_func(model_type)
# Create parameters
params = model.make_params()
return model, params
else: raise ValueError('Invalid model selection')
[docs]
def fit_observed_substitution_rate_regressor(experiment_name,
df, model_type, weights=None,
parameter_name='nucleotide_substitution_rate'):
"""
Fits a model to the observed substitution rate against a varying parameter.
Parameters:
experiment_name (str): Name of the experiment
df (pd.DataFrame): Data containing the parameter column and 'observed_substitution_rate'
model_type (str): Type of model to fit ('lin', 'log', etc.)
weights: Optional weights for fitting
parameter_name (str): The column name of the independent variable (x-axis).
Defaults to 'nucleotide_substitution_rate'.
"""
if parameter_name not in df.columns:
# Fallback for flexibility: try using the first column if specific name not found
# This is useful if the DF structure varies slightly between steps
print(f"[Fit] Warning: '{parameter_name}' not found in DF. Using column 0: {df.columns[0]}")
x_data = df.iloc[:, 0]
else:
x_data = df[parameter_name]
y_data = df['observed_substitution_rate']
# Create the Model
model, params = factory_model_lmfit(model_type)
# Fit the model to the data
try:
if weights is None:
fit_result = model.fit(y_data, params, x=x_data)
else:
fit_result = model.fit(y_data, params, x=x_data, weights=weights)
# Print the fit results
print(fit_result.fit_report())
# save fit results
om.write_fit_results_csv(experiment_name, model_type, fit_result)
return fit_result
except Exception as e:
print(f"Fit failed for {model_type}: {e}")
# Return a dummy object or None so the pipeline can handle it gracefully
# If using lmfit, we might want to return None and check in the caller
raise e
[docs]
def fit_weight(df):
x_data = df['nucleotide_substitution_rate']
weights = 1/x_data
return weights
[docs]
def fit_weight_time(df):
x_data = df['nucleotide_substitution_rate']
weights = 1/x_data * (df['simulation_final_time']/df['settings_final_time'])
return weights
[docs]
def evaluate_model(model_type, params, x):
model = factory_model_func(model_type)
if isinstance(model, lmfit.models.SplineModel):
# Create initial parameters for lmfit
param_obj = model.make_params()
for key in param_obj.keys():
if key in params:
param_obj[key].set(value=params[key])
return model.eval(params=param_obj, x=x)
return model(x, **params)
# ==============================================================================
# INVERSE FUNCTIONS & BRIDGE
# ==============================================================================
[docs]
def inverse_linear_regressor(OSR, params):
"""
Computes inverse of: y = A * x + B
Returns: x = (y - B) / A
"""
A = params.get('A', 0)
B = params.get('B', 0)
if A == 0:
raise ValueError("Parameter A is zero; cannot invert linear function.")
return (OSR - B) / A
[docs]
def inverse_log_regressor(OSR, params):
"""
Computes inverse of: y = A * log(B * x)
Returns: x = (exp(y / A)) / B
"""
A = params.get('A', 1)
B = params.get('B', 1)
if A == 0:
raise ValueError("Parameter A is zero; cannot invert log function.")
if B == 0:
raise ValueError("Parameter B is zero; cannot invert log function.")
# x = (exp(y/A)) / B
val = (np.exp(OSR / A)) / B
return val
[docs]
def inverse_exp_regressor(OSR, params):
"""
Computes inverse of: y = A * x**B
Returns: x = ((y) / A) ** (1/B)
"""
A = params.get('A', 1)
B = params.get('B', 1)
if A == 0 or B == 0:
raise ValueError("Parameters A and B must be non-zero.")
# Ensure domain validity if possible
# if np.any(OSR - C <= 0): ... (checked by caller or numpy raises warning)
NSR = ((OSR) / A) ** (1 / B)
return NSR
[docs]
def inverse_tan_regressor(OSR, params):
"""
Computes inverse of: y = A * tan(B * x - C)
Returns: x = (arctan((y) / A) + C) / B
"""
A = params.get('A', 1)
B = params.get('B', 1)
C = params.get('C', 0)
if A == 0 or B == 0:
raise ValueError("Parameters A or B are zero.")
# (y) / A = tan(Bx - C)
# arctan(...) = Bx - C
# x = (arctan(...) + C) / B
val = (np.arctan((OSR) / A) + C) / B
return val
[docs]
def compute_calibrated_parameter(model_type, fit_result, target_osr):
"""
The unified bridge function.
Dispatches the calculation to the correct inverse function.
Parameters:
model_type (str): 'lin', 'log', 'exp', 'tan'.
fit_result (lmfit.model.ModelResult): The result object from fitting.
target_osr (float): The target Observed Substitution Rate.
Returns:
float: The calibrated parameter value (NSR or Factor).
"""
dispatch_map = {
'lin': inverse_linear_regressor,
'log': inverse_log_regressor,
'exp': inverse_exp_regressor,
'tan': inverse_tan_regressor,
# 'spline' is not supported analytically
}
if model_type not in dispatch_map:
raise NotImplementedError(
f"Inverse function for model '{model_type}' is not implemented."
)
# Extract parameters as a dictionary of values
# fit_result.params is an lmfit Parameters object; values accessed via key lookups or .valuesdict()
params_dict = fit_result.params.valuesdict()
return dispatch_map[model_type](target_osr, params_dict)