diff --git a/grompy/__init__.py b/grompy/__init__.py index d74d16279d85d8ac4259107e92974deae5e6bd7e..5ba76dd14f6c3f8954a40ae810fd5f5c9fb75e4c 100644 --- a/grompy/__init__.py +++ b/grompy/__init__.py @@ -3,7 +3,7 @@ # Allard de Wit (allard.dewit@wur.nl), October 2021 """Grompy is a tool to process and access parcel-based satellite observations from GroenMonitor.nl. """ -__version__ = "1.3.0" +__version__ = "1.3.1" from .dap import DataAccessProvider from .cmd import cli diff --git a/grompy/load_data.py b/grompy/load_data.py index f940d72ad1ad47dad74a760110c47308194b06d2..d37a057a020874f28d6810cf056e88ecaf461ae9 100644 --- a/grompy/load_data.py +++ b/grompy/load_data.py @@ -10,6 +10,7 @@ import multiprocessing as mp import zlib import pickle import csv +import traceback as tb import sqlalchemy as sa import sqlalchemy.exc @@ -38,11 +39,7 @@ def convert_csv_to_store(fname_csv): fname_csv_db = fname_csv.with_suffix(f"{fname_csv.suffix}.db") fname_csv_tmp = fname_csv.with_suffix(f".{uuid.uuid4()}.tmp") - if fname_csv_db.exists(): - with SqliteDict(fname_csv_db, encode=my_encode, decode=my_decode) as db: - field_ids = list(db.keys()) - else: - field_ids = [] + if not fname_csv_db.exists(): with SqliteDict(fname_csv_tmp, encode=my_encode, decode=my_decode, autocommit=True) as db: with open(fname_csv, newline="") as fp_csv: reader = csv.reader(fp_csv) @@ -51,16 +48,9 @@ def convert_csv_to_store(fname_csv): for row in reader: fid = int(row.pop(0)) db[fid] = row - field_ids.append(fid) fname_csv_tmp.rename(fname_csv_db) - # remove the "header" key as it is not a field ID and was only used - # to store the CSV header values - field_ids = set(field_ids) - field_ids.discard("header") - - return field_ids def process_csv_stores(grompy_yaml, datasets, parallel=False): @@ -72,20 +62,14 @@ def process_csv_stores(grompy_yaml, datasets, parallel=False): csv_fname = make_path_absolute(grompy_yaml, Path(csv_fname)) csv_fnames.append(csv_fname) - field_ids = set() + print(f"Creating stores for {len(csv_fnames)} CSV files... ", end="", flush=True) if parallel: n = min(mp.cpu_count(), len(csv_fnames)) with mp.Pool(n) as pool: - r = pool.map(convert_csv_to_store, csv_fnames) - for s in r: - field_ids.update(s) + pool.map(convert_csv_to_store, csv_fnames) else: for csv_fname in csv_fnames: - s = convert_csv_to_store(csv_fname) - field_ids.update(s) - - return field_ids - + convert_csv_to_store(csv_fname) class CSVLoadingError(Exception): @@ -135,7 +119,6 @@ def load_parcel_info(grompy_yaml, gpkg_fname, counts_file_10m, counts_file_20m, # Check for null values, e.g. fields without a count ix = df[col_name].isnull() if any(ix): - print(f"{col_name}: found {sum(ix)} fields without a count: forcing zeros") df.loc[ix, col_name] = 0 df_out = gpd.GeoDataFrame({"fieldID": df.index, @@ -316,22 +299,17 @@ def start_parallel_loading(grompy_yaml, datasets, field_ids): """ process_list = [] parent_conn, child_conn = mp.Pipe() - lines_per_dataset = {} for dataset_name, description in datasets.items(): if description is None: continue - if "nlines" not in description: - print("You must run 'grompy check' before trying 'grompy load'! Aborting...") - sys.exit() print(f"Starting loading of: {dataset_name}") - lines_per_dataset[dataset_name] = description["nlines"] p = Process(target=load_satellite_csv, args=(grompy_yaml, child_conn, dataset_name,field_ids), kwargs=description) process_list.append(p) for p in process_list: p.start() - return process_list, parent_conn, lines_per_dataset + return process_list, parent_conn def start_serial_loading(grompy_yaml, datasets, field_ids): @@ -355,39 +333,49 @@ def start_serial_loading(grompy_yaml, datasets, field_ids): write_sensor_info(engine, description["sensor_info"]) -def monitor_parallel_loading(process_list, parent_conn, lines_per_dataset): +def monitor_parallel_loading(process_list, parent_conn, field_ids): """Monitors the execution of parallel loading and updates the progressbar. """ - total_lines = sum(c for c in lines_per_dataset.values()) - lines_per_dataset = {ds: 0 for ds in lines_per_dataset} - printProgressBar(0, total_lines, decimals=2, length=50) + total_lines = len(process_list) * len(field_ids) + lines_processed = 0 + printProgressBar(lines_processed, total_lines, decimals=2, length=50) + exit = False try: processes = [p for p in process_list if p.is_alive()] while processes: time.sleep(3) for p in process_list: if parent_conn.poll(): - lines_per_dataset.update(parent_conn.recv()) + lines_processed = 0 + for dataset_name, nlines in parent_conn.recv().items(): + lines_processed += nlines if p.exception: error, traceback = p.exception raise CSVLoadingError(error, traceback) - current_lines = sum(c for c in lines_per_dataset.values()) - printProgressBar(current_lines, total_lines, decimals=2, length=50) + printProgressBar(lines_processed, total_lines, decimals=2, length=50) processes = [p for p in process_list if p.is_alive()] except KeyboardInterrupt: - for p in process_list: - p.kill() - sys.exit() + exit = True + print("Terminated on user request!") except CSVLoadingError as e: + exit = True print("Loading failed in one or more files:") print(e.error) print(e.traceback) except Exception as e: + exit = True print("Unknown error caused loading failed in one or more files:") - print(e) + tb.print_exc() + finally: + for p in process_list: + p.kill() + if exit: + sys.exit() + # Loading done, force bar to 100% + printProgressBar(total_lines, total_lines, decimals=2, length=50) def load_data(grompy_yaml, parcel_info_only, parallel=False): @@ -400,15 +388,20 @@ def load_data(grompy_yaml, parcel_info_only, parallel=False): grompy_conf = yaml.safe_load(open(grompy_yaml)) + # process CSV files to SQLite stores first + process_csv_stores(grompy_yaml, grompy_conf["datasets"], parallel) + print("done!") + # First load parcel info - print("Start loading parcel information. This will take some time...") + print("Start loading parcel information. This will take some time... ", end="", flush=True) parcel_info = grompy_conf.pop("parcel_info") field_ids = load_parcel_info(grompy_yaml, **parcel_info) + print("done!") if not parcel_info_only: if parallel: - process_list, parent_conn, lines_per_dataset = start_parallel_loading(grompy_yaml, grompy_conf["datasets"], - field_ids) - monitor_parallel_loading(process_list, parent_conn, lines_per_dataset) + process_list, parent_conn = start_parallel_loading(grompy_yaml, grompy_conf["datasets"], field_ids) + monitor_parallel_loading(process_list, parent_conn, field_ids) else: - start_serial_loading(grompy_yaml, grompy_conf["datasets"], field_ids) \ No newline at end of file + start_serial_loading(grompy_yaml, grompy_conf["datasets"], field_ids) + print("Loading finished successfully.") \ No newline at end of file