Skip to content
Snippets Groups Projects
Commit 9be8562a authored by Wit, Allard de's avatar Wit, Allard de
Browse files

Modified for using SQLitedict for retrieving fields

parent a72ed6ec
Branches
No related tags found
No related merge requests found
datasets:
sentinel1_backscatter:
DBcheck: true
bands:
VH: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_VH_2019_ADC.csv
VH_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_VH_2019_ADC.csv
VV: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_VV_2019_ADC.csv
VV_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_VV_2019_ADC.csv
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel1_backscatter.db3
nlines: 803016
sentinel1_coherence:
DBcheck: true
bands:
S1A_VV: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_coh_S1A_VV_ALL_2019_ADC.csv
S1A_VV_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_coh_S1A_VV_ALL_2019_ADC.csv
S1B_VV: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_coh_S1B_VV_ALL_2019_ADC.csv
S1B_VV_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_coh_S1B_VV_ALL_2019_ADC.csv
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel1_coherence.db3
nlines: 803016
sentinel1_backscatter: null
sentinel1_coherence: null
sentinel2_reflectance_std:
DBcheck: true
bands:
B02: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B02_2019_ADC.csv
B03: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B03_2019_ADC.csv
B04: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B04_2019_ADC.csv
B05: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B05_2019_ADC.csv
B06: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B06_2019_ADC.csv
B07: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B07_2019_ADC.csv
B08: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B08_2019_ADC.csv
B11: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B11_2019_ADC.csv
B12: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B12_2019_ADC.csv
B8A: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B8A_2019_ADC.csv
NDVI: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_2019_ADC.csv
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel2_reflectance_std.db3
nlines: 803016
B02: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_10m_B02_2022_ADC.csv
B03: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_10m_B03_2022_ADC.csv
B04: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_10m_B04_2022_ADC.csv
B05: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_20m_B05_2022_ADC.csv
B06: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_20m_B06_2022_ADC.csv
B07: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_20m_B07_2022_ADC.csv
B08: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_10m_B08_2022_ADC.csv
B11: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_20m_B11_2022_ADC.csv
B12: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_20m_B12_2022_ADC.csv
B8A: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_20m_B8A_2022_ADC.csv
NDVI: /data/wit015/gm_archive/2022/Optisch/zonal_stats_std_10m_ndvi_2022_ADC.csv
dsn: sqlite:////data/wit015/gm_archive/2022/sentinel2_reflectance_std.db3
nlines: 808273
sentinel2_reflectance_values:
DBcheck: true
bands:
B02: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B02_2019_ADC.csv
B03: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B03_2019_ADC.csv
B04: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B04_2019_ADC.csv
B05: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B05_2019_ADC.csv
B06: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B06_2019_ADC.csv
B07: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B07_2019_ADC.csv
B08: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B08_2019_ADC.csv
B11: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B11_2019_ADC.csv
B12: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B12_2019_ADC.csv
B8A: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B8A_2019_ADC.csv
NDVI: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_2019_ADC.csv
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel2_reflectance_values.db3
nlines: 803016
B02: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_10m_B02_2022_ADC.csv
B03: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_10m_B03_2022_ADC.csv
B04: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_10m_B04_2022_ADC.csv
B05: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_20m_B05_2022_ADC.csv
B06: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_20m_B06_2022_ADC.csv
B07: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_20m_B07_2022_ADC.csv
B08: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_10m_B08_2022_ADC.csv
B11: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_20m_B11_2022_ADC.csv
B12: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_20m_B12_2022_ADC.csv
B8A: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_20m_B8A_2022_ADC.csv
NDVI: /data/wit015/gm_archive/2022/Optisch/zonal_stats_mean_10m_ndvi_2022_ADC.csv
dsn: sqlite:////data/wit015/gm_archive/2022/sentinel2_reflectance_values.db3
nlines: 808273
grompy:
version: 1.0
parcel_info:
counts_file: /home/wit015/Data/groenmonitor/Optisch/perceelscount.csv
dsn: sqlite:////home/wit015/Data/groenmonitor/parcel_info.db3
shape_file: /home/wit015/Data/groenmonitor/BRP/gewaspercelen_2019.shp
counts_file_10m: /data/wit015/gm_archive/2022/BRP/no_pixels_per_field_10m_2022.csv
counts_file_20m: /data/wit015/gm_archive/2022/BRP/no_pixels_per_field_20m_2022.csv
counts_file_25m: /data/wit015/gm_archive/2022/BRP/no_pixels_per_field_25m_2022.csv
dsn: sqlite:////data/wit015/gm_archive/2022/parcel_info.db3
shape_file: /data/wit015/gm_archive/2022/BRP/gewaspercelen_2022_ss.shp
table_name: parcel_info
......@@ -2,11 +2,14 @@
# Copyright (c) 2021 Wageningen Environmental Research
# Allard de Wit (allard.dewit@wur.nl), April 2021
import sys
import uuid
from pathlib import Path
from csv import DictReader
import time
import sqlite3
import multiprocessing as mp
import zlib
import pickle
import csv
import sqlalchemy as sa
import sqlalchemy.exc
......@@ -14,12 +17,77 @@ import pandas as pd
import geopandas as gpd
import numpy as np
import yaml
from sqlitedict import SqliteDict
from .util import prepare_db, printProgressBar, Process, make_path_absolute, open_DB_connection, get_latlon_from_RD
dummy_date = "19000101"
def my_encode(obj):
return sqlite3.Binary(zlib.compress(pickle.dumps(obj, pickle.HIGHEST_PROTOCOL)))
def my_decode(obj):
return pickle.loads(zlib.decompress(bytes(obj)))
def convert_csv_to_store(fname_csv):
"""Converts the CSV file into an SQLite key-value store with filename fname_csv + ".db"
"""
fname_csv_db = fname_csv.with_suffix(f"{fname_csv.suffix}.db")
fname_csv_tmp = fname_csv.with_suffix(f".{uuid.uuid4()}.tmp")
if fname_csv_db.exists():
with SqliteDict(fname_csv_db, encode=my_encode, decode=my_decode) as db:
field_ids = list(db.keys())
else:
field_ids = []
with SqliteDict(fname_csv_tmp, encode=my_encode, decode=my_decode, autocommit=True) as db:
with open(fname_csv, newline="") as fp_csv:
reader = csv.reader(fp_csv)
fieldnames = next(reader)
db["header"] = fieldnames[1:]
for row in reader:
fid = int(row.pop(0))
db[fid] = row
field_ids.append(fid)
fname_csv_tmp.rename(fname_csv_db)
# remove the "header" key as it is not a field ID and was only used
# to store the CSV header values
field_ids = set(field_ids)
field_ids.discard("header")
return field_ids
def process_csv_stores(grompy_yaml, datasets, parallel=False):
csv_fnames = []
for dataset_name, description in datasets.items():
if description is None:
continue
for csv_fname in description["bands"].values():
csv_fname = make_path_absolute(grompy_yaml, Path(csv_fname))
csv_fnames.append(csv_fname)
field_ids = set()
if parallel:
n = min(mp.cpu_count(), len(csv_fnames))
with mp.Pool(n) as pool:
r = pool.map(convert_csv_to_store, csv_fnames)
for s in r:
field_ids.update(s)
else:
for csv_fname in csv_fnames:
s = convert_csv_to_store(csv_fname)
field_ids.update(s)
return field_ids
class CSVLoadingError(Exception):
"""Exception for raising errors related to CSV reading
"""
......@@ -125,7 +193,7 @@ def load_parcel_info(grompy_yaml, dsn, counts_file_10m, counts_file_20m, counts_
df_out = df = df_counts = None
def process_rows(rows):
def process_rows(rows, fieldID):
"""Process a single row from different CSV files and convert them
into a DataFrame.
......@@ -134,9 +202,7 @@ def process_rows(rows):
returns a pandas DataFrame
"""
df = pd.DataFrame()
fieldIDs = []
for column_name, row in rows.items():
fieldIDs.append(int(row.pop("field_ID")))
if "count" in row:
count = row.pop("count")
recs = []
......@@ -161,15 +227,12 @@ def process_rows(rows):
df.reset_index(inplace=True)
# Add FieldID to the dataframe
if len(set(fieldIDs)) > 1:
msg = f"FieldIDs are not the same for this row: {fieldIDs}"
raise RuntimeError(msg)
df["fieldID"] = fieldIDs[0]
df["fieldID"] = int(fieldID)
return df
def write_to_database(engine, dataset_name, csv_readers, child_conn=None, total_lines=None):
def write_to_database(engine, dataset_name, csv_readers, field_ids, child_conn=None, total_lines=None):
"""routine writes data from a set of CSV files into the database
:param engine: the database engine to be used
......@@ -182,38 +245,64 @@ def write_to_database(engine, dataset_name, csv_readers, child_conn=None, total_
if total_lines is not None:
printProgressBar(0, total_lines, decimals=2, length=50)
this_line = 0
while True:
try:
rows = {column_name: next(reader) for column_name, reader in csv_readers.items()}
df = process_rows(rows)
meta = sa.MetaData(engine)
tbl = sa.Table(dataset_name, meta, autoload=True)
ins = tbl.insert()
headers = {column_name: reader["header"] for column_name, reader in csv_readers.items()}
for i, fid in enumerate(field_ids):
rows = {}
for column_name, reader in csv_readers.items():
try:
df.to_sql(dataset_name, engine, if_exists="append", index=False)
except sa.exc.IntegrityError as e:
print(f"Field ID {df.fieldID.unique()} failed to insert in table {dataset_name}")
this_line += 1
if this_line % 100 == 0:
if child_conn is not None:
child_conn.send({dataset_name: this_line})
else:
printProgressBar(this_line, total_lines, decimals=2, length=50)
except StopIteration:
break
def load_satellite_csv(grompy_yaml, child_conn, dataset_name, dsn, bands, **kwargs):
mean_csv_readers = {}
band_values = reader[fid]
except KeyError as e:
band_values = [0.] * len(headers)
header = headers[column_name]
row = {hdr: value for hdr, value in zip(header, band_values)}
rows[column_name] = row
df = process_rows(rows, fid)
if len(df) == 0:
continue
try:
recs = df.to_dict(orient="records")
with engine.begin() as DBconn:
DBconn.execute(ins, recs)
# df.to_sql(dataset_name, DBconn, if_exists="append", index=False, method="multi")
except sa.exc.IntegrityError as e:
print(f"Field ID {df.fieldID.unique()} failed to insert in table {dataset_name}")
if i % 100 == 0:
if child_conn is not None:
child_conn.send({dataset_name: i})
else:
printProgressBar(i, total_lines, decimals=2, length=50)
def load_satellite_csv(grompy_yaml, child_conn, dataset_name, field_ids, dsn, bands, **kwargs):
"""Main routine for starting a parallel loader process.
"""
csv_readers = {}
for column_name, csv_fname in bands.items():
csv_fpath = make_path_absolute(grompy_yaml, Path(csv_fname))
mean_csv_readers[column_name] = DictReader(open(csv_fpath))
engine = prepare_db(dsn, table_name=dataset_name, bands=mean_csv_readers.keys())
write_to_database(engine, dataset_name, mean_csv_readers, child_conn)
csv_db_fpath = csv_fpath.with_suffix(csv_fpath.suffix + ".db")
csv_readers[column_name] = SqliteDict(csv_db_fpath, encode=my_encode, decode=my_decode)
engine = prepare_db(dsn, table_name=dataset_name, bands=csv_readers.keys())
write_to_database(engine, dataset_name, csv_readers, field_ids, child_conn)
def start_parallel_loading(grompy_yaml, datasets):
def start_parallel_loading(grompy_yaml, datasets, field_ids):
"""Start loading CSV files in parallel
:param datasets:
:return:
:param grompy_yaml: the full path to the grompy.yaml file
:param datasets: the datasets to load
:param field_ids: the set of unique field IDS to process
:return: a tuple with three elements:
- a list with references to the running processes
- the parent connection of the pipe to which the proceses communicate progress
- the number of lines per dataset being processes
"""
process_list = []
parent_conn, child_conn = mp.Pipe()
......@@ -227,7 +316,7 @@ def start_parallel_loading(grompy_yaml, datasets):
print(f"Starting loading of: {dataset_name}")
lines_per_dataset[dataset_name] = description["nlines"]
p = Process(target=load_satellite_csv, args=(grompy_yaml, child_conn, dataset_name,), kwargs=description)
p = Process(target=load_satellite_csv, args=(grompy_yaml, child_conn, dataset_name,field_ids), kwargs=description)
process_list.append(p)
for p in process_list:
p.start()
......@@ -235,7 +324,7 @@ def start_parallel_loading(grompy_yaml, datasets):
return process_list, parent_conn, lines_per_dataset
def start_serial_loading(grompy_yaml, datasets):
def start_serial_loading(grompy_yaml, datasets, field_ids):
"""Start loading CSV files in sequence
:param datasets:
:return:
......@@ -252,9 +341,11 @@ def start_serial_loading(grompy_yaml, datasets):
mean_csv_readers = {}
for column_name, csv_fname in description["bands"].items():
csv_fpath = make_path_absolute(grompy_yaml, Path(csv_fname))
mean_csv_readers[column_name] = DictReader(open(csv_fpath))
csv_db_fpath = csv_fpath.with_suffix(csv_fpath.suffix + ".db")
mean_csv_readers[column_name] = SqliteDict(csv_db_fpath, encode=my_encode, decode=my_decode)
engine = prepare_db(description["dsn"], table_name=dataset_name, bands=mean_csv_readers.keys())
write_to_database(engine, dataset_name, mean_csv_readers, total_lines=description["nlines"])
write_to_database(engine, dataset_name, mean_csv_readers, field_ids, total_lines=description["nlines"])
break
def monitor_parallel_loading(process_list, parent_conn, lines_per_dataset):
......@@ -297,7 +388,7 @@ def load_data(grompy_yaml, parcel_info_only, parallel=False):
:param grompy_yaml: the path to grompy configuration file
:param parcel_info_only: Only load parcel info, but no satellite data
:param parallel: Load data using multiple CPUs, default True
:param parallel: Load data using multiple CPUs, default False
"""
grompy_conf = yaml.safe_load(open(grompy_yaml))
......@@ -307,9 +398,18 @@ def load_data(grompy_yaml, parcel_info_only, parallel=False):
parcel_info = grompy_conf.pop("parcel_info")
load_parcel_info(grompy_yaml, **parcel_info)
try:
with open('field_ids.pkl', 'rb') as f:
field_ids = pickle.load(f)
except:
field_ids = process_csv_stores(grompy_yaml, grompy_conf["datasets"], parallel=True)
with open('field_ids.pkl', 'wb') as f:
pickle.dump(field_ids, f, pickle.HIGHEST_PROTOCOL)
if not parcel_info_only:
if parallel:
process_list, parent_conn, lines_per_dataset = start_parallel_loading(grompy_yaml, grompy_conf["datasets"])
process_list, parent_conn, lines_per_dataset = start_parallel_loading(grompy_yaml, grompy_conf["datasets"],
field_ids)
monitor_parallel_loading(process_list, parent_conn, lines_per_dataset)
else:
start_serial_loading(grompy_yaml, grompy_conf["datasets"])
\ No newline at end of file
start_serial_loading(grompy_yaml, grompy_conf["datasets"], field_ids)
\ No newline at end of file
......@@ -7,6 +7,8 @@ from pathlib import Path
import sqlalchemy.exc
from sqlalchemy import MetaData, Table, Column, Integer, Date, Float, Text, create_engine
from sqlalchemy.event import listen
from sqlalchemy.pool import Pool
from pyproj import Transformer
......@@ -61,7 +63,22 @@ def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1,
def prepare_db(dsn, table_name, bands):
def on_connect(dbapi_connection, connection_record):
# disable pysqlite's emitting of the BEGIN statement entirely.
# also stops it from emitting COMMIT before any DDL
# see: https://docs.sqlalchemy.org/en/13/dialects/sqlite.html#serializable-isolation-savepoints-transactional-ddl
# https://docs.sqlalchemy.org/en/13/core/event.html
dbapi_connection.isolation_level = None
def on_begin(conn):
# emit our own BEGIN
conn.execute("BEGIN EXCLUSIVE")
engine = create_engine(dsn)
# allow locking of database for concurrency
# listen(engine, "connect", on_connect)
# listen(engine, "begin", on_begin)
meta = MetaData(engine)
tbl = Table(table_name, meta,
Column('fieldID', Integer, primary_key=True, nullable=False),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment