"""UCLCHEM Model Module.
Core module for running gas-grain chemical models under different physical conditions.
This module provides both object-oriented and functional interfaces for running
UCLCHEM chemical models. The object-oriented API (model classes) is recommended
for most use cases as it provides better state management and built-in analysis tools.
**Model Classes (Object-Oriented API - Recommended):**
- :class:`Cloud` - Static or freefall collapsing cloud
- :class:`Collapse` - Collapsing cloud with various prescriptions (BE, filament, ambipolar)
- :class:`PrestellarCore` - Prestellar core with heating and chemistry
- :class:`CShock` - C-type shock model
- :class:`JShock` - J-type shock model
- :class:`Postprocess` - Custom physics from user-provided arrays
- :class:`SequentialRunner` - Chain multiple physical stages
- :class:`GridRunner` - Run parameter grids in parallel
**Legacy Functions (Functional API):**
Available in :mod:`uclchem.functional` for backward compatibility.
Returns arrays/DataFrames instead of model objects.
**Quick Example:**
>>> import uclchem
>>>
>>> # Create a collapsing cloud model
>>> cloud = uclchem.model.Cloud(
... param_dict={
... "initialDens": 1e2,
... "initialTemp": 10.0,
... "finalTime": 1e6,
... "freefall": True
... }
... )
>>>
>>> # Check for errors and plot
>>> cloud.check_error()
Model ran successfully
>>> cloud.create_abundance_plot(["CO", "$CO"]) #doctest: +SKIP
**Model Workflow:**
1. **Initialize**: Create model object with parameters
2. **Run**: Model runs automatically on initialization (or use `read_file` to load)
3. **Analyze**: Access results via attributes (`.final_abundances`, `.chemistry_dataframe`)
4. **Plot**: Use built-in plotting methods (`.create_abundance_plot()`)
5. **Chain**: Use as input to next stage (`.previous_model` parameter)
**Common Parameters:**
All models accept these key parameters in `param_dict`:
- ``initialDens`` (float): Initial density [cm⁻³]
- ``initialTemp`` (float): Initial temperature [K]
- ``finalTime`` (float): Simulation end time [years]
- ``freefall`` (bool): Enable freefall collapse (Cloud only)
- ``outputFile`` (str): Output file path (optional with OO API)
See the user guide for complete parameter list.
**Species Naming:**
- Gas phase: ``CO``, ``H2O``, ``CH3OH``
- Ice surface: ``$CO``, ``$H2O``, ``$CH3OH``
- Ice bulk: ``@CO``, ``@H2O``, ``@CH3OH``
**See Also:**
- :mod:`uclchem.analysis` - Analyze model outputs and reaction pathways
- :mod:`uclchem.advanced` - Advanced controls for heating and network state
**Note on Thread Safety:**
Model objects are **not thread-safe** when using advanced features that modify
Fortran module state. Use multiprocessing (not threading) for parallel runs.
"""
from __future__ import annotations
import contextlib
import json
import logging
_logger = logging.getLogger(__name__)
# /UCLCHEM related imports
# Multiprocessing imports
import multiprocessing as mp
import os
import signal
import traceback
import warnings
from abc import ABC, abstractmethod
from datetime import datetime
from multiprocessing import pool, shared_memory
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
import h5py
import numpy as np
import pandas as pd
import uclchemwrap
import xarray as xr
# UCLCHEM related imports
from uclchemwrap import uclchemwrap as wrap
from uclchem._coolant_utils import load_coolant_level_names
from uclchem._fortran_capture import capture_fortran_output
from uclchem.analysis import check_element_conservation
from uclchem.constants import (
DVODE_STAT_NAMES,
N_DVODE_STATS,
N_PHYSICAL_PARAMETERS,
N_SE_STATS_PER_COOLANT,
N_TOTAL_LEVELS,
NCOOLANTS,
PHYSICAL_PARAMETERS,
SE_STAT_NAMES,
TIMEPOINTS,
default_elements_to_check,
default_param_dictionary,
n_reactions,
n_species,
)
from uclchem.plot import create_abundance_plot, plot_species
from uclchem.utils import (
CollapseMode,
SuccessFlag,
get_collapse_mode,
get_reaction_table,
get_species,
)
if TYPE_CHECKING:
from collections.abc import Iterator
import matplotlib.pyplot as plt
# /Multiprocessing imports
# Global variables determining formats of write files
# in the below variable, the outputs were chosen according to the spacing needed for
# " Time, Density, gasTemp, dustTemp, Av, radfield, zeta,
# point, parcel_radius, radfield_internal, av_internal"
# Model registration is intended to prevent code injection during loading time.
[docs]
REGISTRY: dict[str, type[AbstractModel]] = {}
[docs]
def register_model(cls: type[AbstractModel]) -> type[AbstractModel]:
"""Register a new model in the model registry.
Parameters
----------
cls : type[AbstractModel]
class to register.
Returns
-------
cls : type[AbstractModel]
class
Raises
------
ValueError
If a model with the same name as cls is already in the registry.
"""
name = getattr(cls, "MODEL_NAME", cls.__name__)
if name in REGISTRY and REGISTRY[name] is not cls:
msg = f"Duplicate model registration for {name}"
raise ValueError(msg)
REGISTRY[name] = cls
return cls
# /Global variables determining formats of write files
# Reaction and Species name retrieval classes to reduce file read repetition.
[docs]
class ReactionNamesStore: # noqa: D101
def __init__(self):
[docs]
self.reaction_names = None
def __call__(self) -> list[str]:
"""Get a list of formatted reactions.
Only load the reactions once, after that use the cached version
Returns
-------
list[str]
List of formatted reactions.
"""
if self.reaction_names is None:
reactions = get_reaction_table()
# format the reactions:
self.reaction_names = [
reaction_line_formatter(line) for idx, line in reactions.iterrows()
]
return self.reaction_names
[docs]
get_reaction_names = ReactionNamesStore()
[docs]
class SpeciesNameStore: # noqa: D101
def __init__(self):
[docs]
self.species_names = None
def __call__(self) -> list[str]:
"""Get the species names.
Only loads the species once, after that use the cached version
Returns
-------
list[str]
List of species names
"""
if self.species_names is None:
self.species_names = get_species()
return self.species_names
[docs]
get_species_names = SpeciesNameStore()
# /Reaction and Species name retrieval classes to reduce file read repetition.
# Universal model loader
[docs]
def load_model(
*,
file_obj: h5py.File | None = None,
file: str | None = None,
name: str = "default",
debug: bool = False,
) -> AbstractModel:
"""Load a pre-existing model from a file. Bypasses `__init__`.
Parameters
----------
file_obj : h5py.File | None
open h5py file object. (Default value = None)
file : str | None
Path to a file that contains previously run and stored models. (Default value = None)
name : str
Name of the stored object, if none was provided `default` will have been used.
Defaults to 'default'
debug : bool
Flag if extra debug information should be printed to the terminal.
Defaults to False. #TODO Add debug features
Returns
-------
obj : AbstractModel
Loaded object that inherited from AbstractModel and has the class
of to the model found in the loaded file.
Raises
------
ValueError
If file_obj and file are both passed, or neither are passed.
Exception
If the model with name `name` is not found in the file.
"""
opened_file = False
if (file_obj is None) == (file is None):
msg = "file_obj or file must be passed."
raise ValueError(msg)
elif file_obj is None:
file_obj = h5py.File(file, "r")
opened_file = True
if name not in file_obj:
if opened_file:
file_obj.close()
msg = f"model {name} was not found in the save file that was passed."
raise Exception(msg)
model_group = file_obj[name]
coords = {}
if "_coords" in model_group:
for coord_name in model_group["_coords"]:
coords[coord_name] = _read_array(model_group["_coords"], coord_name)
data_vars = {}
for var_name in model_group:
if var_name == "_coords":
continue
data_vars[var_name] = _read_array(model_group, var_name)
loaded_data = xr.Dataset(data_vars, coords=coords)
if opened_file:
file_obj.close()
model_class = json.loads(loaded_data["attributes_dict"].item())["model_type"]
cls = REGISTRY.get(model_class)
if cls is None:
msg = f"Unrecognized model type '{model_class}'. Not in trusted registry."
raise ValueError(msg)
return cls.load_from_dataset(model_ds=loaded_data, debug=debug)
def _read_array(model_group: dict[str, xr.Dataset], name: str) -> xr.Variable:
"""Read an array from a model group.
Parameters
----------
model_group : dict[str, xr.Dataset]
model group.
name : str
key in model_group
Returns
-------
xr.Variable
xr array
"""
ds = model_group[name]
data = ds[()]
if data.dtype.kind == "S":
data = data.astype(str)
dims = list(ds.attrs["_dims"])
attrs = json.loads(ds.attrs.get("_attrs", "{}"))
return xr.Variable(dims, data, attrs=attrs)
# /Universal model loader
# Worker entry for parallel jobs
def _worker_entry(
model_class: str,
init_kwargs: dict,
shm_descs: dict,
result_queue: mp.Queue,
advanced_snapshot: dict | None = None,
):
# Restore advanced settings captured in the coordinator process.
if advanced_snapshot is not None:
from uclchem.advanced.worker_state import ( # noqa: PLC0415 circular
restore_snapshot,
)
restore_snapshot(advanced_snapshot)
cls = REGISTRY.get(model_class)
if cls is None:
msg = f"Unrecognized model type '{model_class}'. Not in trusted registry."
raise ValueError(msg)
model = cls.worker_build(init_kwargs=init_kwargs, shm_desc=shm_descs)
with capture_fortran_output(label="last_model", log_file="./last_model_fortran.log"):
output = model.run_fortran()
result_queue.put(output)
# /Worker entry for parallel jobs
# Short compatibility helper for legacy parameter `endAtFinalDensity`
def _convert_legacy_stopping_param(param_dict: dict[str, Any]) -> dict:
"""Minimal conversion of legacy `endAtFinalDensity` to `parcelStoppingMode`.
Parameters
----------
param_dict : dict[str, Any]
parameter dictionary.
Returns
-------
dict
Converted dictionary
Raises
------
RuntimeError
If `endAtFinaldensity` and `parcelStoppingMode` are both
in `param_dict`.
RuntimeError
If `endAtFinalDensity` is being used with a multi-point model.
"""
if param_dict is None:
return param_dict
has_old = "endatfinaldensity" in param_dict
has_new = "parcelstoppingmode" in param_dict
if has_old and has_new:
msg = "Cannot specify both 'endAtFinalDensity' and 'parcelStoppingMode'. Use 'parcelStoppingMode' only."
raise RuntimeError(msg)
if has_old:
points = param_dict.get("points", 1)
if points > 1:
msg = "endAtFinalDensity is no longer supported for multi-point models (points > 1). Use 'parcelStoppingMode' instead."
raise RuntimeError(msg)
old_val = param_dict.pop("endatfinaldensity")
new_val = 1 if old_val else 0
param_dict["parcelstoppingmode"] = new_val
# Check if parcelStoppingMode requires freefall validation
# Defer full validation to AbstractModel.__init__() where model_type is known
if new_val != 0 and not param_dict.get("freefall", False):
param_dict["_needs_freefall_validation"] = True
return param_dict
def _build_physics_df(
raw_array: np.ndarray, stored_cols: list[str], model_name: str
) -> pd.DataFrame:
"""Build a physics DataFrame, handling PHYSICAL_PARAMETERS version mismatches.
Parameters
----------
raw_array : np.ndarray
2D array of shape (n_timesteps, n_stored_cols) from the model file.
stored_cols : list[str]
Column names as stored in the file's _coords/physics_values.
model_name : str
Model name used in warning/error messages.
Returns
-------
pd.DataFrame
DataFrame with exactly the current PHYSICAL_PARAMETERS columns.
Columns absent from the file (added in a newer UCLCHEM) are zero-filled with a warning.
Raises
------
ValueError
If the file contains a column that no longer exists in PHYSICAL_PARAMETERS
(i.e. was removed), meaning the file was written with a newer UCLCHEM version.
"""
removed = set(stored_cols) - set(PHYSICAL_PARAMETERS)
if removed:
msg = (
f"Model file '{model_name}' contains physical parameters that no longer "
f"exist in the current UCLCHEM installation: {sorted(removed)}. "
"This file was written with a newer version of UCLCHEM and cannot be "
"loaded with this version."
)
raise ValueError(msg)
added = set(PHYSICAL_PARAMETERS) - set(stored_cols)
if added:
_logger.warning(
f"Model file '{model_name}' is missing physical parameters that were "
f"added in a newer UCLCHEM version: {sorted(added)}. "
"These columns will be filled with zeros."
)
raw_df = pd.DataFrame(raw_array, columns=stored_cols)
return raw_df.reindex(columns=PHYSICAL_PARAMETERS, fill_value=0.0)
# TODO Add catch of ctrl+c or other aborts so that it saves model and a
# full output to files of year, month, day, time type.
[docs]
class AbstractModel(ABC):
"""Base model class used for inheritance only.
The AbstractModel class serves as an abstract class from which other model classes can be built.
It is not intended to be used as a standalone class for running UCLCHEM.
Parameters
----------
param_dict : dict | None
Dictionary containing the parameters to use for the UCLCHEM model.
Uses UCLCHEM default values if not provided.
starting_chemistry : np.ndarray | None
Array containing the starting abundances to use for
the UCLCHEM model. Defaults to None.
previous_model : AbstractModel | None
Model object, a class that inherited from AbstractModel,
to use for the starting abundances of the new UCLCHEM model that will be run.
Defaults to None.
timepoints : int
Integer value of how many timesteps should be calculated before
aborting the UCLCHEM model. Defaults to `uclchem.constants.TIMEPOINTS`.
debug : bool
Flag if extra debug information should be printed to the terminal.
Defaults to False. #TODO Add debug features
read_file : str | None
Path to the file to be read. Reading a file to a model object, prevents it from
being run. Defaults to None.
run_type : Literal["managed", "external"]
Run type. "external" means that the model is not
run directly after instantiation, but can instead be run as `model.run()`.
"""
def __init__(
self,
param_dict: dict | None = None,
starting_chemistry: np.ndarray | None = None,
previous_model: AbstractModel | None = None,
timepoints: int = TIMEPOINTS,
debug: bool = False,
read_file: str | None = None,
run_type: Literal["managed", "external"] = "managed",
on_negative_abundances: Literal[None, "warning", "error", "raise"] = "warning",
on_error: Literal["raise", "warn", "ignore"] = "raise",
):
import multiprocessing # noqa: PLC0415
if (
multiprocessing.current_process().name != "MainProcess"
and run_type != "external"
):
msg = (
"A uclchem model was instantiated inside a multiprocessing worker. "
"This usually means your script is missing an "
"'if __name__ == \"__main__\":' guard around the model code.\n"
"Without it, each spawned worker re-imports and re-runs the script, "
"recursively launching more processes.\n"
"Wrap your code like this:\n\n"
" if __name__ == '__main__':\n"
" model = uclchem.model.Cloud(...)\n"
)
raise RuntimeError(msg)
self._data = xr.Dataset()
self._pickle_dict: dict = {}
# Per-instance metadata containers (scalars and small values)
object.__setattr__(self, "_meta", {})
object.__setattr__(self, "_pickle_meta", {})
# Set run_type into metadata
[docs]
self.run_type = run_type
[docs]
self.separate_worker_types = ["managed"]
# Shared memory
self._shm_desc: dict = {}
self._shm_handles: dict = {}
self._proc_handle: Any = None
# /Shared memory
# Signal Interrupt
self._was_interrupted = False
self._orig_sigint = signal.getsignal(signal.SIGINT)
# /Signal Interrupt
[docs]
self.model_type = str(self.__class__.__name__)
self._param_dict: dict = {}
[docs]
self.full_array: Any = None
self._debug = debug
self._on_negative_abundances = on_negative_abundances
self._on_error = on_error
[docs]
self.success_flag: None | SuccessFlag = None
# Note: specname is now accessed via get_species_names() global function
# Note: PHYSICAL_PARAMETERS is now accessed via the global constant
[docs]
self.n_out = 0 if read_file is None else None
[docs]
self.timepoints = timepoints
[docs]
self.was_read = read_file is not None
self._reform_inputs(param_dict or {})
if "columnfile" in self._param_dict:
warnings.warn(
"Dropping columnfile key from parameter dictionary. Use 'AbstractModel.legacy_write_columnfile' instead.",
stacklevel=2,
)
del self._param_dict["columnfile"]
# Validate parcelStoppingMode usage after model_type is known
if self._param_dict.get("_needs_freefall_validation", False):
if self.model_type not in {"Collapse"}: # Collapse models imply freefall
msg = (
"parcelStoppingMode != 0 can only be used with:\n"
" - Cloud models with freefall=True\n"
" - Collapse models (freefall is implied)\n"
f"Current model_type={self.model_type}, freefall={self._param_dict.get('freefall', False)}\n"
"Please either set freefall=True or set parcelStoppingMode=0"
)
raise ValueError(msg)
self._param_dict.pop("_needs_freefall_validation", None) # Clean up flag
# If we were given a previously-written output file, populate the model
# arrays and metadata from that file now so later initialization can rely on them.
if read_file is not None:
self.legacy_read_output_file(read_file)
if "points" not in self._param_dict:
self._param_dict["points"] = 1
# Expose grid points as attribute for legacy code expecting `gridPoints`
object.__setattr__(self, "gridPoints", self._param_dict["points"])
[docs]
self.outputFile = (
self._param_dict.pop("outputfile")
if "outputfile" in self._param_dict
else None
)
[docs]
self.abundSaveFile = (
self._param_dict.pop("abundsavefile")
if "abundsavefile" in self._param_dict
else None
)
[docs]
self.abundLoadFile = (
self._param_dict.pop("abundloadfile")
if "abundloadfile" in self._param_dict
else None
)
[docs]
self.starting_chemistry_array: Any = None
if previous_model is None and self.abundLoadFile is None:
self._create_starting_array(starting_chemistry)
elif self.abundLoadFile is not None:
self.legacy_read_starting_chemistry()
elif previous_model is not None and previous_model.has_attr(
"next_starting_chemistry_array"
):
# All models must use the same hard-coded PHYSICAL_PARAMETERS from constants.py
# If previous_model was loaded from a legacy file with different parameters,
# that's a compatibility issue that should have been caught during load.
self._create_starting_array(previous_model.next_starting_chemistry_array)
[docs]
self.give_start_abund = self.starting_chemistry_array is not None
if np.all(self.starting_chemistry_array == 0.0):
msg = "Detected all zeros starting chemistry array."
raise AssertionError(msg)
# Only initialize next_starting_chemistry_array if we didn't load it from a file
# (legacy_read_output_file sets it from the last timestep)
if read_file is None:
self.next_starting_chemistry_array: Any = None
# Only create new arrays if we didn't load them from a file
if read_file is None:
self.physics_array: Any = None
self.chemical_abun_array: Any = None
self._create_fortran_array()
self.rate_constants_array: Any = None
self._create_rate_constants_array()
self.heat_array: Any = None
self._create_heating_array()
self.stats_array: Any = None
self._create_stats_array()
self.level_populations_array: Any = None
self._create_level_populations_array()
self.se_stats_array: Any = None
self._create_se_stats_array()
else:
# When loading from file, arrays are already populated; just initialize
# the arrays that weren't loaded
self.rate_constants_array = None
self.heat_array = None
self.stats_array = None
self.level_populations_array = None
self.se_stats_array = None
def __del__(self):
"""Release any shared-memory handles held by this model on garbage collection."""
if hasattr(self, "_shm_desc"):
self._coordinator_unlink_memory()
# Separate class building method(s)
@classmethod
[docs]
def load_from_dataset(
cls, model_ds: xr.Dataset, debug: bool = False
) -> AbstractModel:
"""Load an abstract model from an xr Dataset.
Parameters
----------
model_ds : xr.Dataset
Dataset to load
debug : bool
Flag to set (Default value = False)
Returns
-------
obj : AbstractModel
instantiated model
"""
obj = cls.__new__(cls)
obj._param_dict = json.loads(model_ds["_param_dict"].item())
del model_ds["_param_dict"]
obj._data = xr.Dataset()
obj._data = model_ds.copy()
model_ds.close()
temp_attribute_dict = json.loads(obj._data["attributes_dict"].item())
# Restore these values into the metadata dict rather than dataset variables
object.__setattr__(obj, "_meta", temp_attribute_dict) # noqa: PLC2801 bypass
del obj._data["attributes_dict"]
obj.debug = debug
obj._coord_assign()
return obj
@classmethod
[docs]
def worker_build(cls, init_kwargs, shm_desc): # noqa: ANN001, D102
obj = cls.__new__(cls)
for k, v in init_kwargs.items():
object.__setattr__(obj, k, v) # noqa: PLC2801 bypass
obj._reform_array_in_worker(shm_desc)
return obj
@classmethod
[docs]
def from_file(
cls,
file: str,
name: str = "default",
debug: bool = False,
):
"""Load a model from a file.
This is a convenience class method that wraps the module-level load_model function.
Parameters
----------
file : str
Path to a file that contains previously run and stored models.
name : str
Name of the stored object. Defaults to 'default'.
debug : bool
Flag if extra debug information should be printed to the terminal.
Defaults to False. #TODO Add debug features
Returns
-------
AbstractModel
Model object loaded from the file.
"""
return load_model(file=file, name=name, debug=debug)
# /Separate class building methods
# Class utility methods
def __getattr__(self, key: str) -> Any:
"""Resolve attribute access against instance metadata and the backing dataset.
Parameters
----------
key : str
Name of the attribute to look up.
Returns
-------
Any
The stored value from ``_meta`` or the backing dataset.
Raises
------
AttributeError
If no attribute with the given name exists.
"""
# Internal attributes behave normally
if key.startswith("_") and key != "_data":
return super().__getattribute__(key)
# First check instance metadata
try:
meta = super().__getattribute__("_meta")
except Exception:
meta = {}
if key in meta:
return meta[key]
# Next check the xarray Dataset
if key in super().__getattribute__("_data"):
values = super().__getattribute__("_data")[key].values
if np.shape(values) != ():
if isinstance(values, tuple):
return values[1]
else:
return values
else:
return self._data[key].item()
msg = f'{self.__class__.__name__} has no attribute of name: "{key}".'
raise AttributeError(msg)
def __setattr__(self, key: str, value: Any) -> None:
"""Route attribute assignment to real attributes, metadata, or the dataset.
Parameters
----------
key : str
Name of the attribute to set.
value : Any
Value to store.
"""
# Underscored attributes are real attributes
if key.startswith("_"):
super().__setattr__(key, value)
return
# Determine ndim safely
try:
ndim = np.ndim(value)
except Exception:
ndim = None
# If this looks like an array variable (name contains '_array' and ndim >= 1),
# store it in _data
if ndim is not None and "_array" in key and ndim >= 1:
# Ensure value is a numpy array (convert lists, tuples, etc.)
if not isinstance(value, np.ndarray):
value = np.asarray(value)
ndim = value.ndim # Update ndim after conversion
# Remove any existing conflicting metadata entry
with contextlib.suppress(Exception):
meta = super().__getattribute__("_meta")
if key in meta:
del meta[key]
# Remove any existing dataset var of same name before inserting
if key in self._data:
with contextlib.suppress(Exception):
self._data = self._data.drop_vars(key)
# Choose a time dimension name that avoids conflicts with existing dims
time_dim = "time_step"
existing_time = None
try:
# Use .sizes (mapping of dim name -> length) instead of .dims to avoid FutureWarning
existing_time = self._data.sizes.get("time_step", None)
except Exception:
existing_time = None
if existing_time is not None and existing_time != value.shape[0]:
# create a per-variable time dim to avoid conflicts
base_time_dim = f"time_step_{key}"
time_dim = base_time_dim
i = 1
while time_dim in self._data.sizes:
time_dim = f"{base_time_dim}_{i}"
i += 1
if ndim == 3: # noqa: PLR2004
# Determine the 'values' dimension name for the variable
values_dim = key.replace("array", "values")
# If an existing coordinate with this name has a conflicting
# length, create a per-variable values dimension to avoid
# xarray merge conflicts (similar to per-variable time dims).
try:
existing_values_len = self._data.sizes.get(values_dim, None)
except Exception:
existing_values_len = None
if (
existing_values_len is not None
and existing_values_len != value.shape[2]
):
base_values_dim = f"{values_dim}_{key}"
new_values_dim = base_values_dim
j = 1
while new_values_dim in self._data.sizes:
new_values_dim = f"{base_values_dim}_{j}"
j += 1
self._data[key] = ([time_dim, "point", new_values_dim], value)
# add numeric coords for the new dimension
self._data = self._data.assign_coords(
{new_values_dim: np.arange(value.shape[2])}
)
else:
self._data[key] = ([time_dim, "point", values_dim], value)
# Add coordinate for custom time dim if we created one
if time_dim != "time_step":
self._data = self._data.assign_coords(
{time_dim: np.arange(value.shape[0])}
)
elif ndim == 2: # noqa: PLR2004
self._data[key] = (["point", key], value)
elif ndim == 1:
# For 1D arrays, use the array name as the dimension
dim_name = key.replace("_array", "")
self._data[key] = ([dim_name], value)
else:
self._data[key] = value
return
# Otherwise, store in metadata
try:
meta = super().__getattribute__("_meta")
except Exception:
object.__setattr__(self, "_meta", {})
meta = self._meta
# If a dataset variable exists with this name, drop it to avoid ambiguity
if key in self._data:
with contextlib.suppress(Exception):
self._data = self._data.drop_vars(key)
meta[key] = value
[docs]
def has_attr(self, key: str) -> bool:
"""Check if the object has an attribute stored in ``self._meta`` or ``self._data``.
Parameters
----------
key : str
name of attribute
Returns
-------
bool
whether the object has the attribute.
"""
try:
meta = super().__getattribute__("_meta")
except Exception:
meta = {}
return (key in meta) or (key in self._data)
# /Class utility method
# UCLCHEM utility and analysis wrappers
[docs]
def check_conservation(
self, element_list: list[str] | None = None, percent: bool = True
) -> None:
"""Check conservation of the chemical abundances.
Parameters
----------
element_list : list[str] | None
List of elements to check conservation for. (Default value = None)
percent : bool
Flag on if percentage values should be used. Defaults to True.
"""
if element_list is None:
element_list = default_elements_to_check
if self._param_dict["points"] > 1:
for i in range(self._param_dict["points"]):
print(
f"Element conservation report for point {i + 1} of {self._param_dict['points']}"
)
df_i = self.get_dataframes(i)
print(
check_element_conservation(
df_i if isinstance(df_i, pd.DataFrame) else df_i[0],
element_list,
percent,
)
)
else:
print("Element conservation report")
df_0 = self.get_dataframes(0)
print(
check_element_conservation(
df_0 if isinstance(df_0, pd.DataFrame) else df_0[0],
element_list,
percent,
)
)
[docs]
def check_error(self, only_error: bool = False, raise_on_error: bool = True) -> None:
"""Check the model error status and raise RuntimeError on failure.
Parameters
----------
only_error : bool
If True, only act when there was an error (skip success message).
Defaults to False.
raise_on_error : bool
If True (default), raises RuntimeError on failure. If False, prints.
"""
if self.success_flag is None:
print("Model has not been run.")
return
msg = self.success_flag.check_error(
only_error=only_error, raise_on_error=raise_on_error
)
if msg is not None:
print(msg)
[docs]
def create_abundance_plot(
self,
species: list[str],
figsize: tuple[int, int] = (16, 9),
point: int = 0,
plot_file: str | Path | None = None,
) -> tuple[plt.Figure, plt.Axes]:
"""`uclchem.plot.create_abundance_plot` wrapper method.
Parameters
----------
species : list[str]
List of species to plot.
figsize : tuple[int, int]
The figure size to use for matplotlib Defaults to (16, 9).
point : int
Integer referring to which point of the UCLCHEM model to use. Defaults to 0.
plot_file : str | Path | None
if not None, save to a path. Default = None.
Returns
-------
tuple[plt.Figure, plt.Axes]
matplotlib figure and axis objects
Raises
------
ValueError
If `point` is larger than the number of points in the model run.
"""
if point > self._param_dict["points"]:
msg = "'point' must be less than number of modeled points."
raise ValueError(msg)
df = self.get_dataframes(point)
return create_abundance_plot(
df if isinstance(df, pd.DataFrame) else df[0], species, figsize, plot_file
)
[docs]
def get_dataframes(
self,
point: int | None = None,
joined: bool = True,
with_rate_constants: bool = False,
with_heating: bool = False,
with_stats: bool = False,
with_level_populations: bool = False,
with_se_stats: bool = False,
) -> pd.DataFrame | tuple[pd.DataFrame, ...]:
"""Convert the model physics and chemical_abun arrays from numpy to pandas arrays.
Parameters
----------
point : int | None
Integer referring to which point of the UCLCHEM model to return.
If None, returns data for all points with a 'Point' column. Defaults to None.
joined : bool
Flag on whether the returned pandas dataframe should be one, or if
two dataframes should be
returned. One physical, one chemical_abun dataframe. Defaults to True.
with_rate_constants : bool
Flag on whether to include reaction rate constants
in the dataframe, and/or as a separate dataframe depending on the value of `joined`.
Defaults to False.
with_heating : bool
Flag on whether to include heating/cooling rates in the dataframe,
and/or as a separate dataframe depending on the value of `joined`. Defaults to False.
with_stats : bool
Flag on whether to include DVODE solver statistics in the dataframe,
and/or as a separate dataframe depending on the value of `joined`. Defaults to False.
with_level_populations : bool
Flag on whether to include coolant level populations
in the output. Defaults to False.
with_se_stats : bool
Flag on whether to include SE solver statistics in the output.
Defaults to False.
Returns
-------
pd.DataFrame | tuple[pd.DataFrame, ...]
If ``joined=True``: a single ``pd.DataFrame`` with all requested
columns joined horizontally.
If ``joined=False``: a tuple of ``pd.DataFrame`` objects in the
order ``(physics_df, chemistry_df[, rate_constants_df]
[, heating_df][, stats_df][, level_populations_df]
[, se_stats_df])``, where optional frames are included only when
their corresponding flag is ``True``.
"""
# Helper function to add Point column to a dataframe
def add_point_column(df: pd.DataFrame, point_num: int) -> pd.DataFrame:
df.insert(0, "Point", point_num)
return df
# Determine total number of points in model
n_points = self._param_dict.get("points", 1)
# Get dataframes for the requested points
if point is None:
# Collect dataframes for all points
all_dfs = []
for pt in range(n_points):
dfs = self._get_single_point_dataframes(
pt,
with_rate_constants,
with_heating,
with_stats,
with_level_populations,
with_se_stats,
)
# Add Point columns to all dataframes (1-indexed)
dfs = tuple(add_point_column(df, pt + 1) for df in dfs)
all_dfs.append(dfs)
# Transpose to group by dataframe type instead of by point
# e.g., [[phys0, chem0, rates0], [phys1, chem1, rates1]] -> [[phys0, phys1], [chem0, chem1], [rates0, rates1]] # noqa: W505
df_collections = list(zip(*all_dfs, strict=False))
# Concatenate each type vertically
concatenated: list[pd.DataFrame] = [
pd.concat(list(collection), ignore_index=True)
for collection in df_collections
]
else:
# Single point mode
concatenated = list(
self._get_single_point_dataframes(
point,
with_rate_constants,
with_heating,
with_stats,
with_level_populations,
with_se_stats,
)
)
# Add Point columns (1-indexed)
concatenated = [add_point_column(df, point + 1) for df in concatenated]
# Join horizontally if requested
if joined:
result_df = concatenated[0] # Start with physics
for df in concatenated[1:]:
# Drop duplicate Point column from subsequent dataframes
result_df = result_df.join(df.drop(columns=["Point"]))
return result_df
else:
return tuple(concatenated)
[docs]
def get_joined_dataframes(
self,
point: int | None = None,
with_rate_constants: bool = False,
with_heating: bool = False,
with_stats: bool = False,
with_level_populations: bool = False,
with_se_stats: bool = False,
) -> pd.DataFrame:
"""Return all model data as a single horizontally-joined DataFrame.
Convenience wrapper around :meth:`get_dataframes` with ``joined=True``.
Parameters
----------
point : int | None
Integer referring to which point of the UCLCHEM model to return.
If None, returns data for all points with a 'Point' column. Defaults to None.
with_rate_constants : bool
Flag on whether to include reaction rate constants in the joined dataframe.
Defaults to False.
with_heating : bool
Flag on whether to include heating/cooling rates in the joined dataframe.
Defaults to False.
with_stats : bool
Flag on whether to include DVODE solver statistics in the joined dataframe.
Defaults to False.
with_level_populations : bool
Flag on whether to include coolant level populations in the joined dataframe.
Defaults to False.
with_se_stats : bool
Flag on whether to include SE solver statistics in the joined dataframe.
Defaults to False.
Returns
-------
pd.DataFrame
Single DataFrame with physics, abundances, and any optional columns joined horizontally.
"""
result = self.get_dataframes(
point=point,
joined=True,
with_rate_constants=with_rate_constants,
with_heating=with_heating,
with_stats=with_stats,
with_level_populations=with_level_populations,
with_se_stats=with_se_stats,
)
# joined=True always returns a single DataFrame
if isinstance(result, pd.DataFrame):
return result
return result[0]
def _get_single_point_dataframes(
self,
point: int,
with_rate_constants: bool,
with_heating: bool,
with_stats: bool,
with_level_populations: bool,
with_se_stats: bool,
) -> tuple[pd.DataFrame, ...]:
"""Get dataframes for a single point without the Point column.
Parameters
----------
point : int
Spatial point index (for multi-point models).
with_rate_constants : bool
Flag on whether to include a reaction rate constant dataframe
in the tuple.
with_heating : bool
Flag on whether to include heating/cooling rates dataframe in the tuple.
with_stats : bool
Flag on whether to include DVODE solver statistics dataframe in the tuple.
with_level_populations : bool
Flag on whether to include coolant level
populations in the tuple.
with_se_stats : bool
Flag on whether to include SE solver statistics
in the tuple
Returns
-------
tuple[pd.DataFrame, ...]
a tuple of pd.DataFrame with physics_df, chemistry_df, and all
additional information based off whether the flags were True.
"""
# Create a physical parameter dataframe, using stored column names from the file
# to handle backwards-compatibility with models saved before new parameters were added.
# The original column names are preserved in _meta, even if _coord_assign replaced
# them with numeric indices when there was a length mismatch.
stored_cols = self._meta.get("physics_values", list(PHYSICAL_PARAMETERS))
model_identifier = f"{self.__class__.__name__} model"
physics_df = _build_physics_df(
self.physics_array[:, point, :], stored_cols, model_identifier
)
# Create an abundances dataframe using global species names
species_names = get_species_names()
chemistry_df = pd.DataFrame(
self.chemical_abun_array[:, point, :], index=None, columns=species_names
)
if self.rate_constants_array is not None and with_rate_constants:
# Create a rate constants dataframe.
rate_constants_df = pd.DataFrame(
self.rate_constants_array[:, point, :],
index=None,
columns=get_reaction_names(),
)
else:
rate_constants_df = None
if self.heat_array is not None and with_heating:
# Create a heating dataframe dynamically using labels from heating.f90
heating_columns = ["Time"]
# Add cooling mechanism labels
cooling_labels = [
str(np.char.decode(label)).strip() + " Cooling"
for label in uclchemwrap.heating.coolinglabels
]
heating_columns.extend(cooling_labels)
# Add line cooling labels with species names
line_cooling_labels = [
str(np.char.decode(label)).strip()
for label in uclchemwrap.f2py_constants.coolantnames
]
heating_columns.extend(
f"{label} Line Cooling" for label in line_cooling_labels
)
# Add heating mechanism labels
heating_labels = [
str(np.char.decode(label)).strip() + " Heating"
for label in uclchemwrap.heating.heatinglabels
]
heating_columns.extend(heating_labels)
heating_columns.append("Chemical Heating")
heating_df = pd.DataFrame(
self.heat_array[:, point, :], index=None, columns=heating_columns
)
else:
heating_df = None
if self.stats_array is not None and with_stats:
stats_df = pd.DataFrame(
self.stats_array[:, point, :], index=None, columns=DVODE_STAT_NAMES
)
else:
stats_df = None
if self.level_populations_array is not None and with_level_populations:
level_populations_df = self.get_level_populations_dataframe(point=point)
else:
level_populations_df = None
if self.se_stats_array is not None and with_se_stats:
se_stats_df = self.get_se_stats_dataframe(point=point)
else:
se_stats_df = None
# Return tuple of all dataframes (some may be None)
result: list[pd.DataFrame | None] = [physics_df, chemistry_df]
if with_rate_constants:
result.append(rate_constants_df)
if with_heating:
result.append(heating_df)
if with_stats:
result.append(stats_df)
if with_level_populations:
result.append(level_populations_df)
if with_se_stats:
result.append(se_stats_df)
return tuple(df for df in result if df is not None)
[docs]
def get_final_abundances_of_species(self, species: list[str]) -> np.ndarray:
"""Get the final abundances of a list of species.
Parameters
----------
species : list[str]
list of species names.
Returns
-------
abundances : np.ndarray
array of final abundances of ``species``.
"""
species_names = get_species_names()
abundances = np.empty(len(species))
for spec_index, spec in enumerate(species):
index = species_names.index(spec)
abundances[spec_index] = self.next_starting_chemistry_array[-1, index]
return abundances
[docs]
def get_solver_stats_dataframe(self, point: int | None = None) -> pd.DataFrame | None:
"""Get all solver statistics including failed attempts.
This method returns statistics for EVERY DVODE solver call,
including failed attempts that were retried. This is different
from the regular stats in get_dataframes() which only shows
the final successful attempt per trajectory timestep.
Parameters
----------
point : int | None
Spatial point index (for multi-point models). If None, uses point 0.
Defaults to None.
Returns
-------
pd.DataFrame | None
DataFrame with columns from DVODE_STAT_NAMES,
or None if stats not available.
TRAJECTORY_INDEX column links solver attempts to trajectory timesteps.
Rows where TRAJECTORY_INDEX=0 are filtered out (unused preallocated space).
Examples
--------
>>> import uclchem
>>> param_dict = {}
>>> model = uclchem.model.Cloud(param_dict)
>>> solver_stats = model.get_solver_stats_dataframe()
>>> # Count failed attempts
>>> failures = solver_stats[solver_stats['ISTATE'] < 0]
>>> print(f"Failed attempts: {len(failures)}") # doctest: +ELLIPSIS
Failed attempts: ...
"""
if self.stats_array is None:
return None
if point is None:
point = 0
# Extract stats for this point and filter out unused rows
stats_data = self.stats_array[:, point, :]
valid_mask = stats_data[:, 0] > 0 # TRAJECTORY_INDEX > 0
stats_data = stats_data[valid_mask]
if len(stats_data) == 0:
return None
df = pd.DataFrame(stats_data, columns=DVODE_STAT_NAMES)
df["TRAJECTORY_INDEX"] = df["TRAJECTORY_INDEX"].astype(int)
return df
[docs]
def get_failed_solver_attempts(self, point: int | None = None) -> pd.DataFrame | None:
"""Get only the failed solver attempts (ISTATE < 0).
Returns a DataFrame containing only solver attempts that failed
and required retry (ISTATE = -1, -2, -4, -5, etc.).
Parameters
----------
point : int | None
Spatial point index (for multi-point models). If None, uses point 0.
Defaults to None.
Returns
-------
pd.DataFrame | None
DataFrame of failed attempts,
or None if no failures or stats unavailable.
Examples
--------
>>> import uclchem
>>> param_dict = {}
>>> model = uclchem.model.Cloud(param_dict)
>>> failures = model.get_failed_solver_attempts()
>>> if failures is not None:
... print(f"Total retries needed: {len(failures)}")
... print(failures.groupby('ISTATE').size())
... else:
... print("No failures occurred.")
...
No failures occurred.
"""
df = self.get_solver_stats_dataframe(point)
if df is None:
return None
failed = df[df["ISTATE"] < 0]
return failed if len(failed) > 0 else None
[docs]
def get_solver_efficiency_summary(
self, point: int | None = None
) -> dict[str, int | float] | None:
"""Calculate solver efficiency metrics.
Parameters
----------
point : int | None
Spatial point index (for multi-point models). If None, uses point 0.
Defaults to None.
Returns
-------
dict[str, int | float] | None
dict with keys:
- total_attempts: Total DVODE calls
- successful_attempts: Calls that advanced the trajectory
- failed_attempts: Calls that were retried
- efficiency_ratio: successful / total (1.0 = no retries)
- total_cpu_time: Sum of all CPU time
- wasted_cpu_time: CPU time spent on failed attempts
"""
df = self.get_solver_stats_dataframe(point)
if df is None:
return None
total_attempts = len(df)
failed_attempts = len(df[df["ISTATE"] < 0])
successful_attempts = total_attempts - failed_attempts
total_cpu = df["CPU_TIME"].sum()
wasted_cpu = df[df["ISTATE"] < 0]["CPU_TIME"].sum()
return {
"total_attempts": total_attempts,
"successful_attempts": successful_attempts,
"failed_attempts": failed_attempts,
"efficiency_ratio": successful_attempts / total_attempts
if total_attempts > 0
else 0.0,
"total_cpu_time": total_cpu,
"wasted_cpu_time": wasted_cpu,
"wasted_fraction": wasted_cpu / total_cpu if total_cpu > 0 else 0.0,
}
[docs]
def plot_species(
self,
ax: plt.Axes,
species: list[str],
point: int = 0,
legend: bool = True,
**plot_kwargs,
) -> plt.Axes:
"""`uclchem.plot.plot(species)` wrapper method.
Parameters
----------
ax : plt.Axes
An axis object to plot on
df : pd.DataFrame
A dataframe created by `read_output_file`
species : list[str]
A list of species names to be plotted.
If species name starts with "$" instead of "#" or "@",
plots the sum of surface and bulk abundances.
point : int
Grid point index. Default = 0.
legend : bool
Whether to add a legend to the plot. Default = True.
**plot_kwargs : dict[str, Any]
Additional keyword arguments passed to ``ax.plot``.
Returns
-------
plt.Axes
Modified input axis is returned
"""
df = self.get_dataframes(point)
return plot_species(
ax,
df if isinstance(df, pd.DataFrame) else df[0],
species,
legend,
**plot_kwargs,
)
# /UCLCHEM utility and analysis wrappers
# Methods to start run of model
[docs]
def run(self) -> None:
"""Run the model, resetting the Fortran arrays so they can be reused for new runs.
Raises
------
RuntimeError
If the model was read.
TypeError
If the success_flag returned from the model is not an integer.
ValueError
If the model's run_type is invalid.
KeyboardInterrupt
If the run is interrupted via ``SIGINT`` while in progress.
""" # noqa: DOC502 nested-handler
if self.was_read:
msg = "This model was read. It can not be run. "
raise RuntimeError(msg)
def _handler(signum, frame): # noqa: ARG001, ANN001
try:
self.on_interrupt() # your “final steps”
finally:
# Restore default and re-raise KeyboardInterrupt to stop execution
signal.signal(signal.SIGINT, self._orig_sigint)
raise KeyboardInterrupt
signal.signal(signal.SIGINT, _handler)
if self.run_type not in self.separate_worker_types:
output = self.run_fortran()
elif self.run_type in self.separate_worker_types:
from uclchem.advanced.worker_state import ( # noqa: PLC0415 circular
create_snapshot,
)
snapshot = create_snapshot()
init_kwargs = self._create_init_dict()
ctx = mp.get_context("spawn")
result_queue = ctx.Queue()
self._proc_handle = ctx.Process(
target=_worker_entry,
args=(
self.model_type,
init_kwargs,
self._shm_desc,
result_queue,
snapshot,
),
daemon=False,
)
self._proc_handle.start()
output = result_queue.get()
result_queue.close()
self._proc_handle.join()
self._proc_handle.close()
self._proc_handle = None
else:
msg = f"run_type of {self.run_type} is not a valid value."
raise ValueError(msg)
signal.signal(signal.SIGINT, self._orig_sigint)
if hasattr(self, "_shm_handles"):
self._coordinator_unlink_memory()
raw_flag = output["success_flag"]
if not isinstance(raw_flag, int):
msg = f"Expected int success_flag, got {type(raw_flag).__name__}"
raise TypeError(msg)
output["success_flag"] = SuccessFlag(raw_flag) # type: ignore[call-arg]
for k, v in output.items():
setattr(self, k, v)
self._array_clean()
self._check_negative_abundances()
if self.success_flag != SuccessFlag.SUCCESS:
error_msg: str | None
if self.success_flag is None:
error_msg = None
else:
error_msg = self.success_flag.check_error(
only_error=True, raise_on_error=False
)
self._handle_model_error(
f"UCLCHEM error ({self.success_flag.name if self.success_flag else 'None'}, {self.success_flag.value if self.success_flag else 'None'}): {error_msg}"
)
if self.outputFile is not None:
_logger.debug(f"Writing output file: {self.outputFile}")
_logger.debug(
f"Physics array shape: {self.physics_array.shape if self.physics_array is not None else None}"
)
_logger.debug(
f"Chemical array shape: {self.chemical_abun_array.shape if self.chemical_abun_array is not None else None}"
)
try:
self.legacy_write_full()
_logger.debug(f"Successfully wrote {self.outputFile}")
except Exception as e:
_logger.error(f"Failed to write {self.outputFile}: {e}", exc_info=True)
raise
if self.abundSaveFile is not None:
_logger.debug(f"Writing abundance file: {self.abundSaveFile}")
try:
self.legacy_write_starting_chemistry()
_logger.debug(f"Successfully wrote {self.abundSaveFile}")
except Exception as e:
_logger.error(f"Failed to write {self.abundSaveFile}: {e}", exc_info=True)
raise
@abstractmethod
[docs]
def run_fortran(self) -> dict[str, int | list]: # noqa: D102
raise NotImplementedError
# /Methods to start run of model
# Model saving
[docs]
def save_model(
self,
*,
file_obj: h5py.File | None = None,
file: str | None = None,
name: str = "default",
overwrite: bool = False,
) -> None:
"""Save a model to file on disk. Multiple models can be saved into the same file.
if different names are used to store them.
Parameters
----------
file_obj : h5py.File | None
open file object (Default value = None)
file : str | None
Path to a file to store models. (Default value = None)
name : str
Name to use for the group of the object. Defaults to 'default'
overwrite : bool
Boolean on whether to overwrite pre-existing models, or error out.
Defaults to False
Raises
------
ValueError
If file_obj and file are both passed, or neither are passed.
"""
opened_file = False
if file_obj is None and file is None:
msg = "file_obj or file must be passed."
raise ValueError(msg)
elif file_obj is None:
file_obj = h5py.File(file, "a")
opened_file = True
if name in file_obj:
if not overwrite:
warnings.warn(
f"Model with name: `{name}` already exists in save file but overwrite is set to False. Unable to save model.",
stacklevel=2,
)
return
else:
del file_obj[name]
# TODO: Allow for toggling of saving float64 or float32 for the arrays
temp_attribute_dict = {}
with contextlib.suppress(Exception):
temp_attribute_dict.update(super().__getattribute__("_meta"))
# Work on a copy so save_model is non-destructive to self._data
save_data = self._data.copy()
# Collect remaining non-array dataset variables into attributes (same behavior as before)
for v in list(save_data.variables):
if "_array" not in v and v not in {"_orig_sigint"}:
if np.shape(save_data[v].values) != ():
if isinstance(save_data[v].values, tuple):
temp_attribute_dict[v] = save_data[v].values[1].tolist()
save_data = save_data.drop_vars(v)
else:
temp_attribute_dict[v] = save_data[v].values.tolist()
save_data = save_data.drop_vars(v)
else:
temp_attribute_dict[v] = save_data[v].item()
save_data = save_data.drop_vars(v)
save_data["attributes_dict"] = xr.DataArray([json.dumps(temp_attribute_dict)])
save_data["_param_dict"] = xr.DataArray([json.dumps(self._param_dict)])
model_group = file_obj.create_group(name)
coord_grp = model_group.create_group("_coords")
for coord_name, coord in save_data.coords.items():
self._write_array(coord_grp, coord_name, coord)
for var_name, var in save_data.data_vars.items():
self._write_array(model_group, var_name, var)
if opened_file:
file_obj.flush()
file_obj.close()
@staticmethod
def _write_array(model_group: h5py.Group, name: str, xr_var: xr.DataArray) -> None:
data = xr_var.values
if data.dtype.kind == "U":
data = data.astype(bytes)
ds = model_group.create_dataset(name, data=data)
ds.attrs["_dims"] = list(xr_var.dims)
# /Model saving
# Model Passing through Pickling
[docs]
def pickle(self) -> AbstractModel:
"""Pickle the model.
Returns
-------
AbstractModel
AbstractModel
"""
if self._data is not None and not bool(self._pickle_dict):
for v in self._data.variables:
if np.shape(self._data[v].values) != ():
if isinstance(self._data[v].values, tuple):
self._pickle_dict[v] = self._data[v].values[1].tolist()
else:
self._pickle_dict[v] = self._data[v].values.tolist()
else:
self._pickle_dict[v] = self._data[v].item()
# Save metadata separately for pickle roundtrip
try:
object.__setattr__( # noqa: PLC2801 bypass
self, "_pickle_meta", super().__getattribute__("_meta").copy()
)
except Exception:
object.__setattr__(self, "_pickle_meta", {}) # noqa: PLC2801 bypass
self._data = None
# Clear runtime metadata to reflect pickled state
object.__setattr__(self, "_meta", {}) # noqa: PLC2801 bypass
return self
[docs]
def un_pickle(self) -> AbstractModel:
"""Un-pickle the model.
Returns
-------
AbstractModel
AbstractModel
"""
if self._data is None and bool(self._pickle_dict):
self._data = xr.Dataset()
for k, v in self._pickle_dict.items():
if np.ndim(v) == 3 and "_array" in k: # noqa: PLR2004
# Avoid colliding with existing 'time_step' dim sizes by
# making a per-variable time dim if necessary
time_dim = "time_step"
existing_time = None
try:
existing_time = self._data.sizes.get("time_step", None)
except Exception:
existing_time = None
v_arr = np.asarray(v)
if existing_time is not None and existing_time != np.shape(v_arr)[0]:
base_time_dim = f"time_step_{k}"
time_dim = base_time_dim
i = 1
while time_dim in self._data.sizes:
time_dim = f"{base_time_dim}_{i}"
i += 1
self._data[k] = (
[time_dim, "point", k.replace("array", "values")],
v_arr,
)
self._data = self._data.assign_coords(
{time_dim: np.arange(np.shape(v_arr)[0])}
)
else:
self._data[k] = (
["time_step", "point", k.replace("array", "values")],
v_arr,
)
elif np.ndim(v) == 2 and "_array" in k: # noqa: PLR2004
self._data[k] = (["point", k], v)
elif "_values" in k:
pass
else:
self._data[k] = v
self._pickle_dict = {}
# Restore saved metadata if present
try:
if hasattr(self, "_pickle_meta") and self._pickle_meta:
object.__setattr__(self, "_meta", self._pickle_meta.copy()) # noqa: PLC2801 bypass
except Exception: # noqa: S110 defensive
pass
finally:
object.__setattr__(self, "_pickle_meta", {}) # noqa: PLC2801 bypass
self._coord_assign()
return self
# /Model Passing through Pickling
# Legacy in & output support
[docs]
def legacy_read_output_file(
self,
read_file: str | Path,
rate_constants_load_file: str | Path | None = None, # noqa: ARG002
) -> None:
"""Perform classic output file reading.
This reader constructs the internal :class:`xarray.Dataset` directly from the
parsed header and data in the legacy ``.dat`` output format. The classic
files place a `point` column between the physics and chemistry columns; we
therefore use the location of `point` in the header to split the columns
reliably and avoid using global constants as authoritative metadata.
Parameters
----------
read_file : str | Path
path to file
rate_constants_load_file : str | Path | None
Not used (Default value = None)
Raises
------
ValueError
If there is any incompatibility error.
"""
self.was_read = True
# Read header and numeric data
columns = np.char.strip(
np.loadtxt(read_file, delimiter=",", max_rows=1, dtype=str, comments="%")
)
raw_array = np.loadtxt(read_file, delimiter=",", skiprows=1)
# Determine where the `point` column is and how many points exist
point_index = np.where(columns == "point")[0][0]
min_point = int(np.min(raw_array[:, point_index]))
max_point = int(np.max(raw_array[:, point_index]))
# Support both 0-based (UCLCHEM ≥4.0) and 1-based (legacy Fortran) point indices.
# The offset is 0 for new files and 1 for old legacy files.
_point_offset = min_point
self._param_dict["points"] = max_point - min_point + 1
# Some legacy files include an additional metadata row; if more than one
# point exists the legacy format contains that extra row which we must skip
if self._param_dict["points"] > 1:
array = np.loadtxt(read_file, delimiter=",", skiprows=2)
else:
array = raw_array
row_count = int(np.shape(array)[0] / self._param_dict["points"])
# Extract physics and species columns from legacy file header
physics_cols_from_file = list(columns[:point_index].tolist())
species_cols_from_file = list(columns[point_index + 1 :].tolist())
# Validate compatibility with current UCLCHEM constants - these are non-negotiable
# PHYSICAL_PARAMETERS are hard-coded and tied to the Fortran wrapper
if physics_cols_from_file != list(PHYSICAL_PARAMETERS):
# Special case: backward compatibility for missing 'dstep' parameter
# Old UCLCHEM versions didn't have dstep; we can infer it if safe
missing_params = set(PHYSICAL_PARAMETERS) - set(physics_cols_from_file)
extra_params = set(physics_cols_from_file) - set(PHYSICAL_PARAMETERS)
# Parameters that can be safely zero/one-filled from legacy files.
# av_internal and radfield_internal are computed per-timestep and
# default to zero (no internal radiation source in legacy runs).
INFERABLE_PARAMS = {
"dstep",
"parcel_radius",
"av_internal",
"radfield_internal",
}
if missing_params <= INFERABLE_PARAMS and not extra_params:
# dstep and/or other inferable params missing — check if we can safely infer
# If there are no duplicate timesteps, we can assume dstep=1
time_column_index = physics_cols_from_file.index("Time")
time_values = array[:, time_column_index]
if len(time_values) == len(np.unique(time_values)):
# No duplicate timesteps - safe to assume dstep=1
warnings.warn(
"LEGACY FILE COMPATIBILITY: The file is missing the 'dstep' parameter.\n"
"No duplicate timesteps detected, safely assuming dstep=1 for all rows.\n"
"This file was likely created with an older UCLCHEM version.\n"
"Consider regenerating the file with the current version for full compatibility.",
UserWarning,
stacklevel=2,
)
if "dstep" in missing_params:
# Add dstep=1 column before point
dstep_column = np.ones((array.shape[0], 1))
array = np.hstack(
[array[:, :point_index], dstep_column, array[:, point_index:]]
)
physics_cols_from_file.append("dstep")
point_index += 1 # point column shifted by 1
if "parcel_radius" in missing_params:
# Add parcel_radius=0 column before point (collapse files only)
parcel_radius_column = np.zeros((array.shape[0], 1))
array = np.hstack(
[
array[:, :point_index],
parcel_radius_column,
array[:, point_index:],
]
)
physics_cols_from_file.append("parcel_radius")
point_index += 1 # point column shifted by 1
if "av_internal" in missing_params:
# Add av_internal=0 column before point (not present in pre-1D-RT files)
av_internal_column = np.zeros((array.shape[0], 1))
array = np.hstack(
[
array[:, :point_index],
av_internal_column,
array[:, point_index:],
]
)
physics_cols_from_file.append("av_internal")
point_index += 1
if "radfield_internal" in missing_params:
# Add radfield_internal=0 column before point
# (not present in pre-1D-RT files)
radfield_internal_column = np.zeros((array.shape[0], 1))
array = np.hstack(
[
array[:, :point_index],
radfield_internal_column,
array[:, point_index:],
]
)
physics_cols_from_file.append("radfield_internal")
point_index += 1
else:
msg = (
f"INCOMPATIBLE LEGACY FILE: Cannot infer 'dstep' parameter.\n\n"
f"The file is missing 'dstep' and contains duplicate timesteps,\n"
f"making it impossible to safely infer the particle step values.\n\n"
f" File contains: {physics_cols_from_file}\n"
f" Current UCLCHEM has: {list(PHYSICAL_PARAMETERS)}\n\n"
f"To load this file, regenerate it with the current UCLCHEM version."
)
raise ValueError(msg)
else:
# Other parameter mismatch - cannot fix automatically
msg = (
f"INCOMPATIBLE LEGACY FILE: Physical parameters mismatch.\n\n"
f"The file you are loading has different PHYSICAL_PARAMETERS than the currently installed UCLCHEM.\n"
f"This means the file was created with a different version of UCLCHEM and cannot be loaded.\n\n"
f" File contains: {physics_cols_from_file} ({len(physics_cols_from_file)} parameters)\n"
f" Current UCLCHEM has: {list(PHYSICAL_PARAMETERS)} ({len(PHYSICAL_PARAMETERS)} parameters)\n"
f" Missing parameters: {list(missing_params)}\n"
f" Extra parameters: {list(extra_params)}\n\n"
f"PHYSICAL_PARAMETERS are hard-coded in the Fortran wrapper and cannot be changed at runtime.\n"
f"To load this file, you must use the same UCLCHEM version that created it, or regenerate the file.\n"
)
raise ValueError(msg)
if species_cols_from_file != list(get_species_names()):
msg = (
f"INCOMPATIBLE LEGACY FILE: Species list mismatch.\n\n"
f"The file you are loading has a different species network than the currently installed UCLCHEM.\n"
f"This means the file was created with a different chemical network and cannot be loaded.\n\n"
f" File contains: {len(species_cols_from_file)} species\n"
f" Current UCLCHEM has: {len(get_species_names())} species\n\n"
f"The species list is tied to the chemical network compiled into UCLCHEM.\n"
f"To load this file, you must use the same UCLCHEM version/network that created it, or regenerate the file.\n"
)
raise ValueError(msg)
# At this point, we've validated that dimensions match
# Allocate arrays using the validated file dimensions (which equal current constants)
self.physics_array = np.empty(
(row_count, self._param_dict["points"], len(PHYSICAL_PARAMETERS))
)
self.chemical_abun_array = np.empty(
(row_count, self._param_dict["points"], n_species)
)
for p in range(self._param_dict["points"]):
sel = np.where(array[:, point_index] == p + _point_offset)[0]
self.physics_array[:, p, :] = array[sel, :point_index]
self.chemical_abun_array[:, p, :] = array[sel, point_index + 1 :]
# Construct Dataset using current UCLCHEM constants (validated to match file)
self._data = xr.Dataset(
{
"physics_array": (
["time_step", "point", "physics_values"],
self.physics_array,
),
"chemical_abun_array": (
["time_step", "point", "chemical_abun_values"],
self.chemical_abun_array,
),
},
coords={
"physics_values": PHYSICAL_PARAMETERS,
"chemical_abun_values": get_species_names(),
"time_step": np.arange(self.physics_array.shape[0]),
"point": np.arange(self._param_dict["points"]),
},
)
# Clean trailing zero-padded timesteps and finish
self._array_clean()
# Ensure timepoints is consistent with the dataset we just constructed
# Fortran expects arrays of shape (timepoints+1, points, ...), so we
# store `timepoints` as the number of simulated timesteps minus one.
# Be defensive; if something goes wrong leave timepoints as-is.
with contextlib.suppress(Exception):
tp = int(self._data.sizes.get("time_step", 0))
object.__setattr__(self, "timepoints", tp - 1 if tp > 0 else 0) # noqa: PLC2801 bypass
nonzero_indices = self.physics_array[:, 0, 0].nonzero()[0]
if len(nonzero_indices) > 0:
last_timestep_index = nonzero_indices[-1]
self.next_starting_chemistry_array = self.chemical_abun_array[
last_timestep_index, :, :
]
else:
# Model failed immediately, no valid timesteps
self.next_starting_chemistry_array = None
[docs]
def legacy_read_starting_chemistry(self) -> None:
"""Read the starting chemistry from ``self.abundLoadFile`` in ``_param_dict``.
Raises
------
ValueError
If ``self.abundLoadFile`` is ``None``.
"""
abund_load_file = self.abundLoadFile
if abund_load_file is None:
msg = "abundLoadFile is not set"
raise ValueError(msg)
self._create_starting_array(np.loadtxt(str(abund_load_file), delimiter=","))
[docs]
def legacy_write_full(self) -> None:
"""Perform classic output file writing to ``self.outputFile`` from ``_param_dict``.
Raises
------
ValueError
If ``self.outputFile`` is ``None``.
"""
output_file = self.outputFile
if output_file is None:
msg = "outputFile is not set"
raise ValueError(msg)
phys = self.physics_array.reshape(-1, self.physics_array.shape[-1])
chem = self.chemical_abun_array.reshape(-1, self.chemical_abun_array.shape[-1])
full_array = np.append(phys, chem, axis=1)
# TODO Move away from the magic numbers seen here.
species_names = get_species_names()
string_fmt_string = f"{', '.join([PHYSICAL_PARAMETERS_HEADER_FORMAT] * (len(PHYSICAL_PARAMETERS)))}, {', '.join([SPECNAME_HEADER_FORMAT] * len(species_names))}"
# Magic numbers here to match/improve the formatting of the classic version
# TODO Move away from the magic numbers seen here.
number_fmt_string = f"{PHYSICAL_PARAMETERS_VALUE_FORMAT}, {', '.join([SPECNAME_VALUE_FORMAT] * len(species_names))}"
columns = np.array([PHYSICAL_PARAMETERS[:-1] + ["point"] + species_names])
np.savetxt(str(output_file), columns, fmt=string_fmt_string)
with Path(str(output_file)).open("ab") as f:
np.savetxt(f, full_array, fmt=number_fmt_string)
[docs]
def legacy_write_starting_chemistry(self) -> None:
"""Perform classic starting abundance file writing to the file `self.abundSaveFile`.
provided in `_param_dict`.
Raises
------
ValueError
If ``self.abundSaveFile`` is ``None``.
"""
# TODO Move away from the magic numbers seen here.
abund_save_file = self.abundSaveFile
if abund_save_file is None:
msg = "abundSaveFile is not set"
raise ValueError(msg)
species_names = get_species_names()
number_fmt_string = f" {', '.join(['%9.5E'] * len(species_names))}"
with Path(str(abund_save_file)).open("wb") as f:
np.savetxt(
f,
self.next_starting_chemistry_array,
fmt=number_fmt_string,
)
[docs]
def legacy_write_columnfile(
self, column_file: str | Path, species: list[str]
) -> None:
"""Write a classic ``columnFile`` file, similar to full output but with a subset of species.
Since ``out_species`` was removed from the object-oriented API, it has to be passed here.
Parameters
----------
column_file : str | Path
path to write to
species : list[str]
List of species names to write
"""
phys = self.physics_array.reshape(-1, self.physics_array.shape[-1])
chem = self.chemical_abun_array.reshape(-1, self.chemical_abun_array.shape[-1])
species_names = get_species_names()
species_indices = [species_names.index(spec) for spec in species]
chem_species = chem[:, species_indices]
full_array = np.append(phys, chem_species, axis=1)
string_fmt_string = f"{', '.join([PHYSICAL_PARAMETERS_HEADER_FORMAT] * (len(PHYSICAL_PARAMETERS)))}, {', '.join([SPECNAME_HEADER_FORMAT] * len(species))}"
# Magic numbers here to match/improve the formatting of the classic version
# TODO Move away from the magic numbers seen here.
number_fmt_string = f"{PHYSICAL_PARAMETERS_VALUE_FORMAT}, {', '.join([SPECNAME_VALUE_FORMAT] * len(species))}"
columns = np.array([PHYSICAL_PARAMETERS[:-1] + ["point"] + species])
np.savetxt(column_file, columns, fmt=string_fmt_string)
with Path(column_file).open("ab") as f:
np.savetxt(f, full_array, fmt=number_fmt_string)
# /Legacy in & output support
# Cleaning of array & inputs
def _handle_model_error(self, msg: str) -> None:
"""Dispatch a Fortran model error according to the ``on_error`` constructor setting.
Parameters
----------
msg : str
Error message to raise or warn with.
Raises
------
RuntimeError
If ``on_error`` is ``"raise"`` (the default).
"""
if self._on_error == "raise":
raise RuntimeError(msg)
elif self._on_error == "warn":
warnings.warn(msg, stacklevel=3)
# "ignore": do nothing
def _check_negative_abundances(self) -> None:
"""Check chemical_abun_array for negative values and act per on_negative_abundances.
Called automatically after _array_clean() in run(). Behavior is controlled by the
on_negative_abundances constructor argument:
- None : do nothing.
- "warning" : emit a Python warnings.warn (default).
- "error" : set success_flag to NEGATIVE_ABUNDANCE_ERROR so that the subsequent
check_error() call raises, mirroring the old Fortran behavior.
- "raise" : raise RuntimeError immediately.
Raises
------
RuntimeError
If ``on_negative_abundances='raise'`` and negative abundances are detected.
"""
if self._on_negative_abundances is None:
return
if self.chemical_abun_array is None or not np.any(self.chemical_abun_array < 0):
return
msg = "Negative abundances detected in chemical output array."
if self._on_negative_abundances == "warning":
warnings.warn(msg, stacklevel=3)
elif self._on_negative_abundances == "error":
self.success_flag = SuccessFlag.NEGATIVE_ABUNDANCE_ERROR
elif self._on_negative_abundances == "raise":
raise RuntimeError(msg)
def _array_clean(self):
"""Clean the arrays changed by UCLCHEM Fortran code."""
# Find the first element with all the zeros
if self._debug:
print(f"in _array_clean: physics_array = {self.physics_array}")
print(f"in _array_clean: physics_array type = {type(self.physics_array)}")
nonzero_indices = self.physics_array[:, 0, 0].nonzero()[0]
if len(nonzero_indices) == 0:
# Model failed immediately, keep only the first row to indicate failure
self._data = self._data.isel(time_step=slice(0, 1))
self._coord_assign()
self.next_starting_chemistry_array = self.chemical_abun_array[0, :, :]
return
last_timestep_index = nonzero_indices[-1]
# Get the arrays for only the simulated timesteps (not the zero padded ones)
self._data = self._data.isel(time_step=slice(0, last_timestep_index + 1))
self._coord_assign()
self.next_starting_chemistry_array = self.chemical_abun_array[
last_timestep_index, :, :
]
return
def _coord_assign(self):
"""Assign coordinate values derived from arrays and metadata.
Be conservative: prefer existing dataset coordinates if they are consistent
with array shapes. Only override coords from metadata when lengths match
the corresponding array dimensions to avoid xarray dimension conflicts.
"""
# physics coords
try:
phys_len = self.physics_array.shape[-1]
except Exception:
phys_len = None
if phys_len is not None:
if "physics_values" in self._data.coords:
if len(self._data.coords["physics_values"]) != phys_len:
if len(PHYSICAL_PARAMETERS) == phys_len:
self._data = self._data.assign_coords(
{"physics_values": PHYSICAL_PARAMETERS}
)
else:
# fallback to numeric indices if dimensions don't match
self._data = self._data.assign_coords(
{"physics_values": np.arange(phys_len)}
)
elif len(PHYSICAL_PARAMETERS) == phys_len:
self._data = self._data.assign_coords(
{"physics_values": PHYSICAL_PARAMETERS}
)
else:
self._data = self._data.assign_coords(
{"physics_values": np.arange(phys_len)}
)
# chemical abundances coords
try:
chem_len = self.chemical_abun_array.shape[-1]
except Exception:
chem_len = None
if chem_len is not None:
species_names = get_species_names()
if "chemical_abun_values" in self._data.coords:
if len(self._data.coords["chemical_abun_values"]) != chem_len:
if len(species_names) == chem_len:
self._data = self._data.assign_coords(
{"chemical_abun_values": species_names}
)
else:
self._data = self._data.assign_coords(
{"chemical_abun_values": np.arange(chem_len)}
)
elif len(species_names) == chem_len:
self._data = self._data.assign_coords(
{"chemical_abun_values": species_names}
)
else:
self._data = self._data.assign_coords(
{"chemical_abun_values": np.arange(chem_len)}
)
def _reform_inputs(self, param_dict: dict) -> None:
"""Reform input parameters, copying ``param_dict`` to avoid mutating the user dict.
Parameters
----------
param_dict : dict
Parameter dictionary passed by the user to the model.
Raises
------
ValueError
If an duplicate key is encountered in `param_dict`.
"""
if param_dict is None:
# avoid mutating the shared default dictionary
self._param_dict = default_param_dictionary.copy()
else:
# lower case (and conveniently copy so we don't edit) the user's dictionary
# this is key to UCLCHEM's "case insensitivity"
new_param_dict = {}
for k, v in param_dict.items():
if k.lower() in new_param_dict:
msg = f"Duplicate lower case key {k} is already in the dict, stopping"
raise ValueError(msg)
if isinstance(v, Path):
v = str(v)
new_param_dict[k.lower()] = v
# Handle deprecated endAtFinalDensity parameter (after lowercasing)
new_param_dict = _convert_legacy_stopping_param(new_param_dict)
self._param_dict = {**default_param_dictionary, **new_param_dict.copy()}
del new_param_dict
# Remove keys with None values from the merged _param_dict
# Check the merged dict, not the defaults, to preserve user-provided values
keys_to_delete = [k for k, v in self._param_dict.items() if v is None]
for k in keys_to_delete:
del self._param_dict[k]
# Still set these, because the fortran at this point still requires them.
self.out_species = ""
self.n_out = 0
self.out_species_list = ""
# /Cleaning of array & inptus
# Creation of arrays
def _create_fortran_array(self):
"""Create Fortran-compliant np.arrays that can be passed to the Fortran part of UCLCHEM."""
# For shared memory:
(
self._shm_handles["physics_array"],
self._shm_desc["physics_array"],
self.physics_array,
) = self._create_shared_memory_allocation(
(self.timepoints + 1, self._param_dict["points"], N_PHYSICAL_PARAMETERS)
)
(
self._shm_handles["chemical_abun_array"],
self._shm_desc["chemical_abun_array"],
self.chemical_abun_array,
) = self._create_shared_memory_allocation(
(self.timepoints + 1, self._param_dict["points"], n_species)
)
def _create_rate_constants_array(self):
"""Create the Fortran-compliant np.array for rate constants passed to UCLCHEM."""
# For shared memory:
(
self._shm_handles["rate_constants_array"],
self._shm_desc["rate_constants_array"],
self.rate_constants_array,
) = self._create_shared_memory_allocation(
(self.timepoints + 1, self._param_dict["points"], n_reactions)
)
def _create_heating_array(self):
"""Create the Fortran-compliant np.array for heating/cooling rates passed to UCLCHEM."""
try:
heating_array_size = (
2
+ uclchemwrap.heating.ncooling
+ uclchemwrap.heating.nheating
+ uclchemwrap.f2py_constants.ncoolants
)
# For shared memory:
(
self._shm_handles["heat_array"],
self._shm_desc["heat_array"],
self.heat_array,
) = self._create_shared_memory_allocation(
(
self.timepoints + 1,
self._param_dict["points"],
heating_array_size,
)
)
except AttributeError:
# Heating module not available, likely compiled without heating support
_logger.debug("Heating module not available in uclchemwrap")
self.heat_array = None
def _create_stats_array(self):
"""Create the Fortran-compliant np.array for DVODE solver statistics."""
(
self._shm_handles["stats_array"],
self._shm_desc["stats_array"],
self.stats_array,
) = self._create_shared_memory_allocation(
(self.timepoints + 1, self._param_dict["points"], N_DVODE_STATS)
)
def _create_level_populations_array(self):
"""Create the Fortran-compliant np.array for coolant level populations.
Shape: (timepoints+1, gridpoints, total_levels).
"""
(
self._shm_handles["level_populations_array"],
self._shm_desc["level_populations_array"],
self.level_populations_array,
) = self._create_shared_memory_allocation(
(self.timepoints + 1, self._param_dict["points"], N_TOTAL_LEVELS)
)
def _create_se_stats_array(self):
"""Create the Fortran-compliant np.array for SE solver statistics.
Shape: (timepoints+1, gridpoints, NCOOLANTS*3).
"""
n_stats = NCOOLANTS * N_SE_STATS_PER_COOLANT # 35 * 3 = 105
(
self._shm_handles["se_stats_array"],
self._shm_desc["se_stats_array"],
self.se_stats_array,
) = self._create_shared_memory_allocation(
(self.timepoints + 1, self._param_dict["points"], n_stats)
)
[docs]
def get_level_populations_dataframe(self, point: int = 0) -> pd.DataFrame | None:
"""Get level populations as a DataFrame for a specific grid point.
Parameters
----------
point : int
Grid point index (default 0)
Returns
-------
pd.DataFrame | None
DataFrame with columns for each level with meaningful
coolant/level names
"""
if (
self.level_populations_array is None
or self.level_populations_array.shape[0] < N_SE_STATS_PER_COOLANT
):
return None
# Try to load meaningful level names from coolant files
try:
level_names_dict = load_coolant_level_names()
# Build column names from parsed coolant files
columns = []
for i in range(NCOOLANTS):
if i in level_names_dict:
columns.extend(level_names_dict[i])
except (FileNotFoundError, ValueError, RuntimeError) as e:
# Fallback to generic names if coolant files can't be loaded
_logger.warning(
f"Could not load coolant level names: {e}. Using generic names."
)
columns = [f"LEVEL_{i}" for i in range(N_TOTAL_LEVELS)]
return pd.DataFrame(self.level_populations_array[:, point, :], columns=columns)
[docs]
def get_se_stats_dataframe(self, point: int = 0) -> pd.DataFrame | None:
"""Get SE solver statistics as a DataFrame for a specific grid point.
Parameters
----------
point : int
Grid point index. Default = 0.
Returns
-------
pd.DataFrame | None
DataFrame with per-coolant SE solver statistics using
actual coolant names
"""
if (
self.se_stats_array is None
or self.se_stats_array.shape[0] < N_SE_STATS_PER_COOLANT
):
return None
# Build meaningful column names using actual coolant names
try:
from uclchemwrap import f2py_constants # noqa: PLC0415 optional
coolant_names = [
str(name.decode()).strip() for name in f2py_constants.coolantnames
]
columns = []
for coolant_name in coolant_names:
columns.extend(
[
f"{coolant_name}_CONVERGED",
f"{coolant_name}_ITERATIONS",
f"{coolant_name}_MAX_REL_CHANGE",
]
)
except Exception:
# Fallback to generic names
columns = SE_STAT_NAMES
return pd.DataFrame(self.se_stats_array[:, point, :], columns=columns)
def _create_starting_array(self, starting_chemistry: np.ndarray | None) -> None:
if starting_chemistry is None:
self.starting_chemistry_array = None
else:
if len(np.shape(starting_chemistry)) == 1:
starting_chemistry = starting_chemistry[np.newaxis, :]
# For shared memory:
(
self._shm_handles["starting_chemistry_array"],
self._shm_desc["starting_chemistry_array"],
self.starting_chemistry_array,
) = self._create_shared_memory_allocation(np.shape(starting_chemistry))
np.copyto(self.starting_chemistry_array, starting_chemistry, casting="no")
# /Creation of arrays
# Signal Interrupt Catch
[docs]
def on_interrupt(self, grid: bool = False, model_name: str | None = None) -> None:
"""Catch interruption. Save model to file.
Parameters
----------
grid : bool
whether the model was part of a grid (Default value = False)
model_name : str | None
the name of the model to save it under.
If None, name is set to "interrupted". (Default value = None)
"""
if self._proc_handle is not None:
self._proc_handle.terminate()
self._proc_handle.join()
self._proc_handle = None
if bool(self._shm_desc):
self._coordinator_unlink_memory()
self._array_clean()
error_time = datetime.now().strftime("%y_%m_%d_%H_%M")
if not grid:
if self.outputFile is None:
self.outputFile = "./" + error_time + ".dat"
elif "/" in self.outputFile:
self.outputFile = (
self.outputFile[: self.outputFile.rfind("/") + 1]
+ error_time
+ self.outputFile[self.outputFile.rfind(".") :]
)
else:
self.outputFile = "./" + error_time + ".dat"
self.legacy_write_full()
self._was_interrupted = True
self.save_model(
file="./" + error_time + ".h5"
if not grid
else "./grid_interrupted_models.h5",
name=model_name if model_name is not None else "interrupted",
overwrite=True,
)
# /Signal Interrupt Catch
# Shared memory handlers
@staticmethod
def _create_shared_memory_allocation(shape: tuple):
nbytes = int(np.prod(shape) * np.dtype(np.float64).itemsize)
shm = shared_memory.SharedMemory(create=True, size=nbytes)
array = np.ndarray(shape, dtype=np.float64, buffer=shm.buf, order="F")
array.fill(0.0)
spec = {"name": shm.name, "shape": shape}
return shm, spec, array
def _reform_array_in_worker(self, shm_desc: dict[str, dict]) -> None:
object.__setattr__(self, "_shm_handles", {}) # noqa: PLC2801 bypass
for k, v in shm_desc.items():
shm = shared_memory.SharedMemory(name=v["name"], create=False)
object.__setattr__( # noqa: PLC2801 bypass
self,
k,
np.ndarray(shape=v["shape"], dtype=np.float64, buffer=shm.buf, order="F"),
)
self._shm_handles[k] = shm
del shm
def _worker_close_memory(self):
for k in self._shm_desc:
try:
self._shm_handles[k].close()
except Exception: # noqa: S110 defensive
pass
finally:
del self._shm_handles[k]
def _coordinator_unlink_memory(self):
if bool(self._shm_desc):
for k in self._shm_desc:
try:
setattr(self, k, self.__getattr__(k).copy()) # noqa: PLC2801 bypass
self._shm_handles[k].close()
self._shm_handles[k].unlink()
except Exception:
print(f"Warning, unable to close and unlike {k}")
pass
finally:
del self._shm_handles[k]
del self._shm_desc
self._shm_desc = {}
del self._shm_handles
self._shm_handles = {}
# /Shared memory handlers
@abstractmethod
def _create_init_dict(self):
raise NotImplementedError
@register_model
[docs]
class Cloud(AbstractModel):
"""Cloud model class inheriting from AbstractModel.
Parameters
----------
param_dict : dict
Dictionary containing the parameters to use for the UCLCHEM model.
Uses UCLCHEM default values found in `defaultparameters.f90`.
starting_chemistry : np.ndarray | None
Array containing the starting abundances to use for
the UCLCHEM model. Defaults to None.
previous_model : AbstractModel | None
Model object, a class that inherited from AbstractModel,
to use for the starting abundances of the new UCLCHEM model that will be run.
Defaults to None.
timepoints : int
Integer value of how many timesteps should be calculated before
aborting the UCLCHEM model. Defaults to `uclchem.constants.TIMEPOINTS`.
debug : bool
Flag if extra debug information should be printed to the terminal.
Defaults to False. #TODO Add debug features
read_file : str | None
Path to the file to be read. Reading a file to a model object, prevents it from
being run. Defaults to None.
"""
def __init__(
self,
param_dict: dict | None = None,
out_species: list[str] | None = None,
starting_chemistry: np.ndarray | None = None,
previous_model: AbstractModel | None = None,
timepoints: int = TIMEPOINTS,
debug: bool = False,
read_file: str | None = None,
run_type: Literal["managed", "external"] = "managed",
on_negative_abundances: Literal[None, "warning", "error", "raise"] = "warning",
on_error: Literal["raise", "warn", "ignore"] = "raise",
):
"""Initialize the model with :meth:`AbstractModel.__init__`.
Then run any additional commands needed for the model.
Parameters
----------
param_dict : dict | None
Dictionary of UCLCHEM parameters. Uses defaults from ``defaultparameters.f90`` for
any key not provided. Defaults to None.
out_species : list[str] | None
Not supported on OO model classes; passing any value raises ``TypeError``.
Use the functional interface to filter output species. Defaults to None.
starting_chemistry : np.ndarray | None
Array of starting abundances for each species. If None, uses network defaults.
Defaults to None.
previous_model : AbstractModel | None
A completed model whose final abundances are used as the starting chemistry
for this model. Defaults to None.
timepoints : int
Number of output timesteps to store. Defaults to TIMEPOINTS.
debug : bool
If True, print extra debug information. Defaults to False.
read_file : str | None
Path to a previously saved model file to load instead of running. Defaults to None.
run_type : Literal['managed', 'external']
How the Fortran model is executed. ``'managed'`` runs automatically on init;
``'external'`` defers running to the caller. Defaults to 'managed'.
on_negative_abundances : Literal[None, 'warning', 'error', 'raise']
Action when negative abundances are detected after a run. Defaults to 'warning'.
on_error : Literal['raise', 'warn', 'ignore']
Action when the Fortran solver returns an error flag. Defaults to 'raise'.
Raises
------
TypeError
If ``out_species`` is passed (not supported on OO model classes).
"""
if out_species is not None:
msg = (
"out_species is not supported on OO model classes. "
"Use the functional interface (uclchem.functional.cloud) to filter output species, "
"or access all species via model.chemical_abun_array after running."
)
raise TypeError(msg)
super().__init__(
param_dict=param_dict,
starting_chemistry=starting_chemistry,
previous_model=previous_model,
timepoints=timepoints,
debug=debug,
read_file=read_file,
run_type=run_type,
on_negative_abundances=on_negative_abundances,
on_error=on_error,
)
if self.run_type != "external" and not self.was_read:
self.run()
[docs]
def run_fortran(self) -> dict[str, int | list]:
"""Run the UCLCHEM model, first resetting the numpy arrays.
Resetting uses `AbstractModel.run()`, then running the model. `check_error` and
`array_clean` are automatically called after the model run.
Returns
-------
dict[str, int | list]
Dictionary with two keys:
"success_flag" with value the success flag
"""
# f2py returns all non-intent(in) values in Fortran signature order:
# [0-6] physicsarray..sestatsarray (in,out, modified in-place),
# [7] abundance_out, [8] specname_out, [9] successFlag
result = wrap.cloud(
dictionary=self._param_dict,
outspeciesin=self.out_species,
timepoints=self.timepoints,
gridpoints=self._param_dict["points"],
returnarray=True,
returnrateconstants=True,
givestartabund=self.give_start_abund,
physicsarray=self.physics_array,
chemicalabunarray=self.chemical_abun_array,
rateconstantsarray=self.rate_constants_array,
heatarray=self.heat_array,
statsarray=self.stats_array,
levelpopulationsarray=self.level_populations_array,
sestatsarray=self.se_stats_array,
abundancestart=self.starting_chemistry_array
if "starting_chemistry_array" in self.__dict__
else None,
)
success_flag = result[-1]
return {
"success_flag": success_flag,
}
def _create_init_dict(self):
return {
"_param_dict": self._param_dict,
"out_species_list": self.out_species_list,
"out_species": self.out_species,
"timepoints": self.timepoints,
"give_start_abund": self.give_start_abund,
"_debug": self._debug,
}
@register_model
[docs]
class Collapse(AbstractModel):
"""Collapse model class inheriting from AbstractModel.
Parameters
----------
collapse : str | int | CollapseMode
Collapse type.
Options are 'BE1.1', 'BE4', 'filament', or 'ambipolar'. Defaults to 'BE1.1'.
param_dict : dict
Dictionary containing the parameters to use for the UCLCHEM model.
Uses UCLCHEM default values found in `defaultparameters.f90`.
starting_chemistry : np.ndarray | None
Array containing the starting abundances to use for
the UCLCHEM model. Defaults to None.
previous_model : AbstractModel | None
Model object, a class that inherited from AbstractModel,
to use for the starting abundances of the new UCLCHEM model that will be run.
Defaults to None.
timepoints : int
Integer value of how many timesteps should be calculated before
aborting the UCLCHEM model. Defaults to `uclchem.constants.TIMEPOINTS`.
debug : bool
Flag if extra debug information should be printed to the terminal.
Defaults to False. #TODO Add debug features
read_file : str | None
Path to the file to be read. Reading a file to a model object, prevents it from
being run. Defaults to None.
"""
# Time (years) at which each collapse mode's density evolution ends and the fitting
# functions become singular.
_COLLAPSE_FINAL_TIMES = {
CollapseMode.BE1_1: 1.173387e6,
CollapseMode.BE4: 1.84265e5,
CollapseMode.FILAMENT: 1.393761e6,
CollapseMode.AMBIPOLAR: 1.6132984e7,
}
def __init__(
self,
collapse: str | int | CollapseMode = CollapseMode.BE1_1,
param_dict: dict | None = None,
out_species: list[str] | None = None,
starting_chemistry: np.ndarray | None = None,
previous_model: AbstractModel | None = None,
timepoints: int = TIMEPOINTS,
debug: bool = False,
read_file: str | None = None,
run_type: Literal["managed", "external"] = "managed",
on_negative_abundances: Literal[None, "warning", "error", "raise"] = "warning",
on_error: Literal["raise", "warn", "ignore"] = "raise",
):
"""Initialize the model with :meth:`AbstractModel.__init__`.
Then run any additional commands needed for the model.
Parameters
----------
collapse : str | int | CollapseMode
Collapse mode to use (e.g. ``CollapseMode.BE1_1``). Defaults to CollapseMode.BE1_1.
param_dict : dict | None
Dictionary of UCLCHEM parameters. Uses defaults from ``defaultparameters.f90`` for
any key not provided. Defaults to None.
out_species : list[str] | None
Not supported on OO model classes; passing any value raises ``TypeError``.
Use the functional interface to filter output species. Defaults to None.
starting_chemistry : np.ndarray | None
Array of starting abundances for each species. If None, uses network defaults.
Defaults to None.
previous_model : AbstractModel | None
A completed model whose final abundances are used as the starting chemistry
for this model. Defaults to None.
timepoints : int
Number of output timesteps to store. Defaults to TIMEPOINTS.
debug : bool
If True, print extra debug information. Defaults to False.
read_file : str | None
Path to a previously saved model file to load instead of running. Defaults to None.
run_type : Literal['managed', 'external']
How the Fortran model is executed. ``'managed'`` runs automatically on init;
``'external'`` defers running to the caller. Defaults to 'managed'.
on_negative_abundances : Literal[None, 'warning', 'error', 'raise']
Action when negative abundances are detected after a run. Defaults to 'warning'.
on_error : Literal['raise', 'warn', 'ignore']
Action when the Fortran solver returns an error flag. Defaults to 'raise'.
Raises
------
ValueError
If ``rin`` and ``points`` were both set in the parameter dictionary.
ValueError
If ``parcelStoppingMode`` is set in the parameter dictionary.
ValueError
If ``endAtFinaldensity`` is False, but ``finalTime`` was not set.
ValueError
If ``endAtFinaldensity`` is False, but ``finalTime`` is less than
the duration of the collapse for the collapse mode.
TypeError
If ``out_species`` is passed (not supported on OO model classes).
"""
if out_species is not None:
msg = (
"out_species is not supported on OO model classes. "
"Use the functional interface (uclchem.functional.collapse) to filter output species, "
"or access all species via model.chemical_abun_array after running."
)
raise TypeError(msg)
collapse = get_collapse_mode(collapse)
collapse_final_time = self._COLLAPSE_FINAL_TIMES[collapse]
if param_dict is not None and "initialDens" in param_dict:
warnings.warn(
"initialDens is ignored for collapse models: the initial density is determined "
"with fit functions, not the initialDens parameter.",
UserWarning,
stacklevel=2,
)
if (
param_dict is not None
and param_dict.get("points", 1) == 1
and param_dict.get("rin", 0.0) != 0.0
):
msg = (
"rin has no effect when points=1: the single parcel is placed at rout. "
"Either set points > 1 or remove rin from param_dict."
)
raise ValueError(msg)
# For collapse models, endAtFinalDensity controls finalTime behavior.
# Reject parcelStoppingMode since collapse models don't use density-based stopping.
_param = param_dict or {}
if "parcelStoppingMode" in _param:
msg = (
"parcelStoppingMode is not supported for collapse models. "
"Use endAtFinalDensity instead: True (default) to stop at collapse endpoint, "
"False to extend chemistry until a custom finalTime with frozen density."
)
raise ValueError(msg)
# endAtFinalDensity controls finalTime for collapse models:
# - True (default): finalTime = collapseFinalTime (self-consistent density
# evolution until collapse)
# - False: user must set finalTime > collapseFinalTime (density frozen,
# chemistry continues)
user_final_time = _param.get("finalTime", None)
end_at_final_density = _param.get(
"endAtFinalDensity", True
) # Default True for collapse
if end_at_final_density:
# Scenario 1 (default): stop at collapse endpoint.
if user_final_time is not None and user_final_time != collapse_final_time:
msg = (
f"For {collapse!r} collapse with endAtFinalDensity=True, finalTime is fixed at "
f"collapseFinalTime={collapse_final_time:.3e} yr. "
f"To use a custom finalTime, set endAtFinalDensity=False."
)
raise ValueError(msg)
param_dict = {**_param, "finalTime": collapse_final_time}
# Remove endAtFinalDensity so _convert_legacy_stopping_param doesn't affect it.
param_dict.pop("endAtFinalDensity", None)
else:
# Scenario 2: user extends chemistry past collapse endpoint with frozen density.
if user_final_time is None:
msg = (
f"endAtFinalDensity=False requires setting finalTime > collapseFinalTime "
f"({collapse_final_time:.3e} yr) to extend chemistry beyond the collapse endpoint."
)
raise ValueError(msg)
if user_final_time < collapse_final_time:
msg = (
f"endAtFinalDensity=False with finalTime={user_final_time:.3e} yr < "
f"collapseFinalTime={collapse_final_time:.3e} yr is invalid. "
f"Either set endAtFinalDensity=True (use default finalTime) or "
f"set finalTime > collapseFinalTime for extended chemistry."
)
raise ValueError(msg)
# finalTime > collapseFinalTime is valid; density will freeze at collapseFinalTime.
# Remove endAtFinalDensity so _convert_legacy_stopping_param doesn't affect it.
param_dict = {**_param, "parcelStoppingMode": 0}
param_dict.pop("endAtFinalDensity", None)
super().__init__(
param_dict=param_dict,
starting_chemistry=starting_chemistry,
previous_model=previous_model,
timepoints=timepoints,
debug=debug,
read_file=read_file,
run_type=run_type,
on_negative_abundances=on_negative_abundances,
on_error=on_error,
)
[docs]
self.collapse_final_time = collapse_final_time
if read_file is None:
self.collapse = collapse
if self.run_type != "external":
self.run()
[docs]
def run_fortran(self) -> dict[str, int | list]:
"""Run the UCLCHEM model, first resetting the numpy arrays.
Resetting uses `AbstractModel.run()`, then running the model. `check_error` and
`array_clean` are automatically called after the model run.
Returns
-------
dict[str, int | list]
Dictionary with two keys:
"success_flag" with value the success flag
"""
result = wrap.collapse(
collapsein=self.collapse.value,
dictionary=self._param_dict,
outspeciesin=self.out_species,
timepoints=self.timepoints,
gridpoints=self._param_dict["points"],
returnarray=True,
returnrateconstants=True,
givestartabund=self.give_start_abund,
physicsarray=self.physics_array,
chemicalabunarray=self.chemical_abun_array,
rateconstantsarray=self.rate_constants_array,
heatarray=self.heat_array,
statsarray=self.stats_array,
levelpopulationsarray=self.level_populations_array,
sestatsarray=self.se_stats_array,
abundancestart=self.starting_chemistry_array
if "starting_chemistry_array" in self.__dict__
else None,
)
success_flag = result[-1]
return {
"success_flag": success_flag,
}
def _create_init_dict(self):
return {
"collapse": self.collapse,
"_param_dict": self._param_dict,
"out_species_list": self.out_species_list,
"out_species": self.out_species,
"timepoints": self.timepoints,
"give_start_abund": self.give_start_abund,
"_debug": self._debug,
}
@register_model
[docs]
class PrestellarCore(AbstractModel):
"""PrestellarCore model class inheriting from AbstractModel.
This model type was previously known as hot core.
Parameters
----------
temp_indx : int
Used to select the mass of the prestellar core from the following selection
[1=1Msun, 2=5, 3=10, 4=15, 5=25,6=60]. Defaults to 1, which is 1 Msun
max_temperature : float
Value at which gas temperature will stop increasing. Defaults to 300.0.
param_dict : dict
Dictionary containing the parameters to use for the UCLCHEM model.
Uses UCLCHEM default values found in `defaultparameters.f90`.
starting_chemistry : np.ndarray | None
Array containing the starting abundances to use for
the UCLCHEM model. Defaults to None.
previous_model : AbstractModel | None
Model object, a class that inherited from AbstractModel,
to use for the starting abundances of the new UCLCHEM model that will be run.
Defaults to None.
timepoints : int
Integer value of how many timesteps should be calculated before
aborting the UCLCHEM model. Defaults to `uclchem.constants.TIMEPOINTS`.
debug : bool
Flag if extra debug information should be printed to the terminal.
Defaults to False. #TODO Add debug features
read_file : str | None
Path to the file to be read. Reading a file to a model object, prevents it from
being run. Defaults to None.
"""
def __init__(
self,
temp_indx: int = 1,
max_temperature: float = 300.0,
param_dict: dict | None = None,
out_species: list[str] | None = None,
starting_chemistry: np.ndarray | None = None,
previous_model: AbstractModel | None = None,
timepoints: int = TIMEPOINTS,
debug: bool = False,
read_file: str | None = None,
run_type: Literal["managed", "external"] = "managed",
on_negative_abundances: Literal[None, "warning", "error", "raise"] = "warning",
on_error: Literal["raise", "warn", "ignore"] = "raise",
):
"""Initialize the model with :meth:`AbstractModel.__init__`.
Then run any additional commands needed for the model.
Parameters
----------
temp_indx : int
Index of the temperature column in the physics array to use for hot-core heating.
Defaults to 1.
max_temperature : float
Maximum temperature (K) the hot core reaches. Defaults to 300.0.
param_dict : dict | None
Dictionary of UCLCHEM parameters. Uses defaults from ``defaultparameters.f90`` for
any key not provided. Defaults to None.
out_species : list[str] | None
Not supported on OO model classes; passing any value raises ``TypeError``.
Use the functional interface to filter output species. Defaults to None.
starting_chemistry : np.ndarray | None
Array of starting abundances for each species. If None, uses network defaults.
Defaults to None.
previous_model : AbstractModel | None
A completed model whose final abundances are used as the starting chemistry
for this model. Defaults to None.
timepoints : int
Number of output timesteps to store. Defaults to TIMEPOINTS.
debug : bool
If True, print extra debug information. Defaults to False.
read_file : str | None
Path to a previously saved model file to load instead of running. Defaults to None.
run_type : Literal['managed', 'external']
How the Fortran model is executed. ``'managed'`` runs automatically on init;
``'external'`` defers running to the caller. Defaults to 'managed'.
on_negative_abundances : Literal[None, 'warning', 'error', 'raise']
Action when negative abundances are detected after a run. Defaults to 'warning'.
on_error : Literal['raise', 'warn', 'ignore']
Action when the Fortran solver returns an error flag. Defaults to 'raise'.
Raises
------
ValueError
If `read_file` is None, but `temp_idx` or `max_temperature` is also None.
TypeError
If ``out_species`` is passed (not supported on OO model classes).
"""
if out_species is not None:
msg = (
"out_species is not supported on OO model classes. "
"Use the functional interface (uclchem.functional.prestellar_core) to filter output species, "
"or access all species via model.chemical_abun_array after running."
)
raise TypeError(msg)
super().__init__(
param_dict=param_dict,
starting_chemistry=starting_chemistry,
previous_model=previous_model,
timepoints=timepoints,
debug=debug,
read_file=read_file,
run_type=run_type,
on_negative_abundances=on_negative_abundances,
on_error=on_error,
)
if read_file is None:
if temp_indx is None or max_temperature is None:
msg = "temp_indx and max_temperature must be specified if not reading from file."
raise ValueError(msg)
self.temp_indx = temp_indx
self.max_temperature = max_temperature
if self.run_type != "external":
self.run()
[docs]
def run_fortran(self) -> dict[str, int | list]:
"""Run the UCLCHEM model, first resetting the numpy arrays.
Resetting uses `AbstractModel.run()`, then running the model. `check_error` and
`array_clean` are automatically called after the model run.
Returns
-------
dict[str, int | list]
Dictionary with two keys:
"success_flag" with value the success flag
"""
_, _, _, _, _, _, _, out_species_abundances_array, _, success_flag = (
wrap.hot_core(
temp_index=self.temp_indx,
max_temp=self.max_temperature,
dictionary=self._param_dict,
outspeciesin=self.out_species,
timepoints=self.timepoints,
gridpoints=self._param_dict["points"],
returnarray=True,
returnrateconstants=True,
givestartabund=self.give_start_abund,
physicsarray=self.physics_array,
chemicalabunarray=self.chemical_abun_array,
rateconstantsarray=self.rate_constants_array,
heatarray=self.heat_array,
statsarray=self.stats_array,
levelpopulationsarray=self.level_populations_array,
sestatsarray=self.se_stats_array,
abundancestart=self.starting_chemistry_array
if "starting_chemistry_array" in self.__dict__
else None,
)
)
return {
"success_flag": success_flag,
}
def _create_init_dict(self):
return {
"temp_indx": self.temp_indx,
"max_temperature": self.max_temperature,
"_param_dict": self._param_dict,
"out_species_list": self.out_species_list,
"out_species": self.out_species,
"timepoints": self.timepoints,
"give_start_abund": self.give_start_abund,
"_debug": self._debug,
}
@register_model
[docs]
class CShock(AbstractModel):
"""C-Shock model class inheriting from AbstractModel.
Parameters
----------
shock_vel : float
Velocity of the shock in km/s. Defaults to 10.0.
timestep_factor : float
Whilst the time is less than 2 times the dissipation time of shock,
timestep is timestep_factor*dissipation time. Essentially controls how well resolved the
shock is in your model. Defaults to 0.01.
minimum_temperature : float
Minimum post-shock temperature. Defaults to 0.0 (no minimum). The
shocked gas typically cools to `initialTemp` if this is not set.
param_dict : dict
Dictionary containing the parameters to use for the UCLCHEM model.
Uses UCLCHEM default values found in `defaultparameters.f90`.
starting_chemistry : np.ndarray | None
Array containing the starting abundances to use for
the UCLCHEM model. Defaults to None.
previous_model : AbstractModel | None
Model object, a class that inherited from AbstractModel,
to use for the starting abundances of the new UCLCHEM model that will be run.
Defaults to None.
timepoints : int
Integer value of how many timesteps should be calculated before
aborting the UCLCHEM model. Defaults to `uclchem.constants.TIMEPOINTS`.
debug : bool
Flag if extra debug information should be printed to the terminal.
Defaults to False. #TODO Add debug features
read_file : str | None
Path to the file to be read. Reading a file to a model object, prevents it from
being run. Defaults to None.
"""
def __init__(
self,
shock_vel: float = 10.0,
timestep_factor: float = 0.01,
minimum_temperature: float = 0.0,
param_dict: dict | None = None,
out_species: list[str] | None = None,
starting_chemistry: np.ndarray | None = None,
previous_model: AbstractModel | None = None,
timepoints: int = TIMEPOINTS,
debug: bool = False,
read_file: str | None = None,
run_type: Literal["managed", "external"] = "managed",
on_negative_abundances: Literal[None, "warning", "error", "raise"] = "warning",
on_error: Literal["raise", "warn", "ignore"] = "raise",
):
"""Initialize the model with :meth:`AbstractModel.__init__`.
Then run any additional commands needed for the model.
Parameters
----------
shock_vel : float
Shock velocity in km/s. Defaults to 10.0.
timestep_factor : float
Factor controlling the size of individual timesteps relative to the shock
dissipation time. Defaults to 0.01.
minimum_temperature : float
Minimum post-shock temperature (K) allowed during cooling. Defaults to 0.0.
param_dict : dict | None
Dictionary of UCLCHEM parameters. Uses defaults from ``defaultparameters.f90`` for
any key not provided. Defaults to None.
out_species : list[str] | None
Not supported on OO model classes; passing any value raises ``TypeError``.
Use the functional interface to filter output species. Defaults to None.
starting_chemistry : np.ndarray | None
Array of starting abundances for each species. If None, uses network defaults.
Defaults to None.
previous_model : AbstractModel | None
A completed model whose final abundances are used as the starting chemistry
for this model. Defaults to None.
timepoints : int
Number of output timesteps to store. Defaults to TIMEPOINTS.
debug : bool
If True, print extra debug information. Defaults to False.
read_file : str | None
Path to a previously saved model file to load instead of running. Defaults to None.
run_type : Literal['managed', 'external']
How the Fortran model is executed. ``'managed'`` runs automatically on init;
``'external'`` defers running to the caller. Defaults to 'managed'.
on_negative_abundances : Literal[None, 'warning', 'error', 'raise']
Action when negative abundances are detected after a run. Defaults to 'warning'.
on_error : Literal['raise', 'warn', 'ignore']
Action when the Fortran solver returns an error flag. Defaults to 'raise'.
Raises
------
ValueError
If `read_file` is None, but `shock_vel` is also not set.
TypeError
If ``out_species`` is passed (not supported on OO model classes).
"""
if out_species is not None:
msg = (
"out_species is not supported on OO model classes. "
"Use the functional interface (uclchem.functional.cshock) to filter output species, "
"or access all species via model.chemical_abun_array after running."
)
raise TypeError(msg)
super().__init__(
param_dict=param_dict,
starting_chemistry=starting_chemistry,
previous_model=previous_model,
timepoints=timepoints,
debug=debug,
read_file=read_file,
run_type=run_type,
on_negative_abundances=on_negative_abundances,
on_error=on_error,
)
if read_file is None:
if shock_vel is None:
msg = "shock_vel must be specified if not reading from file."
raise ValueError(msg)
self.shock_vel = shock_vel
self.timestep_factor = timestep_factor
self.minimum_temperature = minimum_temperature
self.dissipation_time = -1
if self.run_type != "external":
self.run()
[docs]
def run_fortran(self) -> dict[str, int | list]:
"""Run the UCLCHEM model, first resetting the numpy arrays.
Resetting uses `AbstractModel.run()`, then running the model. `check_error` and
`array_clean` are automatically called after the model run.
Returns
-------
dict[str, int | list]
Dictionary with two keys:
"success_flag" with value the success flag
"""
result = wrap.cshock(
shock_vel=self.shock_vel,
timestep_factor=self.timestep_factor,
minimum_temperature=self.minimum_temperature,
dictionary=self._param_dict,
outspeciesin=self.out_species,
timepoints=self.timepoints,
gridpoints=self._param_dict["points"],
returnarray=True,
returnrateconstants=True,
givestartabund=self.give_start_abund,
physicsarray=self.physics_array,
chemicalabunarray=self.chemical_abun_array,
rateconstantsarray=self.rate_constants_array,
heatarray=self.heat_array,
statsarray=self.stats_array,
levelpopulationsarray=self.level_populations_array,
sestatsarray=self.se_stats_array,
abundancestart=self.starting_chemistry_array
if "starting_chemistry_array" in self.__dict__
else None,
)
dissipation_time = result[-3]
success_flag = result[-1]
if success_flag < 0:
dissipation_time = None
return {
"success_flag": success_flag,
"dissipation_time": dissipation_time,
}
def _create_init_dict(self):
return {
"shock_vel": self.shock_vel,
"timestep_factor": self.timestep_factor,
"minimum_temperature": self.minimum_temperature,
"_param_dict": self._param_dict,
"out_species_list": self.out_species_list,
"out_species": self.out_species,
"timepoints": self.timepoints,
"give_start_abund": self.give_start_abund,
"_debug": self._debug,
}
@register_model
[docs]
class JShock(AbstractModel):
"""J-Shock model class inheriting from AbstractModel.
Parameters
----------
shock_vel : float
Velocity of the shock. Defaults to 10.0.
param_dict : dict | None
Dictionary containing the parameters to use for the UCLCHEM model.
Uses UCLCHEM default values found in `defaultparameters.f90`.
starting_chemistry : np.ndarray | None
Array containing the starting abundances to use for
the UCLCHEM model. Defaults to None.
previous_model : AbstractModel | None
Model object, a class that inherited from AbstractModel,
to use for the starting abundances of the new UCLCHEM model that will be run.
Defaults to None.
timepoints : int
Integer value of how many timesteps should be calculated before
aborting the UCLCHEM model. Defaults to `uclchem.constants.TIMEPOINTS`.
debug : bool
Flag if extra debug information should be printed to the terminal.
Defaults to False. #TODO Add debug features
read_file : str | None
Path to the file to be read. Reading a file to a model object, prevents it from
being run. Defaults to None.
"""
def __init__(
self,
shock_vel: float = 10.0,
param_dict: dict | None = None,
out_species: list[str] | None = None,
starting_chemistry: np.ndarray | None = None,
previous_model: AbstractModel | None = None,
timepoints: int = TIMEPOINTS,
debug: bool = False,
read_file: str | None = None,
run_type: Literal["managed", "external"] = "managed",
on_negative_abundances: Literal[None, "warning", "error", "raise"] = "warning",
on_error: Literal["raise", "warn", "ignore"] = "raise",
):
"""Initialize the model with :meth:`AbstractModel.__init__`.
Then run any additional commands needed for the model.
Parameters
----------
shock_vel : float
Shock velocity in km/s. Defaults to 10.0.
param_dict : dict | None
Dictionary of UCLCHEM parameters. Uses defaults from ``defaultparameters.f90`` for
any key not provided. Defaults to None.
out_species : list[str] | None
Not supported on OO model classes; passing any value raises ``TypeError``.
Use the functional interface to filter output species. Defaults to None.
starting_chemistry : np.ndarray | None
Array of starting abundances for each species. If None, uses network defaults.
Defaults to None.
previous_model : AbstractModel | None
A completed model whose final abundances are used as the starting chemistry
for this model. Defaults to None.
timepoints : int
Number of output timesteps to store. Defaults to TIMEPOINTS.
debug : bool
If True, print extra debug information. Defaults to False.
read_file : str | None
Path to a previously saved model file to load instead of running. Defaults to None.
run_type : Literal['managed', 'external']
How the Fortran model is executed. ``'managed'`` runs automatically on init;
``'external'`` defers running to the caller. Defaults to 'managed'.
on_negative_abundances : Literal[None, 'warning', 'error', 'raise']
Action when negative abundances are detected after a run. Defaults to 'warning'.
on_error : Literal['raise', 'warn', 'ignore']
Action when the Fortran solver returns an error flag. Defaults to 'raise'.
Raises
------
ValueError
If `read_file` is None, but `shock_vel` is also not set.
TypeError
If ``out_species`` is passed (not supported on OO model classes).
"""
if out_species is not None:
msg = (
"out_species is not supported on OO model classes. "
"Use the functional interface (uclchem.functional.jshock) to filter output species, "
"or access all species via model.chemical_abun_array after running."
)
raise TypeError(msg)
super().__init__(
param_dict=param_dict,
starting_chemistry=starting_chemistry,
previous_model=previous_model,
timepoints=timepoints,
debug=debug,
read_file=read_file,
run_type=run_type,
on_negative_abundances=on_negative_abundances,
on_error=on_error,
)
if read_file is None:
if shock_vel is None:
msg = "shock_vel must be specified if not reading from file."
raise ValueError(msg)
self.shock_vel = shock_vel
if self.run_type != "external":
self.run()
[docs]
def run_fortran(self) -> dict[str, int | list]:
"""Run the UCLCHEM model, first resetting the numpy arrays.
Resetting uses `AbstractModel.run()`, then running the model. `check_error` and
`array_clean` are automatically called after the model run.
Returns
-------
dict[str, int | list]
Dictionary with two keys:
"success_flag" with value the success flag
"""
result = wrap.jshock(
shock_vel=self.shock_vel,
dictionary=self._param_dict,
outspeciesin=self.out_species,
timepoints=self.timepoints,
gridpoints=self._param_dict["points"],
returnarray=True,
returnrateconstants=True,
givestartabund=self.give_start_abund,
physicsarray=self.physics_array,
chemicalabunarray=self.chemical_abun_array,
rateconstantsarray=self.rate_constants_array,
heatarray=self.heat_array,
statsarray=self.stats_array,
levelpopulationsarray=self.level_populations_array,
sestatsarray=self.se_stats_array,
abundancestart=self.starting_chemistry_array
if "starting_chemistry_array" in self.__dict__
else None,
)
success_flag = result[-1]
return {
"success_flag": success_flag,
}
def _create_init_dict(self):
return {
"shock_vel": self.shock_vel,
"_param_dict": self._param_dict,
"out_species_list": self.out_species_list,
"out_species": self.out_species,
"timepoints": self.timepoints,
"give_start_abund": self.give_start_abund,
"_debug": self._debug,
}
@register_model
[docs]
class Postprocess(AbstractModel):
"""Model class with additional controls, inheriting from :class:`AbstractModel`.
Postprocess allows for additional controls of the time, density, gas temperature,
radiation field, cosmic ray ionization rate, atomic and molecular Hydrogen, CO and C
column densities through the use of arrays. Using these arrays allows for experimental
model crafting beyond the standard models in other model classes.
Parameters
----------
param_dict : dict | None
Dictionary containing the parameters to use for the UCLCHEM model.
Uses UCLCHEM default values found in `defaultparameters.f90`.
starting_chemistry : np.ndarray | None
Array containing the starting abundances
to use for the UCLCHEM model. Defaults to None.
previous_model : AbstractModel | None
Model object, a class that inherited from AbstractModel,
to use for the starting abundances of the new UCLCHEM model that will be run.
Defaults to None.
time_array : np.ndarray | None
Represents the time grid to be used for the model.
This sets the target timesteps for which outputs will be stored.
density_array : np.ndarray | None
Represents the value of the density at different
timepoints found in time_array.
gas_temperature_array : np.ndarray | None
Represents the value of the gas temperature
at different timepoints found in time_array.
dust_temperature_array : np.ndarray | None
Represents the value of the dust temperature
at different timepoints found in time_array.
zeta_array : np.ndarray | None
Represents the value of the cosmic ray ionization rate
at different timepoints found in time_array.
radfield_array : np.ndarray | None
Represents the value of the UV radiation field
at different timepoints found in time_array.
coldens_H_array : np.ndarray | None
Represents the value of the column density of H
at different timepoints found in time_array.
coldens_H2_array : np.ndarray | None
Represents the value of the column density of H2
at different timepoints found in time_array.
coldens_CO_array : np.ndarray | None
Represents the value of the column density of CO
at different timepoints found in time_array.
coldens_C_array : np.ndarray | None
Represents the value of the column density of C
at different timepoints found in time_array.
debug : bool
Flag if extra debug information should be printed to the terminal.
Defaults to False. #TODO Add debug features
read_file : str | None
Path to the file to be read. Reading a file to a model object, prevents it from
being run. Defaults to None.
"""
def __init__(
self,
param_dict: dict | None = None,
out_species: list[str] | None = None,
starting_chemistry: np.ndarray | None = None,
previous_model: AbstractModel | None = None,
time_array: np.ndarray | None = None,
density_array: np.ndarray | None = None,
gas_temperature_array: np.ndarray | None = None,
dust_temperature_array: np.ndarray | None = None,
zeta_array: np.ndarray | None = None,
radfield_array: np.ndarray | None = None,
visual_extinction_array: np.ndarray | None = None,
coldens_H_array: np.ndarray | None = None,
coldens_H2_array: np.ndarray | None = None,
coldens_CO_array: np.ndarray | None = None,
coldens_C_array: np.ndarray | None = None,
debug: bool = False,
read_file: str | None = None,
run_type: Literal["managed", "external"] = "managed",
on_negative_abundances: Literal[None, "warning", "error", "raise"] = "warning",
on_error: Literal["raise", "warn", "ignore"] = "raise",
):
"""Initialize the model with :meth:`AbstractModel.__init__`.
Then run any additional commands needed for the model.
Parameters
----------
param_dict : dict | None
Dictionary of UCLCHEM parameters. Uses defaults from ``defaultparameters.f90`` for
any key not provided. Defaults to None.
out_species : list[str] | None
Not supported on OO model classes; passing any value raises ``TypeError``.
Use the functional interface to filter output species. Defaults to None.
starting_chemistry : np.ndarray | None
Array of starting abundances for each species. If None, uses network defaults.
Defaults to None.
previous_model : AbstractModel | None
A completed model whose final abundances are used as the starting chemistry
for this model. Defaults to None.
time_array : np.ndarray | None
Time grid (years) at which model outputs are stored. Defaults to None.
density_array : np.ndarray | None
Number density (cm⁻³) at each timestep. Defaults to None.
gas_temperature_array : np.ndarray | None
Gas temperature (K) at each timestep. Defaults to None.
dust_temperature_array : np.ndarray | None
Dust temperature (K) at each timestep. Defaults to None.
zeta_array : np.ndarray | None
Cosmic-ray ionization rate (s⁻¹) at each timestep. Defaults to None.
radfield_array : np.ndarray | None
UV radiation field strength (Habing units) at each timestep. Defaults to None.
visual_extinction_array : np.ndarray | None
Visual extinction (mag) at each timestep. Mutually exclusive with
``coldens_H_array``. Defaults to None.
coldens_H_array : np.ndarray | None
Atomic hydrogen column density (cm⁻²) at each timestep. Mutually exclusive with
``visual_extinction_array``. Defaults to None.
coldens_H2_array : np.ndarray | None
Molecular hydrogen column density (cm⁻²) at each timestep. Defaults to None.
coldens_CO_array : np.ndarray | None
CO column density (cm⁻²) at each timestep. Defaults to None.
coldens_C_array : np.ndarray | None
Atomic carbon column density (cm⁻²) at each timestep. Defaults to None.
debug : bool
If True, print extra debug information. Defaults to False.
read_file : str | None
Path to a previously saved model file to load instead of running. Defaults to None.
run_type : Literal['managed', 'external']
How the Fortran model is executed. ``'managed'`` runs automatically on init;
``'external'`` defers running to the caller. Defaults to 'managed'.
on_negative_abundances : Literal[None, 'warning', 'error', 'raise']
Action when negative abundances are detected after a run. Defaults to 'warning'.
on_error : Literal['raise', 'warn', 'ignore']
Action when the Fortran solver returns an error flag. Defaults to 'raise'.
Raises
------
ValueError
If not all arrays have the same length.
ValueError
If `read_file` is None, but `time_array` is not an array.
TypeError
If ``out_species`` is passed (not supported on OO model classes).
AssertionError
If both column density and visual extinction arrays are provided.
"""
if out_species is not None:
msg = (
"out_species is not supported on OO model classes. "
"Use the functional interface to filter output species, "
"or access all species via model.chemical_abun_array after running."
)
raise TypeError(msg)
# Allocate 1.5x the input timesteps to give the DVODE solver
# headroom for additional internal substeps.
super().__init__(
param_dict=param_dict,
starting_chemistry=starting_chemistry,
previous_model=previous_model,
timepoints=int(1.5 * len(time_array))
if time_array is not None
else TIMEPOINTS,
debug=debug,
read_file=read_file,
run_type=run_type,
on_negative_abundances=on_negative_abundances,
on_error=on_error,
)
if read_file is None and time_array is not None:
n_input = len(time_array)
object.__setattr__(
self,
"postprocess_arrays",
{
"timegrid": time_array,
"densgrid": density_array,
"gastempgrid": gas_temperature_array,
"dusttempgrid": dust_temperature_array,
"radfieldgrid": radfield_array,
"zetagrid": zeta_array,
"avgrid": visual_extinction_array,
"nhgrid": coldens_H_array,
"nh2grid": coldens_H2_array,
"ncogrid": coldens_CO_array,
"ncgrid": coldens_C_array,
},
)
for key, array in self.postprocess_arrays.items():
if array is not None:
if isinstance(array, float):
array = np.ones(n_input) * array
if len(array) != n_input:
msg = "All arrays must be the same length"
raise ValueError(msg)
# Pad to self.timepoints so Fortran gets
# correctly sized arrays.
padded = np.zeros(
self.timepoints,
dtype=np.float64,
)
padded[:n_input] = array
self.postprocess_arrays[key] = np.asfortranarray(padded)
self.time_array = time_array
# Column-density (coldens) and visual extinction (Av) arrays
self.coldens_H_array = coldens_H_array
self.visual_extinction_array = visual_extinction_array
# Flags exposed for Fortran wrapper (mutually exclusive)
self.usecoldens = self.coldens_H_array is not None
self.useav = self.visual_extinction_array is not None
if self.usecoldens and self.useav:
msg = "Cannot use both column density and visual extinction arrays simultaneously."
raise AssertionError(msg)
if not self.give_start_abund:
self.starting_chemistry_array = np.zeros(
shape=(self.gridPoints, n_species),
dtype=np.float64,
order="F",
)
if self.run_type != "external":
self.run()
elif time_array is None and read_file is None:
msg = f"time_array must be an array if read_file is None. A value of {time_array} with type {type(time_array)} was given."
raise ValueError(msg)
[docs]
def run_fortran(self) -> dict[str, int | list]:
"""Run the UCLCHEM model, first resetting the numpy arrays.
Resetting uses `AbstractModel.run()`, then running the model. `check_error` and
`array_clean` are automatically called after the model run.
Returns
-------
dict[str, int | list]
Dictionary with two keys:
"success_flag" with value the success flag
"""
# Determine whether an Av grid was provided and set the flag expected by the Fortran wrapper
# Only pass arrays that are present (not None) to the Fortran wrapper
post_kwargs = {k: v for k, v in self.postprocess_arrays.items() if v is not None}
result = wrap.postprocess(
usecoldens=self.usecoldens,
useav=self.useav,
**post_kwargs,
dictionary=self._param_dict,
outspeciesin=self.out_species,
timepoints=self.timepoints,
gridpoints=self._param_dict["points"],
returnarray=True,
returnrateconstants=True,
givestartabund=self.give_start_abund,
physicsarray=self.physics_array,
chemicalabunarray=self.chemical_abun_array,
rateconstantsarray=self.rate_constants_array,
heatarray=self.heat_array,
statsarray=self.stats_array,
levelpopulationsarray=self.level_populations_array,
sestatsarray=self.se_stats_array,
abundancestart=self.starting_chemistry_array
if "starting_chemistry_array" in self.__dict__
else None,
)
success_flag = result[-1]
return {
"success_flag": success_flag,
}
def _create_init_dict(self):
return {
"usecoldens": self.coldens_H_array is not None,
"useav": self.useav,
"_param_dict": self._param_dict,
"out_species_list": self.out_species_list,
"out_species": self.out_species,
"timepoints": self.timepoints,
"give_start_abund": self.give_start_abund,
"_debug": self._debug,
"postprocess_arrays": self.postprocess_arrays,
**self.postprocess_arrays,
}
@register_model
[docs]
class Model(AbstractModel):
"""Model, like Postprocess, represents a model class with additional controls.
It inherits from AbstractModel.
Model follows the same logic as Postprocess but without the coldens Arguments.
It allows for additional controls of the time, density, gas temperature, radiation field,
and cosmic ray ionization rate through the use of arrays. Using these arrays allows for
experimental model crafting beyond the standard models in other model classes.
Parameters
----------
param_dict : dict
Dictionary containing the parameters to use for the UCLCHEM model.
Uses UCLCHEM default values found in `defaultparameters.f90`.
starting_chemistry : np.ndarray | None
Array containing the starting abundances to use for
the UCLCHEM model. Defaults to None.
previous_model : AbstractModel | None
Model object, a class that inherited from
AbstractModel, to use for the starting abundances of the new UCLCHEM model
that will be run. Defaults to None.
time_array : np.ndarray | None
Represents the time grid to be used for the model.
This sets the target timesteps for which outputs will be stored.
density_array : np.ndarray | None
Represents the value of the density at different
timepoints found in time_array.
gas_temperature_array : np.ndarray | None
Represents the value of the gas temperature
at different timepoints found in time_array.
dust_temperature_array : np.ndarray | None
Represents the value of the dust temperature
at different timepoints found in time_array.
zeta_array : np.ndarray | None
Represents the value of the cosmic ray ionization rate
at different timepoints found in time_array.
radfield_array : np.ndarray | None
Represents the value of the UV radiation field at
different timepoints found in time_array.
debug : bool
Flag if extra debug information should be printed to the terminal.
Defaults to False. #TODO Add debug features
read_file : str | None
Path to the file to be read. Reading a file to a model object,
prevents it from being run. Defaults to None.
"""
def __init__(
self,
param_dict: dict | None = None,
out_species: list[str] | None = None,
starting_chemistry: np.ndarray | None = None,
previous_model: AbstractModel | None = None,
time_array: np.ndarray | None = None,
density_array: np.ndarray | None = None,
gas_temperature_array: np.ndarray | None = None,
dust_temperature_array: np.ndarray | None = None,
zeta_array: np.ndarray | None = None,
radfield_array: np.ndarray | None = None,
debug: bool = False,
read_file: str | None = None,
run_type: Literal["managed", "external"] = "managed",
on_negative_abundances: Literal[None, "warning", "error", "raise"] = "warning",
on_error: Literal["raise", "warn", "ignore"] = "raise",
):
"""Initialize the model with :meth:`AbstractModel.__init__`.
Then run any additional commands needed for the model.
Parameters
----------
param_dict : dict | None
Dictionary of UCLCHEM parameters. Uses defaults from ``defaultparameters.f90`` for
any key not provided. Defaults to None.
out_species : list[str] | None
Not supported on OO model classes; passing any value raises ``TypeError``.
Use the functional interface to filter output species. Defaults to None.
starting_chemistry : np.ndarray | None
Array of starting abundances for each species. If None, uses network defaults.
Defaults to None.
previous_model : AbstractModel | None
A completed model whose final abundances are used as the starting chemistry
for this model. Defaults to None.
time_array : np.ndarray | None
Time grid (years) at which model outputs are stored. Defaults to None.
density_array : np.ndarray | None
Number density (cm⁻³) at each timestep. Defaults to None.
gas_temperature_array : np.ndarray | None
Gas temperature (K) at each timestep. Defaults to None.
dust_temperature_array : np.ndarray | None
Dust temperature (K) at each timestep. Defaults to None.
zeta_array : np.ndarray | None
Cosmic-ray ionization rate (s⁻¹) at each timestep. Defaults to None.
radfield_array : np.ndarray | None
UV radiation field strength (Habing units) at each timestep. Defaults to None.
debug : bool
If True, print extra debug information. Defaults to False.
read_file : str | None
Path to a previously saved model file to load instead of running. Defaults to None.
run_type : Literal['managed', 'external']
How the Fortran model is executed. ``'managed'`` runs automatically on init;
``'external'`` defers running to the caller. Defaults to 'managed'.
on_negative_abundances : Literal[None, 'warning', 'error', 'raise']
Action when negative abundances are detected after a run. Defaults to 'warning'.
on_error : Literal['raise', 'warn', 'ignore']
Action when the Fortran solver returns an error flag. Defaults to 'raise'.
Raises
------
ValueError
If not all arrays have the same length.
ValueError
If `read_file` is None, but `time_array` is not an array.
TypeError
If ``out_species`` is passed (not supported on OO model classes).
"""
if out_species is not None:
msg = (
"out_species is not supported on OO model classes. "
"Use the functional interface to filter output species, "
"or access all species via model.chemical_abun_array after running."
)
raise TypeError(msg)
# Allocate 1.5x the input timesteps to give the DVODE solver
# headroom for additional internal substeps.
super().__init__(
param_dict=param_dict,
starting_chemistry=starting_chemistry,
previous_model=previous_model,
timepoints=int(1.5 * len(time_array))
if time_array is not None
else TIMEPOINTS,
debug=debug,
read_file=read_file,
run_type=run_type,
on_negative_abundances=on_negative_abundances,
on_error=on_error,
)
if read_file is None and time_array is not None:
n_input = len(time_array)
self.time_array = time_array
self.postprocess_arrays = {
"timegrid": time_array,
"densgrid": density_array,
"gastempgrid": gas_temperature_array,
"dusttempgrid": dust_temperature_array,
"radfieldgrid": radfield_array,
"zetagrid": zeta_array,
}
for key, array in self.postprocess_arrays.items():
if array is not None:
if isinstance(array, float):
array = np.ones(n_input) * array
if len(array) != n_input:
msg = "All arrays must be the same length"
raise ValueError(msg)
# Pad to self.timepoints so Fortran gets
# correctly sized arrays.
padded = np.zeros(
self.timepoints,
dtype=np.float64,
)
padded[:n_input] = array
self.postprocess_arrays[key] = np.asfortranarray(padded)
if not self.give_start_abund:
self.starting_chemistry_array = np.zeros(
shape=(self.gridPoints, n_species),
dtype=np.float64,
order="F",
)
if self.run_type != "external":
self.run()
elif time_array is None and read_file is None:
msg = f"time_array must be an array if read_file is None. A value of {time_array} with type {type(time_array)} was given."
raise ValueError(msg)
[docs]
def run_fortran(self) -> dict[str, int | list]:
"""Run the UCLCHEM model, first resetting the numpy arrays.
Resetting uses `AbstractModel.run()`, then running the model. `check_error` and
`array_clean` are automatically called after the model run.
Returns
-------
dict[str, int | list]
Dictionary with two keys:
"success_flag" with value the success flag
"""
result = wrap.postprocess(
usecoldens=False,
**self.postprocess_arrays,
dictionary=self._param_dict,
outspeciesin=self.out_species,
timepoints=self.timepoints,
gridpoints=self._param_dict["points"],
returnarray=True,
returnrateconstants=True,
givestartabund=self.give_start_abund,
physicsarray=self.physics_array,
chemicalabunarray=self.chemical_abun_array,
rateconstantsarray=self.rate_constants_array,
heatarray=self.heat_array,
statsarray=self.stats_array,
levelpopulationsarray=self.level_populations_array,
sestatsarray=self.se_stats_array,
abundancestart=self.starting_chemistry_array
if "starting_chemistry_array" in self.__dict__
else None,
)
success_flag = result[-1]
return {
"success_flag": success_flag,
}
def _create_init_dict(self):
return {
"_param_dict": self._param_dict,
"out_species_list": self.out_species_list,
"out_species": self.out_species,
"timepoints": self.timepoints,
"give_start_abund": self.give_start_abund,
"_debug": self._debug,
**self.postprocess_arrays,
}
@register_model # type: ignore[arg-type] # SequentialRunner is a composite wrapper, not a direct AbstractModel subclass
[docs]
class SequentialRunner:
"""The SequentialRunner class allows for multiple models to be run back to back.
By defining a specific dictionary to hold the information of each model class to run in sequence, SewuentialModel allows
for the automatic running of multiple models as well as matching some physical parameters from one model to the next.
Parameters
----------
sequenced_model_parameters : list[dict[str, Any]]
The List of dictionaries to pass to SequentialRunner takes the format of
[{"<First Model Class>":{"param_dict":{<parameters>}, <other arguments>}}, {"<Second Model Class>:{"param_dict":{<parameters>}, <other arguments>}}, ...}]
parameters_to_match : list[str]
The list provided to this argument decides which parameters
should be matched from a previous model to the next model in the sequence.
Currently, supports one or more of `["finalDens", "finalTemp"]`.
run_type : Literal['managed', 'external']
Run type.
Must be either "managed", or "external".
Raises
------
NotImplementedError
If a parameter in `parameters_to_match` is not one of
`["finalDens", "finalTemp"]`.
""" # noqa: W505
def __init__(
self,
sequenced_model_parameters: list,
parameters_to_match: list | None = None,
run_type: Literal["managed", "external"] = "managed",
on_error: Literal["raise", "warn", "ignore"] = "raise",
):
for model in sequenced_model_parameters:
if model[next(iter(model.keys()))] == SequentialRunner:
raise AssertionError
[docs]
self.sequenced_model_parameters = sequenced_model_parameters
[docs]
self.parameters_to_match = parameters_to_match
if self.parameters_to_match is not None:
for parameter in self.parameters_to_match:
if parameter not in {"finalTemp", "finalDens"}:
msg = f"Parameter '{parameter}' has not been implemented for parameter matching"
raise NotImplementedError(msg)
[docs]
self.run_type = run_type
self._on_error = on_error
self._pickle_dict: dict = {}
[docs]
self.success_flag: bool | None = None
if self.run_type == "managed":
self.run()
[docs]
def run(self) -> None:
"""Run the sequential model.
Raises
------
NotImplementedError
If a parameter in `parameters_to_match` is not one of
`["finalDens", "finalTemp"]`.
RuntimeError
If ``previous_model`` is ``None`` when ``model_count > 0``.
"""
previous_model: AbstractModel | None = None
stage_failed = False
for base_model_dict in self.sequenced_model_parameters:
if stage_failed:
break
for model_type, model_dict in base_model_dict.items():
model_dict["param_dict"] = {
k.lower(): v for k, v in model_dict["param_dict"].items()
}
if self.model_count > 0:
if previous_model is None:
msg = "previous_model is None but model_count > 0; this should not happen."
raise RuntimeError(msg)
previous_param_dict = previous_model._param_dict.copy()
# Remove the converted stopping-mode key inherited
# from the previous stage before merging
previous_param_dict.pop("parcelstoppingmode", None)
model_dict["param_dict"] = {
**previous_param_dict,
**model_dict["param_dict"],
}
if self.parameters_to_match is not None:
for parameter in self.parameters_to_match:
if parameter == "finalDens":
model_dict["param_dict"]["initialdens"] = (
previous_model.physics_array[-1, 0, 1].item()
)
continue
elif parameter == "finalTemp":
model_dict["param_dict"]["initialtemp"] = (
previous_model.physics_array[-1, 0, 2].item()
)
else:
msg = f"Parameter '{parameter}' has not been implemented for parameter matching"
raise NotImplementedError(msg)
tmp_model = REGISTRY[model_type](
**model_dict,
run_type=self.run_type,
previous_model=previous_model,
on_error=self._on_error,
)
else:
tmp_model = REGISTRY[model_type](
**model_dict,
run_type=self.run_type,
previous_model=previous_model,
on_error=self._on_error,
)
if self.run_type == "external":
tmp_model.run()
successful = tmp_model.success_flag == SuccessFlag.SUCCESS
self.models += [
{
"Model_Type": model_type,
"Model_Order": self.model_count,
"Model": tmp_model,
"Success": tmp_model.success_flag,
"Successful": successful,
}
]
self.model_count += 1
if not successful:
# Abort the sequence — running subsequent stages on bad physics
# output is unsafe regardless of on_error mode.
stage_failed = True
break
previous_model = tmp_model
self.success_flag = all(d["Successful"] for d in self.models)
[docs]
def save_model(
self,
*,
file_obj: h5py.File | None = None,
file: str | None = None,
name: str = "",
overwrite: bool = False,
) -> None:
"""Save a model to an open file object or to a file.
Parameters
----------
file_obj : h5py.File | None
open file h5py file object. Default = None.
file : str | None
file to write to. Default = None.
name : str
name to save model under. (Default value = '')
overwrite : bool
Boolean on whether to overwrite pre-existing models, or error out.
Defaults to False
Raises
------
ValueError
If file_obj and file are both passed, or neither are passed.
"""
opened_file = False
if (file_obj is None) == (file is None):
msg = "file_obj or file must be passed."
raise ValueError(msg)
elif file_obj is None:
file_obj = h5py.File(file, "a")
opened_file = True
for model in self.models:
model["Model"].save_model(
file_obj=file_obj,
name=f"{name}_{model['Model_Order']}_{model['Model_Type']}",
overwrite=overwrite,
)
if opened_file:
file_obj.close()
[docs]
def check_conservation(
self, element_list: list[str] | None = None, percent: bool = True
) -> None:
"""Check conservation of the chemical abundances.
Parameters
----------
element_list : list[str] | None
List of elements to check conservation for.
If None, use `uclchem.constants.default_elements_to_check`. Default = None.
percent : bool
Flag on if percentage values should be used. Defaults to True.
"""
if element_list is None:
element_list = default_elements_to_check
for model in self.models:
conserve_dicts: list[dict[str, str]] = []
if model["Model"]._param_dict["points"] > 1:
for pt in range(model["Model"]._param_dict["points"]):
df = model["Model"].get_dataframes(pt)
if isinstance(df, pd.DataFrame):
conserve_dicts += [
check_element_conservation(df, element_list, percent)
]
else:
df = model["Model"].get_dataframes(0)
if isinstance(df, pd.DataFrame):
conserve_dicts += [
check_element_conservation(df, element_list, percent)
]
conserved = True
for conserve_dict in conserve_dicts:
conserved = all(float(x[:1]) < 1 for x in conserve_dict.values())
model["elements_conserved"] = conserved
[docs]
def pickle(self) -> None:
"""Pickle the models."""
if not bool(self._pickle_dict):
for model in self.models:
model["Model"] = model["Model"].pickle()
self._pickle_dict[f"{model['Model_Order']}_{model['Model_Type']}"] = (
model["Model"]._pickle_dict.copy()
)
[docs]
def un_pickle(self) -> None:
"""Un-pickle the models."""
if bool(self._pickle_dict):
for model in self.models:
model["Model"]._pickle_dict = self._pickle_dict[
f"{model['Model_Order']}_{model['Model_Type']}"
]
model["Model"] = model["Model"].un_pickle()
def _coordinator_unlink_memory(self):
for model in self.models:
model["Model"]._coordinator_unlink_memory()
def _run_grid_model(
model_id: int,
model_type: str,
pending_model: dict[str, Any],
log_dir: str | Path | None = None,
) -> tuple[int, object]:
"""Run a single model. This is used by the GridRunner class.
Parameters
----------
model_id : int
id of model
model_type : str
string representing the type of model
pending_model : dict[str, Any]
dictionary with arguments necessary to initialize
model.
log_dir : str | Path | None
If not None, write logs to "model_{model_id}.log".
If None, do not write logs. Default = None.
Returns
-------
model_id : int
model id of run model
model_obj : object
pickled model object
Raises
------
KeyError
If ``model_type`` is not found in the model registry.
"""
log_file = None
if log_dir is not None:
log_file = str(Path(log_dir) / f"model_{model_id}.log")
cls = REGISTRY.get(model_type)
if cls is None:
msg = f"Model type '{model_type}' is not registered."
raise KeyError(msg)
with capture_fortran_output(label=f"model_{model_id}", log_file=log_file):
model_obj = cls(
**pending_model,
run_type=("managed" if model_type == "SequentialRunner" else "external"),
)
if model_type != "SequentialRunner":
model_obj.run()
model_obj._coordinator_unlink_memory()
model_obj.pickle()
return model_id, model_obj
# The following parameters from various chemical models, cannot be used as grid parameters.
[docs]
NoGridParameters = [
"starting_chemistry",
"time_array",
"density_array",
"gas_temperature_array",
"dust_temperature_array",
"zeta_array",
"radfield_arrayvisual_extinction_array",
"coldens_H_array",
"coldens_H2_array",
"coldens_CO_array",
"coldens_C_array",
"debug",
"read_file",
"run_type",
]
class _NoDaemonProcess(mp.Process):
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, value): # noqa :ANN001
pass
[docs]
class NoDaemonPool(pool.Pool): # noqa
@staticmethod
[docs]
def Process(ctx, *args, **kwargs): # noqa
return _NoDaemonProcess(*args, **kwargs)
[docs]
def get_number_of_grid_workers(max_workers: int | None) -> int:
"""Determine the number of grid workers.
Parameters
----------
max_workers : int | None
Maximum number of workers to use in parallel for
the grid run. If None, use ``os.cpu_count()-1`` workers. Default = None.
Returns
-------
int
Number of CPU cores to use for grid models.
Raises
------
RuntimeError
If ``max_workers`` is None, but func:`os.cpu_count()` also returns None.
"""
cpu_count = os.cpu_count()
if cpu_count is None:
if max_workers is None:
msg = "Could not determine number of CPU cores, but max_workers was None. Provide max_workers instead."
raise RuntimeError(msg)
warnings.warn(
f"Could not determine number of CPU cores. Using {max_workers} workers.",
stacklevel=2,
)
return max_workers
if max_workers is None:
return cpu_count - 1
if max_workers >= cpu_count:
warnings.warn(
f"max_workers was given as {max_workers}, which is higher than the number of physical CPU cores on the system ({cpu_count}). Using {cpu_count - 1} instead.",
stacklevel=2,
)
return cpu_count - 1
else:
return max_workers
[docs]
class GridRunner:
"""GridRunner, like SequentialRunner is not an actual uclchem model,.
instead it allows running multiple models on a grid of parameter space.
Parameters
----------
model_type : str
Name of the registered model class to run (e.g. ``"Cloud"``, ``"Collapse"``).
full_parameters : Dict
The dictionary passed to GridRunner should nest into it,
the param_dict argument that would be passed to any other model, with the addition
of extra keys for the none param_dict variables of a model. Any variables that are
turned into lists or arrays, will automatically be assumed to be used for the gridding.
max_workers : int | None
Maximum number of workers to use in parallel for the grid run.
If None, use ``os.cpu_count()-1``. Default = None.
grid_file : str
Name and path of the output file to which the models should be saved.
Defaults to "./default_grid_out.h5".
model_name_prefix : str
Name prefix convention to use. The fifth model in the grid
would have the name "<model_name_prefix>5>" assigned to it. Defaults to "",
which would make the fifth model have the name "5", for example.
delay_run : bool
Whether to immediately start the models upon initialization,
or delay until the user calls `self.run()`. Defaults to False (start immediately).
log_dir : str | None
Where to write logs. If None, do not write logs. Default = None.
model_ids : list | None
Optional subset of model indices (0-based column in flat_grids)
to run. None means run all models in the grid. Default = None.
"""
def __init__(
self,
model_type: str,
full_parameters: dict | list,
max_workers: int | None = None,
grid_file: str = "./default_grid_out.h5",
model_name_prefix: str = "",
overwrite_models: bool = False,
delay_run: bool = False,
log_dir: str | None = None,
model_ids: list | None = None,
create_grid: bool = True,
):
if model_type not in REGISTRY:
raise AssertionError
[docs]
self.model_type = model_type
[docs]
self.full_parameters = full_parameters
[docs]
self.max_workers = get_number_of_grid_workers(max_workers)
_logger.debug(f"Number of workers set to {self.max_workers}")
[docs]
self.grid_file = grid_file if ".h5" in grid_file else grid_file + ".h5"
# TODO: Implement model appending to grid file
# TODO: Implement option to append or overwrite grid file.
# Initial placeholder statement to remove pre-existing grid files
if Path(self.grid_file).is_file():
Path(self.grid_file).unlink()
[docs]
self.model_name_prefix = model_name_prefix
[docs]
self.overwrite_models = overwrite_models
self._main_log: str | None
if self.log_dir is not None:
Path(self.log_dir).mkdir(exist_ok=True, parents=True)
self._main_log = str(Path(self.log_dir) / "grid.log")
else:
self._main_log = None
self._orig_sigint = signal.getsignal(signal.SIGINT)
[docs]
self.parameters_to_grid: dict = {}
if self.model_type == "SequentialRunner":
if not isinstance(self.full_parameters, list):
msg = f"For SequentialRunner types, full_parameters must be a list. {type(self.full_parameters)} was passed."
raise TypeError(msg)
for model_count in range(len(self.full_parameters)):
for model_full_params in self.full_parameters[model_count].values():
if not isinstance(model_full_params, dict):
continue
for k, v in model_full_params.items():
if k == "param_dict":
for k_p, v_p in v.items():
self._grid_def(k_p, v_p, model_count)
else:
self._grid_def(k, v, model_count)
else:
if not isinstance(self.full_parameters, dict):
msg = f"For none SequentialRunner types, full_parameters must be a dictionary. {type(self.full_parameters)} was passed."
raise TypeError(msg)
for k, v in self.full_parameters.items():
if k == "param_dict":
for k_p, v_p in v.items():
self._grid_def(k_p, v_p)
else:
self._grid_def(k, v)
if create_grid:
grids = np.meshgrid(*self.parameters_to_grid.values(), indexing="xy")
self.flat_grids = np.reshape(
grids,
(
len(self.parameters_to_grid),
int(np.prod(np.shape(grids)) / len(self.parameters_to_grid)),
),
)
else:
if len({len(v) for v in self.parameters_to_grid.values()}) != 1:
raise AssertionError
self.flat_grids = np.array(
[
[p[i] for p in self.parameters_to_grid.values()]
for i in range(len(next(iter(self.parameters_to_grid.values()))))
]
).T
# Optional subset of model indices (0-based column in flat_grids) to run.
# None means run all. Accepts any iterable; stored as a frozenset for O(1) lookup.
[docs]
self.model_ids = frozenset(model_ids) if model_ids is not None else None
[docs]
self.model_id_dict: dict = {}
[docs]
self.physics_values = None
[docs]
self.chemical_abun_values = None
if not delay_run:
self.run()
def _grid_def(self, key: str, value: Any, model_count: int | None = None) -> None:
prefix: str = "" if model_count is None else f"{model_count}_"
if isinstance(value, list) and key not in NoGridParameters:
self.parameters_to_grid[prefix + key] = value
self.parameters_to_grid[prefix + key] = np.array(value, dtype=object)
elif isinstance(value, (np.ndarray, np.generic)) and key not in NoGridParameters:
self.parameters_to_grid[prefix + key] = value.astype(dtype=object)
def _log_main(self, msg: str) -> None:
"""Append a timestamped line to the main grid log file.
Parameters
----------
msg : str
Message text to append to the log file.
"""
if self._main_log is None:
return
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with Path(self._main_log).open("a") as f:
f.write(f"{ts} {msg}\n")
[docs]
def run(self) -> None:
"""Run the grid."""
signal.signal(signal.SIGINT, self._handler)
n_total = np.shape(self.flat_grids)[1]
self._log_main(f"Grid started: {n_total} models, {self.max_workers} workers")
# Capture advanced settings so spawned workers start with the same
# Fortran module state as the coordinator process.
from uclchem.advanced.worker_state import ( # noqa: PLC0415 circular
_pool_initializer,
create_snapshot,
)
snapshot = create_snapshot()
pending = self.grid_iter(
self.full_parameters,
list(self.parameters_to_grid.keys()),
self.flat_grids,
self.model_type,
)
file_obj = h5py.File(self.grid_file, "a")
PoolClass = NoDaemonPool if self.model_type == "SequentialRunner" else mp.Pool
with PoolClass(
self.max_workers,
initializer=_pool_initializer,
initargs=(snapshot,),
maxtasksperchild=1,
) as pool:
completed = 0
def on_result(result: tuple[int, Any]) -> None:
nonlocal completed
completed += 1
model_id, model_object = result
try:
save_name = f"{self.model_name_prefix}{model_id}"
model_object.un_pickle()
failed = (
model_object.success_flag is not None
and model_object.success_flag < 0
)
if failed:
msg = model_object.success_flag.check_error(raise_on_error=False)
self._log_main(
f"model_{model_id} failed ({completed}/{n_total}): {msg}"
)
else:
self._log_main(
f"model_{model_id} completed ({completed}/{n_total})"
)
model_object.save_model(
file_obj=file_obj,
name=save_name,
overwrite=self.overwrite_models,
)
self.model_id_dict[model_id] = save_name
file_obj.flush()
except Exception as e:
print(f"Error saving model {model_id}: {e}")
traceback.print_exc()
def on_error(_exc: Any, _model_id: int) -> None:
self._log_main(f"model_{_model_id} error: {_exc}")
print(f"error: {_exc}; for model: {_model_id}")
# Submit all tasks upfront; the pool limits actual concurrency to
# max_workers. pool.close()/join() are called only after all tasks
# have been submitted, so apply_async never races with a closed pool.
for pending_model in pending:
model_id = pending_model.pop("id")
if self.model_ids is not None and model_id not in self.model_ids:
continue
self._log_main(f"model_{model_id} started")
pool.apply_async(
_run_grid_model,
args=(model_id, self.model_type, pending_model, self.log_dir),
callback=on_result,
error_callback=(lambda exc, _mid=model_id: on_error(exc, _mid)), # type: ignore[misc] # mypy cannot infer lambda with default arg capture
)
pool.close()
pool.join()
file_obj.close()
signal.signal(signal.SIGINT, self._orig_sigint)
self._log_main(f"Grid finished: {len(self.model_id_dict)} models completed")
self.models = [
{"Model": v}
for _, v in sorted(self.model_id_dict.items(), key=lambda item: item[1])
]
self._load_params()
[docs]
def load_phys(self) -> None:
"""Load the physics.
Raises
------
NotImplementedError
If the model type is `SequentialRunner`.
"""
if self.model_type == "SequentialRunner":
msg = "Sequential Runner physics loading not implemented"
raise NotImplementedError(msg)
for model in range(len(self.models)):
loaded_data = self._load_model_data(model=self.models[model]["Model"])
if self.physics_values is None:
self.physics_values = json.loads(loaded_data["attributes_dict"].item())[
"physics_values"
]
loaded_data = loaded_data.assign_coords(
{"physics_values": self.physics_values}
)
self.models[model]["physics_array"] = loaded_data["physics_array"]
[docs]
def load_chem(self, species: list[str]) -> None:
"""Load the chemistry.
Parameters
----------
species : list[str]
list of species to load abundances for.
Raises
------
NotImplementedError
If the model type is `SequentialRunner`.
"""
if self.model_type == "SequentialRunner":
msg = "Sequential Runner chemistry loading not implemented"
raise NotImplementedError(msg)
for model in range(len(self.models)):
loaded_data = self._load_model_data(model=self.models[model]["Model"])
if self.chemical_abun_values is None:
self.chemical_abun_values = json.loads(
loaded_data["attributes_dict"].item()
)["chemical_abun_values"]
loaded_data = loaded_data.assign_coords(
{"chemical_abun_values": self.chemical_abun_values}
)
self.models[model]["species_abundances_array"] = loaded_data[
"chemical_abun_array"
].sel(chemical_abun_values=species)
def _load_params(self) -> None:
"""Loop through the models present in the grid models self.models.
attribute in order to load the changing physical parameters of the models.
The method splits the loops into two cases. SequentialRunner, and other model cases.
In both instances, the for loop loads the model data using the _load_model_data()
method. Then, it matches the given parameters, with the changing parameters in
order to populate the dictionary attribute self.models for users to view the
models run for the given GridRunner object. The Differentiation of
SequentialRunner stems from SequentialRunner instances being nested models,
resulting in an additional required loop to take into account the
additional nesting.
"""
# The following loops perform the same actions, but for the different model types
if self.model_type == "SequentialRunner":
for model in range(len(self.models)):
for model_count in range(len(self.full_parameters)):
for mt_k, mt_v in self.full_parameters[model_count].items():
if not isinstance(mt_v, dict):
continue
tmp_model = self._load_model_data(
model=f"{self.models[model]['Model']}_{model_count}_{mt_k}"
)
self.models[model][f"{model_count}_{mt_k}"] = {
**{
k.replace(f"{model_count}_", ""): tmp_model._param_dict[
k.replace(f"{model_count}_", "").lower()
]
for k in list(self.parameters_to_grid.keys())
if k[: len(str(model_count))] == str(model_count)
and k.replace(f"{model_count}_", "").lower()
in tmp_model._param_dict
},
**{
k.replace(f"{model_count}_", ""): tmp_model.__getattr__( # noqa: PLC2801 bypass
k.replace(f"{model_count}_", "")
)
for k in list(self.parameters_to_grid.keys())
if mt_k in k
and k.replace(mt_k, "").lower() in tmp_model._data
},
}
self.models[model][f"{model_count}_{mt_k}"]["Successful"] = (
True
if tmp_model.success_flag == 0
else tmp_model.success_flag
)
else:
for model in range(len(self.models)):
loaded_data = self._load_model_data(model=self.models[model]["Model"])
loaded_dict = loaded_data._param_dict
self.models[model] = {
**self.models[model],
**{
k: loaded_dict[k.lower()]
for k in list(self.parameters_to_grid.keys())
},
}
self.models[model]["Successful"] = (
True if loaded_data.success_flag == 0 else loaded_data.success_flag
)
def _load_model_data(self, model: str):
tmp_model = load_model(file=self.grid_file, name=model)
return tmp_model
[docs]
def check_conservation(
self, element_list: list[str] | None = None, percent: bool = True
) -> None:
"""Check conservation of the chemical abundances.
Parameters
----------
element_list : list[str] | None
List of elements to check conservation for.
If None, use `uclchem.constants.default_elements_to_check`. Default = None.
percent : bool
Flag on if percentage values should be used.
Defaults to True.
"""
if element_list is None:
element_list = default_elements_to_check
for model in range(len(self.models)):
tmp_model = load_model(file=self.grid_file, name=self.models[model]["Model"])
conserve_dicts: list[dict[str, str]] = []
if tmp_model._param_dict["points"] > 1:
for pt in range(tmp_model._param_dict["points"]):
df = tmp_model.get_dataframes(pt)
if isinstance(df, pd.DataFrame):
conserve_dicts += [
check_element_conservation(df, element_list, percent)
]
else:
df = tmp_model.get_dataframes(0)
if isinstance(df, pd.DataFrame):
conserve_dicts += [
check_element_conservation(df, element_list, percent)
]
conserved = True
for conserve_dict in conserve_dicts:
conserved = all(float(x[:1]) < 1 for x in conserve_dict.values())
self.models[model]["elements_conserved"] = conserved
def _handler(self, signum: Any, frame: Any) -> None: # noqa: ARG002
try:
self.on_interrupt() # your “final steps”
finally:
# Restore default and re-raise KeyboardInterrupt to stop execution
signal.signal(signal.SIGINT, self._orig_sigint)
raise KeyboardInterrupt
[docs]
def on_interrupt(self) -> None: # noqa: PLR6301
"""Handle a keyboard interrupt during a grid run. Override in subclasses to add cleanup."""
return
@staticmethod
[docs]
def grid_iter(
full_parameters: dict | list,
param_keys: list,
flattened_grids: np.ndarray,
model_type: str,
) -> Iterator[dict[str, Any]]:
"""Provide an iterable dictionary of parameters that can be used with the.
grid-based multiprocessing worker distribution.
Parameters
----------
full_parameters : dict | list
dictionary or list
(if model_type == SequentialRunner)
of the full parameters that will be used for the model
param_keys : list
list of parameters that are changing in
this GridRunner object
flattened_grids : np.ndarray
list of all the values for the changing parameters
for this GridRunner object
model_type : str
Type of model to use. 'SequentialRunner' results in
alternative way of executing this phase as each SequentialRunner
represents multiple models to be run in series.
Yields
------
dict[str, Any]
Next dictionary containing the parameter values for the model to run
in the grid of models. Only offers one model per request.
Raises
------
TypeError
If ``model_type`` is not ``"SequentialRunner"`` and
``full_parameters`` is not a ``dict``.
"""
if model_type == "SequentialRunner":
# As SequentialRunner types contain multiple models,
# sometimes of the same type, we split this type out to follow altered
# logic to arrive at equivalently expected outputs.
for i in range(len(flattened_grids[0])):
combo: tuple = ()
for j in range(np.shape(flattened_grids)[0]):
combo += (flattened_grids[j][i],)
yield_dict = {"id": i}
# run_list contains the dictionaries of each model to be run
# in the sequence of models. This is the SequentialRunner
# sequenced_model_parameters input.
run_list = []
for model_count in range(len(full_parameters)):
# run_dict contains all input parameters for a model,
# not just param_dict, for an individual model that is
# part of the SequentialRunner.
run_dict = {}
for seq_model_type, model_full_parameters in full_parameters[
model_count
].items():
if isinstance(model_full_parameters, dict):
# grid_param_dict is filled with the param_dict values of a model.
grid_param_dict = {
k.replace(f"{model_count}_", ""): v
for k, v in zip(param_keys, combo, strict=False)
if k.replace(f"{model_count}_", "")
in model_full_parameters["param_dict"]
and k[: len(str(model_count))] == str(model_count)
}
# grid_dict is filled with the input parameters of a value,
# not part of param_dict
grid_dict = {
k.replace(f"{model_count}_", ""): v
for k, v in zip(param_keys, combo, strict=False)
if k.replace(f"{model_count}_", "")
not in model_full_parameters["param_dict"]
and (
k[: len(str(model_count))] == str(model_count)
if k[: len(str(model_count))].isdigit()
else False
)
}
run_dict[seq_model_type] = {
**model_full_parameters,
"param_dict": {
**model_full_parameters["param_dict"],
**grid_param_dict,
},
**grid_dict,
}
run_list += [run_dict]
else:
yield_dict[seq_model_type] = model_full_parameters
yield {
"parameters_to_match": ["finalDens"],
**yield_dict,
**{"sequenced_model_parameters": run_list},
}
else:
if not isinstance(full_parameters, dict):
msg = (
"full_parameters must be a dict for non-SequentialRunner model types"
)
raise TypeError(msg)
for i in range(len(flattened_grids[0])):
combo = ()
for j in range(np.shape(flattened_grids)[0]):
combo += (flattened_grids[j][i],)
# grid_param_dict contains the param_dict values of the next model to run.
grid_param_dict = {
k: v
if not isinstance(v, float)
else (v.item() if hasattr(v, "item") else v)
for k, v in zip(param_keys, combo, strict=False)
if k in full_parameters["param_dict"]
}
# grid_dict is filled with the input parameters of a value,
# not part of param_dict, for the next model to run
grid_dict = {
k: v
for k, v in zip(param_keys, combo, strict=False)
if k not in full_parameters["param_dict"]
}
yield {
**full_parameters,
"param_dict": {**full_parameters["param_dict"], **grid_param_dict},
**grid_dict,
"id": i,
}