"""UCLCHEM Analysis Module.
Tools for analyzing chemical model outputs and reaction pathways.
This module provides functions to:
- Read and parse UCLCHEM output files
- Analyze chemical reaction pathways for specific species
- Check element conservation in model results
- Create abundance plots and visualizations
- Compare model results across different runs
**Key Functions:**
- :func:`read_output_file` - Read UCLCHEM output files into DataFrames
- :func:`analysis` - Analyze production/destruction pathways for a species
- :func:`check_element_conservation` - Verify element conservation
**Example Usage:**
.. code-block:: python
import uclchem.analysis as analysis
# Read model output
df = analysis.read_output_file("output.dat")
# Analyze CO chemistry
analysis.analysis(
"CO",
"output.dat",
"co_reactions.dat"
)
# Check conservation
conservation = analysis.check_element_conservation(
df, ["C", "O", "N"]
)
**See Also:**
- :mod:`uclchem.plot` - Dedicated plotting utilities
- :mod:`uclchem.model` - Run chemical models
"""
try:
from uclchemwrap import uclchemwrap as wrap
except ImportError as E:
E.add_note(
"Failed to import wrap.f90 from uclchemwrap, did the installation with f2py succeed?"
)
raise
try:
from uclchemwrap import surfacereactions
except ImportError as E:
E.add_note(
"Failed to import surfacereactions.f90 from uclchemwrap, did the installation with f2py succeed?"
)
raise
from pathlib import Path
from typing import Any, TextIO
import numpy as np
import pandas as pd
from pandas import Series, read_csv
from uclchem.constants import default_elements_to_check, n_reactions, n_species
from uclchem.makerates.network import Network
from uclchem.makerates.reaction import LH_REACTION_TYPES, Reaction
from uclchem.makerates.species import Species, elementList
from uclchem.utils import UCLCHEM_ROOT_DIR
[docs]
def read_output_file(output_file: str | Path) -> pd.DataFrame:
"""Read the output of a UCLCHEM run created with the outputFile parameter.
into a pandas DataFrame.
Parameters
----------
output_file : str | Path
path to file containing a full UCLCHEM output
Returns
-------
data : pd.DataFrame
A dataframe containing the abundances and
physical parameters of the model at every time step.
"""
with Path(output_file).open() as f:
data = read_csv(f)
data.columns = data.columns.str.strip()
return data
[docs]
def read_rate_file(rate_file: str | Path) -> pd.DataFrame:
"""Read the output of a UCLCHEM run created with the rateConstantFile.
parameter into a pandas DataFrame.
Parameters
----------
rate_file : str | Path
path to file containing the UCLCHEM reaction rates.
Returns
-------
data : pd.DataFrame
A dataframe containing the physical parameters,
and reaction rates (s-1) at each timestep.
"""
with Path(rate_file).open() as f:
data = read_csv(f)
data.columns = data.columns.str.strip()
return data
def _reactant_count(species: str, reaction_string: str) -> int:
"""Count how many times a species is consumed in a reaction.
Parameters
----------
species : str
species which is maybe consumed in reaction
reaction_string : str
reaction which maybe consumes species
Returns
-------
int
number of times species is consumed in reaction
"""
# TODO: Put in Reaction
split_str = reaction_string.split("->", maxsplit=1)[0].strip()
if " " not in split_str:
return species == split_str
return split_str.split().count(species)
def _product_count(species: str, reaction_string: str) -> int:
"""Count how many times a species is produced in a reaction.
Parameters
----------
species : str
species which is maybe produced by reaction
reaction_string : str
reaction which maybe produces species
Returns
-------
int
amount of times species is produced by reaction
"""
# TODO: Put in Reaction
split_str = reaction_string.split("->")[1].strip()
if " " not in split_str:
return species == split_str
return split_str.split().count(species)
def _get_rates_change(rate_df: pd.DataFrame, species: str) -> pd.DataFrame:
phys_param_columns = []
for col_idx, column in enumerate(rate_df.columns):
if "->" not in column: # It is not a reaction, but a physical parameter
phys_param_columns.append(col_idx)
if "->" in column: # Assume reactions come after all the physical parameters
break
change_df = rate_df.iloc[:, phys_param_columns]
for _col_idx, column in enumerate(rate_df.columns):
if "->" not in column: # It is not a reaction, but a physical parameter
continue
rcount = _reactant_count(species, column)
pcount = _product_count(species, column)
if rcount == 0 and pcount == 0:
# Species is unaffected by reaction
continue
change_df = pd.concat([change_df, rate_df[column] * (pcount - rcount)], axis=1)
return change_df
[docs]
def get_change_df(
rate_df: pd.DataFrame, species: str, on_grain: bool = False
) -> pd.DataFrame:
"""From a dataframe containing all the reaction rates, get the change of a species over time,.
due to each reaction.
Parameters
----------
rate_df : pd.DataFrame
dataframe containing physical parameters and
reaction rate constants over time
species : str
species to get the change over time
on_grain : bool
whether to analyze the ice phase of this species (Default value = False)
Returns
-------
change_df : pd.DataFrame
change of species over time due to each reaction
the species is involved in
Raises
------
DeprecationWarning
Deprecated in UCLCHEM 4.0
ValueError
If "#" or "@" is in `species`.
"""
msg = "This function will be deprecated in UCLCHEM 4.0 and is no longer actively maintained"
raise DeprecationWarning(msg)
if "#" in species or "@" in species:
msg = "WARNING: get_change_df IS ONLY FOR ANALYZING ALL OF THE GAS PHASE AND ALL OF THE ICE. "
msg += "USE on_grain PARAMETER TO INDICATE THIS. IF YOU WANT TO ANALYZE ONLY SURFACE OR ONLY BULK, "
msg += "USE FUNCTION _get_rates_change WITH SPECIES CONTAINING # OR @ TO INDICATE SURFACE OF BULK."
raise ValueError(msg)
if not on_grain:
return _get_rates_change(rate_df, species)
df_surf = _get_rates_change(rate_df, "#" + species)
df_bulk = _get_rates_change(rate_df, "@" + species)
surf_columns = df_surf.columns
bulk_columns = df_bulk.columns
for column in surf_columns:
if "->" not in column:
df_bulk.drop(
columns=column, inplace=True
) # Drop the physical parameters from bulk column so we do not have them twice in the final df
continue
if column in bulk_columns:
# If the same reaction is in both dfs, that means that both surf and bulk version
# of the species is involved in the reaction, which means that either surface is lost
# and bulk forms, or the other way, and so we drop this column
# from both dfs since it has no effect on the ice overall.
df_surf.drop(columns=column, inplace=True)
df_bulk.drop(columns=column, inplace=True)
# Maybe TODO:
# Make it such that the columns of the same reactions (but surf and bulk versions)
# are added such that we have a single reaction rate in the ice,
# and not separate surf and bulk reaction rates.
return pd.concat([df_surf, df_bulk], axis=1)
[docs]
def read_analysis(filepath: str | Path, species: str) -> tuple[pd.DataFrame, list[str]]:
"""Read the analysis output.
Parameters
----------
filepath : str | Path
path to analysis output.
species : str
Species of interest.
Returns
-------
df : pd.DataFrame
dataframe with rates and time.
all_reactions : list[str]
list of all reactions that the species is involved in.
Raises
------
DeprecationWarning
Deprecated in UCLCHEM 4.0
"""
msg = "This function will be deprecated in UCLCHEM 4.0 and is no longer actively maintained"
raise DeprecationWarning(msg)
with Path(filepath).open() as file:
lines = file.readlines()
for i, line in enumerate(lines):
if "All Reactions" in line:
first_newline = lines.index("\n", i)
all_reactions = lines[i + 2 : first_newline]
all_reactions = [reaction.strip("\n") for reaction in all_reactions]
break
counts = [all_reactions.count(reac) for reac in all_reactions]
for i, count in enumerate(counts):
if count > 1:
print(
f"{all_reactions[i]} was found to be in the reaction list {count} times."
)
print(f"Renaming this time to '{all_reactions[i]} ({i})'")
all_reactions[i] = f"{all_reactions[i]} ({i})"
n_cols = len(all_reactions) + 1
columns = ["Time"]
columns.extend(all_reactions)
df = pd.DataFrame(columns=columns)
segments_min = [
i for i, line in enumerate(lines) if "New Important Reactions At:" in line
]
segments_max = [i - 1 for i in segments_min[1:]]
segments_max.append(len(lines) - 1)
segments = [
lines[segments_min[i] : segments_max[i]] for i in range(len(segments_min))
]
for segment_lines in segments:
new_row = [0] * n_cols
time = float(segment_lines[0].split()[-2])
new_row[0] = time
for i, reaction in enumerate(all_reactions):
if not any(reaction in line for line in segment_lines):
rate = 0
else:
reac_indx = [
j for j, line in enumerate(segment_lines) if reaction in line
][0]
rate = float(segment_lines[reac_indx].split()[-3])
# Here we use the fact that the sign of the formation/destruction rate is already
# correct, so we only need to scale it by 1 (in e.g. H+OH->H2O) or 2 (H+H->H2)
new_row[i + 1] = rate * (
_product_count(species, reaction) + _reactant_count(species, reaction)
)
new_row_dict = dict(zip(columns, new_row, strict=False))
new_row_df = pd.DataFrame(new_row_dict, index=[0])
df = pd.concat(
[df, new_row_df],
axis=0,
ignore_index=True,
)
return df, all_reactions
[docs]
def analysis(
species_name: str,
output_file: str | Path,
analysis_file: str | Path,
rate_threshold: float = 0.99,
) -> None:
"""Loop over every time step in an output file and finds the rate of change.
of a species at that time due to each of the reactions it is involved in.
From this, the most important reactions are identified and printed to file.
This can be used to understand the chemical reason behind a species' behavior.
DEPRECATED
Parameters
----------
species_name : str
Name of species to be analyzed
output_file : str | Path
The path to the file where the analysis output will be written
analysis_file : str | Path
The path to the file containing the UCLCHEM output
rate_threshold : float
Analysis output will contain the only the most efficient
reactions that are responsible for rate_threshold of the total
production and destruction rate. Default = 0.99.
"""
result_df = read_output_file(output_file)
_species_arr = np.loadtxt(
UCLCHEM_ROOT_DIR / "species.csv",
usecols=[0],
dtype=str,
skiprows=1,
unpack=True,
delimiter=",",
comments="%",
)
species: list[str] = list(_species_arr)
reactions = np.loadtxt(
UCLCHEM_ROOT_DIR / "reactions.csv",
dtype=str,
skiprows=1,
delimiter=",",
usecols=[0, 1, 2, 3, 4, 5, 6],
comments="%",
)
fortran_reac_indxs = [
i + 1 for i, reaction in enumerate(reactions) if species_name in reaction
]
reac_indxs = [i for i, reaction in enumerate(reactions) if species_name in reaction]
species_index = species.index(species_name) + 1 # fortran index of species
old_key_reactions: list[str] = []
old_total_destruct = 0.0
old_total_form = 0.0
formatted_reacs = _format_reactions(reactions[reac_indxs].tolist())
if species_name[0] == "#":
surftransfer_reacs = [
f"@{species_name[1:]} + SURFACETRANSFER -> {species_name}",
f"{species_name} + SURFACETRANSFER -> @{species_name[1:]}",
]
formatted_reacs.extend(surftransfer_reacs)
if species_name[0] == "@":
surftransfer_reacs = [
f"{species_name} + SURFACETRANSFER -> #{species_name[1:]}",
f"#{species_name[1:]} + SURFACETRANSFER -> {species_name}",
]
formatted_reacs.extend(surftransfer_reacs)
with Path(analysis_file).open("w") as f:
f.write("All Reactions\n************************\n")
for reaction in formatted_reacs:
f.write(reaction + "\n")
for _i, row in result_df.iterrows():
# recreate the parameter dictionary needed to get accurate rates
param_dict = _param_dict_from_output(row)
# get the rate of all reactions from UCLCHEM along with a few other necessary values
rates, transfer, swap, bulk_layers = _get_species_rates(
param_dict, list(row[species]), species_index, fortran_reac_indxs
)
# convert reaction rates to total rates of change.
# this needs manually updating when you add new reaction types!
change_reacs, changes = _get_rates_of_change(
rates,
reactions[reac_indxs].tolist(),
species,
species_name,
row,
swap,
bulk_layers,
)
change_reacs = _format_reactions(change_reacs)
# This whole block adds transfer of material from surface to bulk as surface grows
# (or vice versa). It's not a reaction in the network so won't get picked up otherwise.
# We manually add it.
if transfer <= 0:
if species_name[0] == "#":
change_reacs.append(surftransfer_reacs[0])
changes = np.append(changes, transfer)
elif species_name[0] == "@":
change_reacs.append(surftransfer_reacs[1])
changes = np.append(changes, transfer)
elif species_name[0] == "#":
change_reacs.append(surftransfer_reacs[1])
changes = np.append(changes, -transfer)
elif species_name[0] == "@":
change_reacs.append(surftransfer_reacs[0])
changes = np.append(changes, transfer)
# Then we remove the reactions that are not important enough to be printed by finding
# which of the top reactions we need to reach rate_threshold*total_rate
(
total_formation,
total_destruct,
key_reactions,
key_changes,
) = _remove_slow_reactions(
changes, change_reacs, rate_threshold=rate_threshold
)
# only update if list of reactions change or rates change by factor of 10
if (
(old_key_reactions != key_reactions)
or (
np.abs(
np.log10(np.abs(old_total_destruct))
- np.log10(np.abs(total_destruct))
)
> 0
)
or (np.abs(np.log10(old_total_form) - np.log10(total_formation)) > 0)
):
old_key_reactions = key_reactions[:]
old_total_form = total_formation
old_total_destruct = total_destruct
_write_analysis(
f,
row["Time"],
total_formation,
total_destruct,
change_reacs,
changes,
)
def _param_dict_from_output(
output_line: dict[str, float] | pd.Series,
) -> dict[str, float]:
"""Generate a parameter dictionary from a UCLCHEM timestep that contains enough of.
the physical variables to recreate the parameter dictionary used to run UCLCHEM.
Parameters
----------
output_line : dict[str, float] | pd.Series
any row from the relevant UCLCHEM output
Returns
-------
dict[str, float]
dictionary with physical conditions from output.
"""
param_dict = {
"initialdens": output_line["Density"],
"initialtemp": output_line["gasTemp"],
"zeta": output_line["zeta"],
"radfield": output_line["radfield"],
"baseav": output_line["baseAv"],
}
return param_dict
def _get_species_rates(
param_dict: dict[str, Any],
input_abundances: list[float],
species_index: int,
reac_indxs: list[int],
) -> tuple[np.ndarray, float, float, float]:
"""Get the rate of up to 500 reactions from UCLCHEM for a given.
set of parameters and abundances.
Intended for use within the analysis script.
Parameters
----------
param_dict : dict[str, Any]
A dictionary of parameters where keys are
any of the variables in defaultparameters.f90 and values are value for current run.
input_abundances : list[float]
Abundance of every species in network
species_index : int
Index of the species of interest.
reac_indxs : list[int]
Indices of reactions of interest in the network's reaction list.
Returns
-------
np.ndarray
Array containing the rate of every reaction specified by reac_indxs
transfer
Total transfer rate between surface and bulk ice.
swap
Total swap rate for individual swapping between surface and bulk ice.
bulk_layers
number of monolayers of bulk ice
Raises
------
DeprecationWarning
Deprecated in UCLCHEM 4.0
RuntimeError
If UCLCHEM failed to return the rates for these parameters
"""
msg = "This function will be deprecated in UCLCHEM 4.0 and is no longer actively maintained"
raise DeprecationWarning(msg)
input_abund = np.zeros(n_species)
input_abund[: len(input_abundances)] = input_abundances
rate_indxs = np.ones(n_reactions)
rate_indxs[: len(reac_indxs)] = reac_indxs
rates, success_flag, transfer, swap, bulk_layers = wrap.get_rates(
param_dict, input_abund, species_index, rate_indxs
)
if success_flag < 0:
msg = "UCLCHEM failed to return rates for these parameters"
raise RuntimeError(msg)
return rates[: len(reac_indxs)], transfer, swap, bulk_layers
def _get_rates_of_change(
rates: np.ndarray,
reactions: list[str],
species_list: list[str],
species: str,
row: pd.Series,
swap: float,
bulk_layers: float,
):
"""Calculate the terms in the rate of equation of a particular species using rates.
calculated using `get_species_rates()` and a row from the full output of UCLCHEM.
See `analysis.py` for intended use.
Parameters
----------
rates : np.ndarray
Rates of all reactions the species is involved in
reactions : list[str]
List of all reactions the species is involved in as a list of strings
species_list : list[str]
List of species names from network
species : str
name of species to be analyzed
row : pd.Series
row from output dataframe
swap : float
Total swap rate for individual swapping between bulk and surface
bulk_layers : float
Number of layers in the bulk for individual swapping calc.
Returns
-------
list[float]
Contribution to the rate of change of `species` from each reaction in `reactions`.
Raises
------
DeprecationWarning
Deprecated in UCLCHEM 4.0
"""
msg = "This function will be deprecated in UCLCHEM 4.0 and is no longer actively maintained"
raise DeprecationWarning(msg)
changes = []
reactionList = []
three_phase = "@" in "".join(species_list)
safeMantle = np.max([1.0e-30, row["SURFACE"]])
for i, reaction in enumerate(reactions):
reaction_instance = Reaction([*reaction, 0, 0, 0, 0, 0])
change = rates[i]
reactants = reaction[0:3]
products = reaction[3:]
# Counting the same as Reaction.body_count
reactant_count = reaction_instance.body_count
change *= row["Density"] ** (reactant_count)
for reactant in reactants:
if reactant in species_list:
change *= row[reactant]
elif reactant == "BULKSWAP":
change *= bulk_layers
elif reactant == "SURFSWAP":
change *= swap / safeMantle
elif reactant in {"DEUVCR", "DESCR", "DESOH2", "ER", "ERDES"}:
change /= safeMantle
if reactant == "DESOH2":
change *= row["H"]
elif (not three_phase) and (reactant in {"THERM"}):
change *= row["Density"] / safeMantle
if "H2FORM" in reactants:
# only 1 factor of H abundance in Cazaux & Tielens 2004 H2 formation
# so stop looping after first iteration
break
if "LH" in reactants[2]:
if "@" in reactants[0]:
change *= bulk_layers
if species in reactants:
changes.append(-change)
reactionList.append(reaction)
if species in products:
changes.append(change)
reactionList.append(reaction)
A = zip(changes, reactionList, strict=False)
A = sorted(A, key=lambda x: np.abs(x[0]), reverse=True)
changes, reactionList = zip(*A, strict=False)
changes = np.asarray(changes)
return reactionList, changes
def _remove_slow_reactions(
changes: np.ndarray, change_reacs: list[str], rate_threshold: float = 0.99
) -> tuple[float, float, list[str], list[float]]:
"""Iterate through a list of reactions adding the fastest reactions to a list until some.
threshold fraction of the total rate of change is reached. This list is returned so that
you have the list of reactions that cause rate_threshold of the total destruction and
formation of a species.
Parameters
----------
changes : np.ndarray
List of rates of change due to each reaction
a species is involved in.
change_reacs : list[str]
List of corresponding rates of change
rate_threshold : float
Percentage of overall rate of change to consider before ignoring
less important reactions. Default = 0.99.
Returns
-------
totalProd : float
Total production rate
totalDestruct : float
Total destruction rate
key_reactions : list[str]
List of key reactions
key_changes : list[float]
List of reaction rates of key reactions
"""
totalDestruct = sum(changes[np.where(changes < 0)])
totalProd = sum(changes[np.where(changes > 0)])
key_reactions = []
key_changes = []
form = 0.0
destruct = 0.0
for i, reaction in enumerate(change_reacs):
if (changes[i] > 0) and (form < rate_threshold * totalProd):
form += changes[i]
key_reactions.append(reaction)
key_changes.append(changes[i])
elif (changes[i] < 0) and (abs(destruct) < rate_threshold * abs(totalDestruct)):
destruct += changes[i]
key_reactions.append(reaction)
key_changes.append(changes[i])
return totalProd, totalDestruct, key_reactions, key_changes
def _write_analysis(
output_file: TextIO,
time: float,
total_production: float,
total_destruction: float,
key_reactions: list[str],
key_changes: list[float],
) -> None:
"""Print key reactions to a file.
Parameters
----------
output_file : TextIO
Open file object to write to
time : float
Simulation time at which analysis is performed
total_production : float
Total positive rate of change
total_destruction : float
Total negative rate of change
key_reactions : list[str]
A list of all reactions that contribute
to the total rate of change
key_changes : list[float]
A list of rates of change contributing to total
"""
output_file.write(
f"\n\n***************************\nNew Important Reactions At: {time:.2e} years\n"
)
# Formation and destruction writing is disabled since the absolute numbers
# do not appear to be correct.
output_file.write(f"Formation = {total_production:.8e} from:")
for k, reaction in enumerate(key_reactions):
if key_changes[k] > 0:
outString = f"\n{reaction} : {float(key_changes[k])} = {float(key_changes[k] / total_production):.2%}"
output_file.write(outString)
output_file.write(f"\n\nDestruction = {total_destruction:.8e} from:")
for k, reaction in enumerate(key_reactions):
if key_changes[k] < 0:
outString = f"\n{reaction} : {float(key_changes[k])} = {float(key_changes[k] / total_destruction):.2%}"
output_file.write(outString)
def _format_reactions(reactions: list[list[str]]) -> list[str]:
"""Turn a row of the reaction file into a string.
Parameters
----------
reactions : list[list[str]]
list of lists, each reaction read from reaction.csv is a list.
Returns
-------
list[str]
list of string, each reaction in readable string form
"""
# TODO: Replace with str(Reaction()).
formatted_reactions = []
for reaction in reactions:
outString = f"{reaction[0]} + {reaction[1]} + {reaction[2]} -> {reaction[3]} + {reaction[4]} + {reaction[5]}"
outString = outString.replace(" + NAN", "")
formatted_reactions.append(outString)
return formatted_reactions
def _count_element(species_list: list[str], element: str) -> pd.Series:
"""Count the number of atoms of an element that appear in each species of a list of species.
Parameters
----------
species_list : list[str]
list of species names
element : str
element
Returns
-------
sums : pd.Series
array where each element represents the number of atoms
of the chemical element in the corresponding element of species_list
"""
species_series = Series(species_list)
# confuse list contains elements whose symbols contain the target eg CL for C
# We count both sets of species and remove the confuse list counts.
confuse_list = [x for x in elementList if element in x]
confuse_list = sorted(confuse_list, key=lambda x: len(x), reverse=True)
confuse_list.remove(element)
sums = species_series.str.count(element)
for i in range(2, 10):
sums += np.where(species_series.str.contains(element + f"{i:.0f}"), i - 1, 0)
for spec in confuse_list:
sums += np.where(species_series.str.contains(spec), -1, 0)
return sums
[docs]
def total_element_abundance(element: str, df: pd.DataFrame) -> pd.Series:
"""Calculate the total elemental abundance of a species as a function of time.
Allows you to check conservation.
Parameters
----------
element : str
Name of element
df : pd.DataFrame
DataFrame from `read_output_file()`
Returns
-------
pd.Series
Total abundance of element for all time steps in df.
"""
sums: np.ndarray = _count_element(list(df.columns), element).to_numpy()
for variable in ["Time", "Density", "gasTemp", "av", "point", "SURFACE", "BULK"]:
sums = np.where(df.columns == variable, 0, sums)
return df.mul(sums, axis=1).sum(axis=1)
[docs]
def check_element_conservation(
df: pd.DataFrame, element_list: list[str] | None = None, percent: bool = True
) -> dict[str, str]:
"""Check the conservation of elements by comparing their total.
abundance at start and end of model.
Parameters
----------
df : pd.DataFrame
UCLCHEM output in format from `read_output_file`
element_list : list[str] | None
List of elements to check. If None,
defaults to `uclchem.constants.default_elements_to_check`.
percent : bool
Whether to return the change formatted as a percentage. Default = False.
Returns
-------
dict[str, str]
Dictionary containing the change in the total abundance of each element
as a fraction of initial value
"""
if element_list is None:
element_list = default_elements_to_check
result = {}
for element in element_list:
discrep = total_element_abundance(element, df).values
if percent:
discrep = np.abs(discrep[0] - discrep[-1]) / discrep[0]
result[element] = f"{discrep:.3%}"
else:
discrep = discrep[0] - discrep[-1]
result[element] = f"{discrep:.2e}"
return result
[docs]
def get_total_swap(
rates: pd.DataFrame, abundances: pd.DataFrame, reactions: list[Reaction]
) -> np.ndarray:
"""Obtain the amount of 'random' swapping per timestep.
Parameters
----------
rates : pd.DataFrame
The rates obtained from running an UCLCHEM model
abundances : pd.DataFrame
The abundances obtained from running an UCLCHEM model
reactions : list[Reaction]
The reactions used in UCLCHEM
Returns
-------
totalSwap : np.ndarray
The total swap per timestep
Raises
------
AssertionError
If rates and abundances have different lengths, or if the number of
rate columns does not match the number of reactions.
"""
if len(rates) != len(abundances):
msg = "Rates and abundances must be the same length"
raise AssertionError(msg)
if rates.shape[1] != len(reactions):
msg = "The number of rates and reactions must be equal"
raise AssertionError(msg)
totalSwap = np.zeros(abundances.shape[0])
for idx, reac in enumerate(reactions):
if reac.get_reaction_type() == "BULKSWAP":
totalSwap += rates.iloc[:, idx] * abundances[reac.get_pure_reactants()[0]]
return totalSwap
[docs]
def construct_incidence(species: list[Species], reactions: list[Reaction]) -> np.ndarray:
"""Construct the incidence matrix, a matrix that describes the in and out degree.
for each of the reactions; useful to matrix multiply by the individual rates per reaction
to obtain a rates (dy) per species.
Parameters
----------
species : list[Species]
A list of S species
reactions : list[Reaction]
The list of R reactions
Returns
-------
incidence : np.ndarray
An RxS incidence matrix
"""
incidence = np.zeros(
dtype=np.int8,
shape=(len(reactions), len(species)),
)
species_names = [str(s) for s in species]
for idx, reaction in enumerate(reactions):
products = reaction.get_pure_products()
for prod in products:
incidence[idx, species_names.index(prod)] += 1 # type: ignore[index]
reactants = reaction.get_pure_reactants()
for reac in reactants:
incidence[idx, species_names.index(reac)] -= 1 # type: ignore[index]
return incidence
[docs]
def rate_constants_to_dy_and_rates(
physics: pd.DataFrame,
abundances: pd.DataFrame,
rate_constants: pd.DataFrame,
network: Network | None = None,
species: list[Species] | None = None,
reactions: list[Reaction] | None = None,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Apply postprocessing to obtain the equivalent of GETYDOT from the fortran side.
and the reaction rates at each timestep.
Parameters
----------
physics : pd.DataFrame
The physics output from running a model
abundances : pd.DataFrame
The abundances output from running a model
rate_constants : pd.DataFrame
The rate constants output from running a model
network : Network | None
The reaction network used to postprocess the rate constants.
Defaults to None.
species : list[Species] | None
The species used to postprocess the rate constants.
Defaults to None.
reactions : list[Reaction] | None
The reactions used to postprocess the rate constants.
Defaults to None.
Returns
-------
ydot : pd.DataFrame
the RHS that is solved in UCLCHEM at every output timestep
rate_by_reaction : pd.DataFrame
the individual terms that result in ydot when multiplied
by the incidence matrix.
Raises
------
ValueError
If `species` is specified, but `reactions` is not, or vice versa
ValueError
If `species`, `reactions` and `network` are all specified,
or all not specified.
ValueError
If there are any reaction types not processed.
AssertionError
If `species` or `reactions` is ``None`` after the validation guard
(should not happen; indicates a logic error).
"""
if bool(species) != bool(reactions):
msg = "If species is specified, reactions also must be and vice versa"
raise ValueError(msg)
if (network is None) == (species is None and reactions is None):
msg = (
"Choose between providing a network OR (species AND reactions). "
"A network can be obtained using `uclchem.makerates.network.Network.from_csv()`"
)
raise ValueError(msg)
if network:
species = network.get_species_list()
reactions = network.get_reaction_list()
# At this point species and reactions are guaranteed non-None by the guard above
if species is None:
msg = "species must be non-None after validation guard"
raise AssertionError(msg)
if reactions is None:
msg = "reactions must be non-None after validation guard"
raise AssertionError(msg)
if "Point" in rate_constants.columns:
rate_constants = rate_constants.drop(columns=["Point"])
# Import all of the constants directly from UCLCHEMWRAP to avoid discrepancies
GAS_DUST_DENSITY_RATIO = surfacereactions.gas_dust_density_ratio
NUM_SITES_PER_GRAIN = surfacereactions.num_sites_per_grain
# Compute dynamic quantities that can be precomputed
bulkLayersReciprocal = (
NUM_SITES_PER_GRAIN / (GAS_DUST_DENSITY_RATIO * abundances["BULK"])
).apply(lambda x: min(1.0, x))
totalSwap = bulkLayersReciprocal * get_total_swap(
rate_constants, abundances, reactions
)
# Calculate safe values to avoid division by zero
safeBulk = abundances["BULK"].apply(lambda x: max(1.0e-30, x))
safeMantle = abundances["SURFACE"].apply(lambda x: max(1.0e-30, x))
ratioSurfaceToBulk = (safeMantle / safeBulk).apply(lambda x: min(1.0, x))
# Create the incidence matrix, we use this to evaluate the rates and
incidence = construct_incidence(species, reactions)
# Create a copy so we don't overwrite
rate_by_reaction = rate_constants.copy()
# Keep track of missing reaction types:
missing_reactions = set()
# Iterate over each of the rates and compute the contribution to dy for that reaction.
for idx, reaction in enumerate(reactions):
density_multiplier_factor = reaction.body_count
rate = rate_by_reaction.iloc[:, idx]
# Multiply by density the right number of times:
for _iiii in range(int(density_multiplier_factor)):
rate *= physics["Density"]
# Multiply by the abundances:
for reactant in reaction.get_sorted_reactants():
if reactant in list(abundances.columns):
rate *= abundances[reactant]
reaction_type = reaction.get_reaction_type()
reactants = reaction.get_sorted_reactants()
# GAR needs an additional factor of density
if reaction_type == "GAR":
rate *= physics["Density"]
# BULKSWAP reactions use ratioSurfaceToBulk
elif reaction_type == "BULKSWAP":
rate *= ratioSurfaceToBulk
# LH/LHDES bulk reactions
elif reaction_type in LH_REACTION_TYPES:
if len(reactants) >= 3 and reactants[2] in LH_REACTION_TYPES: # noqa: PLR2004
if "@" in reactants[0]:
rate *= bulkLayersReciprocal
# ED reactions multiply by #H2 abundance
elif reaction_type == "ED":
rate *= abundances["#H2"]
elif reaction_type == "SURFSWAP":
rate *= totalSwap / safeMantle
elif reaction_type in {"DESCR", "DEUVCR", "ER", "ERDES"}:
rate /= safeMantle
elif reaction_type == "DESOH2":
rate *= abundances["H"] / safeMantle
elif reaction_type == "H2FORM":
# H2FORM only uses 1 factor of H abundance (Cazaux & Tielens 2004)
rate /= abundances["H"]
elif reaction_type in {
"PHOTON",
"CRP",
"CRPHOT",
"FREEZE",
"DESORB",
"THERM",
"TWOBODY",
"IONOPOL1",
"IONOPOL2",
"CRS",
"EXSOLID",
"EXRELAX",
}:
# Standard reactions that require no additional prefactors
pass
else:
missing_reactions.add(reaction_type)
rate_by_reaction.iloc[:, idx] = rate
if missing_reactions:
msg = f"Missing reaction types in rate processing: {missing_reactions}"
raise ValueError(msg)
# Compute the rate at each timestep, adding the appropriate header
dy = rate_by_reaction @ incidence
dy.columns = [str(s) for s in species]
# Compute the SURFACE and BULK:
dy.loc[:, "SURFACE"] = dy.loc[:, dy.columns.str.startswith("#")].sum(axis=1)
dy.loc[:, "BULK"] = dy.loc[:, dy.columns.str.startswith("@")].sum(axis=1)
# Compute the corrections to account for the transport between SURFACE and BULK
swap_rate_correction = pd.DataFrame()
# Then correct for the addition or subtraction of material to/from the bulk:
surfswap_reactions = [r for r in reactions if r.get_reaction_type() == "SURFSWAP"]
bulkswap_reactions = [r for r in reactions if r.get_reaction_type() == "BULKSWAP"]
# Sort them in the correct order, s.t. it matches the saving to disk format.
_sp_names = [str(s) for s in species]
surfswap_reactions = sorted(
surfswap_reactions,
key=lambda x: _sp_names.index(str(x.get_reactants()[0])),
)
bulkswap_reactions = sorted(
bulkswap_reactions,
key=lambda x: _sp_names.index(str(x.get_reactants()[0])),
)
for row_pos, ((_row_idx_j, _rate_row), (_idx_i, abunds_row)) in enumerate(
zip(rate_constants.iterrows(), abundances.iterrows(), strict=False)
):
# Walk through the bulkswap and reactionswap pathways:
_sswap_rates = {}
_bswap_rates = {}
# TODO: vectorize this, because this is slower than it has to be.
for r_bswap, r_sswap in zip(bulkswap_reactions, surfswap_reactions, strict=False):
surfaceCoverage = min(1.0, abunds_row["BULK"] / abunds_row["SURFACE"])
dy_row: pd.Series = dy.iloc[row_pos]
if dy_row["SURFACE"] < 0.0:
surfaceCoverage = min(1.0, abunds_row["BULK"] / abunds_row["SURFACE"])
# SURFACE is shrinking, so bulk must be growing
bswap = (
dy_row["SURFACE"]
* surfaceCoverage
* abunds_row[r_bswap.get_reactants()[0]]
/ abunds_row["BULK"]
)
# Call it SWAP_GEOMETRIC since it is due to the swap induced by the effect
# of the surface layers growing, this is a geometric bookkeeping thing,
# So the name geometric makes the most sense.
_bswap_rates[str(r_bswap).replace("SWAP", "SWAP_GEOMETRIC")] = bswap
_sswap_rates[str(r_sswap).replace("SWAP", "SWAP_GEOMETRIC")] = 0.0
# Immedidiately correct dy:
bswap_r_col = dy.columns.get_loc(r_bswap.get_reactants()[0])
bswap_p_col = dy.columns.get_loc(r_bswap.get_products()[0])
dy.iloc[row_pos, bswap_r_col] -= bswap # type: ignore[index]
dy.iloc[row_pos, bswap_p_col] += bswap # type: ignore[index]
else:
surfaceCoverage = 0.5 * GAS_DUST_DENSITY_RATIO / NUM_SITES_PER_GRAIN
sswap = (
dy_row["SURFACE"]
* surfaceCoverage
* abunds_row[r_sswap.get_reactants()[0]]
)
_bswap_rates[str(r_bswap).replace("SWAP", "SWAP_GEOMETRIC")] = 0.0
_sswap_rates[str(r_sswap).replace("SWAP", "SWAP_GEOMETRIC")] = sswap
# Immediately correct dy (surface grows → #X buried into @X):
sswap_r_col = dy.columns.get_loc(r_sswap.get_reactants()[0])
sswap_p_col = dy.columns.get_loc(r_sswap.get_products()[0])
dy.iloc[row_pos, sswap_r_col] -= sswap # type: ignore[index]
dy.iloc[row_pos, sswap_p_col] += sswap # type: ignore[index]
swap_rate_correction = pd.concat(
(
swap_rate_correction,
(
pd.DataFrame.from_dict(
_bswap_rates | _sswap_rates,
orient="index",
).T
),
)
)
swap_rate_correction = swap_rate_correction.reset_index(drop=True)
rate_by_reaction = pd.concat((rate_by_reaction, swap_rate_correction), axis=1)
# Correct the change in surface and bulk by summing the constituents:
dy.loc[:, "SURFACE"] = dy.loc[:, dy.columns.str.startswith("#")].sum(axis=1)
dy.loc[:, "BULK"] = dy.loc[:, dy.columns.str.startswith("@")].sum(axis=1)
# Apply the new ratees to the ydots and compute a new ydot for surface and bulk:
return (
dy,
rate_by_reaction,
)
[docs]
def compute_heating_per_reaction(
rates: pd.DataFrame,
network: Network | None = None,
reactions: list[Reaction] | None = None,
) -> pd.DataFrame:
"""Compute heating/cooling per reaction by multiplying rates by exothermicity.
Parameters
----------
rates : pd.DataFrame
(time x n_reactions) of reaction rates
network : Network | None
Network object with exothermicity data. Default = None.
reactions : list[Reaction] | None
List of Reaction objects (alternative to network)
Default = None.
Returns
-------
pd.DataFrame
(time x n_reactions) of heating rates in erg/s
Raises
------
ValueError
If the number of reactions and rates are not the same.
"""
if network is not None:
reactions = network.get_reaction_list()
if reactions is None:
msg = "reactions must be provided either directly or via network"
raise ValueError(msg)
if len(reactions) != rates.shape[1]:
msg = "Number of reactions and rates must be equal"
raise ValueError(msg)
exothermicities = np.array([r.get_exothermicity() for r in reactions])
return rates * exothermicities
[docs]
def get_production_and_destruction(
species: str, dataframe: pd.DataFrame
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Split the rate constants or rates into production and destruction parts for a given species.
Stoichiometry is accounted for: if a species appears N times on one side of a
reaction, the column is scaled by N so that summing production - destruction
recovers the net dy for that species.
Parameters
----------
species : str
Name of species to split rates for
dataframe : pd.DataFrame
DataFrame of rates
Returns
-------
tuple[pd.DataFrame, pd.DataFrame]
production rates, destruction rates
"""
production = {}
destruction = {}
for i, r in enumerate(dataframe.columns):
r = r.strip()
n_prod = r.split(" -> ")[-1].split(" + ").count(species)
n_destr = r.split(" -> ")[0].split(" + ").count(species)
col = dataframe.iloc[:, i]
if n_prod > 0:
key = r if r not in production else f"{r}__{i}"
production[key] = col * n_prod if n_prod > 1 else col
if n_destr > 0:
key = r if r not in destruction else f"{r}__{i}"
destruction[key] = col * n_destr if n_destr > 1 else col
return pd.DataFrame(production, index=dataframe.index), pd.DataFrame(
destruction, index=dataframe.index
)
[docs]
def derive_phase_from_name(name: str) -> str:
"""Derive the phase of the species from its name.
Parameters
----------
name : str
name of the species.
Returns
-------
str
Phase. One of `["gas", "surface", "bulk", "ion"]`.
"""
if name.startswith("@"):
return "bulk"
elif name.startswith("#"):
return "surface"
elif name.endswith("+"):
return "ion"
else:
return "gas"
[docs]
def analyze_element_per_phase(element: str, df: pd.DataFrame) -> pd.DataFrame:
"""Calculate the total elemental abundance of a species as a function of time.
within each phase (gas, surface, bulk and ion). Allows you to check conservation
of elements.
Parameters
----------
element : str
Name of element
df : pd.DataFrame
DataFrame from `read_output_file()`
Returns
-------
content : pd.DataFrame
Total abundance of element per phase for all time steps in df.
"""
content = pd.DataFrame()
# Split the columns into ionized, gas, surface and bulk:
for phase in ["bulk", "surface", "ion", "gas"]:
species_to_select = [
s for s in list(df.columns) if derive_phase_from_name(s) == phase
]
_df = df.loc[:, species_to_select]
sums = _count_element(species_to_select, element)
col_key = element + "_" + phase
content[col_key] = _df.mul(sums.values, axis=1).sum(axis=1)
return content