From 248bc5964e23174936b269414f422ee85c80f32f Mon Sep 17 00:00:00 2001
From: Ingrid Luijkx <ingrid.luijkx@wur.nl>
Date: Mon, 12 Jul 2021 16:32:14 +0200
Subject: [PATCH] Added preprocessing observations pipeline, which reads the
 obspack files and writes them as pickle files. Small fix in obs_gvp_co2.py
 for encoding on special characters in dataset_summary. Small bug fix in
 observationoperator_tm5_cteco2.py in logging message. Small string conversion
 bug fix in initexit_cteco2 for keeping opts from initial job. Added module
 load NCO for platform file Aether. Fix for statevectorNHgridded_cteco2.py for
 optimizing ocean fluxes, which per default should be true, but was now false
 in case the ocn.optimize key was not present.

---
 da/cyclecontrol/initexit_cteco2.py            |  4 +-
 da/observations/obs_gvp_co2.py                |  2 +-
 .../observationoperator_tm5_cteco2.py         |  2 +-
 da/pipelines/pipeline_cteco2.py               | 48 +++++++++++++++----
 da/platform/aether.py                         |  1 +
 da/rc/cteco2/carbontracker_gcp2021_insitu.rc  | 29 +++++++++++
 .../statevectorNHgridded_cteco2.py            |  8 ++--
 templates/template.jb                         | 22 +++------
 templates/template.py                         |  4 +-
 templates/template.rc                         | 18 +++----
 10 files changed, 95 insertions(+), 43 deletions(-)
 create mode 100644 da/rc/cteco2/carbontracker_gcp2021_insitu.rc

diff --git a/da/cyclecontrol/initexit_cteco2.py b/da/cyclecontrol/initexit_cteco2.py
index f1e0d099..19081fa4 100755
--- a/da/cyclecontrol/initexit_cteco2.py
+++ b/da/cyclecontrol/initexit_cteco2.py
@@ -566,9 +566,9 @@ class CycleControl(dict):
                 nextrestartfilename = self['da.restart.fname'].replace(jobid,nextjobid)
                 nextlogfilename = logfile.replace(jobid,nextjobid)
                 if self.daplatform.ID == 'WU capegrim':
-                    template += '\nexport icycle_in_job=%d\npython3 %s rc=%s %s >&%s &\n' % (cycle+1,execcommand, nextrestartfilename, str.join(str(self.opts), ''), nextlogfilename,)
+                    template += '\nexport icycle_in_job=%d\npython3 %s rc=%s %s >&%s &\n' % (cycle+1,execcommand, nextrestartfilename, ''.join(self.opts), nextlogfilename,)
                 else: 
-                    template += '\nexport icycle_in_job=%d\npython3 %s rc=%s %s >&%s\n' % (cycle+1,execcommand, nextrestartfilename, str.join(str(self.opts), ''), nextlogfilename,) 
+                    template += '\nexport icycle_in_job=%d\npython3 %s rc=%s %s >&%s\n' % (cycle+1,execcommand, nextrestartfilename, ''.join(self.opts), nextlogfilename,) 
 
             # write and submit 
             self.daplatform.write_job(jobfile, template, jobid)
diff --git a/da/observations/obs_gvp_co2.py b/da/observations/obs_gvp_co2.py
index 746b21c4..0701b728 100755
--- a/da/observations/obs_gvp_co2.py
+++ b/da/observations/obs_gvp_co2.py
@@ -95,7 +95,7 @@ class ObsPackObservations(Observations):
 
         # Step 1: Read list of available site files in package
         infile = os.path.join(self.obspack_dir, 'summary', '%s_dataset_summary.txt' % (self.obspack_id,))
-        f      = open(infile, 'r')
+        f      = open(infile, 'r+', encoding="utf-8")
         lines  = f.readlines()
         f.close()
 
diff --git a/da/obsoperators/observationoperator_tm5_cteco2.py b/da/obsoperators/observationoperator_tm5_cteco2.py
index a141a84f..dfb05d48 100755
--- a/da/obsoperators/observationoperator_tm5_cteco2.py
+++ b/da/obsoperators/observationoperator_tm5_cteco2.py
@@ -524,7 +524,7 @@ class TM5ObservationOperator(ObservationOperator):
 
 
         tm5submitdir = os.path.join(self.tm_settings[self.rundirkey])
-        logging.info('tm5submitdir', tm5submitdir)
+        logging.info('tm5submitdir: %s' %tm5submitdir)
 
         # Go to executable directory and start the subprocess, using a new logfile
 
diff --git a/da/pipelines/pipeline_cteco2.py b/da/pipelines/pipeline_cteco2.py
index 1610cb6e..c38d6d73 100755
--- a/da/pipelines/pipeline_cteco2.py
+++ b/da/pipelines/pipeline_cteco2.py
@@ -28,6 +28,7 @@ import os
 import sys
 import datetime
 import copy
+import pickle
 
 header = '\n\n    ***************************************   '
 footer = '    *************************************** \n  '
@@ -168,18 +169,18 @@ def analysis_pipeline(dacycle, platform, dasystem, samples, statevector):
     save_weekly_avg_ext_tc_data(dacycle)
     save_weekly_avg_agg_data(dacycle,region_aggregate='transcom')
     save_weekly_avg_agg_data(dacycle,region_aggregate='transcom_extended')
-    save_weekly_avg_agg_data(dacycle,region_aggregate='olson')
-    save_weekly_avg_agg_data(dacycle,region_aggregate='olson_extended')
-    save_weekly_avg_agg_data(dacycle,region_aggregate='country')
+    #save_weekly_avg_agg_data(dacycle,region_aggregate='olson')
+    #save_weekly_avg_agg_data(dacycle,region_aggregate='olson_extended')
+    #save_weekly_avg_agg_data(dacycle,region_aggregate='country')
 
     logging.info(header + "Starting monthly and yearly averages" + footer)
 
     time_avg(dacycle,'flux1x1')
     time_avg(dacycle,'transcom')
     time_avg(dacycle,'transcom_extended')
-    time_avg(dacycle,'olson')
-    time_avg(dacycle,'olson_extended')
-    time_avg(dacycle,'country')
+    #time_avg(dacycle,'olson')
+    #time_avg(dacycle,'olson_extended')
+    #time_avg(dacycle,'country')
 
     logging.info(header + "Finished analysis" + footer)
 
@@ -221,6 +222,21 @@ def archive_pipeline(dacycle, platform, dasystem):
         jobid = platform.submit_job(jobfile, joblog=logfile)
 
 
+def preprocessobs_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator):
+    """ The main point of entry for the pipeline """
+    sys.path.append(os.getcwd())
+
+    samples = samples if isinstance(samples,list) else [samples]
+
+    logging.info(header + "Initializing current cycle" + footer)
+    start_job(dacycle, dasystem, platform, statevector, samples, obsoperator)
+
+    prepare_state(dacycle, statevector)
+
+    sample_state(dacycle, samples, statevector, obsoperator)
+
+    save_and_submit(dacycle, statevector)
+    logging.info("Cycle finished...exiting pipeline")
 
 def start_job(dacycle, dasystem, platform, statevector, samples, obsoperator):
     """ Set up the job specific directory structure and create an expanded rc-file """
@@ -355,12 +371,22 @@ def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False):
 
     statevector.write_members_to_file(lag, dacycle['dir.input'], obsoperator=obsoperator)
 
-    for sample in samples:
+    for i,sample in enumerate(samples):
 
         sample.setup(dacycle)
 
         # Read observations + perform observation selection
-        sample.add_observations()
+        obspickle_filename = os.path.join(dacycle['dir.restart'], sample.get_samples_type()+'samples_%s.pickle' % dacycle['time.sample.stamp'])
+        if os.path.isfile(obspickle_filename):
+            if dacycle['da.preprocessobs'] == True: continue
+            sample = pickle.load(open(obspickle_filename, 'rb'))
+            logging.info("Loaded the samples object from file: %s"%obspickle_filename) 
+        else:
+            sample.add_observations()
+            if dacycle['da.preprocessobs'] == True:
+                pickle.dump(sample, open(obspickle_filename, 'wb'), -1)
+                logging.info("Saved the samples object to file: %s"%obspickle_filename) 
+                continue
 
         # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
         sample.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc'])
@@ -370,9 +396,13 @@ def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False):
 
         # Write filename to dacycle, and to output collection list
         dacycle['ObsOperator.inputfile.'+sample.get_samples_type()] = sampling_coords_file
+        
+        samples[i] = sample
 
     del sample
-
+    
+    if dacycle['da.preprocessobs'] == True: return
+    
     # Run the observation operator
     obsoperator.run_forecast_model(samples)
 
diff --git a/da/platform/aether.py b/da/platform/aether.py
index 623770f9..9e6691b1 100755
--- a/da/platform/aether.py
+++ b/da/platform/aether.py
@@ -101,6 +101,7 @@ class AetherPlatform(Platform):
                    """#SBATCH -t jobtime \n""" + \
                    """#SBATCH -o joblog \n""" + \
                    """module load TM5-MP \n""" + \
