Source code for uclchem.makerates.io_functions

##########################################################################################
"""
Functions to read in the species and reaction files and write output files
"""

import csv
import fileinput
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict

import numpy as np
import yaml

from uclchem.constants import PHYSICAL_PARAMETERS

from .network import Network
from .reaction import Reaction, reaction_types
from .species import Species


[docs] def read_species_file(file_name: Path) -> list[Species]: """Reads in a Makerates species file Args: fileName (str): path to file containing the species list Returns: list: List of Species objects """ species_list = [] # list to hold user defined bulk species (for adjusting binding energy) user_defined_bulk = [] with open(file_name, "r") as f: reader = csv.reader(f, delimiter=",", quotechar="|") for idx, row in enumerate(reader): try: if row[0] != "NAME" and "!" not in row[0]: if "@" in row[0]: user_defined_bulk.append(Species(row)) else: species_list.append(Species(row)) except IndexError as exc: print(f"Error reading species file {file_name} at line {idx}") raise exc return species_list, user_defined_bulk
[docs] def read_reaction_file( file_name: Path, species_list: list[Species], ftype: str ) -> tuple[list[Reaction], list[Reaction]]: """Reads in a reaction file of any kind (user, UMIST, KIDA) produces a list of reactions for the network, filtered by species_list Args: file_name (str): A file name for the reaction file to read. species_list (list[Species]): A list of chemical species to be used in the reading. ftype (str): 'UMIST','UCL', or 'KIDA' to describe format of file_name Returns: list,list: Lists of kept and dropped reactions. """ reactions = [] dropped_reactions = [] # Every reactant/product of a reaction must be in keep_list to not be dropped keep_list = ["", "NAN", "#", "*", "E-", "e-", "ELECTR", "@"] keep_list.extend(reaction_types) for species in species_list: keep_list.append(species.name) if ftype == "UMIST": with open(file_name, "r") as f: reader = csv.reader(f, delimiter=":", quotechar="|") for row in reader: if row[0].startswith("#") or row[0].startswith("!"): continue reaction_row = row[2:4] + [""] + row[4:8] + row[9:14] + [""] if check_reaction(reaction_row, keep_list): reactions.append(Reaction(reaction_row, reaction_source="UMIST")) elif ftype == "UCL": with open(file_name, "r") as f: reader = csv.reader(f, delimiter=",", quotechar="|") for row in reader: if (len(row) > 1) and (row[0][0] != "!"): if check_reaction(row, keep_list): reactions.append(Reaction(row, reaction_source="UCL")) else: dropped_reactions.append(row) elif ftype == "KIDA": for row in kida_parser(file_name): if check_reaction(row, keep_list): reactions.append(Reaction(row, reaction_source="KIDA")) else: raise ValueError("Reaction file type must be one of 'UMIST', 'UCL' or 'KIDA'") return reactions, dropped_reactions
[docs] def check_reaction(reaction_row, keep_list) -> bool: """Checks a row parsed from a reaction file and checks it only contains acceptable things. It checks if all species in the reaction are present, and adds the temperature range is none is specified. Args: reaction_row (list): List parsed from a reaction file and formatted to be able to called Reaction(reaction_row) keep_list (list): list of elements that are acceptable in the reactant or product bits of row Returns: bool: Whether the row contains acceptable entries. """ if all(x.upper() in keep_list for x in reaction_row[0:7]): if reaction_row[10] == "": reaction_row[10] = 0.0 reaction_row[11] = 10000.0 if reaction_row[12] == "": reaction_row[12] = 0.0 return True else: if reaction_row[1] in ["DESORB", "FREEZE"]: reac_error = "Desorb or freeze reaction in custom input contains species not in species list" reac_error += f"\nReaction was {reaction_row}" raise ValueError(reac_error) return False
[docs] def kida_parser(kida_file): """ KIDA used a fixed format file so we read each line in the chunks they specify and use python built in classes to convert to the necessary types. NOTE KIDA defines some of the same reaction types to UMIST but with different names and coefficients. We fix that by converting them here. """ def str_parse(x): return str(x).strip().upper() kida_contents = [ [3, {str_parse: 11}], [1, {"skip": 1}], [5, {str_parse: 11}], [1, {"skip": 1}], [3, {float: 10, "skip": 1}], [1, {"skip": 27}], [2, {int: 6, "skip": 1}], [1, {int: 2}], [1, {"skip": 11}], ] rows = [] with open(kida_file, "r") as f: f.readline() # throw away header for line in f: # then iterate over file if line.startswith("!"): continue row = [] for item in kida_contents: for i in range(item[0]): for func, count in item[1].items(): if func != "skip": a = line[:count] row.append(func(a)) line = line[count:] # Some reformatting required # KIDA gives CRP reactions in different units to UMIST if row[-1] == 1: # Amazingly both UMIST and KIDA use CRP but differently. # Translate KIDA names to UMIST if row[1] == "CRP": row[1] = "CRPHOT" # with beta=0 and gamma=1, the KIDA formulation of # CRPHOT reactions becomes the UMIST one row[10] = 1.0 elif row[1] == "CR": row[1] = "CRP" # UMIST alpha includes zeta_0 but KIDA doesn't. Since UCLCHEM # rate calculation follows UMIST, we convert. row[8] = row[8] * 1.36e-17 rows.append(row[:7] + row[8:-1]) elif row[-1] in [2, 3]: rows.append(row[:7] + row[8:-1]) elif row[-1] == 4: row[2] = "IONOPOL1" rows.append(row[:7] + row[8:-1]) elif row[-1] == 5: row[2] = "IONOPOL2" rows.append(row[:7] + row[8:-1]) return rows
[docs] def read_grain_assisted_recombination_file(file_name: Path) -> dict: with open(file_name, "r") as fh: gar_parameters = yaml.safe_load(fh) return gar_parameters
[docs] def output_drops( dropped_reactions: list[Reaction], output_dir: str = None, write_files: bool = True ): """Writes the reactions that are dropped to disk/logs Args: dropped_reactions (list[Reaction]): The reactions that were dropped output_dir (str): The directory that dropped_reactions.csv will be written to. write_files (bool, optional): Whether or not to write the file. Defaults to True. """ if output_dir is None: output_dir = "." outputFile = Path(output_dir) / "dropped_reactions.csv" # Print dropped reactions from grain file or write if many if write_files and dropped_reactions: logging.info(f"\nReactions dropped from grain file written to {outputFile}\n") with open(outputFile, "w") as f: writer = csv.writer( f, delimiter=",", quotechar="|", quoting=csv.QUOTE_MINIMAL, lineterminator="\n", ) for reaction in dropped_reactions: writer.writerow(reaction) else: logging.info("Reactions dropped from grain file:\n") for reaction in dropped_reactions: logging.info(reaction)
[docs] def write_outputs( network: Network, output_dir: str = None, rates_to_disk: bool = False, gar_database: dict[str, np.array] = None, ) -> None: """Write the ODE and Network fortran source files to the fortran source. Args: network (network): The makerates Network class output_dir (bool): The directory to write to. """ if output_dir is None: output_dir = Path("../src/uclchem") fortran_src_dir = Path("../src/fortran_src") else: output_dir = Path(output_dir) fortran_src_dir = Path(output_dir) # Create the species file filename = output_dir / "species.csv" write_species(filename, network.get_species_list()) filename = output_dir / "reactions.csv" write_reactions(filename, network.get_reaction_list()) # Write the ODEs in the appropriate language format filename = fortran_src_dir / "odes.f90" write_odes_f90( filename, network.get_species_list(), network.get_reaction_list(), rates_to_disk=rates_to_disk, ) # Write the network files filename = fortran_src_dir / "network.f90" write_network_file( filename, network, rates_to_disk=rates_to_disk, gar_database=gar_database ) # write the constants needed for wrap.f90 filename = fortran_src_dir / "f2py_constants.f90" f2py_constants = { "n_species": len(network.get_species_list()), "n_reactions": len(network.get_reaction_list()), "n_physical_parameters": len(PHYSICAL_PARAMETERS), } write_f90_constants(f2py_constants, filename) # Write some meta information that can be used to read back in the reactions into Python # TODO: we can update this s.t. we can directly access the f90 constants from the f2py wrapped Fortran codes write_python_constants(f2py_constants, "../src/uclchem/constants.py")
[docs] def write_f90_constants( replace_dict: Dict[str, int], output_file_name: Path, template_file_path: Path = "fortran_templates", ) -> None: """Write the physical reactions to the f2py_constants.f90 file after every run of makerates, this ensures the Fortran and Python bits are compatible with one another. Args: replace_dict (Dict[str, int]): The dictionary with keys to replace and their values output_file_name (Path): The path to the target f2py_constants.f90 file template_file_path (Path, optional): The file to use as the template. Defaults to "fortran_templates". """ _ROOT = Path(__file__).parent template_file_path = _ROOT / template_file_path with open(template_file_path / "f2py_constants.f90", "r") as fh: constants = fh.read() constants = constants.format(**replace_dict) with open(output_file_name, "w") as fh: fh.writelines(constants)
[docs] def write_python_constants( replace_dict: Dict[str, int], python_constants_file: Path ) -> None: """Function to write the python constants to the constants.py file after every run, this ensure the Python and Fortran bits are compatible with one another. Args: replace_dict (Dict[str, int]]): Dict with keys to replace and their values python_constants_file (Path): Path to the target constant files. """ with fileinput.input(python_constants_file, inplace=True, backup=".bak") as file: for line in file: # Add a timestamp to the file before the old one: if fileinput.isfirstline(): print( "# This file was machine generated with Makerates on", datetime.now(), end="\n", ) # Don't copy the old timestamp into the new file. if line.startswith( "# This file was machine generated with Makerates on" ): continue # For every line, try to find constants, if we find them, replace them, # if not, just print the line. hits = { constant: line.strip().startswith(constant) for constant in replace_dict } if any(hits.values()): # Filter, also we can only get one hit at a time variable = list(filter(hits.get, hits))[0] print(f"{variable} = {replace_dict[variable]}") else: print(line, end="")
[docs] def write_species(file_name: Path, species_list: list[Species]) -> None: """Write the human readable species file. Note UCLCHEM doesn't use this file. Args: fileName (str): path to output file species_list (list): List of species objects for network """ species_columns = [ "NAME", "MASS", "BINDING ENERGY", "SOLID FRACTION", "MONO FRACTION", "VOLCANO FRACTION", "ENTHALPY", ] with open(file_name, "w") as f: writer = csv.writer( f, delimiter=",", quotechar="|", quoting=csv.QUOTE_MINIMAL, lineterminator="\n", ) writer.writerow(species_columns) for species in species_list: writer.writerow( [ species.name, species.mass, species.binding_energy, species.solidFraction, species.monoFraction, species.volcFraction, species.enthalpy, ] )
# Write the reaction file in the desired format
[docs] def write_reactions(fileName, reaction_list) -> None: """Write the human readable reaction file. Args: fileName (str): path to output file reaction_list (list): List of reaction objects for network """ reaction_columns = [ "Reactant 1", "Reactant 2", "Reactant 3", "Product 1", "Product 2", "Product 3", "Product 4", "Alpha", "Beta", "Gamma", "T_min", "T_max", "reduced_mass", "extrapolate", ] with open(fileName, "w") as f: writer = csv.writer( f, delimiter=",", quotechar="|", quoting=csv.QUOTE_MINIMAL, lineterminator="\n", ) writer.writerow(reaction_columns) for reaction in reaction_list: writer.writerow( reaction.get_reactants() + reaction.get_products() + [ reaction.get_alpha(), reaction.get_beta(), reaction.get_gamma(), reaction.get_templow(), reaction.get_temphigh(), reaction.get_reduced_mass(), reaction.get_extrapolation(), ] )
[docs] def write_odes_f90( file_name: Path, species_list: list[Species], reaction_list: list[Reaction], rates_to_disk: bool = False, ) -> None: """Write the ODEs in Modern Fortran. This is an actual code file. Args: file_name (str): Path to file where code will be written species_list (list): List of species describing network reaction_list (list): List of reactions describing network """ # First generate ODE contributions for all reactions species_names = [spec.name for spec in species_list] [ logging.debug(f"{species_names.index(specie) + 1}:{specie}") for specie in species_list ] for i, reaction in enumerate(reaction_list): logging.debug(f"RATE({i + 1}):{reaction}") reaction.generate_ode_bit(i, species_names) # then create ODE code and write to file. with open(file_name, mode="w") as output: # go through every species and build two strings, one with eq for all destruction routes and one for all formation ydotString = build_ode_string(species_list, reaction_list, rates_to_disk) output.write(ydotString)
[docs] def write_jacobian(file_name: Path, species_list: list[Species]) -> None: """Write jacobian in Modern Fortran. This has never improved UCLCHEM's speed and so is not used in the code as it stands. Current only works for three phase model. Args: file_name (str): Path to jacobian file species_list (species_list): List of species AFTER being processed by build_ode_string """ output = open(file_name, "w") species_names = "" for i, species in enumerate(species_list): species_names += species.name losses = species.losses.split("+") gains = species.gains.split("+") for j in range(1, len(species_list) + 1): if species.name == "SURFACE": di_dj = f"J({i + 1},{j})=SUM(J(surfaceList,{j}))\n" output.write(di_dj) elif species.name == "BULK": if species_names.count("@") > 0: di_dj = f"J({i + 1},{j})=SUM(J(bulkList,{j}))\n" output.write(di_dj) else: # every time an ode bit has our species in it, we remove it (dy/dx=a for y=ax) di_dj = [ f"-{x}".replace(f"*Y({j})", "", 1) for x in losses if f"*Y({j})" in x ] di_dj += [ f"+{x}".replace(f"*Y({j})", "", 1) for x in gains if f"*Y({j})" in x ] # of course there might be y=a*x*x so we only replace first instance and if there's still an instance # we put a factor of two in since dy/dx=2ax for y=a*x*x di_dj = [x + "*2" if f"*Y({j})" in x else x for x in di_dj] # safeMantle is a stand in for the surface so do it manually here # since it's divided by safemantle, derivative is negative so sign flips and we get another factor of 1/safeMantle if species_list[j - 1].name == "SURFACE": di_dj = [f"+{x}/safeMantle" for x in losses if "/safeMantle" in x] di_dj += [f"-{x}/safeMantle" for x in gains if "/safeMantle" in x] if len(di_dj) > 0: di_dj = f"J({i + 1},{j})=" + "".join(di_dj) + "\n" output.write(di_dj) # tackle density separately.handled j = j + 1 if species.name == "SURFACE": di_dj = f"J({i + 1},{j})=SUM(J(surfaceList,{j}))\n" output.write(di_dj) elif species.name == "BULK": if species_names.count("@") > 0: di_dj = f"J({i + 1},{j})=SUM(J(bulkList,{j}))\n" output.write(di_dj) else: di_dj = [f"-{x}".replace("*D", "", 1) for x in losses if "*D" in x] di_dj += [f"+{x}".replace("*D", "", 1) for x in gains if "*D" in x] di_dj = [x + "*2" if "*D" in x else x for x in di_dj] if len(di_dj) > 0: di_dj = f"J({i + 1},{j})=" + ("".join(di_dj)) + "\n" output.write(di_dj) i = i + 2 di_dj = f"J({i},{i})=ddensdensdot(D)\n" output.write(di_dj) output.close()
[docs] def build_ode_string( species_list: list[Species], reaction_list: list[Reaction], rates_to_disk: bool = False, ) -> str: """A long, complex function that does the messy work of creating the actual ODE code to calculate the rate of change of each species. Test any change to this code thoroughly because ODE mistakes are very hard to spot. Args: species_list (list): List of species in network reaction_list (list): List of reactions in network rates_to_disk (bool): Enable the writing of the rates to the disk. Returns: str: One long string containing the entire ODE fortran code. """ # We create a string of losses and gains for each species so initialize them all as "" species_names = [] for i, species in enumerate(species_list): species_names.append(species.name) species.losses = "" species.gains = "" bulk_index = species_names.index("BULK") surface_index = species_names.index("SURFACE") total_swap = "" for i, reaction in enumerate(reaction_list): for species in reaction.get_reactants(): if species in species_names: # Eley-Rideal reactions take a share of total freeze out rate which is already accounted for # so we add as a loss term to the frozen version of the species rather than the gas version if (reaction.get_reaction_type() == "ER") and ( not species_list[species_names.index(species)].is_surface_species() ): species_list[ species_names.index("#" + species) ].losses += reaction.ode_bit else: species_list[ species_names.index(species) ].losses += reaction.ode_bit if reaction.get_reaction_type() == "BULKSWAP": total_swap += reaction.ode_bit for species in reaction.get_products(): if species in species_names: species_list[species_names.index(species)].gains += reaction.ode_bit ode_string = """MODULE ODES USE constants USE network IMPLICIT NONE CONTAINS SUBROUTINE GETYDOT(RATE, Y, bulkLayersReciprocal, surfaceCoverage, safeMantle, safebulk, D, YDOT) REAL(dp), INTENT(IN) :: RATE(:), Y(:), bulkLayersReciprocal, safeMantle, safebulk, D REAL(dp), INTENT(INOUT) :: YDOT(:), surfaceCoverage REAL(dp) :: totalSwap, LOSS, PROD """ # Add a logical to determine whether we can write the reaction rates in realtime ode_string += truncate_line(f"totalSwap={total_swap[1:]}\n\n") # First get total rate of change of bulk and surface by adding ydots for n, species in enumerate(species_list): if species.name[0] == "@": species_list[bulk_index].gains += f"+YDOT({n + 1})" elif species.name[0] == "#": species_list[surface_index].gains += f"+YDOT({n + 1})" if rates_to_disk: for n, reaction in enumerate(reaction_list): ode_string += truncate_line(f"REACTIONRATE({n + 1})={reaction.ode_bit}\n") for n, species in enumerate(species_list): ydot_string = species_ode_string(n, species) ode_string += ydot_string ode_string += f" SURFGROWTHUNCORRECTED = YDOT({surface_index + 1})\n" # now add bulk transfer to rate of change of surface species after they've already been calculated ode_string += "!Update surface species for bulk growth, replace surfaceCoverage with alpha_des\n" ode_string += ( "!Since ydot(surface_index) is negative, bulk is lost and surface forms\n" ) ode_string += f"IF (YDOT({surface_index + 1}) .lt. 0) THEN\n surfaceCoverage = MIN(1.0,safeBulk/safeMantle)\n" surf_species = [ i for i in species_list if i.name not in ["SURFACE", "BULK"] and i.is_surface_species() ] i = len(reaction_list) j = len(reaction_list) + len(surf_species) for n, species in enumerate(species_list): if species.name[0] == "#": i += 1 j += 1 bulk_partner = species_names.index(species.name.replace("#", "@")) if rates_to_disk: ode_string += f" REACTIONRATE({i}) = -YDOT({surface_index + 1})*surfaceCoverage*Y({bulk_partner + 1})/safeBulk\n" ode_string += f" REACTIONRATE({j}) = 0.0\n" if not species_list[bulk_partner].is_refractory: ode_string += f" YDOT({n + 1})=YDOT({n + 1})-YDOT({surface_index + 1})*surfaceCoverage*Y({bulk_partner + 1})/safeBulk\n" if species.name[0] == "@": if not species.is_refractory: ode_string += f" YDOT({n + 1})=YDOT({n + 1})+YDOT({surface_index + 1})*surfaceCoverage*Y({n + 1})/safeBulk\n" ode_string += "ELSE\n" i = len(reaction_list) j = len(reaction_list) + len(surf_species) for n, species in enumerate(species_list): if species.name[0] == "@": i += 1 j += 1 surface_version = species_names.index(species.name.replace("@", "#")) if rates_to_disk: ode_string += f" REACTIONRATE({i}) = 0.0\n" ode_string += f" REACTIONRATE({j}) = -YDOT({surface_index + 1})*surfaceCoverage*Y({surface_version + 1})\n" ode_string += f" YDOT({n + 1})=YDOT({n + 1})+YDOT({surface_index + 1})*surfaceCoverage*Y({surface_version + 1})\n" if species.name[0] == "#": ode_string += f" YDOT({n + 1})=YDOT({n + 1})-YDOT({surface_index + 1})*surfaceCoverage*Y({n + 1})\n" ode_string += "ENDIF\n" # once bulk transfer has been added, odes for bulk and surface must be updated to account for it ode_string += "!Update total rate of change of bulk and surface for bulk growth\n" ode_string += species_ode_string(bulk_index, species_list[bulk_index]) ode_string += species_ode_string(surface_index, species_list[surface_index]) ode_string += """ END SUBROUTINE GETYDOT END MODULE ODES""" return ode_string
[docs] def species_ode_string(n: int, species: Species) -> str: """Build the string of Fortran code for a species once it's loss and gains strings have been produced. Args: n (int): Index of species in python format species (Species): species object Returns: str: the fortran code for the rate of change of the species """ ydot_string = "" if species.losses != "": loss_string = " LOSS = " + species.losses[1:] + "\n" ydot_string += loss_string if species.gains != "": prod_string = " PROD = " + species.gains[1:] + "\n" ydot_string += prod_string if ydot_string != "": ydot_string += f" YDOT({n + 1}) = " # start with empty string and add production and loss terms if they exists if species.gains != "": ydot_string += "PROD" if species.losses != "": ydot_string += "-LOSS" ydot_string += "\n" else: ydot_string = f" YDOT({n + 1}) = {0.0}\n" ydot_string = truncate_line(ydot_string) return ydot_string
[docs] def write_evap_lists(network_file, species_list: list[Species]) -> int: """Two phase networks mimic episodic thermal desorption seen in lab (see Viti et al. 2004) by desorbing fixed fractions of material at specific temperatures. Three phase networks just use binding energy and that fact we set binding energies in bulk to water by default. This function writes all necessary arrays to the network file so these processes work. Args: network_file (file): Open file object to which the network code is being written species_list (list[Species]): List of species in network """ gasIceList = [] surfacelist = [] solidList = [] monoList = [] volcList = [] binding_energyList = [] enthalpyList = [] bulkList = [] iceList = [] refractoryList = [] species_names = [spec.name for spec in species_list] for i, species in enumerate(species_list): if species.name[0] == "#": # find gas phase version of grain species. For #CO it looks for first species in list with just CO and then finds the index of that try: j = species_names.index(species.get_desorb_products()[0]) except ValueError: error = f"{species.name} desorbs as {species.get_desorb_products()[0]}" error += "which is not in species list. This desorption is likely user defined.\n" error += "Please amend the desorption route in your reaction file and re-run Makerates" raise NameError(error) # plus ones as fortran and python label arrays differently surfacelist.append(i + 1) gasIceList.append(j + 1) solidList.append(species.solidFraction) monoList.append(species.monoFraction) volcList.append(species.volcFraction) iceList.append(i + 1) binding_energyList.append(species.binding_energy) enthalpyList.append(species.enthalpy) elif species.name[0] == "@": j = species_names.index(species.get_desorb_products()[0]) gasIceList.append(j + 1) bulkList.append(i + 1) iceList.append(i + 1) binding_energyList.append(species.binding_energy) enthalpyList.append(species.enthalpy) if species.is_refractory: refractoryList.append(i + 1) # dummy index that will be caught by UCLCHEM if len(refractoryList) == 0: refractoryList = [-999] network_file.write(array_to_string("surfaceList", surfacelist, type="int")) if len(bulkList) > 0: network_file.write(array_to_string("bulkList", bulkList, type="int")) network_file.write(array_to_string("iceList", iceList, type="int")) network_file.write(array_to_string("gasIceList", gasIceList, type="int")) network_file.write(array_to_string("solidFractions", solidList, type="float")) network_file.write(array_to_string("monoFractions", monoList, type="float")) network_file.write(array_to_string("volcanicFractions", volcList, type="float")) network_file.write( array_to_string( "bindingEnergy", binding_energyList, type="float", parameter=False ) ) network_file.write(array_to_string("formationEnthalpy", enthalpyList, type="float")) network_file.write(array_to_string("refractoryList", refractoryList, type="int")) return len(iceList)
[docs] def truncate_line(input_string: str, lineLength: int = 72) -> str: """Take a string and adds line endings at regular intervals keeps us from overshooting fortran's line limits and, frankly, makes for nicer ode.f90 even if human readability isn't very important Args: input_string (str): Line of code to be truncated lineLength (int, optional): rough line length. Defaults to 72. Returns: str: Code string with line endings at regular intervals """ result = "" i = 0 j = 0 # we only want to split at operators to make it look nice splits = ["*", ")", "+", ","] while len(input_string[i:]) > lineLength: j = i + lineLength if "\n" in input_string[i:j]: j = input_string[i:j].index("\n") + i + 1 result += input_string[i:j] else: while input_string[j] not in splits: j = j - 1 result += input_string[i:j] + "&\n &" i = j result += input_string[i:] return result
[docs] def write_network_file(file_name: Path, network: Network, rates_to_disk: bool = False, gar_database=None): """Write the Fortran code file that contains all network information for UCLCHEM. This includes lists of reactants, products, binding energies, formationEnthalpies and so on. Args: file_name (str): The file name where the code will be written. network (Network): A Network object built from lists of species and reactions. """ species_list = network.get_species_list() reaction_list = network.get_reaction_list() openFile = open(file_name, "w") openFile.write("MODULE network\nUSE constants\nIMPLICIT NONE\n") # write arrays of all species stuff names = [] atoms = [] masses = [] for species in species_list: names.append(species.name) masses.append(float(species.mass)) atoms.append(species.n_atoms) speciesIndices = "" for name, species_index in network.species_indices.items(): speciesIndices += "{0}={1},".format(name, species_index) if len(speciesIndices) > 72: speciesIndices = truncate_line(speciesIndices) speciesIndices = speciesIndices[:-1] + "\n" openFile.write(" INTEGER(dp), PARAMETER ::" + speciesIndices) openFile.write(" LOGICAL, PARAMETER :: THREE_PHASE = .TRUE.\n") openFile.write(" REAL(dp) :: SURFGROWTHUNCORRECTED\n") openFile.write(array_to_string(" specname", names, type="string")) openFile.write(array_to_string(" mass", masses, type="float")) openFile.write(array_to_string(" atomCounts", atoms, type="int")) # then write evaporation stuff n_ice_species = write_evap_lists(openFile, species_list) # finally all reactions reactant1 = [] reactant2 = [] reactant3 = [] prod1 = [] prod2 = [] prod3 = [] prod4 = [] alpha = [] beta = [] gama = [] reacTypes = [] # duplicates = [] tmins = [] tmaxs = [] reduced_masses = [] extrapolations = [] # store important reactions reaction_indices = "" for reaction, index in network.important_reactions.items(): reaction_indices += reaction + f"={index}," reaction_indices = truncate_line(reaction_indices[:-1]) + "\n" openFile.write(" INTEGER(dp), PARAMETER ::" + reaction_indices) for i, reaction in enumerate(reaction_list): reactant1.append(find_reactant(names, reaction.get_reactants()[0])) reactant2.append(find_reactant(names, reaction.get_reactants()[1])) reactant3.append(find_reactant(names, reaction.get_reactants()[2])) prod1.append(find_reactant(names, reaction.get_products()[0])) prod2.append(find_reactant(names, reaction.get_products()[1])) prod3.append(find_reactant(names, reaction.get_products()[2])) prod4.append(find_reactant(names, reaction.get_products()[3])) alpha.append(reaction.get_alpha()) beta.append(reaction.get_beta()) gama.append(reaction.get_gamma()) # if reaction.duplicate: # duplicates.append(i + 1) tmaxs.append(reaction.get_temphigh()) tmins.append(reaction.get_templow()) reduced_masses.append(reaction.get_reduced_mass()) reacTypes.append(reaction.get_reaction_type()) extrapolations.append(reaction.get_extrapolation()) # if len(duplicates) == 0: # duplicates = [9999] # tmaxs = [0] # tmins = [0] reaction_names = [] for n, reaction in enumerate(reaction_list): reaction_names.append(str(reaction)) for n, species in enumerate(species_list): if species.is_surface_species() and species.name not in ["SURFACE", "BULK"]: reaction_name = f"{species.name} + SURFACETRANSFER -> @{species.name[1:]}" reaction_names.append(reaction_name) for n, species in enumerate(species_list): if species.is_surface_species() and species.name not in ["SURFACE", "BULK"]: reaction_name = f"@{species.name[1:]} + SURFACETRANSFER -> {species.name}" reaction_names.append(reaction_name) if rates_to_disk: openFile.write( f" REAL(dp) :: REACTIONRATE({len(reactant1) + n_ice_species})\n" ) openFile.write(" LOGICAL :: ReactionRatesToDisk=.true.\n") else: openFile.write(" REAL(dp) :: REACTIONRATE(1)\n") openFile.write(" LOGICAL :: ReactionRatesToDisk=.false.\n") openFile.write(array_to_string("\tre1", reactant1, type="int")) openFile.write(array_to_string("\tre2", reactant2, type="int")) openFile.write(array_to_string("\tre3", reactant3, type="int")) openFile.write(array_to_string("\tp1", prod1, type="int")) openFile.write(array_to_string("\tp2", prod2, type="int")) openFile.write(array_to_string("\tp3", prod3, type="int")) openFile.write(array_to_string("\tp4", prod4, type="int")) openFile.write(array_to_string("\talpha", alpha, type="float", parameter=False)) openFile.write(array_to_string("\tbeta", beta, type="float", parameter=False)) openFile.write(array_to_string("\tgama", gama, type="float", parameter=False)) # openFile.write(array_to_string("\tduplicates", duplicates, type="int", parameter=True)) openFile.write(array_to_string("\tminTemps", tmins, type="float", parameter=True)) openFile.write(array_to_string("\tmaxTemps", tmaxs, type="float", parameter=True)) openFile.write( array_to_string("\treducedMasses", reduced_masses, type="float", parameter=True) ) openFile.write( array_to_string( "\tExtrapolateRates", extrapolations, type="logical", parameter=True ) ) reacTypes = np.asarray(reacTypes) partners = get_desorption_freeze_partners(reaction_list) openFile.write( array_to_string("\tfreezePartners", partners, type="int", parameter=True) ) openFile.write(array_to_string("\t garParams", np.array(list(gar_database.values())) if gar_database else np.zeros((1,7)), type="float", parameter=True)) for reaction_type in reaction_types + ["TWOBODY"]: list_name = reaction_type.lower() + "Reacs" indices = np.where(reacTypes == reaction_type)[0] if len(indices > 1): indices = [indices[0] + 1, indices[-1] + 1] else: # We still want a dummy array if the reaction type isn't in network indices = [99999, 99999] openFile.write( array_to_string("\t" + list_name, indices, type="int", parameter=True) ) openFile.write("END MODULE network") openFile.close()
[docs] def find_reactant(species_list: list[str], reactant: str) -> int: """Try to find a reactant in the species list Args: species_list (list[str]): A list of species in the network reactant (str): The reactant to be indexed Returns: int: The index of the reactant, if it is not found, 9999 """ try: return species_list.index(reactant) + 1 except ValueError: return 9999
[docs] def get_desorption_freeze_partners(reaction_list: list[Reaction]) -> list[Reaction]: """Every desorption has a corresponding freeze out eg desorption of #CO and freeze of CO. This find the corresponding freeze out for every desorb so that when desorb>>freeze we can turn off freeze out in UCLCHEM. Args: reaction_list (list): Reactions in network Returns: list: list of indices of freeze out reactions matching order of desorptions. """ freeze_species = [ x.get_products()[0] for x in reaction_list if x.get_reactants()[1] == "DESCR" ] partners = [] for spec in freeze_species: for i, reaction in enumerate(reaction_list): if reaction.get_reaction_type() == "FREEZE": if reaction.get_reactants()[0] == spec: partners.append(i + 1) break return partners
[docs] def array_to_string( name: str, array: np.array, type: str = "int", parameter: bool = True ) -> str: """Write an array to fortran source code Args: name (str): Variable name of array in Fortran array (iterable): List of values of array type (str, optional): The array's type. Must be one of "int","float", or "string".Defaults to "int". parameter (bool, optional): Whether the array is a Fortran PARAMETER (constant). Defaults to True. Raises: ValueError: Raises an error if type isn't "int","float", or "string" Returns: str: String containing the Fortran code to declare this array. """ # Check for 2D array arr = np.array(array) if arr.ndim == 2: shape = arr.shape flat = arr.flatten(order="F") if type == "int": dtype = "INTEGER(dp)" values = ",".join(str(int(v)) for v in flat) elif type == "float": dtype = "REAL(dp)" values = ",".join("{0:.4e}".format(float(v)) for v in flat) elif type == "string": strLength = len(max(flat, key=len)) dtype = f"CHARACTER(Len={strLength})" values = ",".join('"' + str(v).ljust(strLength) + '"' for v in flat) elif type == "logical": dtype = "LOGICAL(dp)" values = ",".join(f".{str(v).upper()}." for v in flat) else: raise ValueError("Not a valid type for array to string") param_str = ", PARAMETER" if parameter else "" outString = f"{dtype}{param_str} :: {name}({','.join(str(s) for s in shape)}) = RESHAPE((/ {values} /), (/ {', '.join(str(s) for s in shape)} /))\n" outString = truncate_line(outString) return outString else: if parameter: outString = ", PARAMETER :: " + name + " ({0})=(/".format(len(arr)) else: outString = " :: " + name + " ({0})=(/".format(len(arr)) if type == "int": outString = "INTEGER(dp)" + outString for value in arr: outString += "{0},".format(value) elif type == "float": outString = "REAL(dp)" + outString for value in arr: outString += "{0:.4e},".format(value) elif type == "string": strLength = len(max(arr, key=len)) outString = "CHARACTER(Len={0:.0f})".format(strLength) + outString for value in arr: outString += '"' + value.ljust(strLength) + '",' elif type == "logical": outString = "LOGICAL(dp)" + outString for value in arr: outString += ".{0}.,".format(value) else: raise ValueError("Not a valid type for array to string") outString = outString[:-1] + "/)\n" outString = truncate_line(outString) return outString