Something went wrong on our end
-
Roelofsen, Hans authoredRoelofsen, Hans authored
MNP.py 9.78 KiB
import os
import shutil
import time
import traceback
import weakref
from concurrent.futures import ThreadPoolExecutor
from configparser import ConfigParser
from scipy.sparse import sparray
from mnp.config import MNPParameters
from mnp.evaluation.subselection_evaluation import (
SubselectionEvaluation,
create_output,
)
from mnp.evaluation.subselection_output import add_output_to_subselection
from mnp.preparation.aggregate_land_type_map import (
aggregate_land_type_map,
read_aggregated_map,
)
from mnp.preparation.filter_incomplete_cells import filter_incomplete_cells
from mnp.preparation.io_pathways import InputPathway, OutputPathway, verify_user_input
from mnp.preparation.read_rasters_from_cover import read_rasters_from_cover
from mnp.preparation.verify_spatial_data_coverage import verify_spatial_data_coverage
from mnp.species_models.species_model import (
SpeciesModel,
run_species_models,
)
from mnp.utils import (
TEMP_DIR,
add_dynamics_to_config,
config_section_to_cover,
copy_input_file,
create_directories,
geo_profile_from_hsi,
get_logger,
list_sources_destinations,
log_file_to_cover,
log_start_completed,
)
DATA_COVERAGE_TIF = "spatial_data_coverage.tif"
@log_start_completed
def prepare_input(parameters: MNPParameters, input_pathway: InputPathway) -> (
dict[str, sparray] | None,
dict[str, sparray] | None,
dict[str, sparray] | None,
set,
):
"""Subflow to prepare input for processing.
- creates directories
- copies all input files
- reads land type map and environmental factor maps
- determines overlap between all provided rasters
- creates geospatial profile when running with precalculated HSI rasters
- makes the parameter database
Parameters
----------
config: ConfigParser
Configuration for current run
input_pathway: InputPathway
the input pathway describing the operations to be done on the provided input
Returns
-------
land_types:dict
The land type map as a dictionary with land type codes as key
and arrays as values
environmentals
Dictionary with the name of the environmental factor as keys
and their corresponding arrays as values
complying_species: set
which species have all the needed information required for running
parameters:dict
databases containing all domain parameters, like species traits and group traits
"""
# make
create_directories(parameters.folders)
# Compile sources and destinations of files to copy
sources, destinations = list_sources_destinations(
input_pathway, parameters.folders, parameters.config
)
with ThreadPoolExecutor(10) as executor:
# Copy files from user input locations to and verify afterwards
list(executor.map(copy_input_file, sources, destinations))
# Store global parameters section as csv
config_section_to_cover(
parameters.config, "parameters", parameters.folders["config"]
)
# Prepare geodata
land_types, environmentals = None, None
if not input_pathway.hsi_from_disk:
# aggregate land type map if not already done (the task itself checks this)
# and read from folder
aggregate_land_type_map(parameters.folders)
land_types = read_aggregated_map(folders=parameters.folders)
# Verify spatial overlap of all input in the case of running with
# environmental variables
if input_pathway.has_environmentals:
environmentals = read_rasters_from_cover(
parameters.folders["environmentals"]
)
# Check if land types and environmentals have enough overlap (at least 95%)
# and create spatial mask (=cells with a value in all rasters)
verify_spatial_data_coverage(land_types, environmentals, parameters)
# filter environmentals (keep only cells within spatial mask)
environmentals = filter_incomplete_cells(
environmentals,
os.path.join(parameters.folders["spatial_mask"], DATA_COVERAGE_TIF),
)
# filter incomplete cells from land type map
land_types = filter_incomplete_cells(
land_types,
os.path.join(parameters.folders["spatial_mask"], DATA_COVERAGE_TIF),
)
else:
# Construct geospatial profile when using pre-calculated HSI maps.
geo_profile_from_hsi(parameters.folders)
return (
land_types,
environmentals,
)
@log_start_completed
def evaluate_models(
output_pathway: OutputPathway,
parameters: MNPParameters,
species_models: weakref.ReferenceType,
land_types: dict[str, sparray],
):
"""Generate all additional output aside from the standard tables.
Parameters
----------
output_pathway: OutputPathway
class describing which output to generate
species_models: list[SpeciesModel]
list with a model object for each species in this run
subselection_evaluation: list[SubselectionEvaluation]
list with an evaluation object for each species in this run
parameters:dict
databases containing all domain parameters, eg species traits and group traits
land_types:dict
The land type map as a dictionary with land type codes as key and arrays
as values
Returns
-------
"""
# Instantiate SubselectionEvaluation objects
subselection_evaluations = [
SubselectionEvaluation(name=name, species_codes=code_list)
for name, code_list in parameters.valid_species_subselections.items()
]
for evaluation in subselection_evaluations:
add_output_to_subselection(
evaluation=evaluation,
output_pathway=output_pathway,
parameters=parameters,
species_models=species_models,
land_types=land_types,
)
with ThreadPoolExecutor() as executor:
list(executor.map(create_output, subselection_evaluations))
return subselection_evaluations
def run_and_evaluate(
land_types: dict[str, sparray] | None,
environmentals: dict[str, sparray] | None,
parameters: MNPParameters,
output_pathway: OutputPathway,
) -> (list[SpeciesModel], dict[list[str:SpeciesModel]] or None):
"""Run and evaluate species models and evaluate species subselections.
Running a model is:
1. make HSI map
2. do clustering (make populations)
3. evaluate clusters
Parameters
----------
land_types:dict
The land type map as a dictionary with land type codes as key
and arrays as values
environmentals
Dictionary with the name of the environmental factor as keys and
their corresponding arrays as values
parameters
Returns
-------
species_models: list[SpeciesModel]
list with a model object for each species in this run
subselection_evaluation: list[SubselectionEvaluation]
list with an evaluation object for each species in this run
"""
# Run species models
species_models = run_species_models(
species_models=list(
SpeciesModel(species_code, parameters)
for species_code in list(parameters.complying_species)
),
land_types=land_types,
environmentals=environmentals,
)
spmodel_ref = weakref.ref(species_models)
# Generate all outputs besides tables for species subselections
subselection_evaluations = evaluate_models(
output_pathway=output_pathway,
species_models=spmodel_ref,
parameters=parameters,
land_types=land_types,
)
del species_models
assert spmodel_ref() is None
return subselection_evaluations
def save_config_and_log(parameters: MNPParameters, config: ConfigParser):
# Save used config to ini file
with open(
os.path.join(parameters.folders["meta"], "run_configuration.ini"), "w"
) as f:
config.write(f)
# Copy logging file to cover
log_file_to_cover(parameters.folders)
def mnp(config: ConfigParser):
"""Run the Model for Nature Policy on the given configuration.
Parameters
----------
config: ConfigParser
Configuration for current run
Returns
-------
"""
print("\n\nStarting the grand Model for Nature Policy\n\n")
start = time.time()
# make temp directory
if not os.path.exists(TEMP_DIR):
os.mkdir(TEMP_DIR)
parameters = None
logger = get_logger()
try:
logger.info(
f'Starting MNP run by {os.environ.get("username")} on '
'{os.environ.get("computername")}.'
)
add_dynamics_to_config(config)
input_pathway, output_pathway = verify_user_input(config)
parameters = MNPParameters.from_configparser(config)
(
land_types,
environmentals,
) = prepare_input(parameters, input_pathway)
parameters.get_complying_species()
parameters.get_valid_species_selections()
parameters.read_geospatial_profile()
# Run models
subselection_evaluations = run_and_evaluate(
land_types=land_types,
environmentals=environmentals,
parameters=parameters,
output_pathway=output_pathway,
)
except Exception:
tb = traceback.format_exc().splitlines()
tb.reverse()
lines = []
for line in tb:
if len(lines) < 3 and not line.strip().startswith("~"):
lines.append(line)
logger.error(f"{' : '.join(lines)}")
subselection_evaluations = None
if parameters is not None:
save_config_and_log(parameters, config)
logger.handlers = []
shutil.rmtree(TEMP_DIR)
print(f"run took {(time.time() - start) / 60:.2f} minutes")
print("Thanks for using MNP")
return subselection_evaluations