Skip to content
Snippets Groups Projects
Commit fa84d2dd authored by Wit, Allard de's avatar Wit, Allard de
Browse files

grompy version 1.3.1:

- Fixed bugs in parallel processing of grompy databases
- Added processing of CSV files into SQLite stores.
parent 72e72da4
Branches
No related tags found
No related merge requests found
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
# Allard de Wit (allard.dewit@wur.nl), October 2021 # Allard de Wit (allard.dewit@wur.nl), October 2021
"""Grompy is a tool to process and access parcel-based satellite observations from GroenMonitor.nl. """Grompy is a tool to process and access parcel-based satellite observations from GroenMonitor.nl.
""" """
__version__ = "1.3.0" __version__ = "1.3.1"
from .dap import DataAccessProvider from .dap import DataAccessProvider
from .cmd import cli from .cmd import cli
......
...@@ -10,6 +10,7 @@ import multiprocessing as mp ...@@ -10,6 +10,7 @@ import multiprocessing as mp
import zlib import zlib
import pickle import pickle
import csv import csv
import traceback as tb
import sqlalchemy as sa import sqlalchemy as sa
import sqlalchemy.exc import sqlalchemy.exc
...@@ -38,11 +39,7 @@ def convert_csv_to_store(fname_csv): ...@@ -38,11 +39,7 @@ def convert_csv_to_store(fname_csv):
fname_csv_db = fname_csv.with_suffix(f"{fname_csv.suffix}.db") fname_csv_db = fname_csv.with_suffix(f"{fname_csv.suffix}.db")
fname_csv_tmp = fname_csv.with_suffix(f".{uuid.uuid4()}.tmp") fname_csv_tmp = fname_csv.with_suffix(f".{uuid.uuid4()}.tmp")
if fname_csv_db.exists(): if not fname_csv_db.exists():
with SqliteDict(fname_csv_db, encode=my_encode, decode=my_decode) as db:
field_ids = list(db.keys())
else:
field_ids = []
with SqliteDict(fname_csv_tmp, encode=my_encode, decode=my_decode, autocommit=True) as db: with SqliteDict(fname_csv_tmp, encode=my_encode, decode=my_decode, autocommit=True) as db:
with open(fname_csv, newline="") as fp_csv: with open(fname_csv, newline="") as fp_csv:
reader = csv.reader(fp_csv) reader = csv.reader(fp_csv)
...@@ -51,16 +48,9 @@ def convert_csv_to_store(fname_csv): ...@@ -51,16 +48,9 @@ def convert_csv_to_store(fname_csv):
for row in reader: for row in reader:
fid = int(row.pop(0)) fid = int(row.pop(0))
db[fid] = row db[fid] = row
field_ids.append(fid)
fname_csv_tmp.rename(fname_csv_db) fname_csv_tmp.rename(fname_csv_db)
# remove the "header" key as it is not a field ID and was only used
# to store the CSV header values
field_ids = set(field_ids)
field_ids.discard("header")
return field_ids
def process_csv_stores(grompy_yaml, datasets, parallel=False): def process_csv_stores(grompy_yaml, datasets, parallel=False):
...@@ -72,20 +62,14 @@ def process_csv_stores(grompy_yaml, datasets, parallel=False): ...@@ -72,20 +62,14 @@ def process_csv_stores(grompy_yaml, datasets, parallel=False):
csv_fname = make_path_absolute(grompy_yaml, Path(csv_fname)) csv_fname = make_path_absolute(grompy_yaml, Path(csv_fname))
csv_fnames.append(csv_fname) csv_fnames.append(csv_fname)
field_ids = set() print(f"Creating stores for {len(csv_fnames)} CSV files... ", end="", flush=True)
if parallel: if parallel:
n = min(mp.cpu_count(), len(csv_fnames)) n = min(mp.cpu_count(), len(csv_fnames))
with mp.Pool(n) as pool: with mp.Pool(n) as pool:
r = pool.map(convert_csv_to_store, csv_fnames) pool.map(convert_csv_to_store, csv_fnames)
for s in r:
field_ids.update(s)
else: else:
for csv_fname in csv_fnames: for csv_fname in csv_fnames:
s = convert_csv_to_store(csv_fname) convert_csv_to_store(csv_fname)
field_ids.update(s)
return field_ids
class CSVLoadingError(Exception): class CSVLoadingError(Exception):
...@@ -135,7 +119,6 @@ def load_parcel_info(grompy_yaml, gpkg_fname, counts_file_10m, counts_file_20m, ...@@ -135,7 +119,6 @@ def load_parcel_info(grompy_yaml, gpkg_fname, counts_file_10m, counts_file_20m,
# Check for null values, e.g. fields without a count # Check for null values, e.g. fields without a count
ix = df[col_name].isnull() ix = df[col_name].isnull()
if any(ix): if any(ix):
print(f"{col_name}: found {sum(ix)} fields without a count: forcing zeros")
df.loc[ix, col_name] = 0 df.loc[ix, col_name] = 0
df_out = gpd.GeoDataFrame({"fieldID": df.index, df_out = gpd.GeoDataFrame({"fieldID": df.index,
...@@ -316,22 +299,17 @@ def start_parallel_loading(grompy_yaml, datasets, field_ids): ...@@ -316,22 +299,17 @@ def start_parallel_loading(grompy_yaml, datasets, field_ids):
""" """
process_list = [] process_list = []
parent_conn, child_conn = mp.Pipe() parent_conn, child_conn = mp.Pipe()
lines_per_dataset = {}
for dataset_name, description in datasets.items(): for dataset_name, description in datasets.items():
if description is None: if description is None:
continue continue
if "nlines" not in description:
print("You must run 'grompy check' before trying 'grompy load'! Aborting...")
sys.exit()
print(f"Starting loading of: {dataset_name}") print(f"Starting loading of: {dataset_name}")
lines_per_dataset[dataset_name] = description["nlines"]
p = Process(target=load_satellite_csv, args=(grompy_yaml, child_conn, dataset_name,field_ids), kwargs=description) p = Process(target=load_satellite_csv, args=(grompy_yaml, child_conn, dataset_name,field_ids), kwargs=description)
process_list.append(p) process_list.append(p)
for p in process_list: for p in process_list:
p.start() p.start()
return process_list, parent_conn, lines_per_dataset return process_list, parent_conn
def start_serial_loading(grompy_yaml, datasets, field_ids): def start_serial_loading(grompy_yaml, datasets, field_ids):
...@@ -355,39 +333,49 @@ def start_serial_loading(grompy_yaml, datasets, field_ids): ...@@ -355,39 +333,49 @@ def start_serial_loading(grompy_yaml, datasets, field_ids):
write_sensor_info(engine, description["sensor_info"]) write_sensor_info(engine, description["sensor_info"])
def monitor_parallel_loading(process_list, parent_conn, lines_per_dataset): def monitor_parallel_loading(process_list, parent_conn, field_ids):
"""Monitors the execution of parallel loading and updates the """Monitors the execution of parallel loading and updates the
progressbar. progressbar.
""" """
total_lines = sum(c for c in lines_per_dataset.values()) total_lines = len(process_list) * len(field_ids)
lines_per_dataset = {ds: 0 for ds in lines_per_dataset} lines_processed = 0
printProgressBar(0, total_lines, decimals=2, length=50) printProgressBar(lines_processed, total_lines, decimals=2, length=50)
exit = False
try: try:
processes = [p for p in process_list if p.is_alive()] processes = [p for p in process_list if p.is_alive()]
while processes: while processes:
time.sleep(3) time.sleep(3)
for p in process_list: for p in process_list:
if parent_conn.poll(): if parent_conn.poll():
lines_per_dataset.update(parent_conn.recv()) lines_processed = 0
for dataset_name, nlines in parent_conn.recv().items():
lines_processed += nlines
if p.exception: if p.exception:
error, traceback = p.exception error, traceback = p.exception
raise CSVLoadingError(error, traceback) raise CSVLoadingError(error, traceback)
current_lines = sum(c for c in lines_per_dataset.values()) printProgressBar(lines_processed, total_lines, decimals=2, length=50)
printProgressBar(current_lines, total_lines, decimals=2, length=50)
processes = [p for p in process_list if p.is_alive()] processes = [p for p in process_list if p.is_alive()]
except KeyboardInterrupt: except KeyboardInterrupt:
for p in process_list: exit = True
p.kill() print("Terminated on user request!")
sys.exit()
except CSVLoadingError as e: except CSVLoadingError as e:
exit = True
print("Loading failed in one or more files:") print("Loading failed in one or more files:")
print(e.error) print(e.error)
print(e.traceback) print(e.traceback)
except Exception as e: except Exception as e:
exit = True
print("Unknown error caused loading failed in one or more files:") print("Unknown error caused loading failed in one or more files:")
print(e) tb.print_exc()
finally:
for p in process_list:
p.kill()
if exit:
sys.exit()
# Loading done, force bar to 100%
printProgressBar(total_lines, total_lines, decimals=2, length=50)
def load_data(grompy_yaml, parcel_info_only, parallel=False): def load_data(grompy_yaml, parcel_info_only, parallel=False):
...@@ -400,15 +388,20 @@ def load_data(grompy_yaml, parcel_info_only, parallel=False): ...@@ -400,15 +388,20 @@ def load_data(grompy_yaml, parcel_info_only, parallel=False):
grompy_conf = yaml.safe_load(open(grompy_yaml)) grompy_conf = yaml.safe_load(open(grompy_yaml))
# process CSV files to SQLite stores first
process_csv_stores(grompy_yaml, grompy_conf["datasets"], parallel)
print("done!")
# First load parcel info # First load parcel info
print("Start loading parcel information. This will take some time...") print("Start loading parcel information. This will take some time... ", end="", flush=True)
parcel_info = grompy_conf.pop("parcel_info") parcel_info = grompy_conf.pop("parcel_info")
field_ids = load_parcel_info(grompy_yaml, **parcel_info) field_ids = load_parcel_info(grompy_yaml, **parcel_info)
print("done!")
if not parcel_info_only: if not parcel_info_only:
if parallel: if parallel:
process_list, parent_conn, lines_per_dataset = start_parallel_loading(grompy_yaml, grompy_conf["datasets"], process_list, parent_conn = start_parallel_loading(grompy_yaml, grompy_conf["datasets"], field_ids)
field_ids) monitor_parallel_loading(process_list, parent_conn, field_ids)
monitor_parallel_loading(process_list, parent_conn, lines_per_dataset)
else: else:
start_serial_loading(grompy_yaml, grompy_conf["datasets"], field_ids) start_serial_loading(grompy_yaml, grompy_conf["datasets"], field_ids)
\ No newline at end of file print("Loading finished successfully.")
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment