Commit 220c9cb0 authored by Roelofsen, Hans's avatar Roelofsen, Hans
Browse files

update Species Lst. Bug fix in ab_opt.remapBTMap

parent 756bc7de
......@@ -17,7 +17,10 @@ class TabularSpecies:
# De volledige Species tabel als Pandas dataframe
benb_dir = pathlib.Path(filepath.parents[1])
self.df = pd.read_csv(os.path.join(benb_dir, r'resources\mnp_species.csv'), sep=',', comment='#')
self.df = pd.read_csv(os.path.join(benb_dir, r'resources\mnp_species.csv'), sep=',', comment='#',
encoding='cp1252')
#TODO: soortenlijst N-gevoelige soorten (179 stuks) toevoegen aan mnp_species.csv en labellen als dusdanig
# Dictionaries tussen code, lokale naam, wetenschappelijke naam
self.code2local = dict(zip(self.df.Species_code, self.df.Local_name))
......@@ -42,7 +45,8 @@ class TabularSpecies:
self.code2sel281 = dict(zip(self.df.Species_code, self.df.sel281.map({1: True, 0: False})))
self.code2sel146 = dict(zip(self.df.Species_code, self.df.sel146.map({1: True, 0: False})))
# Additional data sources
# Additional data sources. Note that population factors are not defined for all species
# TODO: merge deze tabel met self.df ?
self.pop_factors_src = r'W:\PROJECTS\QMAR\MNP-SNL-ParameterSet\Parameters_v05_2021_04_08\07_MNP_versie4_par_population_factors.csv'
self.pop_factors = pd.read_csv(self.pop_factors_src, sep=',')
self.code2local_distance = dict(zip(self.pop_factors.Species_code, self.pop_factors.Local_distance))
......@@ -79,8 +83,12 @@ class IndividualSpecies:
self.scientific = getattr(species_tab, '{}2scientific'.format(typer))[x] # Species Scientific name
self.taxondict = {'S02': 'V', 'S06': 'E', 'S09': 'P'}
self.taxon = self.taxondict[self.code[:3]]
self.key_area = species_tab.code2key_area[self.code]
self.local_distance = species_tab.code2local_distance[self.code]
try:
self.key_area = species_tab.code2key_area[self.code]
self.local_distance = species_tab.code2local_distance[self.code]
except KeyError:
self.key_area = None
self.local_distance = None
if not brief:
# Set all relevant attributes
......@@ -109,6 +117,15 @@ class IndividualSpecies:
# limits = np.array(getattr(self, '{}_response_src'.format(abiotic)).loc[self.code])
return [0, 0.5, 1, 0.5, 0][int(np.digitize(x, abiotic_limits))]
def response_to_beheertype(self, beheertype):
"""
return Landtype Quality (aka draagkracht of self towards a beheertype
:param beheertype:
:return: land type quality [0-1]
"""
# TODO (20220323) Vergelijk met ./dkq.Draagkracht.query4species()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
......
......@@ -3,12 +3,14 @@ Find value of Abiotic Condition (AC) to accomodate as many species as possible
Hans Roelofsen, 24-01-2022
"""
import os
import rasterio as rio
import pandas as pd
import pathlib
import sys
import pathlib
import datetime
import numpy as np
import pandas as pd
import rasterio as rio
from Classes.Species import IndividualSpecies as Species
import dkq
......@@ -16,6 +18,7 @@ import read_sources as src
# Append one level up to sys path and import mrt module from there.
# See: https://git.wur.nl/roelo008/mrt
# fp = r'C:\apps\proj_code\benb_utils\ab_opt.py'
fp = pathlib.Path(__file__)
sys.path.append(str(pathlib.Path(fp).parents[1]))
from mrt.sample import mrt_helpers as mrt
......@@ -150,11 +153,16 @@ class AbioticOptimum:
bt = rio.open(bt_raster)
bt_arr = bt.read(1)
# Build output profile
prof = mrt.mnp_abiotiek_raster_profile()
assert bt.shape == (prof['height'], prof['width'])
assert bt.transform == prof['transform']
# Read mapping between beheertypen and abiotic optima
bt_vat = pd.DataFrame.from_dict(src.mapping_from_file(bt_vat_file, key_col='Value', value_col='Descriptio'),
orient='index', columns=['Description'])
bt_vat['code'] = [x[0] for x in bt_vat.Description.str.split(' ')]
bt_vat['optimum'] = None
bt_vat['optimum'] = np.zeros(bt_vat.shape[0]).astype(prof['dtype'])
bt_vat['Value'] = bt_vat.index
# Gather abiotic optimum for beheertype
......@@ -179,10 +187,6 @@ class AbioticOptimum:
continue
bt_vat.loc[bt_vat.code == code, 'optimum'] = optimum
# Build output profile
prof = mrt.mnp_abiotiek_raster_profile()
assert bt.shape == (prof['height'], prof['width'])
assert bt.transform == prof['transform']
# Set non-compatible beheertypen and otherwise to Nodata
bt_vat.optimum.fillna(prof['nodata'], inplace=True)
......@@ -227,13 +231,51 @@ if __name__ == '__main__':
def query4sp(**kwargs):
foo = AbioticOptimum()
sp = Species(kwargs.get('species'))
response_values = foo.get_abiotic_limits(sp.code, kwargs.get('abiotic'))
d = dict(zip(['L20', 'L80', 'H80', 'H20'], response_values))
abiotic_limits = foo.get_abiotic_limits(sp.code, kwargs.get('abiotic'))
d = dict(zip(['L20', 'L80', 'H80', 'H20'], abiotic_limits))
df = pd.DataFrame.from_dict(d, orient='index', columns=[kwargs.get('abiotic')])
df.T.to_clipboard(sep='\t')
print({'dict': d,
'csv': df.to_csv(sep='\t')}[kwargs.get('of')])
def species_response(**kwargs):
"""
Get a species response to an abiotic value
:param kwargs:
:return:
"""
foo = AbioticOptimum()
sp = Species(kwargs.get('species'))
abiotic_limits = foo.get_abiotic_limits(sp.code, kwargs.get('abiotic'))
response = sp.response_to_abiotic(abiotic_limits, kwargs.get('val'))
print({'simpleton': response,
'verbose': '{0} ({1}) response to {2}={3} is: {4}'.format(sp.local, sp.scientific,
kwargs.get('abiotic'), kwargs.get('val'),
response)}[kwargs.get('of')])
def mixed_species_response(**kwargs):
"""
Determine overall species response to 1 or more abiotic conditions
:param kwargs:
:return:
"""
foo = AbioticOptimum()
sp = Species(kwargs.get('species'))
abiotics = kwargs.get('abiotics')
assert len(abiotics) % 2 == 0
d = dict(zip([i for i in abiotics if (abiotics.index(i) % 2 == 0 and i in [])],
[i for i in abiotics if abiotics.index(i) % 2 == 1]))
responses = []
for k, v in d.items():
abiotic_limits = foo.get_abiotic_limits(sp.code, k)
responses.append(sp.response_to_abiotic(abiotic_limits, v))
return np.prod(responses)
import argparse
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subparser_name')
......@@ -252,8 +294,8 @@ if __name__ == '__main__':
# Subparser for querying a beheertype
parser_b = subparsers.add_parser('query4BT', help='Get information on the abtiotic optimum for a Beheertype')
parser_b.add_argument('abiotic', help='abiotic condition', type=str, choices=['gvg', 'ph', 'ndep'])
parser_b.add_argument('bt', help='Beheertype code', type=str)
parser_b.add_argument('abiotic', help='abiotic condition', type=str, choices=['gvg', 'ph', 'ndep'])
parser_b.add_argument('--of', help='output format', type=str, choices=['tuple', 'df', 'dict', 'single', 'str'],
default='str')
parser_b.add_argument('--sp_list', help='which species list', type=int, choices=[146, 281, 468], default=281)
......@@ -267,6 +309,22 @@ if __name__ == '__main__':
default='csv')
parser_c.set_defaults(func=query4sp)
# Subparser for species response to a abiotic value
parser_d = subparsers.add_parser('SingleSpeciesResponse', help='Get Species response to an abiotic value')
parser_d.add_argument('species', help='Species local name, scientific name or code.')
parser_d.add_argument('abiotic', help='abiotic condition', type=str, choices=['gvg', 'ph', 'ndep'])
parser_d.add_argument('val', help='value of abiotic condition', type=float)
parser_d.add_argument('--of', help='output value', type=str, choices=['simpleton', 'verbose', 'dict'],
default='simpleton')
parser_d.set_defaults(func=species_response)
# Subparser for species response to >1 abiotic value
parser_e = subparsers.add_parser('MixedSpeciesResponse', help='Get species response to 1 or more abiotic value(s)')
parser_e.add_argument('species', help='Species local name, scientific name or code.')
parser_e.add_argument('abiotics', help='one or more abiotic - value pairs', nargs='*')
parser_e.set_defaults(func=mixed_species_response)
try:
args = parser.parse_args()
args.func(**vars(args))
......
......@@ -19,6 +19,7 @@ class DraagKracht:
def __init__(self):
self.dkdir = r'W:\PROJECTS\QMAR\MNP-SNL-ParameterSet\Parameters_v05_2021_04_08'
self.defaultdk = r'03_MNP_versie5_par_density_factors_BT2021_v3.csv'
self.dk_src = None # To do, set to default if self.use_default, else to someting else
self.snl_bt = get_snl_beheertypen_list(of='full')
self.use_default = True
self.bt_column = 'Land_type_code'
......@@ -64,11 +65,16 @@ class DraagKracht:
.format(bt, code2name[fix_bt(bt, as_mnp=True)],
counts.get('V', 0), counts.get('E', 0), counts.get('P', 0),
sel.shape[0], sp_sel if sp_sel != 'all' else '1081')
message_full = sel.assign(local=sel.Species_code.map(species_table.code2local),
taxon=sel.Species_code.map(species_table.code2taxon)) \
df_out = sel.assign(local=sel.Species_code.map(species_table.code2local),
taxon=sel.Species_code.map(species_table.code2taxon),
latin=sel.Species_code.map(species_table.code2scientific),
lst468=sel.Species_code.map(getattr(species_table, 'code2sel468')).multiply(1),
lst281=sel.Species_code.map(getattr(species_table, 'code2sel281')).multiply(1),
lst146=sel.Species_code.map(getattr(species_table, 'code2sel146')).multiply(1))\
.sort_values(by=['taxon', 'local']) \
.loc[:, ['Species_code', 'taxon', 'Land_type_quality', 'local']] \
.to_csv(sep='\t', index=False, header=False)
.loc[:, ['Species_code', 'taxon', 'Land_type_quality', 'local', 'latin', 'lst468',
'lst281', 'lst146']]
message_full = df_out.to_csv(sep='\t', index=False, header=True)
dict_out = dict(zip(sel['Species_code'], sel['Land_type_quality']))
if of == 'brief':
......@@ -76,10 +82,13 @@ class DraagKracht:
elif of == 'full':
print(message_brief)
print(message_full)
df_out.to_clipboard(index=False, sep='\t')
elif of == 'df':
return df_out
elif of == 'dict':
return dict_out
def query4species(self, species_lst: list, of):
def query4species(self, species_lst: list, of, verbose=False):
"""
query draagkrachten for one or more species
:param species_lst:
......@@ -87,18 +96,28 @@ class DraagKracht:
:return:
"""
if not isinstance(species_lst, list):
species_lst = [species_lst]
for x in species_lst:
sp = species.IndividualSpecies(x)
query = 'Species_code in ["{0}"]'.format(sp.code)
sel = self.dk.query(query)
print("{0} ({1}-{2}. Listed in: {3}): {4} beheertypen".format(sp.local, sp.scientific, sp.code,
sp.groupinfo, sel.shape[0]))
if verbose:
print("{0} ({1}-{2}. Listed in: {3}): {4} beheertypen".format(sp.local, sp.scientific, sp.code,
sp.groupinfo, sel.shape[0]))
df = sel.assign(desc=getattr(sel, self.bt_column).map(code2name)) \
.loc[:, ['Land_type_quality', self.bt_column, 'desc']] \
.sort_values(by=self.bt_column)
if of == 'full':
df = sel.assign(desc=getattr(sel, self.bt_column).map(code2name)) \
.loc[:, ['Land_type_quality', self.bt_column, 'desc']] \
.sort_values(by=self.bt_column)
df.to_clipboard(sep='\t', index=False)
print(df.to_csv(sep='\t', index=False, header=False))
elif of == 'class':
for row in df.itertuples():
setattr(sp, getattr(row, self.bt_column), row.Land_type_quality)
return sp
if __name__ == '__main__':
......@@ -107,7 +126,7 @@ if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--qbt', nargs='+', type=str)
parser.add_argument('--qsp', nargs='+', type=str)
parser.add_argument('--of', choices=['full', 'sparse'], default='full')
parser.add_argument('--of', choices=['full', 'sparse', 'df', 'dict'], default='full')
parser.add_argument('--sp_list', choices=['281', '468', '146', 'all'], default='281', type=str)
# parser.add_argument('--dk_file', help='draagrkachten file', default=)
args = parser.parse_args()
......
import numbers
def fix_bt(code_in, as_mnp=False, verbose=False, strict=True):
def fix_bt(code_in, as_mnp=False, verbose=False, strict=True, pass_missing_N=False):
"""
Parser for Beheertype codes to repair abbreviated codes and/or MNP extension
eg. 02.01 --> N02.01 if as_mnp==False
02.01 --> N01.01.00 if as_mnp==True
eg. 02.01 --> N02.01 if as_mnp==False and pass_missing_N=True
02.01 --> N01.01.00 if as_mnp==True and pass_missing_N=True
N05.01 --> N05.01 if as_mnp==False
N05.01 --> N05.01.00 if as_mnp==True
N04.02.01 --> N04.02.01 if as_mnp==False
......@@ -20,6 +20,7 @@ def fix_bt(code_in, as_mnp=False, verbose=False, strict=True):
:param as_mnp: return MNP style beheertypecode, default=False
:param verbose: verbose feedback
:param strict: raise error if bt code is unexpected
:param pass_missing_N: allow missing Capital letter at start of code, assuming N
:return: beheertypecode
"""
......@@ -48,6 +49,13 @@ def fix_bt(code_in, as_mnp=False, verbose=False, strict=True):
return None
# Verify TOP and add "N" is required
if top[0].upper() in ['N', 'W', 'S', 'T', 'A', 'L']:
pass
else:
if pass_missing_N:
print('warning: Capital letter missing from code. Defaulting to Natuurtype, interpreting "{0}" as "{1}"'.format(top, 'N{}'.format(top)))
top = 'N{}'.format(top)
assert top[0].upper() in ['N', 'W', 'S', 'T', 'A', 'L'], "3. Unexpected BT Code: {}".format(code_in)
if top[0].islower():
top = top.capitalize()
......
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment