Skip to content
Snippets Groups Projects
Commit ce70df8a authored by Wit, Allard de's avatar Wit, Allard de
Browse files

First version of grompy data access provider

parent 214a267a
No related branches found
No related tags found
No related merge requests found
runfiles/*
\ No newline at end of file
from .grompy import DataAccesProvider
\ No newline at end of file
import sqlalchemy as sa
import pandas as pd
class DataAccessProvider:
def __init__(self, dsn=None, area_gt=None, pixcount_gt=None, provincie=None):
self.engine = sa.create_engine(dsn)
meta = sa.MetaData(self.engine)
self.tbl_perc_info = sa.Table('perceels_info', meta, autoload=True)
self.s2_observations = sa.Table("s2_observations", meta, autoload=True)
self.perc_stmt = sa.select([self.tbl_perc_info])
if area_gt is not None:
self.perc_stmt.append_whereclause(self.tbl_perc_info.c.area == area_gt)
if pixcount_gt is not None:
self.perc_stmt.append_whereclause(self.tbl_perc_info.c.pixcount > pixcount_gt)
if provincie is not None:
self.perc_stmt.append_whereclause(self.tbl_perc_info.c.provincie == provincie)
s = sa.select([sa.func.count()]).select_from(self.perc_stmt)
self.count = s.execute().fetchone()[0]
def __iter__(self):
r = self.perc_stmt.execute()
rows = r.fetchmany(100)
while rows:
for row in rows:
s = sa.select([self.s2_observations],
sa.and_(self.s2_observations.c.fieldID==row.fieldID),
order_by={self.s2_observations.c.day})
df = pd.read_sql(s, self.engine)
df = df.drop(columns="fieldID")
df.index = pd.to_datetime(df.day)
yield row, df
rows = r.fetchmany()
def __len__(self):
return self.count
File added
CREATE TABLE perceels_info (
fieldID BIGINT,
year INTEGER,
pixcount INTEGER,
area_ha FLOAT,
cat_gewasc TEXT,
gws_gewasc INTEGER,
gws_gewas TEXT,
provincie TEXT,
gemeente TEXT,
regio TEXT,
pc4 TEXT,
woonplaats TEXT,
waterschap TEXT,
primary key (fieldID)
);
create index ix_pixcount on perceels_info(pixcount);
create index ix_gewas_area on perceels_info(gws_gewasc, area_ha);
create index ix_gewas_pixcount on perceels_info(gws_gewasc, pixcount);
create index ix_gewas_provincie on perceels_info(gws_gewasc, provincie);
from csv import DictReader
from pathlib import Path
from shutil import copyfile
import time
import pandas as pd
import sqlalchemy as sa
cwd = Path(__file__).parent
csv_dir = cwd / "Optisch"
template_db = cwd / "sentinel2_observations_template.db3"
db_s2_observations = cwd / "sentinel2_observations_2019.db3"
csv_s2_observations = cwd / "sentinel2_observations_2019.csv"
dummy_date = "19000101"
mean_files = {
"NDVI": csv_dir / "zonal_stats_mean_2019_ADC.csv",
"B02": csv_dir / "zonal_stats_mean_B02_2019_ADC.csv",
"B03": csv_dir / "zonal_stats_mean_B03_2019_ADC.csv",
"B04": csv_dir / "zonal_stats_mean_B04_2019_ADC.csv",
"B05": csv_dir / "zonal_stats_mean_B05_2019_ADC.csv",
"B06": csv_dir / "zonal_stats_mean_B06_2019_ADC.csv",
"B07": csv_dir / "zonal_stats_mean_B07_2019_ADC.csv",
"B08": csv_dir / "zonal_stats_mean_B08_2019_ADC.csv",
"B11": csv_dir / "zonal_stats_mean_B11_2019_ADC.csv",
"B12": csv_dir / "zonal_stats_mean_B12_2019_ADC.csv",
"B8A": csv_dir / "zonal_stats_mean_B8A_2019_ADC.csv",
}
# Print iterations progress
def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '', printEnd = "\r"):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end = printEnd)
# Print New Line on Complete
if iteration == total:
print()
def take_first(iter):
for i in iter:
return i
def count_lines(files):
"""Checks the number of lines in the input CSV files.
They should all be the same else throw an error.
"""
print("Checking file row counts...")
counts = {}
for band, fname in files.items():
with open(fname) as my_file:
c = sum(1 for _ in my_file)
counts[fname] = c
print(f" - {fname}: {c}")
if len(set(counts.values())) > 1:
msg = "CSV files do not have the same number of rows!"
raise RuntimeError(msg)
return take_first(counts.values())
def process_rows(rows):
df = pd.DataFrame()
fieldIDs = []
for column_name, row in rows.items():
fieldIDs.append(int(row.pop("field_ID")))
count = row.pop("count")
recs = []
for sdate, value in row.items():
value = float(value)
if value == 0.:
continue
recs.append({"day": sdate, "value": float(value), "band": column_name})
if not recs: # only zero (null) values for the column
# We add one dummy record to make sure we can create the dataframe properly
recs.append({"day": dummy_date, "value": None, "band": column_name})
df_tmp = pd.DataFrame(recs)
try:
df_tmp["day"] = pd.to_datetime(df_tmp.day)
except:
pass
df = pd.concat([df, df_tmp])
df = df.pivot(index="day", columns="band", values="value")
df.reset_index(inplace=True)
if len(set(fieldIDs)) > 1:
msg = f"FieldIDs are not the same for this row: {fieldIDs}"
raise RuntimeError(msg)
df["fieldID"] = fieldIDs[0]
ix = (df.day == pd.to_datetime(dummy_date))
if any(ix):
df = df[~ix]
return df
def write_to_SQLite(mean_csv_readers, nlines):
copyfile(template_db, db_s2_observations)
engine = sa.create_engine(f"sqlite:///{db_s2_observations}")
printProgressBar(0, nlines-1, prefix='Progress:', suffix='Complete', length=50)
this_line = 0
while True:
try:
rows = {column_name:next(reader) for column_name, reader in mean_csv_readers.items()}
df = process_rows(rows)
df.to_sql("s2_observations", engine, if_exists="append", index=False)
this_line += 1
if this_line % 1000 == 0:
printProgressBar(this_line, nlines, prefix='Progress:', suffix='Complete', length=50)
except StopIteration:
break
def main():
nlines = count_lines(mean_files)
mean_csv_readers = {}
for column_name, csv_fname in mean_files.items():
mean_csv_readers[column_name] = DictReader(open(csv_fname))
t1 = time.time()
write_to_SQLite(mean_csv_readers, nlines)
print(f"Processing {nlines} lines to SQLite took {time.time()-t1} seconds.")
if __name__ == "__main__":
main()
\ No newline at end of file
from pathlib import Path
import geopandas as gpd
import pandas as pd
import sqlalchemy as sa
import numpy as np
def main():
fname_percelen = Path.cwd() / "BRP" / "gewaspercelen_2019.shp"
# fname_percelen = Path.cwd() / "BRP" / "BRP_10rows.shp"
df = gpd.read_file(fname_percelen)
df["area_ha"] = df.geometry.area/1e4
df = df.set_index("fieldid")
fname_counts = Path.cwd() / "Optisch" / "perceelscount.csv"
df_counts = pd.read_csv(fname_counts)
df_counts.set_index("field_ID", inplace=True)
df["pixcount"] = df_counts.pixcount
df_out = pd.DataFrame({"fieldID": df.index,
"year": df.year,
"pixcount": df.pixcount,
"area_ha": df.area_ha,
"cat_gewasc": df.cat_gewasc.apply(str),
"gws_gewasc": df.gws_gewasc.astype(np.int32),
"gws_gewas": df.gws_gewas.apply(str),
"provincie": df.provincie.apply(str),
"gemeente": df.gemeente.apply(str),
"regio": df.regio.apply(str),
"pc4": df.PC4.apply(str),
"woonplaats": df.woonplaats.apply(str),
"waterschap": df.waterschap.apply(str),
})
fname_percelen_db = Path.cwd() / "sentinel2_observations_2019.db3"
dsn = f"sqlite:///{fname_percelen_db}"
engine = sa.create_engine(dsn)
df_out.to_sql("perceels_info", engine, if_exists="append", index=False)
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment