Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • develop
  • master
2 results

Target

Select target project
  • michiel.kallenberg/grompy
  • wit015/grompy
2 results
Select Git revision
  • develop
  • master
2 results
Show changes
Commits on Source (9)
# Read the Docs configuration file for Sphinx projects
# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details
# Required
version: 2
# Set the OS, Python version and other tools you might need
build:
os: ubuntu-22.04
tools:
python: "3.9"
# You can also specify other tool versions:
# nodejs: "20"
# rust: "1.70"
# golang: "1.20"
# Build documentation in the "docs/" directory with Sphinx
sphinx:
configuration: doc/conf.py
# You can configure Sphinx to use a different builder, for instance use the dirhtml builder for simpler URLs
# builder: "dirhtml"
# Fail on all warnings to avoid broken references
# fail_on_warning: true
# Optionally build your docs in additional formats such as PDF and ePub
formats:
- pdf
- epub
# Optional but recommended, declare the Python requirements required
# to build your documentation
# See https://docs.readthedocs.io/en/stable/guides/reproducible-builds.html
# python:
# install:
# - requirements: docs/requirements.txt
\ No newline at end of file
......@@ -3,7 +3,7 @@
# Allard de Wit (allard.dewit@wur.nl), October 2021
"""Grompy is a tool to process and access parcel-based satellite observations from GroenMonitor.nl.
"""
__version__ = "1.3.2"
__version__ = "1.6.1"
from .dap import DataAccessProvider
from .cmd import cli
......
......@@ -20,18 +20,22 @@ def check_DB_connection(grompy_yaml, dsn):
sqlite_path = dsn.replace("sqlite:///", "")
sqlite_path = make_path_absolute(grompy_yaml, Path(sqlite_path))
dsn = f"sqlite:///{sqlite_path}"
else:
msg = "Only SQLite database are supported!"
return False, dsn
try:
e = sa.create_engine(dsn)
e.connect()
print(f"OK: Connection seems fine for: {dsn}")
return True
return True, dsn
except sa.exc.SQLAlchemyError as e:
print(f"ERROR: Failed making DB connection: {dsn}")
ALL_CHECKS_OK = False
return False
return False, dsn
def check_parcel_info(grompy_yaml, counts_file_10m, counts_file_20m, counts_file_25m, shape_file, **kwargs):
def check_parcel_info(grompy_yaml, counts_file_10m, counts_file_20m, counts_file_25m, shape_file, gpkg_fname, **kwargs):
global ALL_CHECKS_OK
# Check if files exist
for fname, col_name in [(counts_file_10m, "counts_10m"),
......@@ -54,10 +58,20 @@ def check_parcel_info(grompy_yaml, counts_file_10m, counts_file_20m, counts_file
if fname_shape_file.exists():
gdf = gpd.read_file(fname_shape_file, rows=1)
for column in ["fieldid", "year", "cat_gewasc", "gws_gewasc", "gws_gewas", "provincie",
"gemeente", "regio", "PC4", "woonplaats", "waterschap", "S2Tiles", "AHN2"]:
"gemeente", "regio", "PC4", "woonplaats", "waterschap", "AHN2"]:
if column not in gdf.columns:
print(f"ERROR: Attribute {column} missing in BRP shapefile")
ALL_CHECKS_OK = False
# Check if either S2_tiles or S2tiles exist because there were two variants in the shapefiles
if "S2_Tiles" in gdf.columns or "S2Tiles" in gdf.columns:
pass
else:
print(f"ERROR: Attribute 'S2Tiles' or 'S2_Tiles' is missing in BRP shapefile")
ALL_CHECKS_OK = False
gpkg_fname_fp = make_path_absolute(grompy_yaml, Path(gpkg_fname))
return str(gpkg_fname_fp)
def check_CSV_inputs(grompy_yaml, dsn, bands, **kwargs):
global ALL_CHECKS_OK
......@@ -79,9 +93,7 @@ def check_CSV_inputs(grompy_yaml, dsn, bands, **kwargs):
if nlines is None:
ALL_CHECKS_OK = False
db_OK = check_DB_connection(grompy_yaml, dsn)
return nlines, db_OK
return nlines
def check_grompy_inputs(grompy_yaml):
......@@ -93,13 +105,19 @@ def check_grompy_inputs(grompy_yaml):
sys.exit()
grompy_conf = yaml.safe_load(open(grompy_yaml))
# Check parcel info inputs
parcel_info = grompy_conf.pop("parcel_info")
check_parcel_info(grompy_yaml, **parcel_info)
gpkg_fname = check_parcel_info(grompy_yaml, **parcel_info)
# Check input files with CSV data
for dataset_name, description in grompy_conf["datasets"].items():
if description is None: # Dataset has not been defined
print(f"Skipping {dataset_name}, no inputs defined.")
continue
n, OK = check_CSV_inputs(grompy_yaml, **description)
n = check_CSV_inputs(grompy_yaml, **description)
OK, dsn = check_DB_connection(grompy_yaml, description["dsn"])
description.update({"nlines": n, "DBcheck": OK})
if ALL_CHECKS_OK:
......
......@@ -30,7 +30,7 @@ def init_grompy(year, input_path):
@click.command("check")
@click.argument("grompy_yaml", type=click.Path(exists=True))
def check_grompy(grompy_yaml):
grompy_yaml = Path(grompy_yaml)
grompy_yaml = Path(grompy_yaml).absolute()
check_grompy_inputs(grompy_yaml)
......
......@@ -2,25 +2,17 @@
# Copyright (c) 2021 Wageningen Environmental Research
# Allard de Wit (allard.dewit@wur.nl), April 2021
from pathlib import Path
from types import SimpleNamespace
import sqlalchemy as sa
import pandas as pd
import yaml
import duckdb
from .util import open_DB_connection, make_path_absolute
from .S2_zenith_azimuth_angles import df_angles
class Container:
"""A simple container that sets attributes through keywords.
"""
def __init__(self, **kwargs):
super().__init__()
for key, value in kwargs.items():
setattr(self, key, value)
class DataAccessProvider:
"""grompy.DataAccessProvider allow to query grompy databases and iterate through the selected parcels.
......@@ -56,7 +48,7 @@ class DataAccessProvider:
def __init__(self, grompy_yaml, fieldID=None, limit=None, **kwargs):
grompy_yaml = Path(grompy_yaml)
grompy_yaml = Path(grompy_yaml).absolute()
if not grompy_yaml.exists():
msg = f"Cannot find config file: {grompy_yaml}"
raise RuntimeError(msg)
......@@ -67,16 +59,18 @@ class DataAccessProvider:
for dataset_name, description in self.grompy_conf["datasets"].items():
if description is None:
continue
engine = open_DB_connection(grompy_yaml, description["dsn"])
meta = sa.MetaData(engine)
tbl = sa.Table(dataset_name, meta, autoload=True)
self.dataset_connections[dataset_name] = (engine, tbl)
# For sentinel2 data also read the sensor information
fname_duckdb = make_path_absolute(grompy_yaml, description["dsn"])
DBconn = duckdb.connect(fname_duckdb, read_only=True)
self.dataset_connections[dataset_name] = (DBconn, dataset_name)
# For sentinel2 data also read the sensor information and generate the band names that need unit conversion
if dataset_name == "sentinel2_reflectance_values":
sensor_info = pd.read_sql_table("sensor_info", engine).set_index("overpass")
sensor_info = DBconn.sql("select * from sensor_info").df()
sensor_info.set_index("overpass", inplace=True)
self.observation_info = sensor_info.join(df_angles)
self.observation_info.set_index(["date", "S2_tile"], inplace=True)
self.bands2convert = [k for k in description["bands"].keys() if k.startswith("B")]
self.datasets_enabled = set(self.dataset_connections.keys())
......@@ -200,7 +194,13 @@ class DataAccessProvider:
:type parcel_info: NamedTuple
:return: a grompy DataFrame with S2 viewing azimuth/zenith angles added.
"""
S2tile = parcel_info.S2tiles.split(",")[0]
if hasattr(parcel_info, "S2tiles"):
S2tile = parcel_info.S2tiles.split(",")[0]
elif hasattr(parcel_info, "S2_tiles"):
S2tile = parcel_info.S2_tiles.split(",")[0]
else:
msg = "Parcel info does not have attribute 'S2tiles' or 'S2_tiles'"
raise RuntimeError(msg)
df["S2_tile"] = S2tile
df["date"] = pd.to_datetime(df.day)
df.set_index(["date", "S2_tile"], inplace=True)
......@@ -218,19 +218,20 @@ class DataAccessProvider:
rows = r.fetchmany(100)
while rows:
for parcel_info in rows:
c = Container()
for dataset_name, (engine, tbl) in self.dataset_connections.items():
d = dict()
for dataset_name, (DBconn, tbl) in self.dataset_connections.items():
if dataset_name not in self.datasets_enabled:
continue
s = sa.select([tbl],
sa.and_(tbl.c.fieldID==parcel_info.fieldID),
order_by={tbl.c.day})
df = pd.read_sql(s, engine)
df = df.drop(columns="fieldID")
sql = f"SELECT * EXCLUDE (fieldID) FROM {tbl} WHERE fieldID=? ORDER BY day"
df = DBconn.execute(sql, [parcel_info.fieldID]).df()
if dataset_name == "sentinel2_reflectance_values":
df = self._add_S2_angles(df, parcel_info)
df.index = pd.to_datetime(df.day)
setattr(c, dataset_name, df)
df.drop(columns="day", inplace=True)
if dataset_name.startswith("sentinel2"):
df[self.bands2convert] = df[self.bands2convert].astype(float) / 10000.0
d[dataset_name] = df
c = SimpleNamespace(**d)
yield parcel_info, c
rows = r.fetchmany(100)
......
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Wageningen Environmental Research
# Allard de Wit (allard.dewit@wur.nl), April 2021
import sys
import sys, os
import uuid
from pathlib import Path
import time
import sqlite3
import multiprocessing as mp
import zlib
import pickle
import csv
import traceback as tb
from tqdm import tqdm
import sqlalchemy as sa
import sqlalchemy.exc
import pandas as pd
import geopandas as gpd
import numpy as np
import yaml
from sqlitedict import SqliteDict
import duckdb
from .util import prepare_db, printProgressBar, Process, make_path_absolute, open_DB_connection, get_latlon_from_RD
dummy_date = "19000101"
def my_encode(obj):
return sqlite3.Binary(zlib.compress(pickle.dumps(obj, pickle.HIGHEST_PROTOCOL)))
def my_decode(obj):
return pickle.loads(zlib.decompress(bytes(obj)))
def convert_csv_to_store(fname_csv):
"""Converts the CSV file into an SQLite key-value store with filename fname_csv + ".db"
"""
fname_csv_db = fname_csv.with_suffix(f"{fname_csv.suffix}.db")
fname_csv_tmp = fname_csv.with_suffix(f".{uuid.uuid4()}.tmp")
if not fname_csv_db.exists():
with SqliteDict(fname_csv_tmp, encode=my_encode, decode=my_decode, autocommit=True) as db:
with open(fname_csv, newline="") as fp_csv:
reader = csv.reader(fp_csv)
fieldnames = next(reader)
db["header"] = fieldnames[1:]
for row in reader:
fid = int(row.pop(0))
db[fid] = row
fname_csv_tmp.rename(fname_csv_db)
def process_csv_stores(grompy_yaml, datasets, parallel=False):
csv_fnames = []
for dataset_name, description in datasets.items():
if description is None:
continue
for csv_fname in description["bands"].values():
csv_fname = make_path_absolute(grompy_yaml, Path(csv_fname))
csv_fnames.append(csv_fname)
print(f"Creating stores for {len(csv_fnames)} CSV files... ", end="", flush=True)
if parallel:
n = min(mp.cpu_count(), len(csv_fnames))
with mp.Pool(n) as pool:
pool.map(convert_csv_to_store, csv_fnames)
else:
for csv_fname in csv_fnames:
convert_csv_to_store(csv_fname)
from .util import prepare_db, make_path_absolute, get_latlon_from_RD
class CSVLoadingError(Exception):
......@@ -99,6 +43,12 @@ def load_parcel_info(grompy_yaml, gpkg_fname, counts_file_10m, counts_file_20m,
df["fieldid"] = df.fieldid.astype(np.int32)
df = df.set_index("fieldid")
# Check for S2Tiles/S2_Tiles column
if "S2_Tiles" in df.columns:
pass
else:
df["S2_Tiles"] = df.S2Tiles
# Compute latitude/longitude of field centroids
r = []
for row in df.itertuples():
......@@ -119,6 +69,7 @@ def load_parcel_info(grompy_yaml, gpkg_fname, counts_file_10m, counts_file_20m,
# Check for null values, e.g. fields without a count
ix = df[col_name].isnull()
if any(ix):
print(f"{col_name}: found {sum(ix)} fields without a count: forcing zeros")
df.loc[ix, col_name] = 0
df_out = gpd.GeoDataFrame({"fieldID": df.index,
......@@ -136,11 +87,11 @@ def load_parcel_info(grompy_yaml, gpkg_fname, counts_file_10m, counts_file_20m,
"provincie": df.provincie.apply(str),
"gemeente": df.gemeente.apply(str),
"regio": df.regio.apply(str),
"pc4": df.PC4.apply(str),
"pc4": df.PC4.apply(lambda pc4: str(int(pc4))),
"AHN2": df.AHN2,
"woonplaats": df.woonplaats.apply(str),
"waterschap": df.waterschap.apply(str),
"S2tiles": df.S2Tiles.apply(str),
"S2_Tiles": df.S2_Tiles.apply(str),
"geometry": df.geometry
}, crs=df.crs)
df_out.reset_index(drop=True, inplace=True)
......@@ -152,97 +103,31 @@ def load_parcel_info(grompy_yaml, gpkg_fname, counts_file_10m, counts_file_20m,
return field_ids
def process_rows(rows, fieldID):
"""Process a single row from different CSV files and convert them
into a DataFrame.
:param rows: a dict with column names as keys and a CSV row as values.
def write_to_database(fname_duckdb, dataset_name, df):
"""routine writes data from a set of CSV files into an SQLite database using .import
returns a pandas DataFrame
"""
df = pd.DataFrame()
for column_name, row in rows.items():
if "count" in row:
count = row.pop("count")
recs = []
for sdate, value in row.items():
value = float(value)
if value == 0.:
continue
recs.append({"day": sdate, "value": float(value), "band": column_name})
if not recs: # only zero (null) values for the column
# We add one dummy record to make sure we can create the dataframe properly
recs.append({"day": dummy_date, "value": None, "band": column_name})
df_tmp = pd.DataFrame(recs)
df_tmp["day"] = pd.to_datetime(df_tmp.day).dt.date
df = pd.concat([df, df_tmp])
df = df.pivot(index="day", columns="band", values="value")
# Filter out the dummy records
ix = (df.index == pd.to_datetime(dummy_date).date())
if any(ix):
df = df[~ix]
# Turn index back into a column
df.reset_index(inplace=True)
# Add FieldID to the dataframe
df["fieldID"] = int(fieldID)
return df
def write_to_database(engine, dataset_name, csv_readers, field_ids, child_conn=None, total_lines=None):
"""routine writes data from a set of CSV files into the database
:param engine: the database engine to be used
:param fname_duckdb: path to the database engine to be used
:param dataset_name: the name of the dataset, will be used as output table name
:param csv_readers: the set of CSV DictReaders (one per CSV file)
:param child_conn: The pipe to report progress in loading data from file
if None it is assumed that loading is serial not parallel
:param df: the dataframe to be written to the DB
:return:
"""
if total_lines is not None:
printProgressBar(0, total_lines, decimals=2, length=50)
meta = sa.MetaData(engine)
tbl = sa.Table(dataset_name, meta, autoload=True)
ins = tbl.insert()
headers = {column_name: reader["header"] for column_name, reader in csv_readers.items()}
for i, fid in enumerate(field_ids):
rows = {}
for column_name, reader in csv_readers.items():
try:
band_values = reader[fid]
except KeyError as e:
band_values = [0.] * len(headers)
header = headers[column_name]
row = {hdr: value for hdr, value in zip(header, band_values)}
rows[column_name] = row
df = process_rows(rows, fid)
if len(df) == 0:
continue
print(f"Start loading records to {fname_duckdb}, this can take some time...")
with duckdb.connect(fname_duckdb) as DBconn:
for i, df_chunk in enumerate(tqdm(np.array_split(df, 100))):
DBconn.sql(f"INSERT INTO {dataset_name} SELECT * from df_chunk")
try:
recs = df.to_dict(orient="records")
with engine.begin() as DBconn:
DBconn.execute(ins, recs)
# df.to_sql(dataset_name, DBconn, if_exists="append", index=False, method="multi")
except sa.exc.IntegrityError as e:
print(f"Field ID {df.fieldID.unique()} failed to insert in table {dataset_name}")
if i % 100 == 0:
if child_conn is not None:
child_conn.send({dataset_name: i})
else:
printProgressBar(i, total_lines, decimals=2, length=50)
DBconn.sql(f"CREATE UNIQUE INDEX {dataset_name}_uix ON {dataset_name} (fieldID, day)")
cnt, = DBconn.sql(f"SELECT count(*) from {dataset_name}").fetchone()
# Make sure the progressbar goes up to 100% when done
if total_lines is not None:
printProgressBar(total_lines, total_lines, decimals=2, length=50)
# check if all records are loaded
if cnt == len(df):
print(f"{cnt} records successfully loaded to db {fname_duckdb}")
else:
msg = f"Number of records loaded ({len(df)}) not equal to number of records written ({cnt}) to db ({fname_duckdb}). Check results!"
print(msg)
def write_sensor_info(engine, sensor_info):
def write_sensor_info(fname_duckdb, fname_sensor_info):
"""Write info about sensors in to the database provided by engine
@param engine: the database engine
......@@ -261,58 +146,30 @@ def write_sensor_info(engine, sensor_info):
and contain a record for all days within the grompy database.
"""
df = pd.read_csv(sensor_info)
df = pd.read_csv(fname_sensor_info)
df = df.rename(columns=lambda x: x.lower())
df["date"] = pd.to_datetime(df.date, format="%Y%m%d").dt.date
df.to_sql("sensor_info", engine, index=False, if_exists="replace",
dtype={"date": sa.types.Date(),
"sensor": sa.types.Text(),
"overpass": sa.types.Text()}
)
with duckdb.connect(fname_duckdb) as DBconn:
DBconn.sql(f"CREATE TABLE sensor_info AS SELECT * FROM df")
def load_satellite_csv(grompy_yaml, child_conn, dataset_name, field_ids, dsn, bands, sensor_info, **kwargs):
"""Main routine for starting a parallel loader process.
"""
csv_readers = {}
for column_name, csv_fname in bands.items():
csv_fpath = make_path_absolute(grompy_yaml, Path(csv_fname))
csv_db_fpath = csv_fpath.with_suffix(csv_fpath.suffix + ".db")
csv_readers[column_name] = SqliteDict(csv_db_fpath, encode=my_encode, decode=my_decode)
engine = prepare_db(dsn, table_name=dataset_name, bands=csv_readers.keys())
write_to_database(engine, dataset_name, csv_readers, field_ids, child_conn)
write_sensor_info(engine, sensor_info)
def start_parallel_loading(grompy_yaml, datasets, field_ids):
"""Start loading CSV files in parallel
:param grompy_yaml: the full path to the grompy.yaml file
:param datasets: the datasets to load
:param field_ids: the set of unique field IDS to process
:return: a tuple with three elements:
- a list with references to the running processes
- the parent connection of the pipe to which the proceses communicate progress
- the number of lines per dataset being processes
"""
process_list = []
parent_conn, child_conn = mp.Pipe()
for dataset_name, description in datasets.items():
if description is None:
continue
def process_df(df, value_name=None):
"""Process a CSV file with Sentinel2 parcel average observations from groenmonitor
print(f"Starting loading of: {dataset_name}")
p = Process(target=load_satellite_csv, args=(grompy_yaml, child_conn, dataset_name,field_ids), kwargs=description)
process_list.append(p)
for p in process_list:
p.start()
:param df: a pandas dataframe with inputs from groenmonitor
:param value_name: the S2 band name to process
:return:
"""
df.drop(columns=["count"], inplace=True, errors="ignore")
df1 = df.melt(id_vars="field_ID")
df1["day"] = pd.to_datetime(df1.variable)
df1.drop(columns=["variable"], inplace=True)
df1.rename(columns={"value": value_name}, inplace=True)
return process_list, parent_conn
return df1.set_index(["field_ID","day"])
def start_serial_loading(grompy_yaml, datasets, field_ids):
def start_serial_loading(grompy_yaml, datasets):
"""Start loading CSV files in sequence
:param datasets:
:return:
......@@ -323,64 +180,25 @@ def start_serial_loading(grompy_yaml, datasets, field_ids):
continue
print(f"Starting loading of: {dataset_name}")
csv_readers = {}
for column_name, csv_fname in description["bands"].items():
df_all = None
band_names = sorted(description["bands"].items(), key=lambda x: x[0])
for column_name, csv_fname in tqdm(band_names):
csv_fpath = make_path_absolute(grompy_yaml, Path(csv_fname))
csv_db_fpath = csv_fpath.with_suffix(csv_fpath.suffix + ".db")
csv_readers[column_name] = SqliteDict(csv_db_fpath, encode=my_encode, decode=my_decode)
engine = prepare_db(description["dsn"], table_name=dataset_name, bands=csv_readers.keys())
write_to_database(engine, dataset_name, csv_readers, field_ids, total_lines=len(field_ids))
write_sensor_info(engine, description["sensor_info"])
if df_all is None:
df_all = process_df(pd.read_csv(csv_fpath), value_name=column_name)
else:
df_all = df_all.join(process_df(pd.read_csv(csv_fpath), value_name=column_name))
# Remove empty rows
df_all = df_all[df_all.values.sum(axis=1) != 0]
df_all.reset_index(inplace=True)
def monitor_parallel_loading(process_list, parent_conn, field_ids):
"""Monitors the execution of parallel loading and updates the
progressbar.
"""
total_lines = len(process_list) * len(field_ids)
lines_processed = 0
printProgressBar(lines_processed, total_lines, decimals=2, length=50)
exit = False
try:
processes = [p for p in process_list if p.is_alive()]
while processes:
for p in process_list:
if parent_conn.poll():
lines_processed = 0
mesg = parent_conn.recv()
if mesg:
for dataset_name, nlines in mesg.items():
lines_processed += nlines
else: # we got a zero length message
print("Zero length message received from parent_conn.recv()."
"Loading may continue, but progressbar is probably incorrect.")
if p.exception:
error, traceback = p.exception
raise CSVLoadingError(error, traceback)
printProgressBar(lines_processed, total_lines, decimals=2, length=50)
time.sleep(3)
processes = [p for p in process_list if p.is_alive()]
except KeyboardInterrupt:
exit = True
print("Terminated on user request!")
except CSVLoadingError as e:
exit = True
print("Loading failed in one or more files:")
print(e.error)
print(e.traceback)
except Exception as e:
exit = True
print("Unknown error caused loading failed in one or more files:")
tb.print_exc()
finally:
for p in process_list:
p.kill()
if exit:
sys.exit()
# Loading done, force bar to 100%
printProgressBar(total_lines, total_lines, decimals=2, length=50)
fname_duckdb = make_path_absolute(grompy_yaml, description["dsn"])
prepare_db(fname_duckdb, table_name=dataset_name, bands=band_names)
write_to_database(fname_duckdb, dataset_name, df_all)
fname_ssinfo = make_path_absolute(grompy_yaml, description["sensor_info"])
write_sensor_info(fname_duckdb, fname_ssinfo)
def load_data(grompy_yaml, parcel_info_only, parallel=False):
......@@ -393,20 +211,10 @@ def load_data(grompy_yaml, parcel_info_only, parallel=False):
grompy_conf = yaml.safe_load(open(grompy_yaml))
# process CSV files to SQLite stores first
process_csv_stores(grompy_yaml, grompy_conf["datasets"], parallel)
print("done!")
# First load parcel info
print("Start loading parcel information. This will take some time... ", end="", flush=True)
print("Start loading parcel information. This will take some time...")
parcel_info = grompy_conf.pop("parcel_info")
field_ids = load_parcel_info(grompy_yaml, **parcel_info)
print("done!")
if not parcel_info_only:
if parallel:
process_list, parent_conn = start_parallel_loading(grompy_yaml, grompy_conf["datasets"], field_ids)
monitor_parallel_loading(process_list, parent_conn, field_ids)
else:
start_serial_loading(grompy_yaml, grompy_conf["datasets"], field_ids)
print("Loading finished successfully.")
\ No newline at end of file
start_serial_loading(grompy_yaml, grompy_conf["datasets"])
\ No newline at end of file
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Wageningen Environmental Research
# Allard de Wit (allard.dewit@wur.nl), April 2021
import multiprocessing as mp
import traceback
from pathlib import Path
from packaging import version
import gzip
import yaml
import sqlalchemy.exc
import duckdb
from sqlalchemy import MetaData, Table, Column, Integer, Date, Float, Text, create_engine
# from sqlalchemy.event import listen
# from sqlalchemy.pool import Pool
from pyproj import Transformer
from . import __version__
......@@ -21,14 +18,18 @@ def take_first(iter):
def count_lines(files):
"""Checks the number of lines in the input CSV files.
They should all be the same else return None.
"""
counts = {}
print("Checking number of lines:")
for fname in files:
with open(fname) as my_file:
c = sum(1 for _ in my_file)
if fname.suffix == ".gz":
with gzip.open(fname, 'rb') as myfile:
for c, l in enumerate(myfile):
pass
c += 1
else:
with open(fname) as my_file:
c = sum(1 for _ in my_file)
counts[fname] = c
print(f" - {fname}: {c}")
......@@ -43,82 +44,31 @@ def count_lines(files):
return take_first(counts.values())
def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '', printEnd = "\r"):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end = printEnd, flush=True)
# Print New Line on Complete
if iteration == total:
print()
def prepare_db(dsn, table_name, bands):
def on_connect(dbapi_connection, connection_record):
# disable pysqlite's emitting of the BEGIN statement entirely.
# also stops it from emitting COMMIT before any DDL
# see: https://docs.sqlalchemy.org/en/13/dialects/sqlite.html#serializable-isolation-savepoints-transactional-ddl
# https://docs.sqlalchemy.org/en/13/core/event.html
dbapi_connection.isolation_level = None
def on_begin(conn):
# emit our own BEGIN
conn.execute("BEGIN EXCLUSIVE")
engine = create_engine(dsn)
# allow locking of database for concurrency
# listen(engine, "connect", on_connect)
# listen(engine, "begin", on_begin)
meta = MetaData(engine)
tbl = Table(table_name, meta,
Column('fieldID', Integer, primary_key=True, nullable=False),
Column('day', Date, primary_key=True, nullable=False),
)
for col_name in bands:
tbl.append_column(Column(col_name, Float, nullable=True))
def create_ddb_table(table_name, band_names):
try:
tbl.drop()
except sqlalchemy.exc.OperationalError:
pass
tbl.create()
sql = f"""CREATE TABLE {table_name} (
fieldID INTEGER,
day DATE,
"""
for band, fname in band_names:
if band == "NDVI":
sql += f"{band} FLOAT, \n"
else:
sql += f"{band} USMALLINT, \n"
return engine
sql += ")"
return sql
def prepare_db(fname_ddb, table_name, bands):
class Process(mp.Process):
def __init__(self, *args, **kwargs):
mp.Process.__init__(self, *args, **kwargs)
self._parent_conn, self._child_conn = mp.Pipe()
self._exception = None
# Check if file exists, if so delete it
fname_ddb.unlink(missing_ok=True)
def run(self):
try:
mp.Process.run(self)
self._child_conn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._child_conn.send((e, tb))
# raise e # You can still rise this exception if you need to
sql = create_ddb_table(table_name, bands)
@property
def exception(self):
if self._parent_conn.poll():
self._exception = self._parent_conn.recv()
return self._exception
# Create new database and table structure
with duckdb.connect(fname_ddb) as ddb:
ddb.execute(sql)
def make_path_absolute(grompy_yaml, filepath):
......@@ -130,6 +80,7 @@ def make_path_absolute(grompy_yaml, filepath):
:return: the absolute path to the file
"""
filepath = Path(filepath)
if not filepath.is_absolute():
filepath = grompy_yaml.parent / filepath
......
......@@ -2,21 +2,31 @@
requires = ["flit_core >=2,<4"]
build-backend = "flit_core.buildapi"
[tool.flit.metadata]
module = "grompy"
author = "Allard de Wit"
author-email = "allard.dewit@wur.nl"
home-page = "http://www.earthinformatics.eu"
[project]
name = "grompy"
version = "1.6.1"
description = "GROMPY is a tool for storing and accessing Groenmonitor satellite time-series for agricultural parcels."
authors = [
{name = "Allard de Wit", email = "allard.dewit@wur.nl"},
]
maintainers = [
{name = "Allard de Wit", email = "allard.dewit@wur.nl"},
]
home-page = "https://git.wur.nl/wit015/grompy"
classifiers = [ "License :: OSI Approved :: MIT License",]
description-file = "README.md"
requires = [
readme = "README.md"
dependencies = [
"click>=7.1",
"geopandas>= 0.8",
"pyyaml>= 5.3",
"sqlalchemy< 2.0",
"sqlitedict >= 2.1"
"tqdm",
"pandas",
"numpy",
"duckdb>=1.1.1",
"fiona==1.9",
]
requires-python=">= 3.6"
[tool.flit.scripts]
[project.scripts]
grompy = "grompy:cli"