diff --git a/README.md b/README.md index 88c0f26..6fcf7f0 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,5 @@ # Breast cancer [![Actions Status](https://github.com/pasmopy/breast_cancer/workflows/Tests/badge.svg)](https://github.com/pasmopy/breast_cancer/actions) -This repository contains analysis code for the following paper: - - - ## Manual installation of package requirements General: @@ -18,7 +14,7 @@ Python: Julia: -- [BioMASS.jl==0.5.0](https://github.com/biomass-dev/BioMASS.jl) +- [BioMASS.jl==0.5.0](https://github.com/biomass-dev/BioMASS.jl) R: @@ -56,7 +52,7 @@ R: ```bash $ cd transcriptomic_data $ R - ``` + ``` - Read `integration.R` @@ -64,7 +60,7 @@ R: source("integration.R") ``` -- Run `outputClinical()` or `outputSubtype()` +- Run `outputClinical()` or `outputSubtype()` ```R outputClinical("BRCA") @@ -73,13 +69,12 @@ R: Output: `{TCGA Study Abbreviation}_clinic.csv` or `{TCGA Study Abbreviation}_subtype.csv` - ### Select samples in reference to clinical or subtype data -- You can select the patient's state based on the clinical or subtype data obtained above. +- You can select the patient's state based on the clinical or subtype data obtained above. ```R - patientSelection(type = subtype, + patientSelection(type = subtype, ID = "patient", pathologic_stage %in% c("Stage_I", "Stage_II"), age_at_initial_pathologic_diagnosis < 60) @@ -87,36 +82,36 @@ R: ### Download TCGA gene expression data (HTSeq-Counts) - - Download the gene expression data of the specified sample types ([Sample Type Codes](https://gdc.cancer.gov/resources-tcga-users/tcga-code-tables/sample-type-codes)) in the cancer type specified by `outputClinical()` or `outputSubtype()`. By running this code, you can get data of only the patients selected by `sampleSelection()`. +- Download the gene expression data of the specified sample types ([Sample Type Codes](https://gdc.cancer.gov/resources-tcga-users/tcga-code-tables/sample-type-codes)) in the cancer type specified by `outputClinical()` or `outputSubtype()`. By running this code, you can get data of only the patients selected by `sampleSelection()`. - ```R - downloadTCGA(cancertype = "BRCA", - sampletype = c("01", "06"), - outputresult = FALSE) - ``` - Output: Number of selected samples + ```R + downloadTCGA(cancertype = "BRCA", + sampletype = c("01", "06"), + outputresult = FALSE) + ``` + Output: Number of selected samples ### Download CCLE transcriptomic data - - Download CCLE transcriptomic data. You can select cell lines derived from [one specific cancer type](https://github.com/pasmopy/breast_cancer/blob/master/transcriptomic_data/CCLE_cancertype.txt). ```R downloadCCLE(cancertype = "BREAST", outputresult = FALSE) - ``` + ``` + Output: Number of selected samples - ### Merge TCGA and CCLE data - 1. Merge TCGA data download with `downloadTCGA()` and CCLE data download with `downloadCCLE()`. - 1. Run ComBat-seq program to remove batch effects between TCGA and CCLE datasets. - 1. Output total read counts of all samples in order to decide the cutoff value of total read counts for `normalization()`. + +1. Merge TCGA data download with `downloadTCGA()` and CCLE data download with `downloadCCLE()`. +1. Run ComBat-seq program to remove batch effects between TCGA and CCLE datasets. +1. Output total read counts of all samples in order to decide the cutoff value of total read counts for `normalization()`. ```R mergeTCGAandCCLE(outputesult = FALSE) - ``` + ``` Output : `totalreadcounts.csv` @@ -126,10 +121,11 @@ R: - You can specify min and max value for truncation of total read counts. - If you do not want to specify values for truncation, please set `min = F` or `max = F`. - ```R - normalization(min = 40000000, max = 140000000) - ``` - Output : `TPM_RLE_postComBat_{TCGA}_{CCLE}.csv` + ```R + normalization(min = 40000000, max = 140000000) + ``` + + Output : `TPM_RLE_postComBat_{TCGA}_{CCLE}.csv` ## Construction of a comprehensive model of the ErbB signaling network @@ -146,53 +142,53 @@ R: 1. Add weighting factors for each gene (prefix: `"w_"`) to [`name2idx/parameters.py`](models/breast/TCGA_3C_AALK_01A/name2idx/parameters.py) - ```python - from pasmopy.preprocessing import WeightingFactors - from biomass import Model - - from models import erbb_network - - - model = Model(erbb_network.__package__).create() - - gene_expression = { - "ErbB1": ["EGFR"], - "ErbB2": ["ERBB2"], - "ErbB3": ["ERBB3"], - "ErbB4": ["ERBB4"], - "Grb2": ["GRB2"], - "Shc": ["SHC1", "SHC2", "SHC3", "SHC4"], - "RasGAP": ["RASA1", "RASA2", "RASA3"], - "PI3K": ["PIK3CA", "PIK3CB", "PIK3CD", "PIK3CG"], - "PTEN": ["PTEN"], - "SOS": ["SOS1", "SOS2"], - "Gab1": ["GAB1"], - "RasGDP": ["HRAS", "KRAS", "NRAS"], - "Raf": ["ARAF", "BRAF", "RAF1"], - "MEK": ["MAP2K1", "MAP2K2"], - "ERK": ["MAPK1", "MAPK3"], - "Akt": ["AKT1", "AKT2"], - "PTP1B": ["PTPN1"], - "GSK3b": ["GSK3B"], - "DUSP": ["DUSP5", "DUSP6", "DUSP7"], - "cMyc": ["MYC"], - } - - weighting_factors = WeightingFactors(model, gene_expression) - weighting_factors.add() - weighting_factors.set_search_bounds() - ``` + ```python + from pasmopy.preprocessing import WeightingFactors + from biomass import Model + + from models import erbb_network + + + model = Model(erbb_network.__package__).create() + + gene_expression = { + "ErbB1": ["EGFR"], + "ErbB2": ["ERBB2"], + "ErbB3": ["ERBB3"], + "ErbB4": ["ERBB4"], + "Grb2": ["GRB2"], + "Shc": ["SHC1", "SHC2", "SHC3", "SHC4"], + "RasGAP": ["RASA1", "RASA2", "RASA3"], + "PI3K": ["PIK3CA", "PIK3CB", "PIK3CD", "PIK3CG"], + "PTEN": ["PTEN"], + "SOS": ["SOS1", "SOS2"], + "Gab1": ["GAB1"], + "RasGDP": ["HRAS", "KRAS", "NRAS"], + "Raf": ["ARAF", "BRAF", "RAF1"], + "MEK": ["MAP2K1", "MAP2K2"], + "ERK": ["MAPK1", "MAPK3"], + "Akt": ["AKT1", "AKT2"], + "PTP1B": ["PTPN1"], + "GSK3b": ["GSK3B"], + "DUSP": ["DUSP5", "DUSP6", "DUSP7"], + "cMyc": ["MYC"], + } + + weighting_factors = WeightingFactors(model, gene_expression) + weighting_factors.add() + weighting_factors.set_search_bounds() + ``` 1. Rename `erbb_network/` to CCLE_name or TCGA_ID, e.g., `MCF7_BREAST` or `TCGA_3C_AALK_01A` - ```python - import shutil + ```python + import shutil - shutil.move( - os.path.join("models", "erbb_network"), - os.path.join("models", "breast", "TCGA_3C_AALK_01A") - ) - ``` + shutil.move( + os.path.join("models", "erbb_network"), + os.path.join("models", "breast", "TCGA_3C_AALK_01A") + ) + ``` 1. Edit [`set_search_param.py`](models/breast/TCGA_3C_AALK_01A/set_search_param.py) @@ -206,7 +202,7 @@ R: from . import __path__ from .name2idx import C, V from .set_model import initial_values, param_values - + incorporating_gene_expression_levels = Individualization( parameters=C.NAMES, @@ -361,8 +357,8 @@ R: ```python simulations.subtyping( - None, - { + fname=None, + dynamical_features={ "Phosphorylated_Akt": {"EGF": ["max"], "HRG": ["max"]}, "Phosphorylated_ERK": {"EGF": ["max"], "HRG": ["max"]}, "Phosphorylated_c-Myc": {"EGF": ["max"], "HRG": ["max"]}, @@ -390,7 +386,7 @@ R: from pasmopy import PatientModelAnalyses import models.breast - + with open (os.path.join("models", "breast", "selected_tnbc.txt"), mode="r") as f: TNBC_ID = f.read().splitlines() @@ -426,7 +422,7 @@ R: erbb_expression_ratio = pd.read_csv( os.path.join("data", "ErbB_expression_ratio.csv"), index_col=0 - ) + ) compounds = ["Erlotinib", "Lapatinib", "AZD6244", "PD-0325901"] for compound in compounds: ccle.save_all(erbb_expression_ratio, compound) diff --git a/pasmopy/__init__.py b/pasmopy/__init__.py deleted file mode 100644 index f0bcd82..0000000 --- a/pasmopy/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -"""Patient-Specific Modeling in Python""" - -__author__ = ", ".join(["Hiroaki Imoto", "Sawa Yamashiro"]) -__maintainer__ = "Hiroaki Imoto" -__email__ = "himoto@protein.osaka-u.ac.jp" -__version__ = "0.0.1" - -from .construction import Text2Model -from .individualization import Individualization -from .patient_model import PatientModelAnalyses, PatientModelSimulations diff --git a/pasmopy/construction/__init__.py b/pasmopy/construction/__init__.py deleted file mode 100644 index 0c10b9c..0000000 --- a/pasmopy/construction/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .text2model import Text2Model diff --git a/pasmopy/construction/julia_template.py b/pasmopy/construction/julia_template.py deleted file mode 100644 index 6335bab..0000000 --- a/pasmopy/construction/julia_template.py +++ /dev/null @@ -1,531 +0,0 @@ -""" -Model template for conversion of biomass models into -BioMASS.jl (https://github.com/biomass-dev/BioMASS.jl) format. - -BioMASS.jl >= 0.4.2 -""" - -PARAMETERS: str = """\ -module C - -const NAMES = [] - -for (idx,name) in enumerate(NAMES) - eval(Meta.parse("const $name = $idx")) -end - -const NUM = length(NAMES) - -end # module -""" - - -SPECIES: str = """\ -module V - -const NAMES = [] - -for (idx, name) in enumerate(NAMES) - eval(Meta.parse("const $name = $idx")) -end - -const NUM = length(NAMES) - -end # module -""" - - -SET_MODEL: str = """\ -function diffeq!(du, u, p, t) - v = Dict{Int64,Float64}() - - for i in 1:V.NUM - @inbounds du[i] = 0.0 - end - -end - - -function param_values()::Vector{Float64} - p::Vector{Float64} = ones(C.NUM) - - return p -end - - -function initial_values()::Vector{Float64} - u0::Vector{Float64} = zeros(V.NUM) - - return u0 -end -""" - - -OBSERVABLE: str = """\ -const observables = [] - -function observables_index(observable_name::String)::Int - if !(observable_name in observables) - error("$observable_name is not defined in observables.") - end - return findfirst(isequal(observable_name),observables) -end -""" - - -SIMULATION: str = """\ -module Sim -include("./name2idx/parameters.jl") -include("./name2idx/species.jl") -include("./set_model.jl") -include("./observable.jl") - -using .C -using .V - -using Sundials -using SteadyStateDiffEq - -# Options for ODE solver -const ABSTOL = 1e-8 -const RELTOL = 1e-8 - -normalization = Dict{String,Dict{}}() - -const dt = 1.0 -const t = collect(0.0:dt:100.0) - -const conditions = [] - -simulations = Array{Float64,3}( - undef, length(observables), length(t), length(conditions) -) - - -function solveode( - f::Function, - u0::Vector{Float64}, - t::Vector{Float64}, - p::Vector{Float64})::Union{ODESolution{},Nothing} - local sol::ODESolution{}, is_successful::Bool - try - prob = ODEProblem(f, u0, (t[1], t[end]), p) - sol = solve( - prob,CVODE_BDF(), - abstol=ABSTOL, - reltol=RELTOL, - saveat=dt, - dtmin=eps(), - verbose=false - ) - is_successful = ifelse(sol.retcode === :Success, true, false) - catch - is_successful = false - finally - if !is_successful - GC.gc() - end - end - return is_successful ? sol : nothing -end - - -function get_steady_state( - f::Function, - u0::Vector{Float64}, - p::Vector{Float64})::Vector{Float64} - local sol::SteadyStateSolution{}, is_successful::Bool - try - prob = ODEProblem(f, u0, (0.0, Inf), p) - prob = SteadyStateProblem(prob) - sol = solve( - prob, - DynamicSS( - CVODE_BDF(); - abstol=ABSTOL, - reltol=RELTOL - ), - dt=dt, - dtmin=eps(), - verbose=false - ) - is_successful = ifelse(sol.retcode === :Success, true, false) - catch - is_successful = false - finally - if !is_successful - GC.gc() - end - end - return is_successful ? sol.u : [] -end - - -function simulate!(p::Vector{Float64}, u0::Vector{Float64})::Union{Bool,Nothing} - # unperturbed steady state - - # add ligand - for (i, condition) in enumerate(conditions) - - sol = solveode(diffeq!, u0, t, p) - if sol === nothing - return false - else - @inbounds @simd for j in eachindex(t) - # line_num + 4 - end - end - end -end -end # module -""" - - -EXPERIMENTAL_DATA: str = """\ -module Exp -include("./observable.jl") - -experiments = Array{Dict{String,Array{Float64,1}},1}(undef, length(observables)) -error_bars = Array{Dict{String,Array{Float64,1}},1}(undef, length(observables)) - - -function get_timepoint(obs_name::String)::Vector{Float64} - return [] -end -end # module -""" - - -SET_SEARCH_PARAM: str = """\ -# Specify model parameters and/or initial values to optimize -function get_search_index()::Tuple{Array{Int64,1},Array{Int64,1}} - # parameters - search_idx_params::Vector{Int} = [] - - # initial values - search_idx_initials::Vector{Int} = [] - - return search_idx_params, search_idx_initials -end - - -function get_search_region()::Matrix{Float64} - p::Vector{Float64} = param_values() - u0::Vector{Float64} = initial_values() - - search_idx::Tuple{Array{Int64,1},Array{Int64,1}} = get_search_index() - search_param::Vector{Float64} = initialize_search_param(search_idx, p, u0) - - search_rgn::Matrix{Float64} = zeros(2, length(p) + length(u0)) - - # Default: 0.1 ~ 10x - for (i, j) in enumerate(search_idx[1]) - search_rgn[1,j] = search_param[i] * 0.1 # lower bound - search_rgn[2,j] = search_param[i] * 10.0 # upper bound - end - - # Default: 0.5 ~ 2x - for (i, j) in enumerate(search_idx[2]) - search_rgn[1,j + length(p)] = search_param[i + length(search_idx[1])] * 0.5 # lower bound - search_rgn[2,j + length(p)] = search_param[i + length(search_idx[1])] * 2.0 # upper bound - end - - # search_rgn[:, C.param_name] = [lower_bound, upper_bound] - # search_rgn[:, V.var_name+length(p)] = [lower_bound, upper_bound] - - search_rgn = convert_scale!(search_rgn, search_idx) - - return search_rgn -end - - -function update_param(indiv::Vector{Float64})::Tuple{Array{Float64,1},Array{Float64,1}} - p::Vector{Float64} = param_values() - u0::Vector{Float64} = initial_values() - - search_idx::Tuple{Array{Int64,1},Array{Int64,1}} = get_search_index() - - for (i, j) in enumerate(search_idx[1]) - @inbounds p[j] = indiv[i] - end - for (i, j) in enumerate(search_idx[2]) - @inbounds u0[j] = indiv[i + length(search_idx[1])] - end - - # parameter constraints - - return p, u0 -end - - -function decode_gene2val(indiv_gene)::Vector{Float64} - search_rgn::Matrix{Float64} = get_search_region() - indiv::Vector{Float64} = zeros(length(indiv_gene)) - - for i in eachindex(indiv_gene) - indiv[i] = 10^( - indiv_gene[i] * ( - search_rgn[2,i] - search_rgn[1,i] - ) + search_rgn[1,i] - ) - end - - return round.(indiv, sigdigits=7) -end - - -function encode_val2gene(indiv::Vector{Float64}) - search_rgn::Matrix{Float64} = get_search_region() - indiv_gene::Vector{Float64} = zeros(length(indiv)) - - for i in eachindex(indiv) - indiv_gene[i] = ( - log10(indiv[i]) - search_rgn[1,i] - ) / ( - search_rgn[2,i] - search_rgn[1,i] - ) - end - - return indiv_gene -end - - -function encode_bestIndivVal2randGene( - gene_idx::Int64, - best_indiv::Vector{Float64}, - p0_bounds::Vector{Float64})::Float64 - search_rgn::Matrix{Float64} = get_search_region() - rand_gene::Float64 = ( - log10( - best_indiv[gene_idx] * 10^( - rand() * log10(p0_bounds[2] / p0_bounds[1]) + log10(p0_bounds[1]) - ) - ) - search_rgn[1,gene_idx] - ) / ( - search_rgn[2,gene_idx] - search_rgn[1,gene_idx] - ) - return rand_gene -end - - -function initialize_search_param( - search_idx::Tuple{Array{Int64,1},Array{Int64,1}}, - p::Vector{Float64}, - u0::Vector{Float64})::Vector{Float64} - duplicate::Vector{String} = [] - if length(search_idx[1]) != length(unique(search_idx[1])) - for idx in findall( - [count(x -> x == i, search_idx[1]) for i in unique(search_idx[1])] .!= 1 - ) - push!(duplicate, C.NAMES[search_idx[1][idx]]) - end - error( - "Duplicate parameters (C.): $duplicate" - ) - elseif length(search_idx[2]) != length(unique(search_idx[2])) - for idx in findall( - [count(x -> x == i, search_idx[2]) for i in unique(search_idx[2])] .!= 1 - ) - push!(duplicate, V.NAMES[search_idx[2][idx]]) - end - error( - "Duplicate initial conditions (V.): $duplicate" - ) - end - search_param = zeros( - length(search_idx[1]) + length(search_idx[2]) - ) - for (i, j) in enumerate(search_idx[1]) - @inbounds search_param[i] = p[j] - end - for (i, j) in enumerate(search_idx[2]) - @inbounds search_param[i + length(search_idx[1])] = u0[j] - end - - if any(x -> x == 0.0, search_param) - msg::String = "search_param must not contain zero." - for idx in search_idx[1] - if p[idx] == 0.0 - error( - @sprintf( - "`C.%s` in search_idx_params: ", C.NAMES[idx] - ) * msg - ) - end - end - for idx in search_idx[2] - if u0[idx] == 0.0 - error( - @sprintf( - "`V.%s` in search_idx_initials: ", V.NAMES[idx] - ) * msg - ) - end - end - end - - return search_param -end - - -function convert_scale!( - search_rgn::Matrix{Float64}, - search_idx::Tuple{Array{Int64,1},Array{Int64,1}})::Matrix{Float64} - for i = 1:size(search_rgn, 2) - if minimum(search_rgn[:,i]) < 0.0 - msg = "search_rgn[lower_bound,upper_bound] must be positive.\n" - if i <= C.NUM - error(@sprintf("`C.%s` ", C.NAMES[i]) * msg) - else - error(@sprintf("`V.%s` ", V.NAMES[i - C.NUM]) * msg) - end - elseif minimum(search_rgn[:,i]) == 0.0 && maximum(search_rgn[:,i]) != 0.0 - msg = "lower_bound must be larger than 0.\n" - if i <= C.NUM - error(@sprintf("`C.%s` ", C.NAMES[i]) * msg) - else - error(@sprintf("`V.%s` ", V.NAMES[i - C.NUM]) * msg) - end - elseif search_rgn[2,i] - search_rgn[1,i] < 0.0 - msg = "lower_bound must be smaller than upper_bound.\n" - if i <= C.NUM - error(@sprintf("`C.%s` ", C.NAMES[i]) * msg) - else - error(@sprintf("`V.%s` ", V.NAMES[i - C.NUM]) * msg) - end - end - end - - nonzero_idx::Vector{Int} = [] - for i = 1:size(search_rgn, 2) - if search_rgn[:,i] != [0.0,0.0] - push!(nonzero_idx, i) - end - end - difference::Vector{Int} = collect( - symdiff( - Set(nonzero_idx), - Set(append!(search_idx[1], C.NUM .+ search_idx[2])) - ) - ) - if length(difference) > 0 - for idx in difference - if idx <= C.NUM - println(@sprintf("`C.%s`", C.NAMES[Int(idx)])) - else - println(@sprintf("`V.%s`", V.NAMES[Int(idx) - C.NUM])) - end - end - error( - "Set these search_params in both search_idx and search_rgn." - ) - end - - search_rgn = search_rgn[:,nonzero_idx] - - return log10.(search_rgn) -end -""" - -FITNESS: str = """\ -# Residual Sum of Squares -function compute_objval_rss( - sim_data::Vector{Float64}, - exp_data::Vector{Float64})::Float64 - error::Float64 = 0.0 - for i in eachindex(exp_data) - @inbounds error += (sim_data[i] - exp_data[i])^2 - end - return error -end - - -# Cosine similarity -function compute_objval_cos( - sim_data::Vector{Float64}, - exp_data::Vector{Float64})::Float64 - error::Float64 = 1.0 - dot(sim_data,exp_data)/(norm(sim_data)*norm(exp_data)) - return error -end - - -function conditions_index(condition_name::String)::Int - if !(condition_name in Sim.conditions) - error("$condition_name is not defined in Sim.conditions") - end - return findfirst(isequal(condition_name), Sim.conditions) -end - - -function diff_sim_and_exp( - sim_matrix::Matrix{Float64}, - exp_dict::Dict{String,Array{Float64,1}}, - exp_timepoint::Vector{Float64}, - conditions::Vector{String}; - sim_norm_max::Float64)::Tuple{Vector{Float64}, Vector{Float64}} - sim_result::Vector{Float64} = [] - exp_result::Vector{Float64} = [] - - for (idx,condition) in enumerate(conditions) - if condition in keys(exp_dict) - append!(sim_result,sim_matrix[Int.(exp_timepoint.+1),idx]) - append!(exp_result,exp_dict[condition]) - end - end - - return (sim_result./sim_norm_max, exp_result) -end - - -# Define an objective function to be minimized. -function objective(indiv_gene)::Float64 - indiv::Vector{Float64} = decode_gene2val(indiv_gene) - - (p,u0) = update_param(indiv) - - if Sim.simulate!(p,u0) isa Nothing - error::Vector{Float64} = zeros(length(observables)) - for (i,obs_name) in enumerate(observables) - if isassigned(Exp.experiments,i) - if length(Sim.normalization) > 0 - norm_max::Float64 = ( - Sim.normalization[obs_name]["timepoint"] !== nothing ? maximum( - Sim.simulations[ - i, - Sim.normalization[obs_name]["timepoint"], - [ - conditions_index(c) - for c in Sim.normalization[obs_name]["condition"] - ] - ] - ) : maximum( - Sim.simulations[ - i, - :, - [ - conditions_index(c) - for c in Sim.normalization[obs_name]["condition"] - ] - ] - ) - ) - end - error[i] = compute_objval_rss( - diff_sim_and_exp( - Sim.simulations[i,:,:], - Exp.experiments[i], - Exp.get_timepoint(obs_name), - Sim.conditions, - sim_norm_max = ifelse( - length(Sim.normalization) == 0, 1.0, norm_max - ) - )... - ) - end - end - return sum(error) # < 1e12 - else - return 1e12 - end -end -""" diff --git a/pasmopy/construction/reaction_rules.py b/pasmopy/construction/reaction_rules.py deleted file mode 100644 index 0c56986..0000000 --- a/pasmopy/construction/reaction_rules.py +++ /dev/null @@ -1,1235 +0,0 @@ -import sys -from dataclasses import dataclass, field -from difflib import SequenceMatcher -from typing import Dict, List, NamedTuple, Optional - -import numpy as np - - -class UnregisteredRule(NamedTuple): - expected: Optional[str] - original: Optional[str] - - -PREPOSITIONS: List[str] = [ - " to", - " for", - " from", - " up", - " down", - " in", - " on", - " at", - " off", - " into", - " around", - " among", - " between", - " of", - " over", - " above", - " below", - " under", - " through", - " across", - " along", - " near", - " by", - " beside", -] - - -@dataclass -class ReactionRules(object): - """Create an executable biochemical model from text. - - **reaction** | **parameters** | **initial conditions** - - Attributes - ---------- - input_txt : str - Model description file (*.txt), e.g., 'Kholodenko_JBC_1999.txt' - parameters : list of strings - x : model parameters. - species : list of strings - y : model species. - reactions : list of strings - v : flux vector. - differential_equations : list of strings - dydt : right-hand side of the differential equation. - obs_desc : list of List[str] - Description of observables. - param_info : list of strings - Information about parameter values. - init_info : list of strings - Information about initial values. - param_constraints : list of strings - Information about parameter constraints. - param_excluded : list of strings - Parameters excluded from search params because of parameter constraints. - sim_tspan : list of strings ['t0', 'tf'] - Interval of integration. - sim_conditions : list of List[str] - Simulation conditions with stimulation. - sim_unperturbed : str - Untreated conditions to get steady state. - rule_words : dict - Words to identify reaction rules. - - """ - - input_txt: str - - parameters: List[str] = field( - default_factory=list, - init=False, - ) - species: List[str] = field( - default_factory=list, - init=False, - ) - reactions: List[str] = field( - default_factory=list, - init=False, - ) - differential_equations: List[str] = field( - default_factory=list, - init=False, - ) - obs_desc: List[List[str]] = field( - default_factory=list, - init=False, - ) - param_info: List[str] = field( - default_factory=list, - init=False, - ) - init_info: List[str] = field( - default_factory=list, - init=False, - ) - param_constraints: List[str] = field( - default_factory=list, - init=False, - ) - param_excluded: List[str] = field( - default_factory=list, - init=False, - ) - # Information about simulation - sim_tspan: List[str] = field( - default_factory=list, - init=False, - ) - sim_conditions: List[List[str]] = field( - default_factory=list, - init=False, - ) - sim_unperturbed: str = field( - default_factory=str, - init=False, - ) - # Words to identify reaction rules - rule_words: Dict[str, List[str]] = field( - default_factory=lambda: dict( - dimerize=[ - " dimerizes", - " homodimerizes", - " forms a dimer", - " forms dimers", - ], - bind=[ - " binds", - " forms complexes with", - ], - is_dissociated=[ - " is dissociated into", - " dissociates to", - ], - is_phosphorylated=[ - " is phosphorylated", - ], - is_dephosphorylated=[ - " is dephosphorylated", - ], - phosphorylate=[ - " phosphorylates", - ], - dephosphorylate=[ - " dephosphorylates", - ], - transcribe=[ - " transcribe", - " transcribes", - ], - is_translated=[ - " is translated into", - ], - synthesize=[ - " synthesizes", - " promotes synthesis of", - ], - is_synthesized=[ - " is synthesized", - ], - degrade=[ - " degrades", - " promotes degradation of", - ], - is_degraded=[ - " is degraded", - ], - translocate=[ - " translocates", - " is translocated", - ], - ), - init=False, - ) - - @staticmethod - def _isfloat(string: str) -> bool: - """ - Checking if a string can be converted to float. - """ - try: - float(string) - return True - except ValueError: - return False - - @staticmethod - def _remove_prefix(text: str, prefix: str) -> str: - """ - Remove prefix from a text. - """ - if text.startswith(prefix): - return text[len(prefix) :] - assert False - - def _set_params(self, line_num: int, *args: str) -> None: - """ - Set model parameters. - """ - for p_name in args: - if p_name + f"{line_num:d}" not in self.parameters: - self.parameters.append(p_name + f"{line_num:d}") - - def _set_species(self, *args: str) -> None: - """ - Set model species. - """ - for s_name in args: - if s_name not in self.species: - self.species.append(s_name) - - def _preprocessing( - self, - func_name: str, - line_num: int, - line: str, - *args: str, - ) -> List[str]: - """ - Extract the information about parameter and/or initial values - if '|' in the line and find a keyword to identify reaction rules. - - Parameters - ---------- - func_name : str - Name of the rule function. - - line_num : int - Line number. - - line : str - Each line of the input text. - - Returns - ------- - description : list of strings - - """ - self._set_params(line_num, *args) - if "|" in line: - if line.split("|")[1].strip(): - param_values = line.split("|")[1].strip().split(",") - if all("=" in pval for pval in param_values): - for pval in param_values: - base_param = pval.split("=")[0].strip(" ") - if base_param.startswith("const "): - # Parameter names with 'const' will be added to param_excluded. - base_param = base_param.split("const ")[-1] - fixed = True - else: - fixed = False - if base_param in args: - if self._isfloat(pval.split("=")[1].strip(" ")): - self.param_info.append( - "x[C." - + base_param - + f"{line_num:d}] = " - + pval.split("=")[1].strip(" ") - ) - # If a parameter value is initialized to 0.0 or fixed, - # then add it to param_excluded. - if float(pval.split("=")[1].strip(" ")) == 0.0 or fixed: - self.param_excluded.append(base_param + f"{line_num:d}") - else: - raise ValueError( - f"line{line_num:d}: Parameter value must be int or float." - ) - else: - raise ValueError( - f"line{line_num:d}: '{pval.split('=')[0].strip(' ')}'\n" - f"Available parameters are: {', '.join(args)}." - ) - elif param_values[0].strip(" ").isdecimal(): - # Parameter constraints - for param_name in args: - if f"{param_name}{int(param_values[0]):d}" not in self.parameters: - raise ValueError( - f"Line {line_num:d} and {int(param_values[0]):d} : " - "Different reaction rules in parameter constraints." - ) - else: - self.param_excluded.append(f"{param_name}{line_num:d}") - self.param_info.append( - f"x[C.{param_name}" - f"{line_num:d}] = " - f"x[C.{param_name}" - f"{int(param_values[0]):d}]" - ) - self.param_constraints.append( - f"x[C.{param_name}" - f"{line_num:d}] = " - f"x[C.{param_name}" - f"{int(param_values[0]):d}]" - ) - else: - raise ValueError( - f"line{line_num:d}: {line}\nInvalid expression in the input parameter." - ) - if line.count("|") > 1 and line.split("|")[2].strip(): - initial_values = line.split("|")[2].strip().split(",") - for ival in initial_values: - if ival.split("=")[0].strip(" ") in line.split("|")[0]: - if self._isfloat(ival.split("=")[1].strip(" ")): - self.init_info.append( - "y0[V." - + ival.split("=")[0].strip(" ") - + "] = " - + ival.split("=")[1].strip(" ") - ) - else: - raise ValueError( - f"line{line_num:d}: Initial value must be int or float." - ) - else: - raise NameError( - f"line{line_num:d}: " - "Name'{ival.split('=')[0].strip(' ')}' is not defined." - ) - line = line.split("|")[0] - hit_words: List[str] = [] - for word in self.rule_words[func_name]: - # Choose longer word - if word in line: - hit_words.append(word) - - return line.strip().split(max(hit_words, key=len)) - - @staticmethod - def _word2scores(word: str, sentence: str) -> List[float]: - """ - Calculate similarity scores between word and sentence. - - Parameters - ---------- - word : str - User-defined word. - sentence : str - Textual unit consisting of two or more words. - - returns - ------- - ratio : list - List containing similarity scores. - - """ - ratio = [ - SequenceMatcher(None, word, sentence[i : i + len(word)]).ratio() - for i in range(len(sentence) - len(word) + 1) - ] - return ratio - - def _get_partial_similarity( - self, - line: str, - similarity_threshold: float = 0.7, - ) -> UnregisteredRule: - """ - Suggest similar rule word when user-defined word is not registered - in rule_words. - - Parameters - ---------- - line : str - Each line of the input text. - - similarity_threshold : float (default: 0.7) - if all match_scores are below this value, expected_word will not - be returned. - - Returns - ------- - expected_word : str - Rule word with the highest similarity score. - - """ - match_words = [] - match_scores = [] - str_subset = [] - for rules in self.rule_words.values(): - for word in rules: - ratio = self._word2scores(word, line) - if ratio: - match_words.append(word) - match_scores.append(max(ratio)) - str_subset.append(line[np.argmax(ratio) : np.argmax(ratio) + len(word)]) - expected_word = ( - None - if all([score < similarity_threshold for score in match_scores]) - else match_words[np.argmax(match_scores)] - ) - original_word = ( - None if expected_word is None else str_subset[match_words.index(expected_word)] - ) - - return UnregisteredRule(expected_word, original_word) - - @staticmethod - def _remove_prepositions(sentence: str) -> str: - """ - Remove preposition from text not to use it for identifying reaction rules. - """ - for preposition in PREPOSITIONS: - if sentence.endswith(preposition): - return sentence[: -len(preposition)] - else: - return sentence - - def dimerize(self, line_num: int, line: str) -> None: - """ - Examples - -------- - >>> `monomer` dimerizes --> `dimer` - >>> `monomer` homodimerizes --> `dimer` - >>> `monomer` forms a dimer --> `dimer` - >>> `monomer` forms dimers --> `dimer` - - Notes - ----- - * Event - monomer + monomer <=> dimer - - * Rate equation - v = kf * [monomer] * [monomer] - kr * [dimer] - - * Differential equation - d[monomer]/dt = - 2 * v - - d[dimer]/dt = + v - - """ - description = self._preprocessing( - sys._getframe().f_code.co_name, line_num, line, "kf", "kr" - ) - monomer = description[0].strip(" ") - if " --> " in description[1]: - dimer = description[1].split(" --> ")[1].strip(" ") - else: - raise ValueError(f"line{line_num:d}: Use '-->' to specify the name of the dimer.") - if monomer == dimer: - raise ValueError(f"{dimer} <- Use a different name.") - self._set_species(monomer, dimer) - self.reactions.append( - f"v[{line_num:d}] = " - f"x[C.kf{line_num:d}] * y[V.{monomer}] * y[V.{monomer}] - " - f"x[C.kr{line_num:d}] * y[V.{dimer}]" - ) - counter_monomer, counter_dimer = (0, 0) - for i, eq in enumerate(self.differential_equations): - if f"dydt[V.{monomer}]" in eq: - counter_monomer += 1 - self.differential_equations[i] = eq + f" - 2 * v[{line_num:d}]" - elif f"dydt[V.{dimer}]" in eq: - counter_dimer += 1 - self.differential_equations[i] = eq + f" + v[{line_num:d}]" - if counter_monomer == 0: - self.differential_equations.append(f"dydt[V.{monomer}] = - v[{line_num:d}]") - if counter_dimer == 0: - self.differential_equations.append(f"dydt[V.{dimer}] = + v[{line_num:d}]") - - def bind(self, line_num: int, line: str) -> None: - """ - Examples - -------- - >>> `component1` binds `component2` --> `complex` - >>> `component1` forms complexes with `component2` --> `complex` - - Notes - ----- - * Event - component1 + component2 <=> complex - - * Rate equation - v = kf * [component1] * [component2] - kr * [complex] - - * Differential equation - d[component1]/dt = - v - - d[component2]/dt = - v - - d[complex]/dt = + v - - """ - description = self._preprocessing( - sys._getframe().f_code.co_name, line_num, line, "kf", "kr" - ) - component1 = description[0].strip(" ") - if " --> " in description[1]: - # Specify name of the complex - component2 = description[1].split(" --> ")[0].strip(" ") - complex = description[1].split(" --> ")[1].strip(" ") - else: - raise ValueError( - f"line{line_num:d}: Use '-->' to specify the name of the protein complex." - ) - if component1 == complex or component2 == complex: - raise ValueError(f"line{line_num:d}: {complex} <- Use a different name.") - elif component1 == component2: - self.dimerize(line_num, line) - else: - self._set_species(component1, component2, complex) - self.reactions.append( - f"v[{line_num:d}] = " - f"x[C.kf{line_num:d}] * y[V.{component1}] * y[V.{component2}] - " - f"x[C.kr{line_num:d}] * y[V.{complex}]" - ) - counter_component1, counter_component2, counter_complex = (0, 0, 0) - for i, eq in enumerate(self.differential_equations): - if f"dydt[V.{component1}]" in eq: - counter_component1 += 1 - self.differential_equations[i] = eq + f" - v[{line_num:d}]" - elif f"dydt[V.{component2}]" in eq: - counter_component2 += 1 - self.differential_equations[i] = eq + f" - v[{line_num:d}]" - elif f"dydt[V.{complex}]" in eq: - counter_complex += 1 - self.differential_equations[i] = eq + f" + v[{line_num:d}]" - if counter_component1 == 0: - self.differential_equations.append(f"dydt[V.{component1}] = - v[{line_num:d}]") - if counter_component2 == 0: - self.differential_equations.append(f"dydt[V.{component2}] = - v[{line_num:d}]") - if counter_complex == 0: - self.differential_equations.append(f"dydt[V.{complex}] = + v[{line_num:d}]") - - def is_dissociated(self, line_num: int, line: str) -> None: - """ - Examples - -------- - >>> `complex` is dissociated into `component1` and `component2` - >>> `complex` dissociates to `component1` and `component2` - - Notes - ----- - * Event - complex <=> component1 + component2 - - * Rate equation - v = kf * [complex] - kr * [component1] * [component2] - - * Differential equation - d[component1]/dt = + v - - d[component2]/dt = + v - - d[complex]/dt = - v - - """ - description = self._preprocessing( - sys._getframe().f_code.co_name, line_num, line, "kf", "kr" - ) - complex = description[0].strip(" ") - if " and " not in description[1]: - raise ValueError( - f"Use 'and' in line{line_num:d}:\ne.g., AB is dissociated into A and B" - ) - else: - component1 = description[1].split(" and ")[0].strip(" ") - component2 = description[1].split(" and ")[1].strip(" ") - self._set_species(complex, component1, component2) - self.reactions.append( - f"v[{line_num:d}] = " - f"x[C.kf{line_num:d}] * y[V.{complex}] - " - f"x[C.kr{line_num:d}] * y[V.{component1}] * y[V.{component2}]" - ) - counter_complex, counter_component1, counter_component2 = (0, 0, 0) - for i, eq in enumerate(self.differential_equations): - if f"dydt[V.{complex}]" in eq: - counter_complex += 1 - self.differential_equations[i] = eq + f" - v[{line_num:d}]" - elif f"dydt[V.{component1}]" in eq: - counter_component1 += 1 - self.differential_equations[i] = ( - eq + f" + v[{line_num:d}]" - if component1 != component2 - else eq + f" + 2 * v[{line_num:d}]" - ) - elif f"dydt[V.{component2}]" in eq: - counter_component2 += 1 - self.differential_equations[i] = eq + f" + v[{line_num:d}]" - if counter_complex == 0: - self.differential_equations.append(f"dydt[V.{complex}] = - v[{line_num:d}]") - if counter_component1 == 0: - self.differential_equations.append(f"dydt[V.{component1}] = + v[{line_num:d}]") - if counter_component2 == 0: - self.differential_equations.append(f"dydt[V.{component2}] = + v[{line_num:d}]") - - def is_phosphorylated(self, line_num: int, line: str) -> None: - """ - Examples - -------- - `unphosphorylated_form` is phosphorylated --> `phosphorylated_form` - - Notes - ----- - * Event - unphosphorylated_form <=> phosphorylated_form - - * Rate equation - v = kf * [unphosphorylated_form] - kr * [phosphorylated_form] - - * Differential equation - d[unphosphorylated_form]/dt = - v - - d[phosphorylated_form]/dt = + v - - """ - description = self._preprocessing( - sys._getframe().f_code.co_name, line_num, line, "kf", "kr" - ) - unphosphorylated_form = description[0].strip(" ") - if " --> " in description[1]: - phosphorylated_form = description[1].split(" --> ")[1].strip(" ") - else: - raise ValueError( - f"line{line_num:d}: " - "Use '-->' to specify the name of the phosphorylated protein." - ) - self._set_species(unphosphorylated_form, phosphorylated_form) - - self.reactions.append( - f"v[{line_num:d}] = " - f"x[C.kf{line_num:d}] * y[V.{unphosphorylated_form}] - " - f"x[C.kr{line_num:d}] * y[V.{phosphorylated_form}]" - ) - counter_unphosphorylated_form, counter_phosphorylated_form = (0, 0) - for i, eq in enumerate(self.differential_equations): - if f"dydt[V.{unphosphorylated_form}]" in eq: - counter_unphosphorylated_form += 1 - self.differential_equations[i] = eq + f" - v[{line_num:d}]" - elif "dydt[V.{phosphorylated_form}]" in eq: - counter_phosphorylated_form += 1 - self.differential_equations[i] = eq + f" + v[{line_num:d}]" - if counter_unphosphorylated_form == 0: - self.differential_equations.append( - f"dydt[V.{unphosphorylated_form}] = - v[{line_num:d}]" - ) - if counter_phosphorylated_form == 0: - self.differential_equations.append( - f"dydt[V.{phosphorylated_form}] = + v[{line_num:d}]" - ) - - def is_dephosphorylated(self, line_num: int, line: str) -> None: - """ - Examples - -------- - >>> `phosphorylated_form` is dephosphorylated --> `unphosphorylated_form` - - Notes - ----- - * Event - phosphorylated_form --> unphosphorylated_form - - * Rate equation - v = V * [phosphorylated_form] / (K + [phosphorylated_form]) - - * Differential equation - d[unphosphorylated_form]/dt = + v - - d[phosphorylated_form]/dt = - v - - """ - description = self._preprocessing(sys._getframe().f_code.co_name, line_num, line, "V", "K") - phosphorylated_form = description[0].strip(" ") - if " --> " in description[1]: - unphosphorylated_form = description[1].split(" --> ")[1].strip(" ") - else: - raise ValueError( - f"line{line_num:d}: " - "Use '-->' to specify the name of the dephosphorylated protein." - ) - self._set_species(phosphorylated_form, unphosphorylated_form) - - self.reactions.append( - f"v[{line_num:d}] = " - f"x[C.V{line_num:d}] * y[V.{phosphorylated_form}] / " - f"(x[C.K{line_num:d}] + y[V.{phosphorylated_form}])" - ) - counter_unphosphorylated_form, counter_phosphorylated_form = (0, 0) - for i, eq in enumerate(self.differential_equations): - if f"dydt[V.{unphosphorylated_form}]" in eq: - counter_unphosphorylated_form += 1 - self.differential_equations[i] = eq + f" + v[{line_num:d}]" - elif f"dydt[V.{phosphorylated_form}]" in eq: - counter_phosphorylated_form += 1 - self.differential_equations[i] = eq + f" - v[{line_num:d}]" - if counter_unphosphorylated_form == 0: - self.differential_equations.append( - f"dydt[V.{unphosphorylated_form}] = + v[{line_num:d}]" - ) - if counter_phosphorylated_form == 0: - self.differential_equations.append( - f"dydt[V.{phosphorylated_form}] = - v[{line_num:d}]" - ) - - def phosphorylate(self, line_num: int, line: str) -> None: - """ - Examples - -------- - >>> `kinase` phosphorylates `unphosphorylated_form` --> `phosphorylated_form` - - Notes - ----- - * Event - kinase - - ↓ - - unphosphorylated_form --> phosphorylated_form - - * Rate equation - v = V * [kinase] * [unphosphorylated_form] / (K + [unphosphorylated_form]) - - * Differential equation - d[unphosphorylated_form]/dt = - v - - d[phosphorylated_form]/dt = + v - - """ - description = self._preprocessing(sys._getframe().f_code.co_name, line_num, line, "V", "K") - kinase = description[0].strip(" ") - if " --> " in description[1]: - unphosphorylated_form = description[1].split(" --> ")[0].strip(" ") - phosphorylated_form = description[1].split(" --> ")[1].strip(" ") - else: - raise ValueError( - f"line{line_num:d}: " - "Use '-->' to specify the name of the phosphorylated " - "(or activated) protein." - ) - if unphosphorylated_form == phosphorylated_form: - raise ValueError(f"line{line_num:d}: {phosphorylated_form} <- Use a different name.") - self._set_species(kinase, unphosphorylated_form, phosphorylated_form) - - self.reactions.append( - f"v[{line_num:d}] = " - f"x[C.V{line_num:d}] * y[V.{kinase}] * y[V.{unphosphorylated_form}] / " - f"(x[C.K{line_num:d}] + y[V.{unphosphorylated_form}])" - ) - counter_unphosphorylated_form, counter_phosphorylated_form = (0, 0) - for i, eq in enumerate(self.differential_equations): - if f"dydt[V.{unphosphorylated_form}]" in eq: - counter_unphosphorylated_form += 1 - self.differential_equations[i] = eq + f" - v[{line_num:d}]" - elif f"dydt[V.{phosphorylated_form}]" in eq: - counter_phosphorylated_form += 1 - self.differential_equations[i] = eq + f" + v[{line_num:d}]" - if counter_unphosphorylated_form == 0: - self.differential_equations.append( - f"dydt[V.{unphosphorylated_form}] = - v[{line_num:d}]" - ) - if counter_phosphorylated_form == 0: - self.differential_equations.append( - f"dydt[V.{phosphorylated_form}] = + v[{line_num:d}]" - ) - - def dephosphorylate(self, line_num: int, line: str) -> None: - """ - Examples - -------- - >>> `phosphatase` dephosphorylates `phosphorylated_form` --> `unphosphorylated_form` - - Notes - ----- - * Event - phosphatase - - ↓ - - phosphorylated_form --> unphosphorylated_form - - * Rate equation - v = V * [phosphatase] * [phosphorylated_form] / (K + [phosphorylated_form]) - - * Differential equation - d[unphosphorylated_form]/dt = + v - - d[phosphorylated_form]/dt = - v - - """ - description = self._preprocessing(sys._getframe().f_code.co_name, line_num, line, "V", "K") - phosphatase = description[0].strip(" ") - if " --> " in description[1]: - phosphorylated_form = description[1].split(" --> ")[0].strip(" ") - unphosphorylated_form = description[1].split(" --> ")[1].strip(" ") - else: - raise ValueError( - f"line{line_num:d}: " - "Use '-->' to specify the name of the dephosphorylated " - "(or deactivated) protein." - ) - if phosphorylated_form == unphosphorylated_form: - raise ValueError(f"line{line_num:d}: {unphosphorylated_form} <- Use a different name.") - self._set_species(phosphatase, phosphorylated_form, unphosphorylated_form) - - self.reactions.append( - f"v[{line_num:d}] = " - f"x[C.V{line_num:d}] * y[V.{phosphatase}] * y[V.{phosphorylated_form}] / " - f"(x[C.K{line_num:d}] + y[V.{phosphorylated_form}])" - ) - counter_phosphorylated_form, counter_unphosphorylated_form = (0, 0) - for i, eq in enumerate(self.differential_equations): - if f"dydt[V.{phosphorylated_form}]" in eq: - counter_phosphorylated_form += 1 - self.differential_equations[i] = eq + f" - v[{line_num:d}]" - elif f"dydt[V.{unphosphorylated_form}]" in eq: - counter_unphosphorylated_form += 1 - self.differential_equations[i] = eq + f" + v[{line_num:d}]" - if counter_phosphorylated_form == 0: - self.differential_equations.append( - f"dydt[V.{phosphorylated_form}] = - v[{line_num:d}]" - ) - if counter_unphosphorylated_form == 0: - self.differential_equations.append( - f"dydt[V.{unphosphorylated_form}] = + v[{line_num:d}]" - ) - - def transcribe(self, line_num: int, line: str) -> None: - """ - Examples - -------- - >>> `TF` transcribes `mRNA` - >>> `TF1` and `TF2` transcribe `mRNA` # (AND-gate) - >>> `TF` transcribes `mRNA`, repressed by `repressor` # (Negative regulation) - - Notes - ----- - * Event - TF --> mRNA - - * Rate equation - - v = V * [TF] ** n / (K ** n + [TF] ** n) - - v = V * ([TF1] * [TF2]) ** n / (K ** n + ([TF1] * [TF2]) ** n) - - v = V * [TF] ** n / (K ** n + [TF] ** n + ([repressor] / KF) ** nF) - - * Differential equation - d[mRNA]/dt = + v - - """ - description = self._preprocessing( - sys._getframe().f_code.co_name, line_num, line, "V", "K", "n", "KF", "nF" - ) - repressor: Optional[str] = None - ratio = self._word2scores(", repressed by", description[1]) - if not ratio or max(ratio) < 1.0: - self.parameters.remove(f"KF{line_num:d}") - self.parameters.remove(f"nF{line_num:d}") - mRNA = description[1].strip() - if " " in mRNA: - # Fix typo in line{line_num:d} - raise ValueError( - f"line{line_num:d}: " - "Add ', repressed by XXX' to describe negative regulation from XXX." - ) - else: - # Add negative regulation from repressor - mRNA = description[1].split(", repressed by")[0].strip() - repressor = description[1].split(", repressed by")[1].strip() - if " and " not in description[0]: - TF = description[0].strip(" ") - self._set_species(mRNA, TF) - if repressor is not None: - self._set_species(repressor) - self.reactions.append( - f"v[{line_num:d}] = " - f"x[C.V{line_num:d}] * y[V.{TF}] ** x[C.n{line_num:d}] / " - f"(x[C.K{line_num:d}] ** x[C.n{line_num:d}] + " - f"y[V.{TF}] ** x[C.n{line_num:d}]" - + ( - ")" - if repressor is None - else f" + (y[V.{repressor}] / x[C.KF{line_num:d}]) ** x[C.nF{line_num:d}])" - ) - ) - else: - # AND-gate - TF1 = description[0].split(" and ")[0].strip(" ") - TF2 = description[0].split(" and ")[1].strip(" ") - self._set_species(mRNA, TF1, TF2) - if repressor is not None: - self._set_species(repressor) - self.reactions.append( - f"v[{line_num:d}] = " - f"x[C.V{line_num:d}] * (y[V.{TF1}]*y[V.{TF2}]) ** x[C.n{line_num:d}] / " - f"(x[C.K{line_num:d}] ** x[C.n{line_num:d}] + " - f"(y[V.{TF1}]*y[V.{TF2}]) ** x[C.n{line_num:d}]" - + ( - ")" - if repressor is None - else f" + (y[V.{repressor}] / x[C.KF{line_num:d}]) ** x[C.nF{line_num:d}])" - ) - ) - counter_mRNA = 0 - for i, eq in enumerate(self.differential_equations): - if f"dydt[V.{mRNA}]" in eq: - counter_mRNA += 1 - self.differential_equations[i] = eq + f" + v[{line_num:d}]" - if counter_mRNA == 0: - self.differential_equations.append(f"dydt[V.{mRNA}] = + v[{line_num:d}]") - - def is_translated(self, line_num: int, line: str) -> None: - """ - Examples - -------- - >>> `mRNA` is translated into `protein` - - Notes - ----- - * Event - mRNA --> protein - - * Rate equation - v = kf * [mRNA] - - * Differential equation - d[protein]/dt = + v - - """ - description = self._preprocessing(sys._getframe().f_code.co_name, line_num, line, "kf") - mRNA = description[0].strip(" ") - protein = description[1].strip(" ") - self._set_species(mRNA, protein) - self.reactions.append(f"v[{line_num:d}] = x[C.kf{line_num:d}] * y[V.{mRNA}]") - counter_protein = 0 - for i, eq in enumerate(self.differential_equations): - if f"dydt[V.{protein}]" in eq: - counter_protein += 1 - self.differential_equations[i] = eq + f" + v[{line_num:d}]" - if counter_protein == 0: - self.differential_equations.append(f"dydt[V.{protein}] = + v[{line_num:d}]") - - def synthesize(self, line_num: int, line: str) -> None: - """ - Examples - -------- - >>> `catalyst` synthesizes `product` - - Notes - ----- - * Event - catalyst - - ↓ - - 0 --> product - - * Rate equation - v = kf * [catalyst] - - * Differential equation - d[product]/dt = + v - - """ - description = self._preprocessing(sys._getframe().f_code.co_name, line_num, line, "kf") - catalyst = description[0].strip(" ") - product = description[1].strip(" ") - self._set_species(catalyst, product) - self.reactions.append(f"v[{line_num:d}] = x[C.kf{line_num:d}] * y[V.{catalyst}]") - counter_product = 0 - for i, eq in enumerate(self.differential_equations): - if f"dydt[V.{product}]" in eq: - counter_product += 1 - self.differential_equations[i] = eq + f" + v[{line_num:d}]" - if counter_product == 0: - self.differential_equations.append(f"dydt[V.{product}] = + v[{line_num:d}]") - - def is_synthesized(self, line_num: int, line: str) -> None: - """ - Examples - -------- - >>> `chemical_species` is synthesized - - Notes - ----- - * Event - 0 --> chemical_species - - * Rate equation - v = kf - - * Differential equation - d[chemical_species]/dt = + v - - """ - description = self._preprocessing(sys._getframe().f_code.co_name, line_num, line, "kf") - chemical_species = description[0].strip(" ") - self._set_species(chemical_species) - self.reactions.append(f"v[{line_num:d}] = x[C.kf{line_num:d}]") - counter_chemical_species = 0 - for i, eq in enumerate(self.differential_equations): - if f"dydt[V.{chemical_species}]" in eq: - counter_chemical_species += 1 - self.differential_equations[i] = eq + f" + v[{line_num:d}]" - if counter_chemical_species == 0: - self.differential_equations.append(f"dydt[V.{chemical_species}] = + v[{line_num:d}]") - - def degrade(self, line_num: int, line: str) -> None: - """ - Examples - -------- - >>> `protease` degrades `protein` - - Notes - ----- - * Event - protease - - ↓ - - protein --> 0 - - * Rate equation - v = kf * [protease] - - * Differential equation - d[protein]/dt = - v - - """ - description = self._preprocessing(sys._getframe().f_code.co_name, line_num, line, "kf") - protease = description[0].strip(" ") - protein = description[1].strip(" ") - self._set_species(protease, protein) - self.reactions.append(f"v[{line_num:d}] = x[C.kf{line_num:d}] * y[V.{protease}]") - counter_protein = 0 - for i, eq in enumerate(self.differential_equations): - if f"dydt[V.{protein}]" in eq: - counter_protein += 1 - self.differential_equations[i] = eq + f" - v[{line_num:d}]" - if counter_protein == 0: - self.differential_equations.append(f"dydt[V.{protein}] = - v[{line_num:d}]") - - def is_degraded(self, line_num: int, line: str) -> None: - """ - Examples - -------- - >>> `chemical_species` is degraded - - Notes - ----- - * Event - chemical_species --> 0 - - * Rate equation - v = kf * [chemical_species] - - * Differential equation - d[chemical_species]/dt = - v - - """ - description = self._preprocessing(sys._getframe().f_code.co_name, line_num, line, "kf") - chemical_species = description[0].strip(" ") - self._set_species(chemical_species) - self.reactions.append(f"v[{line_num:d}] = x[C.kf{line_num:d}] * y[V.{chemical_species}]") - counter_chemical_species = 0 - for i, eq in enumerate(self.differential_equations): - if f"dydt[V.{chemical_species}]" in eq: - counter_chemical_species += 1 - self.differential_equations[i] = eq + f" - v[{line_num:d}]" - if counter_chemical_species == 0: - self.differential_equations.append(f"dydt[V.{chemical_species}] = - v[{line_num:d}]") - - def translocate(self, line_num: int, line: str) -> None: - """ - Examples - -------- - >>> `pre_translocation` translocates from one location to another (pre_volume, post_volume) --> `post_translocation` - >>> `pre_translocation` is translocated from one location to another (pre_volume, post_volume) --> `post_translocation` - - Notes - ----- - * Event - pre_translocation <=> post_translocation \ - (Volume_pre_translocation <-> Volume_post_translocation) - - * Example - - - * Rate equation - v = kf * [pre_translocation] - kr * (post_volume / pre_volume) \ - * [post_translocation] - - * Differential equation - d[pre_translocation]/dt = - v - - d[post_translocation]/dt = + v * (pre_volume / post_volume) - - """ - description = self._preprocessing( - sys._getframe().f_code.co_name, line_num, line, "kf", "kr" - ) - pre_translocation = description[0].strip(" ") - if " --> " in description[1]: - post_translocation = description[1].split(" --> ")[1].strip(" ") - else: - raise ValueError( - f"line{line_num:d}: " - "Use '-->' to specify the name of the species after translocation." - ) - if pre_translocation == post_translocation: - raise ValueError(f"line{line_num:d}: {post_translocation} <- Use a different name.") - # Information about compartment volumes - if "(" in description[1] and ")" in description[1]: - [pre_volume, post_volume] = description[1].split("(")[-1].split(")")[0].split(",") - if not self._isfloat(pre_volume.strip(" ")) or not self._isfloat( - post_volume.strip(" ") - ): - raise ValueError("pre_volume and post_volume must be float or int.") - else: - [pre_volume, post_volume] = ["1", "1"] - self._set_species(pre_translocation, post_translocation) - self.reactions.append( - f"v[{line_num:d}] = x[C.kf{line_num:d}] * y[V.{pre_translocation}] - " - f"x[C.kr{line_num:d}] * y[V.{post_translocation}]" - ) - if float(pre_volume.strip(" ")) != float(post_volume.strip(" ")): - self.reactions[-1] = ( - f"v[{line_num:d}] = " - f"x[C.kf{line_num:d}] * y[V.{pre_translocation}] - " - f"x[C.kr{line_num:d}] * " - f"({post_volume.strip()} / {pre_volume.strip()}) * " - f"y[V.{post_translocation}]" - ) - counter_pre_translocation, counter_post_translocation = (0, 0) - for i, eq in enumerate(self.differential_equations): - if f"dydt[V.{pre_translocation}]" in eq: - counter_pre_translocation += 1 - self.differential_equations[i] = eq + f" - v[{line_num:d}]" - elif f"dydt[V.{post_translocation}]" in eq: - counter_post_translocation += 1 - self.differential_equations[i] = eq + f" + v[{line_num:d}]" - if float(pre_volume.strip(" ")) != float(post_volume.strip(" ")): - self.differential_equations[ - i - ] += f" * ({pre_volume.strip()} / {post_volume.strip()})" - if counter_pre_translocation == 0: - self.differential_equations.append(f"dydt[V.{pre_translocation}] = - v[{line_num:d}]") - if counter_post_translocation == 0: - self.differential_equations.append(f"dydt[V.{post_translocation}] = + v[{line_num:d}]") - if float(pre_volume.strip(" ")) != float(post_volume.strip(" ")): - self.differential_equations[ - -1 - ] += f" * ({pre_volume.strip()} / {post_volume.strip()})" - - def create_ode(self) -> None: - """ - Find a keyword in each line to identify the reaction rule and - construct an ODE model. - - """ - with open(self.input_txt, encoding="utf-8") as f: - lines = f.readlines() - for line_num, line in enumerate(lines, start=1): - # Remove double spaces - while True: - if " " not in line: - break - else: - line = line.replace(" ", " ") - # Comment out - line = line.split("#")[0].rstrip(" ") - if not line.strip(): - # Skip blank lines - continue - elif lines.count(line) > 1: - # Find duplicate lines - raise ValueError( - f"Reaction '{line}' is duplicated in lines " - + ", ".join([str(i + 1) for i, rxn in enumerate(lines) if rxn == line]) - ) - # About observables - elif line.startswith("@obs "): - line = self._remove_prefix(line, "@obs ") - if line.count(":") != 1: - raise SyntaxError(f"line{line_num:d}: Missing colon") - else: - self.obs_desc.append(line.split(":")) - # About simulation info. - elif line.startswith("@sim "): - line = self._remove_prefix(line, "@sim ") - if line.count(":") != 1: - raise SyntaxError(f"line{line_num:d}: Missing colon") - else: - if line.startswith("tspan"): - t_info = line.split(":")[-1].strip() - if "[" in t_info and "]" in t_info: - [t0, tf] = t_info.split("[")[-1].split("]")[0].split(",") - if t0.strip(" ").isdecimal() and tf.strip(" ").isdecimal(): - self.sim_tspan.append(t0) - self.sim_tspan.append(tf) - else: - raise TypeError("@sim tspan: [t0, tf] must be a list of integers.") - else: - raise ValueError( - "tspan must be a two element vector [t0, tf] " - "specifying the initial and final times." - ) - elif line.startswith("unperturbed"): - self.sim_unperturbed += line.split(":")[-1].strip() - elif line.startswith("condition "): - self.sim_conditions.append( - self._remove_prefix(line, "condition ").split(":") - ) - else: - raise ValueError( - f"(line{line_num:d}) Available options are: " - "'@sim tspan:', '@sim unperturbed:' or '@sim condition XXX:'." - ) - # Detect reaction rule - else: - for reaction_rule, words in self.rule_words.items(): - if any([self._remove_prepositions(word) in line for word in words]): - exec("self." + reaction_rule + "(line_num, line)") - break - else: - unregistered_rule = self._get_partial_similarity(line) - raise ValueError( - f"Unregistered words in line{line_num:d}: {line}" - + ( - f"\nMaybe: '{unregistered_rule.expected}'." - if unregistered_rule.expected is not None - else "" - ) - ) diff --git a/pasmopy/construction/template/README.md b/pasmopy/construction/template/README.md deleted file mode 100644 index 9b21a9d..0000000 --- a/pasmopy/construction/template/README.md +++ /dev/null @@ -1,22 +0,0 @@ -# Template for BioMASS model construction - -A brief description of each file/folder is below: - -| Name | Content | -| ---------------------------------------------- | -------------------------------------------------------------------------------------------------------- | -| [`name2idx/`](./name2idx/) | Names of model parameters and species | -| [`set_model.py`](./set_model.py) | Differential equation, parameters and initial condition | -| [`observalbe.py`](./observable.py) | Observables, simulations and experimental data | -| [`viz.py`](./viz.py) | Plotting parameters for customizing figure properties | -| [`set_search_param.py`](./set_search_param.py) | Model parameters to optimize and search region | -| [`fitness.py`](./fitness.py) | An objective function to be minimized, i.e., the distance between model simulation and experimental data | -| [`reaction_network.py`](./reaction_network.py) | Reaction indices grouped according to biological processes | - -## Examples - -- [Circadian clock](https://github.com/biomass-dev/biomass/tree/master/biomass/models/circadian_clock) -- [Immediate-early gene response](https://github.com/biomass-dev/biomass/tree/master/biomass/models/Nakakuki_Cell_2010) -- [Insulin signaling](https://github.com/biomass-dev/biomass/tree/master/biomass/models/insulin_signaling) -- [MAPK cascade](https://github.com/biomass-dev/biomass/tree/master/biomass/models/mapk_cascade) -- [NF-κB pathway](https://github.com/biomass-dev/biomass/tree/master/biomass/models/nfkb_pathway) -- [TGF-β/SMAD pathway](https://github.com/biomass-dev/biomass/tree/master/biomass/models/tgfb_smad) diff --git a/pasmopy/construction/template/__init__.py b/pasmopy/construction/template/__init__.py deleted file mode 100644 index 31abc99..0000000 --- a/pasmopy/construction/template/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .fitness import OptimizationProblem -from .name2idx import C, V -from .reaction_network import ReactionNetwork -from .set_model import initial_values, param_values -from .viz import Visualization diff --git a/pasmopy/construction/template/fitness.py b/pasmopy/construction/template/fitness.py deleted file mode 100644 index f9270a2..0000000 --- a/pasmopy/construction/template/fitness.py +++ /dev/null @@ -1,104 +0,0 @@ -import numpy as np -from scipy.spatial.distance import cosine - -from .observable import Observable -from .set_search_param import SearchParam - - -class OptimizationProblem(Observable, SearchParam): - def __init__(self): - super(OptimizationProblem, self).__init__() - - @property - def bounds(self): - """ - Lower and upper bounds on independent variables. - """ - search_region: np.ndarray = self.get_region() - lb = 10 ** search_region[0] - ub = 10 ** search_region[1] - return tuple(zip(lb, ub)) - - @staticmethod - def _compute_objval_rss(sim_data, exp_data): - """Return Residual Sum of Squares""" - return np.dot((sim_data - exp_data), (sim_data - exp_data)) - - @staticmethod - def _compute_objval_cos(sim_data, exp_data): - """Return Cosine distance""" - return cosine(sim_data, exp_data) - - @staticmethod - def _diff_sim_and_exp(sim_matrix, exp_dict, exp_timepoint, conditions, sim_norm_max): - sim_val = [] - exp_val = [] - - for idx, condition in enumerate(conditions): - if condition in exp_dict.keys(): - sim_val.extend(sim_matrix[list(map(int, exp_timepoint)), idx]) - exp_val.extend(exp_dict[condition]) - - return np.array(sim_val) / sim_norm_max, np.array(exp_val) - - def objective(self, indiv, *args): - """Define an objective function to be minimized""" - if len(args) == 0: - (x, y0) = self.update(indiv) - elif len(args) == 1: - raise ValueError("not enough values to unpack (expected 2, got 1)") - elif len(args) == 2: - (x, y0) = args - else: - raise ValueError("too many values to unpack (expected 2)") - - self.set_data() - - if self.simulate(x, y0) is None: - error = np.zeros(len(self.obs_names)) - for i, obs_name in enumerate(self.obs_names): - if self.experiments[i] is not None: - error[i] = self._compute_objval_rss( - *self._diff_sim_and_exp( - self.simulations[i], - self.experiments[i], - self.get_timepoint(obs_name), - self.conditions, - sim_norm_max=1 - if not self.normalization - else ( - np.max( - self.simulations[ - self.obs_names.index(obs_name), - self.normalization[obs_name]["timepoint"], - [ - self.conditions.index(c) - for c in ( - self.normalization[obs_name]["condition"] - if self.normalization[obs_name]["condition"] - else self.conditions - ) - ], - ] - ) - if self.normalization[obs_name]["timepoint"] is not None - else np.max( - self.simulations[ - self.obs_names.index(obs_name), - :, - [ - self.conditions.index(c) - for c in ( - self.normalization[obs_name]["condition"] - if self.normalization[obs_name]["condition"] - else self.conditions - ) - ], - ] - ) - ), - ) - ) - return np.sum(error) # < 1e12 - else: - return 1e12 diff --git a/pasmopy/construction/template/name2idx/__init__.py b/pasmopy/construction/template/name2idx/__init__.py deleted file mode 100644 index 3869ddc..0000000 --- a/pasmopy/construction/template/name2idx/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .parameters import C -from .species import V diff --git a/pasmopy/construction/template/name2idx/parameters.py b/pasmopy/construction/template/name2idx/parameters.py deleted file mode 100644 index cb3917c..0000000 --- a/pasmopy/construction/template/name2idx/parameters.py +++ /dev/null @@ -1,19 +0,0 @@ -from dataclasses import make_dataclass -from typing import Dict, List - -NAMES: List[str] = [] - -NUM: int = len(NAMES) - -Parameters = make_dataclass( - cls_name="Parameters", - fields=[(name, int) for name in NAMES], - namespace={"NAMES": NAMES, "NUM": NUM}, - frozen=True, -) - -name2idx: Dict[str, int] = {k: v for v, k in enumerate(NAMES)} - -C = Parameters(**name2idx) - -del name2idx diff --git a/pasmopy/construction/template/name2idx/species.py b/pasmopy/construction/template/name2idx/species.py deleted file mode 100644 index 8109670..0000000 --- a/pasmopy/construction/template/name2idx/species.py +++ /dev/null @@ -1,19 +0,0 @@ -from dataclasses import make_dataclass -from typing import Dict, List - -NAMES: List[str] = [] - -NUM: int = len(NAMES) - -Species = make_dataclass( - cls_name="Species", - fields=[(name, int) for name in NAMES], - namespace={"NAMES": NAMES, "NUM": NUM}, - frozen=True, -) - -name2idx: Dict[str, int] = {k: v for v, k in enumerate(NAMES)} - -V = Species(**name2idx) - -del name2idx diff --git a/pasmopy/construction/template/observable.py b/pasmopy/construction/template/observable.py deleted file mode 100644 index a665a23..0000000 --- a/pasmopy/construction/template/observable.py +++ /dev/null @@ -1,74 +0,0 @@ -import numpy as np -from biomass.dynamics.solver import * - -from .name2idx import C, V -from .set_model import DifferentialEquation - - -class Observable(DifferentialEquation): - """ - Correlating model simulations and experimental measurements. - - Attributes - ---------- - obs_names : list of strings - Names of model observables. - - t : range - Simulation time span. - - conditions : list of strings - Expetimental conditions. - - simulations : numpy.ndarray - The numpy array to store simulation results. - - normalization : nested dict - * 'timepoint' : Optional[int] - The time point at which simulated values are normalized. - If None, the maximum value will be used for normalization. - - * 'condition' : list of strings - The experimental conditions to use for normalization. - If empty, all conditions defined in sim.conditions will be used. - - experiments : list of dict - Time series data. - - error_bars : list of dict - Error bars to show in figures. - - """ - - def __init__(self): - super(Observable, self).__init__(perturbation={}) - self.obs_names: list = [] - self.t: range = range(101) - self.conditions: list = [] - self.simulations: np.ndarray = np.empty( - (len(self.obs_names), len(self.t), len(self.conditions)) - ) - self.normalization: dict = {} - self.experiments: list = [None] * len(self.obs_names) - self.error_bars: list = [None] * len(self.obs_names) - - def simulate(self, x: list, y0: list, _perturbation: dict = {}): - if _perturbation: - self.perturbation = _perturbation - # unperturbed steady state - - for i, condition in enumerate(self.conditions): - - sol = solve_ode(self.diffeq, y0, self.t, tuple(x)) - - if sol is None: - return False - else: - pass - - def set_data(self): - pass - - def get_timepoint(self, obs_name: str): - if obs_name in self.obs_names: - return [] diff --git a/pasmopy/construction/template/reaction_network.py b/pasmopy/construction/template/reaction_network.py deleted file mode 100644 index cb99b1b..0000000 --- a/pasmopy/construction/template/reaction_network.py +++ /dev/null @@ -1,26 +0,0 @@ -from typing import Dict, List - -from biomass.analysis.reaction import is_duplicate - - -class ReactionNetwork(object): - """ - Reaction indices grouped according to biological processes. - This is used for sensitivity analysis (target='reaction'). - """ - - def __init__(self) -> None: - self.reactions: Dict[str, List[int]] = {} - - def group(self): - """ - Group reactions according to biological processes - """ - biological_processes = [] - for process, indices in self.reactions.items(): - if not isinstance(indices, list): - raise TypeError("Use list for reaction indices in {}".format(process)) - biological_processes.append(indices) - - if not is_duplicate(self.reactions, biological_processes): - return biological_processes diff --git a/pasmopy/construction/template/set_model.py b/pasmopy/construction/template/set_model.py deleted file mode 100644 index 02a2d6e..0000000 --- a/pasmopy/construction/template/set_model.py +++ /dev/null @@ -1,34 +0,0 @@ -from .name2idx import C, V - - -class DifferentialEquation(object): - def __init__(self, perturbation): - super(DifferentialEquation, self).__init__() - self.perturbation = perturbation - - def diffeq(self, t, y, *x): - """Kinetic equations""" - # v : flux vector - v = {} - - if self.perturbation: - for i, dv in self.perturbation.items(): - v[i] = v[i] * dv - - dydt = [0] * V.NUM - - return dydt - - -def param_values(): - """Parameter values""" - x = [1] * C.NUM - - return x - - -def initial_values(): - """Values of the initial condition""" - y0 = [0] * V.NUM - - return y0 diff --git a/pasmopy/construction/template/set_search_param.py b/pasmopy/construction/template/set_search_param.py deleted file mode 100644 index ee0bd99..0000000 --- a/pasmopy/construction/template/set_search_param.py +++ /dev/null @@ -1,77 +0,0 @@ -import numpy as np -from biomass.estimation import convert_scale, initialize_search_param - -from .name2idx import C, V -from .set_model import initial_values, param_values - - -class SearchParam(object): - """Specify model parameters and/or initial values to optimize""" - - def __init__(self): - # parameters - self.idx_params = [] - - # initial values - self.idx_initials = [] - - def get_region(self): - x = param_values() - y0 = initial_values() - - search_param = initialize_search_param( - parameters=C.NAMES, - species=V.NAMES, - param_values=x, - initial_values=y0, - estimated_params=self.idx_params, - estimated_initials=self.idx_initials, - ) - - search_rgn = np.zeros((2, len(x) + len(y0))) - # Default: 0.1 ~ 10 - for i, j in enumerate(self.idx_params): - search_rgn[0, j] = search_param[i] * 0.1 # lower bound - search_rgn[1, j] = search_param[i] * 10.0 # upper bound - # Default: 0.5 ~ 2 - for i, j in enumerate(self.idx_initials): - search_rgn[0, j + len(x)] = search_param[i + len(self.idx_params)] * 0.5 # lower bound - search_rgn[1, j + len(x)] = search_param[i + len(self.idx_params)] * 2.0 # upper bound - - # search_rgn[:,C.parameter] = [lower_bound, upper_bound] - # search_rgn[:,V.specie+len(x)] = [lower_bound, upper_bound] - - search_rgn = convert_scale( - region=search_rgn, - parameters=C.NAMES, - species=V.NAMES, - estimated_params=self.idx_params, - estimated_initials=self.idx_initials, - ) - - return search_rgn - - def update(self, indiv): - x = param_values() - y0 = initial_values() - - for i, j in enumerate(self.idx_params): - x[j] = indiv[i] - for i, j in enumerate(self.idx_initials): - y0[j] = indiv[i + len(self.idx_params)] - - # parameter constraints - - return x, y0 - - def gene2val(self, indiv_gene): - search_rgn = self.get_region() - indiv = 10 ** (indiv_gene * (search_rgn[1, :] - search_rgn[0, :]) + search_rgn[0, :]) - - return indiv - - def val2gene(self, indiv): - search_rgn = self.get_region() - indiv_gene = (np.log10(indiv) - search_rgn[0, :]) / (search_rgn[1, :] - search_rgn[0, :]) - - return indiv_gene diff --git a/pasmopy/construction/template/viz.py b/pasmopy/construction/template/viz.py deleted file mode 100644 index c94b7f2..0000000 --- a/pasmopy/construction/template/viz.py +++ /dev/null @@ -1,76 +0,0 @@ -from biomass.plotting import * -from matplotlib import pyplot as plt - -from .observable import Observable - - -class Visualization(Observable): - """ - Plotting parameters for customizing figure properties. - - Attributes - ---------- - cm : matplotlib.colors.ListedColormap (default: `plt.cm.get_cmap('tab10')`) - Choosing colormaps for `cmap`. - single_observable_options : list of SingleObservable - Visualization options for time-course simulation (single-observable). - multiple_observables_options : MultipleObservables - Visualization options for time-course simulation (multi-observables). - sensitivity_options : SensitivityOptions - Visualization options for sensitivity analysis results. - """ - - def __init__(self): - super().__init__() - - self.cm = plt.cm.get_cmap("tab10") - self.single_observable_options = [ - SingleObservable(self.cm, obs_name) for obs_name in self.obs_names - ] - self.multiple_observables_options = MultipleObservables(self.cm) - self.sensitivity_options = SensitivityOptions(self.cm) - - def get_single_observable_options(self): - - return self.single_observable_options - - def get_multiple_observables_options(self): - - return self.multiple_observables_options - - def get_sensitivity_options(self): - - return self.sensitivity_options - - @staticmethod - def set_timecourse_rcParams(): - """figure/simulation""" - plt.rcParams["font.size"] = 12 - plt.rcParams["axes.linewidth"] = 1.5 - plt.rcParams["xtick.major.width"] = 1.5 - plt.rcParams["ytick.major.width"] = 1.5 - plt.rcParams["lines.linewidth"] = 1.8 - plt.rcParams["lines.markersize"] = 12 - plt.rcParams["savefig.bbox"] = "tight" - # plt.rcParams["savefig.format"] = "pdf" - # plt.rcParams['font.family'] = 'Arial' - # plt.rcParams['mathtext.fontset'] = 'custom' - # plt.rcParams['mathtext.it'] = 'Arial:italic' - - @staticmethod - def set_sensitivity_rcParams(): - """figure/sensitivity""" - plt.rcParams["font.size"] = 12 - plt.rcParams["axes.linewidth"] = 1.2 - plt.rcParams["xtick.major.width"] = 1.2 - plt.rcParams["ytick.major.width"] = 1.2 - plt.rcParams["savefig.bbox"] = "tight" - # plt.rcParams["savefig.format"] = "pdf" - # plt.rcParams['font.family'] = 'Arial' - - @staticmethod - def convert_species_name(name): - """figure/sensitivity/initial_condition - - Sensitivity for species with nonzero initial conditions - """ - return name diff --git a/pasmopy/construction/text2model.py b/pasmopy/construction/text2model.py deleted file mode 100644 index 5a29c93..0000000 --- a/pasmopy/construction/text2model.py +++ /dev/null @@ -1,822 +0,0 @@ -import os -import re -import shutil -from dataclasses import dataclass, field -from typing import List - -from . import julia_template as jl -from .reaction_rules import ReactionRules - - -@dataclass -class Text2Model(ReactionRules): - """Build a BioMASS-formatted model based on template. - - Attributes - ---------- - input_txt : str - Model description file (*.txt), e.g., 'Kholodenko_JBC_1999.txt' - - lang : str (default: 'python') - Either 'python' or 'julia'. - - - 'python': biomass (https://github.com/biomass-dev/biomass) - - 'julia': BioMASS.jl (https://github.com/biomass-dev/BioMASS.jl) - """ - - input_txt: str - lang: str = "python" - indentation: str = field(default=4 * " ", init=False) - - def __post_init__(self) -> None: - if not os.path.isfile(self.input_txt): - raise FileNotFoundError(f"{self.input_txt} does not exist.") - if self.lang not in ["python", "julia"]: - raise ValueError("lang must be either 'python' or 'julia'.") - self.name: str = os.path.splitext(self.input_txt)[0] - - def _update_parameters(self) -> None: - """ - Update name2idx/parameters.py - """ - if self.lang == "python": - with open( - os.path.join( - os.path.dirname(__file__), - "template", - "name2idx", - "parameters.py", - ), - encoding="utf-8", - mode="r", - ) as f: - lines = f.readlines() - for line_num, line in enumerate(lines): - if line.startswith("NAMES: List[str] = []"): - lines[line_num] = "NAMES: List[str] = [\n" - lines[line_num] += ( - f'{self.indentation}"' - + f'",\n{self.indentation}"'.join(self.parameters) - + '",\n' - ) - lines[line_num] += "]\n" - with open( - os.path.join( - f"{self.name}", - "name2idx", - "parameters.py", - ), - encoding="utf-8", - mode="w", - ) as f: - f.writelines(lines) - else: - lines = jl.PARAMETERS.splitlines() - for line_num, line in enumerate(lines): - if line.startswith("const NAMES = []"): - lines[line_num] = "const NAMES = [\n" - lines[line_num] += ( - f'{self.indentation}"' - + f'",\n{self.indentation}"'.join(self.parameters) - + '",\n' - ) - lines[line_num] += "]\n" - with open( - os.path.join( - f"{self.name}_jl", - "name2idx", - "parameters.jl", - ), - encoding="utf-8", - mode="w", - ) as f: - f.write("\n".join(lines)) - - def _update_species(self) -> None: - """ - Update name2idx/species.py - """ - if self.lang == "python": - with open( - os.path.join( - os.path.dirname(__file__), - "template", - "name2idx", - "species.py", - ), - encoding="utf-8", - mode="r", - ) as f: - lines = f.readlines() - for line_num, line in enumerate(lines): - if line.startswith("NAMES: List[str] = []"): - lines[line_num] = "NAMES: List[str] = [\n" - lines[line_num] += ( - f'{self.indentation}"' - + '",\n{}"'.format(self.indentation).join(self.species) - + '",\n' - ) - lines[line_num] += "]\n" - with open( - os.path.join( - f"{self.name}", - "name2idx", - "species.py", - ), - encoding="utf-8", - mode="w", - ) as f: - f.writelines(lines) - else: - lines = jl.SPECIES.splitlines() - for line_num, line in enumerate(lines): - if line.startswith("const NAMES = []"): - lines[line_num] = "const NAMES = [\n" - lines[line_num] += ( - '{}"'.format(self.indentation) - + '",\n{}"'.format(self.indentation).join(self.species) - + '",\n' - ) - lines[line_num] += "]\n" - with open( - os.path.join( - f"{self.name}_jl", - "name2idx", - "species.jl", - ), - encoding="utf-8", - mode="w", - ) as f: - f.write("\n".join(lines)) - - def _update_set_model(self) -> None: - """Update set_model.py - - If you don't write info., all parameter values and initial values are - initialized to 1.0 and 0.0, respectively. - """ - if self.lang == "python": - with open( - os.path.join(os.path.dirname(__file__), "template", "set_model.py"), - encoding="utf-8", - mode="r", - ) as f: - lines = f.readlines() - for line_num, line in enumerate(lines): - if line.startswith(2 * self.indentation + "v = {}\n"): - # Write flux vector: v - lines[line_num + 1] = ( - 2 * self.indentation - + f"\n{2 * self.indentation}".join(self.reactions) - + "\n\n" - ) - elif line.startswith(2 * self.indentation + "dydt = [0] * V.NUM\n"): - # Write right-hand side of the differential equation: dydt - lines[line_num + 1] = ( - 2 * self.indentation - + f"\n{2 * self.indentation}".join(self.differential_equations) - + "\n\n" - ) - elif line.startswith(self.indentation + "return x"): - if not self.param_info: - # Set all parameter values = 1.0 if param_info is not given - lines[line_num - 1] = ( - f"{self.indentation + 'x[C.'}" - + f"] = 1.0\n{self.indentation + 'x[C.'}".join(self.parameters) - + "] = 1.0\n\n" - ) - else: - # Set parameter values using param_info - lines[line_num - 1] = ( - "{}".format(self.indentation) - + "\n{}".format(self.indentation).join(self.param_info) - + "\n\n" - ) - elif line.startswith(self.indentation + "return y0"): - if self.init_info: - # Set initial values using init_info - lines[line_num - 1] = ( - "{}".format(self.indentation) - + "\n{}".format(self.indentation).join(self.init_info) - + "\n\n" - ) - with open( - os.path.join( - f"{self.name}", - "set_model.py", - ), - encoding="utf-8", - mode="w", - ) as f: - f.writelines(lines) - else: - lines = jl.SET_MODEL.splitlines() - for line_num, line in enumerate(lines): - if line.startswith(self.indentation + "v = Dict{Int64,Float64}()"): - # Write flux vector: v - lines[line_num + 1] = ( - self.indentation + f"\n{self.indentation}".join(self.reactions) + "\n" - ) - lines[line_num + 1] = ( - lines[line_num + 1] - .replace("x[C.", "p[C.") - .replace("y[V.", "u[V.") - .replace("**", "^") - ) - # Write right-hand side of the differential equation: dydt - lines[line_num + 5] = ( - self.indentation - + f"\n{self.indentation}".join(self.differential_equations) - + "\n" - ).replace("dydt[V.", "du[V.") - elif line.startswith(self.indentation + "return p"): - if not self.param_info: - # Set all parameter values = 1.0 if param_info is not given - lines[line_num - 1] = ( - f"{self.indentation + 'p[C.'}" - + f"] = 1.0\n{self.indentation + 'p[C.'}".join(self.parameters) - + "] = 1.0\n\n" - ) - else: - # Set parameter values using param_info - lines[line_num - 1] = ( - "{}".format(self.indentation) - + "\n{}".format(self.indentation).join(self.param_info) - + "\n\n" - ).replace("x[C.", "p[C.") - elif line.startswith(self.indentation + "return u0"): - if self.init_info: - # Set initial values using init_info - lines[line_num - 1] = ( - "{}".format(self.indentation) - + "\n{}".format(self.indentation).join(self.init_info) - + "\n\n" - ).replace("y0[V.", "u0[V.") - with open( - os.path.join( - f"{self.name}_jl", - "set_model.jl", - ), - encoding="utf-8", - mode="w", - ) as f: - f.write("\n".join(lines)) - - def _update_set_search_param(self) -> None: - """ - Update set_search_param.py - """ - if self.lang == "python": - with open( - os.path.join(os.path.dirname(__file__), "template", "set_search_param.py"), - encoding="utf-8", - mode="r", - ) as f: - lines = f.readlines() - for line_num, line in enumerate(lines): - if "self.idx_params = []" in line: - # Write all parameters in idx_params (except for param_excluded) - lines[line_num] = f"{2 * self.indentation}self.idx_params = [\n" - lines[line_num] += ( - "{}".format(3 * self.indentation + "C.") - + ",\n{}".format(3 * self.indentation + "C.").join(self.parameters) - + ",\n" - ) - lines[line_num] += f"{2 * self.indentation}]\n" - for param_name in self.param_excluded: - # Comment out parameters in param_excluded - lines[line_num] = lines[line_num].replace( - f"C.{param_name},", f"# C.{param_name}," - ) - elif "# parameter constraints" in line: - # Add parameter constraints - lines[line_num + 1] = ( - "{}".format(2 * self.indentation) - + "\n{}".format(2 * self.indentation).join(self.param_constraints) - + "\n\n" - ) - with open( - os.path.join(f"{self.name}", "set_search_param.py"), - encoding="utf-8", - mode="w", - ) as f: - f.writelines(lines) - else: - lines = jl.SET_SEARCH_PARAM.splitlines() - for line_num, line in enumerate(lines): - if "search_idx_params::Vector{Int} = []" in line: - # Write all parameters in idx_params (except for param_excluded) - lines[line_num] = ( - f"{self.indentation}search_idx_params" + "::Vector{Int} = [\n" - ) - lines[line_num] += ( - "{}".format(2 * self.indentation + "C.") - + ",\n{}".format(2 * self.indentation + "C.").join(self.parameters) - + ",\n" - ) - lines[line_num] += f"{self.indentation}]\n" - for param_name in self.param_excluded: - # Comment out parameters in param_excluded - lines[line_num] = lines[line_num].replace( - f"C.{param_name},", f"# C.{param_name}," - ) - elif "# parameter constraints" in line: - # Add parameter constraints - lines[line_num + 1] = ( - "{}".format(self.indentation) - + "\n{}".format(self.indentation).join(self.param_constraints) - + "\n" - ).replace("x[C.", "p[C.") - with open( - os.path.join(f"{self.name}_jl", "set_search_param.jl"), - encoding="utf-8", - mode="w", - ) as f: - f.write("\n".join(lines)) - - def _convert_names( - self, - line: str, - p: List[str] = [], - u: List[str] = [], - init: List[str] = [], - ) -> str: - """ - Replace - - p[xxx] with x[C.xxx] - - u[xxx] with Y[:, V.xxx] - - init[xxx] with y0[V.xxx] - - Parameters - ---------- - p : list of strings (default: []) - Parameters. - - u : list of strings (default: []) - Species. - - init : list of strings (default: []) - Initial conditions. - - Returns - ------- - line : string - Each line. - - """ - for p_name in p: - if not p_name.strip() in self.parameters: - raise NameError(f"{p_name.strip()} is not defined in model parameters.") - else: - line = ( - line.replace(f"p[{p_name.strip()}]", f"x[C.{p_name.strip()}]") - if self.lang == "python" - else line.replace(f"p[{p_name.strip()}]", f"p[C.{p_name.strip()}]") - ) - for s_name in u: - if not s_name.strip() in self.species: - raise NameError(f"{s_name.strip()} is not defined in model species.") - else: - line = ( - line.replace(f"u[{s_name.strip()}]", f"sol.y[V.{s_name.strip()}, :]") - if self.lang == "python" - else line.replace(f"u[{s_name.strip()}]", f"sol.u[j][V.{s_name.strip()}]") - ) - for s_name in init: - if not s_name.strip() in self.species: - raise NameError(f"{s_name.strip()} is not defined in model species.") - else: - line = ( - line.replace(f"init[{s_name.strip()}]", f"y0[V.{s_name.strip()}]") - if self.lang == "python" - else line.replace(f"init[{s_name.strip()}]", f"u0[V.{s_name.strip()}]") - ) - return line - - def _update_observable(self) -> None: - """ - Update observable.py - """ - if self.lang == "python": - with open( - os.path.join(os.path.dirname(__file__), "template", "observable.py"), - encoding="utf-8", - mode="r", - ) as f: - lines = f.readlines() - if not self.sim_conditions: - # When sim condition is not given, add 'sim condition control: pass' - self.sim_conditions = [["control", "pass"]] - for line_num, line in enumerate(lines): - if line.startswith(f"{2 * self.indentation}self.obs_names: list = []"): - # Write observables - lines[line_num] = f"{2 * self.indentation}self.obs_names: list = [\n" - lines[line_num] += ( - f"{3 * self.indentation}'" - + f"',\n{3 * self.indentation}'".join( - [desc[0].strip() for desc in self.obs_desc] - ) - + "',\n" - ) - lines[line_num] += f"{2 * self.indentation}]\n" - elif f"self.t: range = range(101)" in line: - # Write interval of integration - if self.sim_tspan: - lines[line_num] = "{}self.t: range = range({}, {}+1)\n".format( - 2 * self.indentation, - self.sim_tspan[0].strip(), - self.sim_tspan[1].strip(), - ) - elif line.startswith(f"{2 * self.indentation}self.conditions: list = []"): - # Write sim.condition - lines[line_num] = f"{2 * self.indentation}self.conditions: list = [\n" - lines[line_num] += ( - f"{3 * self.indentation}'" - + f"',\n{3 * self.indentation}'".join( - [condition[0].strip() for condition in self.sim_conditions] - ) - + "',\n" - ) - lines[line_num] += f"{2 * self.indentation}]\n" - elif "# unperturbed steady state" in line: - if self.sim_unperturbed: - lines[line_num + 1] = ( - 2 * self.indentation - + f"\n{2 * self.indentation}".join( - c.strip() for c in self.sim_unperturbed.split(sep=";") - ) - + "\n" - ) - lines[line_num + 1] += ( - 2 * self.indentation - + "y0 = self._get_steady_state(self.diffeq, y0, tuple(x))\n" - + 2 * self.indentation - + f"if not y0:\n{3 * self.indentation}return False\n" - ) - # pa: parameters - # init: initial conditions - lines[line_num + 1] = self._convert_names( - line=lines[line_num + 1], - p=re.findall(r"p\[(.*?)\]", self.sim_unperturbed), - init=re.findall(r"init\[(.*?)\]", self.sim_unperturbed), - ) - elif "for i, condition in enumerate(self.conditions):" in line: - for i, condition in enumerate(self.sim_conditions): - # Use ';' for adding description of each condition - if i == 0: - lines[line_num + 1] = ( - 3 * self.indentation - + f'if condition == \'{condition[0].strip(" ")}\':\n' - + 4 * self.indentation - + f"\n{4 * self.indentation}".join( - c.strip(" ") for c in condition[1].split(sep=";") - ) - + ("\n\n" if len(self.sim_conditions) == 1 else "") - ) - else: - lines[line_num + 1] += ( - 3 * self.indentation - + f'elif condition == \'{condition[0].strip(" ")}\':\n' - + 4 * self.indentation - + f"\n{4 * self.indentation}".join( - c.strip(" ") for c in condition[1].split(sep=";") - ) - + "\n\n" - ) - # pa: parameters - # init: initial conditions - lines[line_num + 1] = self._convert_names( - line=lines[line_num + 1], - p=re.findall(r"p\[(.*?)\]", condition[1]), - init=re.findall(r"init\[(.*?)\]", condition[1]), - ) - elif "sol is None:" in line: - lines[line_num + 3] = "" # initialization (default: pass) - for desc in self.obs_desc: - lines[line_num + 3] += ( - "{}".format(4 * self.indentation) - + "self.simulations" - + f"[self.obs_names.index('{desc[0].strip()}'), :, i] = (\n" - + f"{5 * self.indentation}" - + desc[1].strip(" ").strip() - ) - # p: parameters - # u: species - lines[line_num + 3] = self._convert_names( - line=lines[line_num + 3], - p=re.findall(r"p\[(.*?)\]", desc[1]), - u=re.findall(r"u\[(.*?)\]", desc[1]), - ) - lines[line_num + 3] += "\n{}".format(4 * self.indentation + ")\n") - with open( - os.path.join(f"{self.name}", "observable.py"), - encoding="utf-8", - mode="w", - ) as f: - f.writelines(lines) - else: - self._split_observables_julia() - - def _split_observables_julia(self) -> None: - """ - Create observable.jl, simulation.jl and experimental_data.jl - """ - # observable.jl - lines = jl.OBSERVABLE.splitlines() - for line_num, line in enumerate(lines): - if "const observables = []" in line: - lines[line_num] = "const observables = [\n" - lines[line_num + 1] = ( - f'{self.indentation}"' - + f'",\n{self.indentation}"'.join([desc[0].strip() for desc in self.obs_desc]) - + '",\n' - ) - lines[line_num + 1] += "]\n" - with open( - os.path.join(f"{self.name}_jl", "observable.jl"), - encoding="utf-8", - mode="w", - ) as f: - f.write("\n".join(lines)) - # simulation.jl - lines = jl.SIMULATION.splitlines() - for line_num, line in enumerate(lines): - if line.startswith("const t = collect(0.0:dt:100.0)"): - # Write interval of integration - if self.sim_tspan: - lines[line_num] = "const t = collect({}:dt:{})".format( - self.sim_tspan[0].strip(), - self.sim_tspan[1].strip(), - ) - elif line.startswith("const conditions = []"): - # Write sim.condition - lines[line_num] = "const conditions = [" - lines[line_num + 1] = ( - f'{self.indentation}"' - + f'",\n{self.indentation}"'.join( - [condition[0].strip() for condition in self.sim_conditions] - ) - + '",\n' - ) - lines[line_num + 1] += "]\n" - elif "# unperturbed steady state" in line: - if self.sim_unperturbed: - lines[line_num + 1] = ( - self.indentation - + f"\n{self.indentation}".join( - c.strip() for c in self.sim_unperturbed.split(sep=";") - ) - + "\n" - ) - lines[line_num + 1] += ( - f"{self.indentation}u0 = get_steady_state(diffeq!, u0, p)\n" - + f"{self.indentation}if isempty(u0)\n" - + f"{2 * self.indentation}return false\n" - + f"{self.indentation}end\n" - ) - # p: parameters - # init: initial conditions - lines[line_num + 1] = self._convert_names( - line=lines[line_num + 1], - p=re.findall(r"p\[(.*?)\]", self.sim_unperturbed), - init=re.findall(r"init\[(.*?)\]", self.sim_unperturbed), - ) - elif "for (i, condition) in enumerate(conditions)" in line: - for i, condition in enumerate(self.sim_conditions): - # Use ';' for adding description of each condition - if i == 0: - lines[line_num + 1] = ( - 3 * self.indentation - + f'if condition == "{condition[0].strip(" ")}"\n' - + 4 * self.indentation - + f"\n{4 * self.indentation}".join( - c.strip(" ") for c in condition[1].split(sep=";") - ) - + ("\n\n" if len(self.sim_conditions) == 1 else "") - ) - else: - lines[line_num + 1] += ( - 3 * self.indentation - + f'elseif condition == "{condition[0].strip(" ")}"\n' - + 4 * self.indentation - + f"\n{4 * self.indentation}".join( - c.strip(" ") for c in condition[1].split(sep=";") - ) - + f"\n{3 * self.indentation}end\n" - ) - # p: parameters - # init: initial conditions - lines[line_num + 1] = self._convert_names( - line=lines[line_num + 1], - p=re.findall(r"p\[(.*?)\]", condition[1]), - init=re.findall(r"init\[(.*?)\]", condition[1]), - ) - elif "if sol === nothing" in line: - lines[line_num + 4] = "" # initialization (default: # line_num + 4) - for desc in self.obs_desc: - lines[line_num + 4] += ( - "{}".format(4 * self.indentation) - + "simulations" - + f'[observables_index("{desc[0].strip()}"), j, i] = (\n' - + f"{5 * self.indentation}" - + desc[1].strip(" ").strip() - ) - # p: parameters - # u: species - lines[line_num + 4] = self._convert_names( - line=lines[line_num + 4], - p=re.findall(r"p\[(.*?)\]", desc[1]), - u=re.findall(r"u\[(.*?)\]", desc[1]), - ) - lines[line_num + 4] += "\n{}".format(4 * self.indentation + ")\n") - with open( - os.path.join(f"{self.name}_jl", "simulation.jl"), - encoding="utf-8", - mode="w", - ) as f: - f.write("\n".join(lines)) - # experimental_data.jl - lines = jl.EXPERIMENTAL_DATA.splitlines() - with open( - os.path.join(f"{self.name}_jl", "experimental_data.jl"), - encoding="utf-8", - mode="w", - ) as f: - f.write("\n".join(lines)) - - def convert(self, overwrite: bool = False) -> None: - """ - Convert text to a biomass-formatted model. - - Parameters - ---------- - overwrite : bool (defauld: False) - If True, the model folder will be overwritten. - - Examples - -------- - >>> from pasmopy import Text2Model - >>> Text2Model("Kholodenko_JBC_1999.txt").convert() - - """ - if overwrite and os.path.isdir( - f"{self.name}" if self.lang == "python" else f"{self.name}_jl" - ): - shutil.rmtree(f"{self.name}" if self.lang == "python" else f"{self.name}_jl") - if self.lang == "python": - shutil.copytree( - os.path.join( - os.path.dirname(__file__), - "template", - ), - f"{self.name}", - ) - else: - os.makedirs(os.path.join(f"{self.name}_jl", "name2idx")) - - self.create_ode() - self._update_parameters() - self._update_species() - self._update_set_model() - self._update_set_search_param() - self._update_observable() - if self.lang == "julia": - # Create fitness.jl - lines = jl.FITNESS.splitlines() - with open( - os.path.join(f"{self.name}_jl", "fitness.jl"), - encoding="utf-8", - mode="w", - ) as f: - f.write("\n".join(lines)) - print( - "Model Information\n" - "-----------------\n" - f"{len(self.reactions):d} reactions\n" - f"{len(self.species):d} species\n" - f"{len(self.parameters):d} parameters" - ) - - def to_markdown(self, n_reaction: int) -> None: - """ - Create markdown table describing differential equations. - - Parameters - ---------- - n_reaction : int - The number of rate equations in the model. - - Examples - -------- - >>> from pasmopy import Text2Model - >>> Text2Model("Kholodenko_JBC_1999.txt").to_markdown(25) - - """ - os.makedirs(os.path.join("markdown", f"{self.name.split(os.sep)[-1]}"), exist_ok=True) - self.create_ode() - with open(self.input_txt, encoding="utf-8") as f: - lines = f.readlines() - for num, line in enumerate(lines): - if n_reaction <= num: - break - else: - rate_equation_formatted = f"{self.reactions[num]}".split("=")[1].strip() - for p_names in self.parameters: - if f" ** x[C.{p_names.strip()}]" in f"{self.reactions[num]}": - # Hill coefficients - rate_equation_formatted = rate_equation_formatted.replace( - f" ** x[C.{p_names.strip()}]", - f"{p_names.strip()}".replace( - f"{num + 1:d}", f"{num + 1:d}" - ), - ) - elif f"x[C.{p_names.strip()}]" in f"{self.reactions[num]}": - # Others - rate_equation_formatted = rate_equation_formatted.replace( - f"x[C.{p_names.strip()}]", - f"{p_names.strip()}".replace( - f"{num + 1:d}", f"{num + 1:d}" - ), - ) - lines[num] = ( - f"|{num + 1:d}|" - + line.split("|")[0].rstrip() - + f"|{rate_equation_formatted.replace('*', '·').replace('y[V.', '[')}|" - + "\n" - ) - with open( - os.path.join("markdown", f"{self.name.split(os.sep)[-1]}", "rate_equation.md"), - encoding="utf-8", - mode="w", - ) as f: - f.writelines( - [ - "|No.|Reactions|Rate equations|\n|---|---------|--------------|\n", - *lines[:n_reaction], - ] - ) - # Differential equation - differential_equations_formatted = [ - eq.replace("*", "·") for eq in self.differential_equations - ] - for i, eq in enumerate(differential_equations_formatted): - for s_name in self.species: - if f"dydt[V.{s_name.strip()}]" in eq: - eq = eq.replace( - f"dydt[V.{s_name.strip()}]", - f"|{i + 1:d}|d[{s_name.strip()}]/dt", - ) - break - for num in range(n_reaction): - if f"v[{num + 1:d}]" in eq: - eq = eq.replace( - f"v[{num + 1:d}]", - f"_v_ {num + 1:d}", - ) - differential_equations_formatted[i] = eq + "|\n" - with open( - os.path.join("markdown", f"{self.name.split(os.sep)[-1]}", "differential_equation.md"), - encoding="utf-8", - mode="w", - ) as f: - f.writelines( - [ - "|No.|Differential equations|\n|---|----------------------|\n", - *differential_equations_formatted, - ] - ) - - def register_word(self, rxn_rule: str, my_word: str) -> None: - """ - Register user-defined rule word. - - Parameters - ---------- - rxn_rule : str - Define a rule to which users register a new rule word. - - my_word : str - User-defined rule word. - - Examples - -------- - >>> from pasmopy import Text2Model - >>> mm_kinetics = Text2Model("michaelis_menten.txt") - >>> mm_kinetics.register_word("is_dissociated", "releases") - >>> mm_kinetics.convert() - - """ - if rxn_rule not in self.rule_words.keys(): - raise ValueError( - f"{rxn_rule} is not defined in reaction_rules.\n" - f"Choose a reaction rule from {', '.join(map(str, self.rule_words.keys()))}" - ) - for rule, words in self.rule_words.items(): - for registered_word in words: - if " " + my_word in registered_word and registered_word in " " + my_word: - raise NameError( - f"Cannot register '{my_word}'. " - f"Currently, it is used in the rule: {rule}" - ) - else: - self.rule_words[rxn_rule].append(" " + my_word) diff --git a/pasmopy/individualization.py b/pasmopy/individualization.py deleted file mode 100644 index 1b4212d..0000000 --- a/pasmopy/individualization.py +++ /dev/null @@ -1,142 +0,0 @@ -from dataclasses import dataclass, field -from typing import Dict, List, Optional - -import pandas as pd - - -@dataclass -class Individualization(object): - """ - Individualize a mechanistic model by incorporating gene expression levels. - - Attributes - ---------- - parameters : List[str] - List of model parameters. - - species : List[str] - List of model species. - - transcriptomic_data : str - Path to normalized gene expression data (CSV-formatted), - e.g., (1) RLE-normalized and (2) post-ComBat TPM values. - - gene_expression : Dict[str, List[str]] - Pairs of proteins and their related genes. - - read_csv_kws : dict, optional - Keyword arguments to pass to pandas.read_csv. - - prefix : str (default: "w_") - Prefix of weighting factors on gene expression levels. - """ - - parameters: List[str] - species: List[str] - transcriptomic_data: str - gene_expression: Dict[str, List[str]] - read_csv_kws: Optional[dict] = field(default=None) - prefix: str = field(default="w_", init=False) - - def __post_init__(self) -> None: - kwargs = self.read_csv_kws - if kwargs is None: - kwargs = {} - self._expression_level: pd.DataFrame = pd.read_csv(self.transcriptomic_data, **kwargs) - - @property - def expression_level(self) -> pd.DataFrame: - return self._expression_level - - def _calculate_weighted_sum( - self, - id: str, - x: List[float], - ) -> Dict[str, float]: - """ - Incorporate gene expression levels in the model. - - Parameters - ---------- - id : str - CCLE_ID or TCGA_ID. - - x : List[float] - Model parameters. - - Returns - ------- - weighted_sum : Dict[str, float] - Estimated protein levels after incorporating transcriptomic data. - """ - weighted_sum = dict.fromkeys(self.gene_expression, 0.0) - for (protein, genes) in self.gene_expression.items(): - for gene in genes: - weighted_sum[protein] += ( - x[self.parameters.index(self.prefix + gene)] - * self.expression_level.at[gene, id] - ) - return weighted_sum - - def as_reaction_rate( - self, - id: str, - x: List[float], - param_name: str, - protein: str, - ) -> float: - """ - Gene expression levels are incorporated as a reaction rate. - - Parameters - ---------- - id : str - CCLE_ID or TCGA_ID. - - x : List[float] - List of parameter values. - - param_name : str - Parameter incorporating gene_expression_data. - - protein: str - Protein involved in the reaction. - - Returns - ------- - param_value : float - """ - weighted_sum = self._calculate_weighted_sum(id, x) - param_value = x[self.parameters.index(param_name)] - param_value *= weighted_sum[protein] - return param_value - - def as_initial_conditions( - self, - id: str, - x: List[float], - y0: List[float], - ) -> List[float]: - """ - Gene expression levels are incorporated as initial conditions. - - Parameters - ---------- - id : str - CCLE_ID or TCGA_ID. - - x : List[float] - List of parameter values. - - y0 : List[float] - List of initial values. - - Returns - ------- - y0 (individualized) : List[float] - Cell-line- or patient-specific initial conditions. - """ - weighted_sum = self._calculate_weighted_sum(id, x) - for protein in self.gene_expression.keys(): - y0[self.species.index(protein)] *= weighted_sum[protein] - return y0 diff --git a/pasmopy/patient_model.py b/pasmopy/patient_model.py deleted file mode 100644 index 66c1123..0000000 --- a/pasmopy/patient_model.py +++ /dev/null @@ -1,296 +0,0 @@ -import csv -import multiprocessing -import os -import platform -from dataclasses import dataclass, field -from typing import Callable, Dict, List, Optional, Union - -import numpy as np -import pandas as pd -import seaborn as sns -from biomass import Model, run_analysis, run_simulation -from scipy.integrate import simps -from tqdm import tqdm - - -@dataclass -class InSilico(object): - """ - Patient-specific in silico models. - - Attributes - ---------- - path_to_models : str - Path (dot-separated) to the directory containing patient-specific models. - - patients : list of strings - List of patients' names or identifiers. - """ - - path_to_models: str - patients: List[str] - - def __post_init__(self) -> None: - """ - Check for duplicates in self.patients. - """ - duplicate = [patient for patient in set(self.patients) if self.patients.count(patient) > 1] - if duplicate: - raise NameError(f"Duplicate patient: {', '.join(duplicate)}") - - def parallel_execute( - self, - func: Callable[[str], None], - n_proc: int, - ) -> None: - """ - Execute multiple models in parallel. - - Parameters - ---------- - func : Callable - Function executing a single patient-specific model. - - n_proc : int - The number of worker processes to use. - """ - - if platform.system() == "Darwin": - # fork() has always been unsafe on Mac - # spawn* functions should be instead - ctx = multiprocessing.get_context("spawn") - p = ctx.Pool(processes=n_proc) - else: - p = multiprocessing.Pool(processes=n_proc) - - with tqdm(total=len(self.patients)) as t: - for _ in p.imap_unordered(func, self.patients): - t.update(1) - p.close() - - -@dataclass -class PatientModelSimulations(InSilico): - """ - Run simulations of patient-specific models. - - Attributes - ---------- - biomass_kws : dict, optional - Keyword arguments to pass to biomass.run_simulation. - response_characteristics : dict[str, Callable[[1d-array], int ot float]] - A dictionary containing functions to extract dynamic response characteristics - from time-course simulations. - """ - - biomass_kws: Optional[dict] = field(default=None) - response_characteristics: Dict[str, Callable[[np.ndarray], Union[int, float]]] = field( - default_factory=lambda: dict( - max=np.max, - AUC=simps, - ), - init=False, - ) - - def _run_single_patient(self, patient: str) -> None: - """ - Run a single patient-specifc model simulation. - """ - - kwargs = self.biomass_kws - if kwargs is None: - kwargs = {} - kwargs.setdefault("viz_type", "average") - kwargs.setdefault("stdev", True) - - model = Model(".".join([self.path_to_models, patient.strip()])).create() - run_simulation(model, **kwargs) - - def run(self, n_proc: Optional[int] = None) -> None: - """ - Run simulations of multiple patient-specific models in parallel. - - Parameters - ---------- - n_proc : int, optional - The number of worker processes to use. - """ - if n_proc is None: - n_proc = multiprocessing.cpu_count() - 1 - self.parallel_execute(self._run_single_patient, n_proc) - - def _extract( - self, - dynamic_characteristics: Dict[str, Dict[str, List[str]]], - normalization: bool, - ) -> None: - """ - Extract response characteristics from patient-specific signaling dynamics. - """ - os.makedirs("classification", exist_ok=True) - # cleanup csv - files = os.listdir("classification") - for file in files: - if file.endswith(".csv"): - os.remove(os.path.join("classification", f"{file}")) - for obs_name, conditions_and_metrics in dynamic_characteristics.items(): - with open( - os.path.join("classification", f"{obs_name}.csv"), - "w", - newline="", - ) as f: - writer = csv.writer(f) - header = ["Sample"] - for condition, metrics in conditions_and_metrics.items(): - for metric in metrics: - header.append(f"{condition}_{metric}") - writer.writerow(header) - - for patient in tqdm(self.patients): - patient_specific = Model( - ".".join([self.path_to_models, patient.strip()]) - ).create() - all_data = np.load( - os.path.join( - patient_specific.path, - "simulation_data", - "simulations_all.npy", - ) - ) - data = np.array(all_data[patient_specific.observables.index(obs_name)]) - if normalization: - for i in range(data.shape[0]): - if not np.isnan(data[i]).all() and not np.all(data[i] == 0.0): - data[i] /= np.nanmax(data[i]) - data = np.nanmean(data, axis=0) - if not np.all(data == 0.0): - data /= np.max(data) - patient_specific_characteristics = [patient] - for h in header[1:]: - condition, metric = h.split("_") - patient_specific_characteristics.append( - str( - self.response_characteristics[metric]( - data[:, patient_specific.problem.conditions.index(condition)], - ) - ) - ) - writer = csv.writer(f, lineterminator="\n") - writer.writerow(patient_specific_characteristics) - - def subtyping( - self, - fname: Optional[str], - dynamic_characteristics: Dict[str, Dict[str, List[str]]], - normalization: bool = True, - *, - clustermap_kws: Optional[dict] = None, - ): - """ - Classify patients based on dynamic characteristics extracted from simulation results. - - Parameters - ---------- - fname : str, path-like or None - The clustermap is saved as fname if it is not None. - - dynamic_characteristics : Dict[str, Dict[str, List[str]]] - {"observable": {"condition": ["metric", ...], ...}, ...}. - Characteristics in the signaling dynamics used for classification. - 'metric' must be one of 'max', 'AUC', 'droprate'. - - normalization : bool (default: True) - Whether to perform max-normalization. - - clustermap_kws : dict, optional - Keyword arguments to pass to seaborn.clustermap(). - - Examples - -------- - Subtype classification - - >>> with open ("models/breast/sample_names.txt", mode="r") as f: - ... TCGA_ID = f.read().splitlines() - >>> from pasmopy import PatientModelSimulations - >>> simulations = PatientModelSimulations("models.breast", TCGA_ID) - >>> simulations.subtyping( - ... "subtype_classification.pdf", - ... { - ... "Phosphorylated_Akt": {"EGF": ["max"], "HRG": ["max"]}, - ... "Phosphorylated_ERK": {"EGF": ["max"], "HRG": ["max"]}, - ... "Phosphorylated_c-Myc": {"EGF": ["max"], "HRG": ["max"]}, - ... }, - ... clustermap_kws={"figsize": (9, 12)} - ... ) - - Add new characteristics - - >>> def droprate(time_course: np.ndarray) -> float: - ... return - (time_course[-1] - np.max(time_course)) / (len(time_course) - np.argmax(time_course)) - >>> simulations.response_characteristics["droprate"] = droprate - """ - # seaborn clustermap - if clustermap_kws is None: - clustermap_kws = {} - clustermap_kws.setdefault("z_score", 1) - clustermap_kws.setdefault("cmap", "RdBu_r") - clustermap_kws.setdefault("center", 0) - # extract response characteristics - self._extract(dynamic_characteristics, normalization) - if fname is not None: - characteristics: List[pd.DataFrame] = [] - files = os.listdir("classification") - for file in files: - observable, ext = os.path.splitext(file) - if ext == ".csv": - df = pd.read_csv(os.path.join("classification", file), index_col="Sample") - characteristics.append( - df.rename(columns=lambda s: observable.replace("_", " ") + "_" + s) - ) - all_info = pd.concat(characteristics, axis=1) - all_info.index.name = "" - fig = sns.clustermap(all_info, **clustermap_kws) - fig.savefig(fname) - - -@dataclass -class PatientModelAnalyses(InSilico): - """ - Run analyses of patient-specific models. - - Attributes - ---------- - biomass_kws : dict, optional - Keyword arguments to pass to biomass.run_analysis. - """ - - biomass_kws: Optional[dict] = field(default=None) - - def _run_single_patient(self, patient: str) -> None: - """ - Run a single patient-specifc model analysis. - """ - - kwargs = self.biomass_kws - if kwargs is None: - kwargs = {} - kwargs.setdefault("target", "initial_condition") - kwargs.setdefault("metric", "integral") - kwargs.setdefault("style", "heatmap") - kwargs.setdefault("options", None) - - model = Model(".".join([self.path_to_models, patient.strip()])).create() - run_analysis(model, **kwargs) - - def run(self, n_proc: Optional[int] = None) -> None: - """ - Run analyses of multiple patient-specific models in parallel. - - Parameters - ---------- - n_proc : int, optional - The number of worker processes to use. - """ - if n_proc is None: - n_proc = multiprocessing.cpu_count() - 1 - self.parallel_execute(self._run_single_patient, n_proc) diff --git a/pasmopy/preprocessing/__init__.py b/pasmopy/preprocessing/__init__.py deleted file mode 100644 index 7ba5ca1..0000000 --- a/pasmopy/preprocessing/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .weighting_factors import WeightingFactors \ No newline at end of file diff --git a/pasmopy/preprocessing/weighting_factors.py b/pasmopy/preprocessing/weighting_factors.py deleted file mode 100644 index 6a12f06..0000000 --- a/pasmopy/preprocessing/weighting_factors.py +++ /dev/null @@ -1,111 +0,0 @@ -import os -from dataclasses import dataclass, field -from typing import Dict, List - -from biomass.exec_model import ModelObject - - -@dataclass -class WeightingFactors(object): - """ - Prepare for adding information about gene expression data to model. - - Attributes - ---------- - model : biomass.exec_model.ModelObject - BioMASS model object. - gene_expression : dict - Pairs of proteins and their related genes. - weighting_factors : list of strings - List of weighting factors. - prefix : str (default: "w_") - Prefix of weighting factors on gene expression levels. - indentation : str (default: 4 spaces) - How many spaces as indentation. - """ - model: ModelObject - gene_expression: Dict[str, List[str]] - weighting_factors: List[str] = field(default_factory=list, init=False) - prefix: str = field(default="w_", init=False) - indentation: str = field(default=" " * 4, init=False) - - def add(self) -> None: - """ - Add weighting factors to model parameters. - """ - for genes in self.gene_expression.values(): - for gene in genes: - if self.prefix + gene not in self.model.parameters: - self.weighting_factors.append(self.prefix + gene) - if self.weighting_factors: - with open( - os.path.join(self.model.path, "name2idx", "parameters.py"), - mode="r", - encoding="utf-8", - ) as f: - lines = f.readlines() - for line_num, line in enumerate(lines): - if line.startswith("NUM: int"): - lines[line_num] = "NAMES.extend(\n" - lines[line_num] += ( - f"{self.indentation}[\n" - + f'{2 * self.indentation}"' - + f'",\n{2 * self.indentation}"'.join(self.weighting_factors) - + f'",\n{self.indentation}]\n' - ) - lines[line_num] += ")\n\nNUM: int = len(NAMES)\n" - with open( - os.path.join(self.model.path, "name2idx", "parameters.py"), - mode="w", - encoding="utf-8", - ) as f: - f.writelines(lines) - - - def set_search_bounds(self, lb: float = 0.01, ub: float = 100.0) -> None: - """ - Set search bounds for weighting factors. - - Parameters - ---------- - lb : float - Lower bound. - ub : float - Upper bound. - """ - if self.weighting_factors: - search_bounds = [ - f"search_rgn[:, C.{wf}] = [{lb}, {ub}]" for wf in self.weighting_factors - ] - with open( - os.path.join(self.model.path, "set_search_param.py"), - mode="r", - encoding="utf-8", - ) as f: - lines = f.readlines() - for line_num, line in enumerate(lines): - if ( - line.startswith(f"{2 * self.indentation}]") - and lines[line_num + 3].startswith( - f"{2 * self.indentation}self.idx_initials = " - ) - ): - lines[line_num] = ( - f"{3 * self.indentation}" - + f",\n{3 * self.indentation}".join( - map(lambda _s: "C." + _s, self.weighting_factors) - ) - + f",\n{2 * self.indentation}]\n" - ) - elif line.startswith(f"{2 * self.indentation}search_rgn = convert_scale("): - lines[line_num] = ( - 2 * self.indentation - + f"\n{2 * self.indentation}".join(search_bounds) - ) - lines[line_num] += f"\n\n{2 * self.indentation}search_rgn = convert_scale(\n" - with open( - os.path.join(self.model.path, "set_search_param.py"), - mode="w", - encoding="utf-8", - ) as f: - f.writelines(lines) diff --git a/requirements.txt b/requirements.txt index e179839..9dd8961 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ +pasmopy biomass>=0.5.2 matplotlib -numpy +numpy>=1.17 pandas>=0.23 seaborn>=0.11 -scipy>=1.6 -tqdm +scipy>=1.6 \ No newline at end of file