+                   """module load NCO/4.6.6-intel-2017a \n""" + \
                    """module load netcdf4-python/1.2.9-intel-2017a-Python-3.6.1 \n""" + \
            """\n"""
 
diff --git a/da/rc/cteco2/carbontracker_gcp2021_insitu.rc b/da/rc/cteco2/carbontracker_gcp2021_insitu.rc
new file mode 100644
index 00000000..fb023cb7
--- /dev/null
+++ b/da/rc/cteco2/carbontracker_gcp2021_insitu.rc
@@ -0,0 +1,29 @@
+!!! Info for the CarbonTracker data assimilation system
+
+datadir                       : /mnt/beegfs/user/gkoren/ctdas_2012
+
+! For ObsPack
+obspack.input.id   : obspack_co2_111_MERGE_GVP6.1_NRT6.1.1_2021-07-05
+obspack.input.dir  : ${datadir}/obspacks/${obspack.input.id}
+obs.sites.rc       : ${obspack.input.dir}/summary/sites_weights_GCP2021.rc
+obs.obspack.localization             : False
+obs.obspack.localization.length.tower: 500
+obs.obspack.localization.length.land : 300
+
+ocn.covariance  : ${datadir}/oceans/oif/cov_ocean.2000.01.nc
+deltaco2.prefix : oif_p3_era40.dpco2
+
+bio.cov.dir     : ${datadir}/covariances/gridded_NH/
+bio.cov.prefix  : cov_ecoregion
+
+regtype         : gridded_oif30
+nparameters     : 9835
+random.seed     : 4385
+regionsfile     : ${datadir}/covariances/gridded_NH/griddedNHparameters.nc
+random.seed.init: ${datadir}/randomseedinit.pickle
+!random.seed.init: /projects/0/ctdas/ingrid/seed_py2.pickle
+
+! Include a naming scheme for the variables
+
+#include NamingScheme.wp_Mar2011.rc
+
diff --git a/da/statevectors/statevectorNHgridded_cteco2.py b/da/statevectors/statevectorNHgridded_cteco2.py
index f2b4844a..07becde2 100755
--- a/da/statevectors/statevectorNHgridded_cteco2.py
+++ b/da/statevectors/statevectorNHgridded_cteco2.py
@@ -89,11 +89,11 @@ class CO2GriddedStateVector(StateVector):
 
             if 'pco2' in file or 'cov_ocean' in file: 
                 parnr = list(range(9805,9835))
-                if 'ocn.optimize' in dacycle.dasystem and dacycle.dasystem['ocn.optimize']:
-                    cov = f.get_variable('CORMAT')
-                else:
-                    cov = np.identity(30)*1E-20
+                if 'ocn.optimize' in dacycle.dasystem and dacycle.dasystem['ocn.optimize'] == False:
+                    cov = np.identity(30)*1E-20                                                                                  |                      cov = np.identity(30)*1E-20
                     logging.debug('Prior ocean covariance matrix = Identity matrix')
+                else:
+                     cov = f.get_variable('CORMAT')                    
             elif 'tropic' in file:
                 cov = f.get_variable('covariance')
                 parnr = f.get_variable('parameternumber')
diff --git a/templates/template.jb b/templates/template.jb
index c3d47789..2638fc18 100755
--- a/templates/template.jb
+++ b/templates/template.jb
@@ -1,22 +1,14 @@
 #! /bin/env bash
-#SBATCH -p short
+#SBATCH -p all
 #SBATCH -t 01:00:00 
-#SBATCH -n 48
+#SBATCH -n 56
 
 echo "All output piped to file template.log"
-export HOST='cartesius'
+export HOST='aether'
 export icycle_in_job=999
 
-.  /projects/0/tm5meteo/admin/bash_profile__2019
+module load TM5-MP
+module load NCO/4.6.6-intel-2017a
+module load netcdf4-python/1.2.9-intel-2017a-Python-3.6.1
 
-module load Python/3.6.6-foss-2018b
-module load pre2019
-module load hdf5/impi/intel/1.8.9
-module load hdf4/intel/4.2.9
-module load netcdf/impi/intel/4.1.3
-module load szip/intel/2.1
-module load fortran
-module load mpi
-module load nco
-
-python3 template.py rc=template.rc -v $1 >& template.log 
+python template.py rc=template.rc -v $1 >& template.log 
diff --git a/templates/template.py b/templates/template.py
index 0a88b2a7..9abfcd91 100755
--- a/templates/template.py
+++ b/templates/template.py
@@ -28,7 +28,7 @@ sys.path.append(os.getcwd())
 from da.cyclecontrol.initexit_cteco2 import start_logger, validate_opts_args, parse_options, CycleControl
 from da.pipelines.pipeline_cteco2 import ensemble_smoother_pipeline, header, footer, analysis_pipeline, archive_pipeline
 from da.dasystems.dasystem_cteco2 import CO2DaSystem
