Source code for uclchem.advanced.worker_state

"""Snapshot and restore of advanced settings for multiprocessing propagation.

When UCLCHEM runs grid models or managed-mode models, worker processes are
spawned via ``mp.Pool`` or ``mp.Process`` with the ``spawn`` context.  These
fresh processes import ``uclchemwrap`` from scratch, losing any runtime
modifications made through ``GeneralSettings``, ``HeatingSettings``, or
``NetworkState``.

This module provides :func:`create_snapshot` / :func:`restore_snapshot` to
capture the current Fortran module state into a picklable dict and re-apply
it in a worker process before the model runs.

"""

import contextlib
import logging
from typing import Any

import numpy as np
import uclchemwrap
from uclchemwrap import f2py_constants as f2py_constants_module
from uclchemwrap import heating as heating_module
from uclchemwrap import network as network_module

from uclchem.advanced.runtime_network import RuntimeNetwork

from .constants import FILE_PATH_PARAMETERS, FORTRAN_PARAMETERS, INTERNAL_PARAMETERS

[docs] logger = logging.getLogger(__name__)
# Module names mirroring GeneralSettings._discover_modules() _MODULE_NAMES = [ "defaultparameters", "network", "heating", "physicscore", "constants", "cloud_mod", "collapse_mod", "cshock_mod", "jshock_mod", "hotcore", "chemistry", "rates", "photoreactions", "surfacereactions", "io", "f2py_constants", "postprocess_mod", "sputtering", ] # Modules where *all* 0-d array attributes are Fortran PARAMETERs (compile-time # constants compiled into read-only pages). Their runtime state is either # handled by dedicated snapshot sections (e.g. f2py_constants → heating section) # or never needs propagation to workers. Attempting setattr on these in a # freshly-spawned worker causes SIGBUS on macOS. _MODULES_SKIP_0D = frozenset( { "constants", # physical constants (c, k_boltz, …) – all PARAMETERs "f2py_constants", # build-time counts (nspec, nReac, …) – all PARAMETERs "surfacereactions", # grain/surface constants – all PARAMETERs } ) _NETWORK_ARRAYS_TO_TAKE_SNAPSHOT_OF = RuntimeNetwork._ARRAYS_TO_CACHE
[docs] def create_snapshot() -> dict[str, Any]: """Capture the current Fortran module state into a picklable dict. Reads directly from Fortran memory (not cached Python values) to ensure accuracy even when settings were modified outside the wrapper classes. The returned dict has three sections: * ``"general"`` – scalar settings from all uclchemwrap sub-modules (excluding PARAMETERs, INTERNAL, FILE_PATH, and arrays). * ``"heating"`` – heating/cooling boolean arrays, scalars, and coolant configuration. * ``"network"`` – Everything in :data:`_NETWORK_ARRAYS_TO_TAKE_SNAPSHOT_OF`. Returns ------- dict[str, Any] Fully picklable dict suitable for passing to :func:`restore_snapshot`. """ snapshot: dict[str, Any] = {} # --- General settings (scalars only) --- general: dict[str, dict[str, Any]] = {} for mod_name in _MODULE_NAMES: if not hasattr(uclchemwrap, mod_name): continue mod = getattr(uclchemwrap, mod_name) mod_snapshot: dict[str, Any] = {} for attr in dir(mod): if attr.startswith("_"): continue try: value = getattr(mod, attr) except Exception: logger.debug( "Could not read attribute %r from module %r; skipping.", attr, mod_name, ) continue if callable(value): continue # Skip arrays (handled by heating/network sections, or immutable), # but keep 0-d arrays (f2py scalar wrappers like cloud_mod scalars). if isinstance(value, np.ndarray) and value.ndim > 0: continue # Skip 0-d arrays for modules that are entirely Fortran PARAMETERs — # these are read-only in spawned workers and cause SIGBUS on macOS. if ( isinstance(value, np.ndarray) and value.ndim == 0 and mod_name in _MODULES_SKIP_0D ): continue # Skip parameters that cannot or should not be set attr_lower = attr.lower() if attr_lower in FORTRAN_PARAMETERS: continue if attr_lower in INTERNAL_PARAMETERS: continue if attr_lower in FILE_PATH_PARAMETERS: continue # Convert 0-d numpy arrays to Python scalars for clean pickling if isinstance(value, np.ndarray) and value.ndim == 0: value = value.item() mod_snapshot[attr] = value if mod_snapshot: general[mod_name] = mod_snapshot snapshot["general"] = general # --- Heating / cooling settings --- heating: dict[str, Any] = { "heating_modules": np.copy(heating_module.heating_modules), "cooling_modules": np.copy(heating_module.cooling_modules), "dust_gas_coupling_method": int(heating_module.dust_gas_coupling_method), "line_solver_attempts": int(heating_module.line_solver_attempts), "pahabund": float(heating_module.pahabund), "coolantdatadir": np.copy(f2py_constants_module.coolantdatadir), "coolant_active": np.copy(f2py_constants_module.coolant_active), } # Coolant restart mode – accessor pattern varies between builds if hasattr(uclchemwrap, "get_coolant_restart_mode_wrap"): heating["coolant_restart_mode"] = int(uclchemwrap.get_coolant_restart_mode_wrap()) elif hasattr( getattr(uclchemwrap, "uclchemwrap", None), "get_coolant_restart_mode_wrap" ): heating["coolant_restart_mode"] = int( uclchemwrap.uclchemwrap.get_coolant_restart_mode_wrap() ) snapshot["heating"] = heating # --- Network state (rate parameters + binding energies) --- snapshot["network"] = { array_name: np.copy(getattr(network_module, array_name)) for array_name in _NETWORK_ARRAYS_TO_TAKE_SNAPSHOT_OF } return snapshot
[docs] def restore_snapshot(snapshot: dict[str, Any]) -> None: """Apply a previously captured snapshot to the current process. Must be called **before** running any model in the worker process. Parameters ---------- snapshot : dict[str, Any] Dict produced by :func:`create_snapshot`. """ # --- General settings --- # If uclchem hangs here, the last debug line printed shows which Fortran # PARAMETER is blocking. Add it to src/uclchem/advanced/fortran_metadata.yaml # and regenerate with: uclchem-generate-metadata for mod_name, settings_dict in snapshot.get("general", {}).items(): if not hasattr(uclchemwrap, mod_name): continue mod = getattr(uclchemwrap, mod_name) for attr, value in settings_dict.items(): logger.debug("setattr(%s, %s, %r)", mod_name, attr, value) with contextlib.suppress(AttributeError, TypeError): # read-only or incompatible – skip silently setattr(mod, attr, value) # --- Heating / cooling settings --- heating = snapshot.get("heating", {}) if "heating_modules" in heating: heating_module.heating_modules[:] = heating["heating_modules"] if "cooling_modules" in heating: heating_module.cooling_modules[:] = heating["cooling_modules"] if "dust_gas_coupling_method" in heating: heating_module.dust_gas_coupling_method = heating["dust_gas_coupling_method"] if "line_solver_attempts" in heating: heating_module.line_solver_attempts = heating["line_solver_attempts"] if "pahabund" in heating: heating_module.pahabund = heating["pahabund"] if "coolantdatadir" in heating: f2py_constants_module.coolantdatadir = heating["coolantdatadir"] if "coolant_active" in heating: f2py_constants_module.coolant_active[:] = heating["coolant_active"] if "coolant_restart_mode" in heating: mode = heating["coolant_restart_mode"] if hasattr(uclchemwrap, "set_coolant_restart_mode_wrap"): uclchemwrap.set_coolant_restart_mode_wrap(mode) elif hasattr( getattr(uclchemwrap, "uclchemwrap", None), "set_coolant_restart_mode_wrap" ): uclchemwrap.uclchemwrap.set_coolant_restart_mode_wrap(mode) # --- Network state --- net = snapshot.get("network", {}) for array_name, array in net.items(): np.copyto(getattr(network_module, array_name), array)
def _pool_initializer(snapshot: dict[str, Any]) -> None: """``mp.Pool`` initializer that restores advanced settings in each worker. Usage:: snapshot = create_snapshot() mp.Pool(N, initializer=_pool_initializer, initargs=(snapshot,)) Parameters ---------- snapshot : dict[str, Any] Serialized worker-state snapshot passed to the subprocess on initialization. """ restore_snapshot(snapshot)