Commit b3539b2d authored by Biersteker, Levi's avatar Biersteker, Levi
Browse files

Added csv combiner function and an csvlister for use with mnp species result or validation files.

parent d7e1ba9c
......@@ -7,20 +7,22 @@ import shapely
import geopandas as gp
import affine
import numpy as np
import pandas as pd
import random
import numbers
import os
def rand_web_color_hex():
""""
""" "
Generate random color.
https://blog.stigok.com/2019/06/22/generate-random-color-codes-python.html
"""
rgb = ""
for _ in "RGB":
i = random.randrange(0, 2**8)
i = random.randrange(0, 2 ** 8)
rgb += i.to_bytes(1, "big").hex()
return '#{}'.format(rgb)
return "#{}".format(rgb)
def fix_bt(code_in, as_mnp=False):
......@@ -46,9 +48,9 @@ def fix_bt(code_in, as_mnp=False):
code_in = str(code_in)
# Identify parts of code
parts = code_in.split('.')
parts = code_in.split(".")
if len(parts) == 1:
raise Exception('Unexpected BT code: {}'.format(code_in))
raise Exception("Unexpected BT code: {}".format(code_in))
elif len(parts) == 2:
top, sub = parts
neer = None
......@@ -57,31 +59,35 @@ def fix_bt(code_in, as_mnp=False):
# Verify TOP and add "N" is required
if top.isdigit():
top = 'N{0}'.format(top.zfill(2))
top = "N{0}".format(top.zfill(2))
else:
assert top[0].isupper(), 'Unexpected BT Code format: {}'.format(code_in)
assert top[0].isupper(), "Unexpected BT Code format: {}".format(code_in)
# Verify SUB
assert sub.isdigit and len(sub) == 2, 'Unexpected BT Code format: {}'.format(code_in)
assert sub.isdigit and len(sub) == 2, "Unexpected BT Code format: {}".format(
code_in
)
# Verify NEER and check if it is neergeschaald or not
keep_neer = False
if neer:
assert neer.isdigit and len(neer) == 2, 'Unexpected BT Code format: {}'.format(code_in)
assert neer.isdigit and len(neer) == 2, "Unexpected BT Code format: {}".format(
code_in
)
if int(neer) > 0:
keep_neer = True
# Construct output
head = '{}.{}'.format(top, sub)
head = "{}.{}".format(top, sub)
if keep_neer:
return '{}.{}'.format(head, neer)
return "{}.{}".format(head, neer)
elif as_mnp:
return '{}.00'.format(head)
return "{}.00".format(head)
else:
return head
def pnt2square_id(east_north, size, of='full'):
def pnt2square_id(east_north, size, of="full"):
"""
Used to slot a point into a geographical square based on its coordinates.
......@@ -103,17 +109,21 @@ def pnt2square_id(east_north, size, of='full'):
"""
easting, northing = east_north
ll_easting = np.multiply(np.floor_divide(easting, size), size) # easting of square lower left
ll_northing = np.multiply(np.floor_divide(northing, size), size) # northing of square lower left
ll_easting = np.multiply(
np.floor_divide(easting, size), size
) # easting of square lower left
ll_northing = np.multiply(
np.floor_divide(northing, size), size
) # northing of square lower left
out = {
'full': '{:04d}_{:04d}'.format(int(ll_easting), int(ll_northing)),
'brief': np.add(ll_easting, np.divide(ll_northing, 1000)).astype(np.uint32)
"full": "{:04d}_{:04d}".format(int(ll_easting), int(ll_northing)),
"brief": np.add(ll_easting, np.divide(ll_northing, 1000)).astype(np.uint32),
}
try:
return out[of]
except KeyError:
print('select either full or brief as output format')
print("select either full or brief as output format")
raise
......@@ -143,7 +153,7 @@ def gen_squares(x_ul, y_ul, nrow, ncol, size):
:return: gdf with ncol*nrow items
"""
'''
"""
Definition affine.Affine(a, b, c, d, e, f)
a: width of a pixel
b: row rotation (typically zero)
......@@ -152,21 +162,41 @@ def gen_squares(x_ul, y_ul, nrow, ncol, size):
e: height of a pixel (negative)
f: y-coordinate of the upper-left of the upper-left pixel
https://www.perrygeo.com/python-affine-transforms.html
'''
"""
aff = affine.Affine(size, 0, x_ul, 0, -size, y_ul)
ul_corners_indx = [indx for indx in np.ndindex(ncol, nrow)]
ul_corners_coords = [colrow2xy(colrow=corner, affine_in=aff) for corner in ul_corners_indx]
ul_corners_coords = [
colrow2xy(colrow=corner, affine_in=aff) for corner in ul_corners_indx
]
# [(xmin, ymin), (xmax, ymin), (xmax, ymax), (xmin, ymax)] == [ll, lr, ur, ul]
all_corner_coords = [[(xmin, ymax-size), (xmin+size, ymax-size), (xmin+size, ymax), (xmin, ymax)] for (xmin, ymax)
in ul_corners_coords]
all_corner_coords = [
[
(xmin, ymax - size),
(xmin + size, ymax - size),
(xmin + size, ymax),
(xmin, ymax),
]
for (xmin, ymax) in ul_corners_coords
]
shapes = [shapely.geometry.Polygon(corner) for corner in all_corner_coords]
ids = ['{0}_{1}'.format(int(x), int(y)) for x,y in ul_corners_coords]
return gp.GeoDataFrame(geometry=shapes, data={'ID': ids, 'size': str(size)},
crs={'x_0': 155000, 'proj': 'sterea', 'units': 'm', 'y_0': 463000, 'k': 0.9999079,
'lon_0': 5.38763888888889, 'no_defs': True, 'lat_0': 52.15616055555555,
'ellps': 'bessel'})
ids = ["{0}_{1}".format(int(x), int(y)) for x, y in ul_corners_coords]
return gp.GeoDataFrame(
geometry=shapes,
data={"ID": ids, "size": str(size)},
crs={
"x_0": 155000,
"proj": "sterea",
"units": "m",
"y_0": 463000,
"k": 0.9999079,
"lon_0": 5.38763888888889,
"no_defs": True,
"lat_0": 52.15616055555555,
"ellps": "bessel",
},
)
def colrow2xy(colrow, rio_object=None, affine_in=None):
......@@ -183,9 +213,70 @@ def colrow2xy(colrow, rio_object=None, affine_in=None):
if isinstance(affine_in, affine.Affine):
a = affine_in
else:
raise Exception("{0} is not a valid rasterio object and no Affine alternative supplied".format(rio_object))
raise Exception(
"{0} is not a valid rasterio object and no Affine alternative supplied".format(
rio_object
)
)
col, row = colrow
x, y = a * (col, row)
return x, y
def csv_combine(files, identifiers, identifiercolumn="df_id"):
"""
appends one table to another while adding a column to identify which entries came from each table
:param files: list of csv files
:param identifiers: list of labels to identify original tables
:param: identifiercolumn: name of identifiercolumn
:return
"""
if not isinstance(files, list) or not isinstance(identifiers, list):
raise Exception("Either files or identifiers is not a list")
for file in files:
if not os.path.exists(file):
raise Exception(file + " was not found!")
df_combined = pd.read_csv(files[0])
df_combined[identifiercolumn] = identifiers[0]
for file, identifier in zip(files[1:], identifiers[1:]):
nextfile = pd.read_csv(file)
nextfile[identifiercolumn] = identifier
df_combined = df_combined.append(nextfile)
return df_combined
def mnp_csvlister(folder, notwanted=False, extension=False):
"""
list all species output csv files in an MNP result folder
you can provida a list of species that you want to exclude from te returned list
:param folder: MNP result folder
:param notwanted: list of species to exclude
:param extension: return file extensions?
:return: list containing filenames or species codes
"""
if type(notwanted) != bool:
try:
file_list = [
file
for file in os.listdir(folder)
if not (np.isin(file[0:9], notwanted))
]
file_list.sort()
except Exception as e:
print(e)
next
else:
file_list = [file for file in os.listdir(folder)]
file_list.sort()
if not extension:
file_list = [f[0:9] for f in file_list]
file_list = list(set([s for s in file_list if s[:2] == "S0"]))
file_list.sort()
return file_list
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment