Skip to content
Snippets Groups Projects
Commit 2a02e4e9 authored by Wit, Allard de's avatar Wit, Allard de
Browse files

Refactored the parallel loading and monitoring

parent 33001dec
Branches
No related tags found
No related merge requests found
grompy:
version: 1.0
parcel_info:
dsn: sqlite:///{input_path}/parcel_info.db3
counts_file: {input_path}/Optisch/perceelscount.csv
shape_file: {input_path}/BRP/gewaspercelen_2019.shp
table_name: parcel_info
datasets:
sentinel2_reflectance_values:
dsn: sqlite:///{input_path}/sentinel2_reflectance_values.db3
bands:
NDVI: {input_path}/Optisch/zonal_stats_mean_2019_ADC.csv
B02: {input_path}/Optisch/zonal_stats_mean_B02_2019_ADC.csv
B03: {input_path}/Optisch/zonal_stats_mean_B03_2019_ADC.csv
B04: {input_path}/Optisch/zonal_stats_mean_B04_2019_ADC.csv
B05: {input_path}/Optisch/zonal_stats_mean_B05_2019_ADC.csv
B06: {input_path}/Optisch/zonal_stats_mean_B06_2019_ADC.csv
B07: {input_path}/Optisch/zonal_stats_mean_B07_2019_ADC.csv
B08: {input_path}/Optisch/zonal_stats_mean_B08_2019_ADC.csv
B11: {input_path}/Optisch/zonal_stats_mean_B11_2019_ADC.csv
B12: {input_path}/Optisch/zonal_stats_mean_B12_2019_ADC.csv
B8A: {input_path}/Optisch/zonal_stats_mean_B8A_2019_ADC.csv
sentinel2_reflectance_std:
dsn: sqlite:///{input_path}/sentinel2_reflectance_std.db3
bands:
NDVI: {input_path}/Optisch/zonal_stats_std_2019_ADC.csv
B02: {input_path}/Optisch/zonal_stats_std_B02_2019_ADC.csv
B03: {input_path}/Optisch/zonal_stats_std_B03_2019_ADC.csv
B04: {input_path}/Optisch/zonal_stats_std_B04_2019_ADC.csv
B05: {input_path}/Optisch/zonal_stats_std_B05_2019_ADC.csv
B06: {input_path}/Optisch/zonal_stats_std_B06_2019_ADC.csv
B07: {input_path}/Optisch/zonal_stats_std_B07_2019_ADC.csv
B08: {input_path}/Optisch/zonal_stats_std_B08_2019_ADC.csv
B11: {input_path}/Optisch/zonal_stats_std_B11_2019_ADC.csv
B12: {input_path}/Optisch/zonal_stats_std_B12_2019_ADC.csv
B8A: {input_path}/Optisch/zonal_stats_std_B8A_2019_ADC.csv
sentinel1_backscatter:
dsn: sqlite:///{input_path}/sentinel1_backscatter.db3
bands:
VV: {input_path}/Radar/zonal_stats_mean_VV_2019_ADC.csv
VH: {input_path}/Radar/zonal_stats_mean_VH_2019_ADC.csv
VV_std: {input_path}/Radar/zonal_stats_std_VV_2019_ADC.csv
VH_std: {input_path}/Radar/zonal_stats_std_VH_2019_ADC.csv
sentinel1_coherence:
dsn: sqlite:///{input_path}/sentinel1_coherence.db3
bands:
S1A_VV: {input_path}/Radar/zonal_stats_mean_coh_S1A_VV_ALL_2019_ADC.csv
S1A_VV_std: {input_path}/Radar/zonal_stats_std_coh_S1A_VV_ALL_2019_ADC.csv
S1B_VV: {input_path}/Radar/zonal_stats_mean_coh_S1B_VV_ALL_2019_ADC.csv
S1B_VV_std: {input_path}/Radar/zonal_stats_std_coh_S1B_VV_ALL_2019_ADC.csv
...@@ -169,27 +169,14 @@ class Process(mp.Process): ...@@ -169,27 +169,14 @@ class Process(mp.Process):
def start_parallel_loading(datasets): def start_parallel_loading(datasets):
pass """Start loading CSV files in parallel
:param datasets:
def load_data(yaml_file):
"""Loads data point to by the YAML config file.
:param yaml_file:
:return: :return:
""" """
grompy_conf = yaml.safe_load(open(yaml_file))
# First load parcel info
parcel_info = grompy_conf.pop("parcel_info")
load_parcel_info(**parcel_info)
# Start loading CSV files in parallel
process_list = [] process_list = []
parent_conn, child_conn = mp.Pipe() parent_conn, child_conn = mp.Pipe()
lines_per_dataset = {} lines_per_dataset = {}
for dataset_name, description in grompy_conf["datasets"].items(): for dataset_name, description in datasets.items():
if "nlines" not in description: if "nlines" not in description:
print("You must run 'grompy check' before trying 'grompy load'! Aborting...") print("You must run 'grompy check' before trying 'grompy load'! Aborting...")
sys.exit() sys.exit()
...@@ -201,10 +188,17 @@ def load_data(yaml_file): ...@@ -201,10 +188,17 @@ def load_data(yaml_file):
for p in process_list: for p in process_list:
p.start() p.start()
return process_list, parent_conn, lines_per_dataset
def monitor_parallel_loading(process_list, parent_conn, lines_per_dataset):
"""Monitors the execution of parallel loading and updates the
progressbar.
"""
total_lines = sum(c for c in lines_per_dataset.values()) total_lines = sum(c for c in lines_per_dataset.values())
lines_per_dataset = {ds:0 for ds in lines_per_dataset} lines_per_dataset = {ds:0 for ds in lines_per_dataset}
printProgressBar(0, total_lines, decimals=2, length=50)
try: try:
printProgressBar(0, total_lines, decimals=2, length=50)
processes = [p for p in process_list if p.is_alive()] processes = [p for p in process_list if p.is_alive()]
while processes: while processes:
time.sleep(3) time.sleep(3)
...@@ -228,4 +222,21 @@ def load_data(yaml_file): ...@@ -228,4 +222,21 @@ def load_data(yaml_file):
print(e.traceback) print(e.traceback)
def load_data(yaml_file):
"""Loads data point to by the YAML config file.
:param yaml_file:
:return:
"""
grompy_conf = yaml.safe_load(open(yaml_file))
# First load parcel info
parcel_info = grompy_conf.pop("parcel_info")
load_parcel_info(**parcel_info)
process_list, parent_conn, lines_per_dataset = start_parallel_loading(grompy_conf["datasets"])
monitor_parallel_loading(process_list, parent_conn, lines_per_dataset)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment