Skip to content
Snippets Groups Projects
MNP.py 9.78 KiB
import os
import shutil
import time
import traceback
import weakref
from concurrent.futures import ThreadPoolExecutor
from configparser import ConfigParser

from scipy.sparse import sparray

from mnp.config import MNPParameters
from mnp.evaluation.subselection_evaluation import (
    SubselectionEvaluation,
    create_output,
)
from mnp.evaluation.subselection_output import add_output_to_subselection
from mnp.preparation.aggregate_land_type_map import (
    aggregate_land_type_map,
    read_aggregated_map,
)
from mnp.preparation.filter_incomplete_cells import filter_incomplete_cells
from mnp.preparation.io_pathways import InputPathway, OutputPathway, verify_user_input
from mnp.preparation.read_rasters_from_cover import read_rasters_from_cover
from mnp.preparation.verify_spatial_data_coverage import verify_spatial_data_coverage
from mnp.species_models.species_model import (
    SpeciesModel,
    run_species_models,
)
from mnp.utils import (
    TEMP_DIR,
    add_dynamics_to_config,
    config_section_to_cover,
    copy_input_file,
    create_directories,
    geo_profile_from_hsi,
    get_logger,
    list_sources_destinations,
    log_file_to_cover,
    log_start_completed,
)

DATA_COVERAGE_TIF = "spatial_data_coverage.tif"


@log_start_completed
def prepare_input(parameters: MNPParameters, input_pathway: InputPathway) -> (
    dict[str, sparray] | None,
    dict[str, sparray] | None,
    dict[str, sparray] | None,
    set,
):
    """Subflow to prepare input for processing.
    - creates  directories
    - copies all input files
    - reads land type map and environmental factor maps
    - determines overlap between all provided rasters
    - creates geospatial profile when running with precalculated HSI rasters
    - makes the parameter database

    Parameters
    ----------
    config: ConfigParser
        Configuration for current run
    input_pathway: InputPathway
        the input pathway describing the operations to be done on the provided input

    Returns
    -------
    land_types:dict
        The land type map as a dictionary with land type codes as key
        and arrays as values
    environmentals
        Dictionary with the name of the environmental factor as keys
        and their corresponding arrays as values
    complying_species: set
        which species have all the needed information required for running
    parameters:dict
        databases containing all domain parameters, like species traits and group traits

    """
    # make
    create_directories(parameters.folders)

    # Compile sources and destinations of files to copy
    sources, destinations = list_sources_destinations(
        input_pathway, parameters.folders, parameters.config
    )
    with ThreadPoolExecutor(10) as executor:
        # Copy files from user input locations to  and verify afterwards
        list(executor.map(copy_input_file, sources, destinations))

    # Store global parameters section as csv
    config_section_to_cover(
        parameters.config, "parameters", parameters.folders["config"]
    )

    # Prepare geodata
    land_types, environmentals = None, None
    if not input_pathway.hsi_from_disk:
        # aggregate land type map if not already done (the task itself checks this)
        # and read from folder
        aggregate_land_type_map(parameters.folders)
        land_types = read_aggregated_map(folders=parameters.folders)

        # Verify spatial overlap of all input in the case of running with
        # environmental variables
        if input_pathway.has_environmentals:
            environmentals = read_rasters_from_cover(
                parameters.folders["environmentals"]
            )

            # Check if land types and environmentals have enough overlap (at least 95%)
            # and create spatial mask (=cells with a value in all rasters)
            verify_spatial_data_coverage(land_types, environmentals, parameters)

            # filter environmentals (keep only cells within spatial mask)
            environmentals = filter_incomplete_cells(
                environmentals,
                os.path.join(parameters.folders["spatial_mask"], DATA_COVERAGE_TIF),
            )

            # filter incomplete cells from land type map
            land_types = filter_incomplete_cells(
                land_types,
                os.path.join(parameters.folders["spatial_mask"], DATA_COVERAGE_TIF),
            )

    else:
        # Construct geospatial profile when using pre-calculated HSI maps.
        geo_profile_from_hsi(parameters.folders)

    return (
        land_types,
        environmentals,
    )


@log_start_completed
def evaluate_models(
    output_pathway: OutputPathway,
    parameters: MNPParameters,
    species_models: weakref.ReferenceType,
    land_types: dict[str, sparray],
):
    """Generate all additional output aside from the standard tables.

    Parameters
    ----------
    output_pathway: OutputPathway
        class describing which output to generate
    species_models: list[SpeciesModel]
        list with a model object for each species in this run
    subselection_evaluation: list[SubselectionEvaluation]
        list with an evaluation object for each species in this run
    parameters:dict
        databases containing all domain parameters, eg species traits and group traits
    land_types:dict
        The land type map as a dictionary with land type codes as key and arrays
        as values

    Returns
    -------

    """
    # Instantiate SubselectionEvaluation objects
    subselection_evaluations = [
        SubselectionEvaluation(name=name, species_codes=code_list)
        for name, code_list in parameters.valid_species_subselections.items()
    ]

    for evaluation in subselection_evaluations:
        add_output_to_subselection(
            evaluation=evaluation,
            output_pathway=output_pathway,
            parameters=parameters,
            species_models=species_models,
            land_types=land_types,
        )

    with ThreadPoolExecutor() as executor:
        list(executor.map(create_output, subselection_evaluations))
    return subselection_evaluations


def run_and_evaluate(
    land_types: dict[str, sparray] | None,
    environmentals: dict[str, sparray] | None,
    parameters: MNPParameters,
    output_pathway: OutputPathway,
) -> (list[SpeciesModel], dict[list[str:SpeciesModel]] or None):
    """Run and evaluate species models and evaluate species subselections.

    Running a model is:
    1. make HSI map
    2. do clustering (make populations)
    3. evaluate clusters

    Parameters
    ----------
    land_types:dict
        The land type map as a dictionary with land type codes as key
        and arrays as values
    environmentals
        Dictionary with the name of the environmental factor as keys and
        their corresponding arrays as values
    parameters

    Returns
    -------
    species_models: list[SpeciesModel]
        list with a model object for each species in this run
    subselection_evaluation: list[SubselectionEvaluation]
        list with an evaluation object for each species in this run

    """
    # Run species models
    species_models = run_species_models(
        species_models=list(
            SpeciesModel(species_code, parameters)
            for species_code in list(parameters.complying_species)
        ),
        land_types=land_types,
        environmentals=environmentals,
    )
    spmodel_ref = weakref.ref(species_models)

    # Generate all outputs besides tables for species subselections
    subselection_evaluations = evaluate_models(
        output_pathway=output_pathway,
        species_models=spmodel_ref,
        parameters=parameters,
        land_types=land_types,
    )
    del species_models
    assert spmodel_ref() is None
    return subselection_evaluations


def save_config_and_log(parameters: MNPParameters, config: ConfigParser):
    # Save used config to ini file
    with open(
        os.path.join(parameters.folders["meta"], "run_configuration.ini"), "w"
    ) as f:
        config.write(f)

    # Copy logging file to cover
    log_file_to_cover(parameters.folders)


def mnp(config: ConfigParser):
    """Run the Model for Nature Policy on the given configuration.

    Parameters
    ----------
    config: ConfigParser
        Configuration for current run

    Returns
    -------

    """
    print("\n\nStarting the grand Model for Nature Policy\n\n")
    start = time.time()

    # make temp directory
    if not os.path.exists(TEMP_DIR):
        os.mkdir(TEMP_DIR)

    parameters = None
    logger = get_logger()

    try:
        logger.info(
            f'Starting MNP run by {os.environ.get("username")} on '
            '{os.environ.get("computername")}.'
        )
        add_dynamics_to_config(config)
        input_pathway, output_pathway = verify_user_input(config)
        parameters = MNPParameters.from_configparser(config)
        (
            land_types,
            environmentals,
        ) = prepare_input(parameters, input_pathway)
        parameters.get_complying_species()
        parameters.get_valid_species_selections()

        parameters.read_geospatial_profile()

        # Run models
        subselection_evaluations = run_and_evaluate(
            land_types=land_types,
            environmentals=environmentals,
            parameters=parameters,
            output_pathway=output_pathway,
        )

    except Exception:
        tb = traceback.format_exc().splitlines()
        tb.reverse()
        lines = []
        for line in tb:
            if len(lines) < 3 and not line.strip().startswith("~"):
                lines.append(line)
        logger.error(f"{' : '.join(lines)}")
        subselection_evaluations = None

    if parameters is not None:
        save_config_and_log(parameters, config)
    logger.handlers = []
    shutil.rmtree(TEMP_DIR)

    print(f"run took {(time.time() - start) / 60:.2f} minutes")
    print("Thanks for using MNP")
    return subselection_evaluations