-from da.platform.cartesius import CartesiusPlatform
+from da.platform.aether import AetherPlatform
 from da.statevectors.statevectorNHgridded_cteco2 import CO2GriddedStateVector 
 from da.observations.obs_gvp_co2 import ObsPackObservations
 from da.obsoperators.observationoperator_tm5_cteco2 import TM5ObservationOperator 
@@ -49,7 +49,7 @@ opts, args = validate_opts_args(opts, args)
 
 dacycle = CycleControl(opts, args)
 
-platform    = CartesiusPlatform()
+platform    = AetherPlatform()
 dasystem    = CO2DaSystem(dacycle['da.system.rc'])
 obsoperator = TM5ObservationOperator(dacycle['da.obsoperator.rc'])
 samples     = [ObsPackObservations()]
diff --git a/templates/template.rc b/templates/template.rc
index 54ac71e7..ab70b570 100644
--- a/templates/template.rc
+++ b/templates/template.rc
@@ -28,8 +28,8 @@
 !
 ! The time for which to start and end the data assimilation experiment in format YYYY-MM-DD HH:MM:SS
 
-time.start          : 2014-12-27 00:00:00
-time.finish         : 2016-02-01 00:00:00
+time.start          : 2000-01-01 00:00:00
+time.finish         : 2021-02-01 00:00:00
 
 ! Whether to restart the CTDAS system from a previous cycle, or to start the sequence fresh. Valid entries are T/F/True/False/TRUE/FALSE
 
@@ -37,11 +37,11 @@ time.restart        : False
 
 ! The length of a cycle is given in days, such that the integer 7 denotes the typically used weekly cycle. Valid entries are integers > 1
 
-time.cycle          : 2
+time.cycle          : 7
 
 ! The number of cycles of lag to use for a smoother version of CTDAS. CarbonTracker CO2 typically uses 5 weeks of lag. Valid entries are integers > 0
 
-time.nlag           : 2
+time.nlag           : 5
 
 ! The directory under which the code, input, and output will be stored. This is the base directory for a run. The word
 ! '/' will be replaced through the start_ctdas.sh script by a user-specified folder name. DO NOT REPLACE
@@ -53,19 +53,19 @@ dir.da_run          : template
 ! allows you to complete many cycles before resubmitting a job to the queue and having to wait again for resources.
 ! Valid entries are integers > 0
 
-da.resources.ncycles_per_job : 1
+da.resources.ncycles_per_job : 90
 
 ! The ntasks specifies the number of threads to use for the MPI part of the code, if relevant. Note that the CTDAS code
 ! itself is not parallelized and the python code underlying CTDAS does not use multiple processors. The chosen observation
 ! operator though might use many processors, like TM5. Valid entries are integers > 0
 
-da.resources.ntasks : 48
+da.resources.ntasks : 56 
 
 ! This specifies the amount of wall-clock time to request for each job. Its value depends on your computing platform and might take
 ! any form appropriate for your system. Typically, HPC queueing systems allow you a certain number of hours of usage before 
 ! your job is killed, and you are expected to finalize and submit a next job before that time. Valid entries are strings.
 
-da.resources.ntime  : 5:00:00
+da.resources.ntime  : 168:00:00
 
 ! The resource settings above will cause the creation of a job file in which 2 cycles will be run, and 30 threads 
 ! are asked for a duration of 4 hours
@@ -77,7 +77,7 @@ da.system           : CarbonTracker
 
 ! The specific settings for your system are read from a separate rc-file, which points to the data directories, observations, etc
 
-da.system.rc        : da/rc/carbontracker_gcp2020_insitu_OCO2.rc
+da.system.rc        : da/rc/cteco2/carbontracker_gcp2021_insitu.rc
 
 ! This flag should probably be moved to the da.system.rc file. It denotes which type of filtering to use in the optimizer
 
@@ -93,7 +93,7 @@ da.obsoperator         : TM5
 !
 
 da.obsoperator.home    : ${HOME}/TM5
-da.obsoperator.rc      : ${da.obsoperator.home}/tm5-test_py3.rc
+da.obsoperator.rc      : ${da.obsoperator.home}/tm5-cte2021-era5-sib4.rc
 
 !
 ! The number of ensemble members used in the experiment. Valid entries are integers > 2
-- 
GitLab