diff --git a/grompy.yaml b/grompy.yaml index 4f1a9d608f3d0752b0284b896548ac84518a7386..f76b74034e8b1eeafeca3b88bd222edcd1432996 100644 --- a/grompy.yaml +++ b/grompy.yaml @@ -1,58 +1,44 @@ datasets: - sentinel1_backscatter: - DBcheck: true - bands: - VH: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_VH_2019_ADC.csv - VH_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_VH_2019_ADC.csv - VV: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_VV_2019_ADC.csv - VV_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_VV_2019_ADC.csv - dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel1_backscatter.db3 - nlines: 803016 - sentinel1_coherence: - DBcheck: true - bands: - S1A_VV: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_coh_S1A_VV_ALL_2019_ADC.csv - S1A_VV_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_coh_S1A_VV_ALL_2019_ADC.csv - S1B_VV: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_coh_S1B_VV_ALL_2019_ADC.csv - S1B_VV_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_coh_S1B_VV_ALL_2019_ADC.csv - dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel1_coherence.db3 - nlines: 803016 + sentinel1_backscatter: null + sentinel1_coherence: null sentinel2_reflectance_std: DBcheck: true bands: - B02: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B02_2019_ADC.csv - B03: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B03_2019_ADC.csv - B04: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B04_2019_ADC.csv - B05: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B05_2019_ADC.csv - B06: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B06_2019_ADC.csv - B07: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B07_2019_ADC.csv - B08: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B08_2019_ADC.csv - B11: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B11_2019_ADC.csv - B12: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B12_2019_ADC.csv - B8A: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B8A_2019_ADC.csv - NDVI: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_2019_ADC.csv - dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel2_reflectance_std.db3 - nlines: 803016 + B02: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_10m_B02_2022_ADC.csv + B03: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_10m_B03_2022_ADC.csv + B04: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_10m_B04_2022_ADC.csv + B05: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_20m_B05_2022_ADC.csv + B06: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_20m_B06_2022_ADC.csv + B07: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_20m_B07_2022_ADC.csv + B08: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_10m_B08_2022_ADC.csv + B11: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_20m_B11_2022_ADC.csv + B12: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_20m_B12_2022_ADC.csv + B8A: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_20m_B8A_2022_ADC.csv + NDVI: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_10m_ndvi_2022_ADC.csv + dsn: sqlite:////data/wit015/gm_archive/2022/sentinel2_reflectance_std.db3 + nlines: 808273 sentinel2_reflectance_values: DBcheck: true bands: - B02: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B02_2019_ADC.csv - B03: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B03_2019_ADC.csv - B04: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B04_2019_ADC.csv - B05: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B05_2019_ADC.csv - B06: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B06_2019_ADC.csv - B07: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B07_2019_ADC.csv - B08: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B08_2019_ADC.csv - B11: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B11_2019_ADC.csv - B12: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B12_2019_ADC.csv - B8A: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B8A_2019_ADC.csv - NDVI: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_2019_ADC.csv - dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel2_reflectance_values.db3 - nlines: 803016 + B02: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_10m_B02_2022_ADC.csv + B03: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_10m_B03_2022_ADC.csv + B04: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_10m_B04_2022_ADC.csv + B05: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_20m_B05_2022_ADC.csv + B06: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_20m_B06_2022_ADC.csv + B07: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_20m_B07_2022_ADC.csv + B08: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_10m_B08_2022_ADC.csv + B11: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_20m_B11_2022_ADC.csv + B12: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_20m_B12_2022_ADC.csv + B8A: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_20m_B8A_2022_ADC.csv + NDVI: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_10m_ndvi_2022_ADC.csv + dsn: sqlite:////data/wit015/gm_archive/2022/sentinel2_reflectance_values.db3 + nlines: 808273 grompy: version: 1.0 parcel_info: - counts_file: /home/wit015/Data/groenmonitor/Optisch/perceelscount.csv - dsn: sqlite:////home/wit015/Data/groenmonitor/parcel_info.db3 - shape_file: /home/wit015/Data/groenmonitor/BRP/gewaspercelen_2019.shp + counts_file_10m: /data/wit015/gm_archive/2022/BRP/no_pixels_per_field_10m_2022.csv + counts_file_20m: /data/wit015/gm_archive/2022/BRP/no_pixels_per_field_20m_2022.csv + counts_file_25m: /data/wit015/gm_archive/2022/BRP/no_pixels_per_field_25m_2022.csv + dsn: sqlite:////data/wit015/gm_archive/2022/parcel_info.db3 + shape_file: /data/wit015/gm_archive/2022/BRP/gewaspercelen_2022_ss.shp table_name: parcel_info diff --git a/grompy/load_data.py b/grompy/load_data.py index 4e45b5dbe16fa706a413f45f9d62ffe01d5a81e9..4d17612f9b5dec7f7b0523bc198199450dba68a6 100644 --- a/grompy/load_data.py +++ b/grompy/load_data.py @@ -2,11 +2,14 @@ # Copyright (c) 2021 Wageningen Environmental Research # Allard de Wit (allard.dewit@wur.nl), April 2021 import sys +import uuid from pathlib import Path -from csv import DictReader import time +import sqlite3 import multiprocessing as mp - +import zlib +import pickle +import csv import sqlalchemy as sa import sqlalchemy.exc @@ -14,12 +17,77 @@ import pandas as pd import geopandas as gpd import numpy as np import yaml +from sqlitedict import SqliteDict from .util import prepare_db, printProgressBar, Process, make_path_absolute, open_DB_connection, get_latlon_from_RD dummy_date = "19000101" +def my_encode(obj): + return sqlite3.Binary(zlib.compress(pickle.dumps(obj, pickle.HIGHEST_PROTOCOL))) + +def my_decode(obj): + return pickle.loads(zlib.decompress(bytes(obj))) + + +def convert_csv_to_store(fname_csv): + """Converts the CSV file into an SQLite key-value store with filename fname_csv + ".db" + """ + + fname_csv_db = fname_csv.with_suffix(f"{fname_csv.suffix}.db") + fname_csv_tmp = fname_csv.with_suffix(f".{uuid.uuid4()}.tmp") + + if fname_csv_db.exists(): + with SqliteDict(fname_csv_db, encode=my_encode, decode=my_decode) as db: + field_ids = list(db.keys()) + else: + field_ids = [] + with SqliteDict(fname_csv_tmp, encode=my_encode, decode=my_decode, autocommit=True) as db: + with open(fname_csv, newline="") as fp_csv: + reader = csv.reader(fp_csv) + fieldnames = next(reader) + db["header"] = fieldnames[1:] + for row in reader: + fid = int(row.pop(0)) + db[fid] = row + field_ids.append(fid) + + fname_csv_tmp.rename(fname_csv_db) + + # remove the "header" key as it is not a field ID and was only used + # to store the CSV header values + field_ids = set(field_ids) + field_ids.discard("header") + + return field_ids + +def process_csv_stores(grompy_yaml, datasets, parallel=False): + + csv_fnames = [] + for dataset_name, description in datasets.items(): + if description is None: + continue + for csv_fname in description["bands"].values(): + csv_fname = make_path_absolute(grompy_yaml, Path(csv_fname)) + csv_fnames.append(csv_fname) + + field_ids = set() + if parallel: + n = min(mp.cpu_count(), len(csv_fnames)) + with mp.Pool(n) as pool: + r = pool.map(convert_csv_to_store, csv_fnames) + for s in r: + field_ids.update(s) + else: + for csv_fname in csv_fnames: + s = convert_csv_to_store(csv_fname) + field_ids.update(s) + + return field_ids + + + class CSVLoadingError(Exception): """Exception for raising errors related to CSV reading """ @@ -125,7 +193,7 @@ def load_parcel_info(grompy_yaml, dsn, counts_file_10m, counts_file_20m, counts_ df_out = df = df_counts = None -def process_rows(rows): +def process_rows(rows, fieldID): """Process a single row from different CSV files and convert them into a DataFrame. @@ -134,9 +202,7 @@ def process_rows(rows): returns a pandas DataFrame """ df = pd.DataFrame() - fieldIDs = [] for column_name, row in rows.items(): - fieldIDs.append(int(row.pop("field_ID"))) if "count" in row: count = row.pop("count") recs = [] @@ -161,15 +227,12 @@ def process_rows(rows): df.reset_index(inplace=True) # Add FieldID to the dataframe - if len(set(fieldIDs)) > 1: - msg = f"FieldIDs are not the same for this row: {fieldIDs}" - raise RuntimeError(msg) - df["fieldID"] = fieldIDs[0] + df["fieldID"] = int(fieldID) return df -def write_to_database(engine, dataset_name, csv_readers, child_conn=None, total_lines=None): +def write_to_database(engine, dataset_name, csv_readers, field_ids, child_conn=None, total_lines=None): """routine writes data from a set of CSV files into the database :param engine: the database engine to be used @@ -182,38 +245,64 @@ def write_to_database(engine, dataset_name, csv_readers, child_conn=None, total_ if total_lines is not None: printProgressBar(0, total_lines, decimals=2, length=50) - this_line = 0 - while True: - try: - rows = {column_name: next(reader) for column_name, reader in csv_readers.items()} - df = process_rows(rows) + meta = sa.MetaData(engine) + tbl = sa.Table(dataset_name, meta, autoload=True) + ins = tbl.insert() + + headers = {column_name: reader["header"] for column_name, reader in csv_readers.items()} + for i, fid in enumerate(field_ids): + rows = {} + for column_name, reader in csv_readers.items(): try: - df.to_sql(dataset_name, engine, if_exists="append", index=False) - except sa.exc.IntegrityError as e: - print(f"Field ID {df.fieldID.unique()} failed to insert in table {dataset_name}") - this_line += 1 - if this_line % 100 == 0: - if child_conn is not None: - child_conn.send({dataset_name: this_line}) - else: - printProgressBar(this_line, total_lines, decimals=2, length=50) - except StopIteration: - break - - -def load_satellite_csv(grompy_yaml, child_conn, dataset_name, dsn, bands, **kwargs): - mean_csv_readers = {} + band_values = reader[fid] + except KeyError as e: + band_values = [0.] * len(headers) + header = headers[column_name] + row = {hdr: value for hdr, value in zip(header, band_values)} + rows[column_name] = row + + df = process_rows(rows, fid) + if len(df) == 0: + continue + + try: + recs = df.to_dict(orient="records") + with engine.begin() as DBconn: + DBconn.execute(ins, recs) + # df.to_sql(dataset_name, DBconn, if_exists="append", index=False, method="multi") + except sa.exc.IntegrityError as e: + print(f"Field ID {df.fieldID.unique()} failed to insert in table {dataset_name}") + if i % 100 == 0: + if child_conn is not None: + child_conn.send({dataset_name: i}) + else: + printProgressBar(i, total_lines, decimals=2, length=50) + + +def load_satellite_csv(grompy_yaml, child_conn, dataset_name, field_ids, dsn, bands, **kwargs): + """Main routine for starting a parallel loader process. + """ + + csv_readers = {} for column_name, csv_fname in bands.items(): csv_fpath = make_path_absolute(grompy_yaml, Path(csv_fname)) - mean_csv_readers[column_name] = DictReader(open(csv_fpath)) - engine = prepare_db(dsn, table_name=dataset_name, bands=mean_csv_readers.keys()) - write_to_database(engine, dataset_name, mean_csv_readers, child_conn) + csv_db_fpath = csv_fpath.with_suffix(csv_fpath.suffix + ".db") + csv_readers[column_name] = SqliteDict(csv_db_fpath, encode=my_encode, decode=my_decode) + engine = prepare_db(dsn, table_name=dataset_name, bands=csv_readers.keys()) + write_to_database(engine, dataset_name, csv_readers, field_ids, child_conn) -def start_parallel_loading(grompy_yaml, datasets): +def start_parallel_loading(grompy_yaml, datasets, field_ids): """Start loading CSV files in parallel - :param datasets: - :return: + + :param grompy_yaml: the full path to the grompy.yaml file + :param datasets: the datasets to load + :param field_ids: the set of unique field IDS to process + + :return: a tuple with three elements: + - a list with references to the running processes + - the parent connection of the pipe to which the proceses communicate progress + - the number of lines per dataset being processes """ process_list = [] parent_conn, child_conn = mp.Pipe() @@ -227,7 +316,7 @@ def start_parallel_loading(grompy_yaml, datasets): print(f"Starting loading of: {dataset_name}") lines_per_dataset[dataset_name] = description["nlines"] - p = Process(target=load_satellite_csv, args=(grompy_yaml, child_conn, dataset_name,), kwargs=description) + p = Process(target=load_satellite_csv, args=(grompy_yaml, child_conn, dataset_name,field_ids), kwargs=description) process_list.append(p) for p in process_list: p.start() @@ -235,7 +324,7 @@ def start_parallel_loading(grompy_yaml, datasets): return process_list, parent_conn, lines_per_dataset -def start_serial_loading(grompy_yaml, datasets): +def start_serial_loading(grompy_yaml, datasets, field_ids): """Start loading CSV files in sequence :param datasets: :return: @@ -252,9 +341,11 @@ def start_serial_loading(grompy_yaml, datasets): mean_csv_readers = {} for column_name, csv_fname in description["bands"].items(): csv_fpath = make_path_absolute(grompy_yaml, Path(csv_fname)) - mean_csv_readers[column_name] = DictReader(open(csv_fpath)) + csv_db_fpath = csv_fpath.with_suffix(csv_fpath.suffix + ".db") + mean_csv_readers[column_name] = SqliteDict(csv_db_fpath, encode=my_encode, decode=my_decode) engine = prepare_db(description["dsn"], table_name=dataset_name, bands=mean_csv_readers.keys()) - write_to_database(engine, dataset_name, mean_csv_readers, total_lines=description["nlines"]) + write_to_database(engine, dataset_name, mean_csv_readers, field_ids, total_lines=description["nlines"]) + break def monitor_parallel_loading(process_list, parent_conn, lines_per_dataset): @@ -297,7 +388,7 @@ def load_data(grompy_yaml, parcel_info_only, parallel=False): :param grompy_yaml: the path to grompy configuration file :param parcel_info_only: Only load parcel info, but no satellite data - :param parallel: Load data using multiple CPUs, default True + :param parallel: Load data using multiple CPUs, default False """ grompy_conf = yaml.safe_load(open(grompy_yaml)) @@ -307,9 +398,18 @@ def load_data(grompy_yaml, parcel_info_only, parallel=False): parcel_info = grompy_conf.pop("parcel_info") load_parcel_info(grompy_yaml, **parcel_info) + try: + with open('field_ids.pkl', 'rb') as f: + field_ids = pickle.load(f) + except: + field_ids = process_csv_stores(grompy_yaml, grompy_conf["datasets"], parallel=True) + with open('field_ids.pkl', 'wb') as f: + pickle.dump(field_ids, f, pickle.HIGHEST_PROTOCOL) + if not parcel_info_only: if parallel: - process_list, parent_conn, lines_per_dataset = start_parallel_loading(grompy_yaml, grompy_conf["datasets"]) + process_list, parent_conn, lines_per_dataset = start_parallel_loading(grompy_yaml, grompy_conf["datasets"], + field_ids) monitor_parallel_loading(process_list, parent_conn, lines_per_dataset) else: - start_serial_loading(grompy_yaml, grompy_conf["datasets"]) \ No newline at end of file + start_serial_loading(grompy_yaml, grompy_conf["datasets"], field_ids) \ No newline at end of file diff --git a/grompy/util.py b/grompy/util.py index c88e1f3018b827aa2dcabe2ac5bd6bbfa0eae616..c959e68b8262e5559943547267464c9134f688fd 100644 --- a/grompy/util.py +++ b/grompy/util.py @@ -7,6 +7,8 @@ from pathlib import Path import sqlalchemy.exc from sqlalchemy import MetaData, Table, Column, Integer, Date, Float, Text, create_engine +from sqlalchemy.event import listen +from sqlalchemy.pool import Pool from pyproj import Transformer @@ -61,7 +63,22 @@ def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, def prepare_db(dsn, table_name, bands): + def on_connect(dbapi_connection, connection_record): + # disable pysqlite's emitting of the BEGIN statement entirely. + # also stops it from emitting COMMIT before any DDL + # see: https://docs.sqlalchemy.org/en/13/dialects/sqlite.html#serializable-isolation-savepoints-transactional-ddl + # https://docs.sqlalchemy.org/en/13/core/event.html + dbapi_connection.isolation_level = None + + def on_begin(conn): + # emit our own BEGIN + conn.execute("BEGIN EXCLUSIVE") + engine = create_engine(dsn) + # allow locking of database for concurrency + # listen(engine, "connect", on_connect) + # listen(engine, "begin", on_begin) + meta = MetaData(engine) tbl = Table(table_name, meta, Column('fieldID', Integer, primary_key=True, nullable=False),