From c62b1397f7749420eeea682825d0b31a2d9bdba2 Mon Sep 17 00:00:00 2001
From: Wouter Peters <wouter.peters@wur.nl>
Date: Wed, 9 May 2012 11:20:16 +0000
Subject: [PATCH] changes to the structure of sampling, and how Obs are
 accummulated in the system

---
 da/tools/pipeline.py | 81 +++++++++++++++++++++++++-------------------
 1 file changed, 46 insertions(+), 35 deletions(-)

diff --git a/da/tools/pipeline.py b/da/tools/pipeline.py
index c7611f1c..11dc1ff7 100755
--- a/da/tools/pipeline.py
+++ b/da/tools/pipeline.py
@@ -144,7 +144,29 @@ def SampleState(DaCycle,Samples,StateVector, ObservationOperator):
 
         msg     = header+".....Ensemble Kalman Filter at lag %d"% (int(lag)+1,)           ; logging.info(msg) 
 
-        status   = SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag)
+    # If we are going to sample the lag = 0 time step, place the needed files to run the transport model in the right folder first
+
+        if lag == 0:
+            status  = ObservationOperator.GetInitialData()
+
+        ############# Perform the actual sampling loop #####################
+
+        status      = SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag)
+
+        ############# Finished the actual sampling loop #####################
+
+        msg = "StateVector now carries %d samples"%StateVector.nobs ; logging.debug(msg)
+
+        # Add the observation operator restart+output files to the general Output+RestartFileList, only after 'Advance'
+        # Same logic for observation files
+
+        if lag == 0:
+            DaCycle.RestartFileList.extend( ObservationOperator.RestartFileList )
+            DaCycle.OutputFileList.extend(ObservationOperator.OutputFileList )
+            msg = "Appended ObsOperator restart and output file lists to DaCycle for collection " ; logging.debug(msg)
+        
+            DaCycle.OutputFileList.extend([DaCycle['ObsOperator.inputfile']])
+            msg = "Appended Observation filename to DaCycle for collection " ; logging.debug(msg)
 
 
     # Optionally, post-processing of the model output can be added that deals for instance with
@@ -159,10 +181,7 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag):
     # First set up the information for time start and time end of this sample
 
 
-    if lag == 0:
-        status       = ObservationOperator.GetInitialData()
-
-    status           = DaCycle.SetSampleTimes(lag)
+    status                          = DaCycle.SetSampleTimes(lag)
 
     startdate                       = DaCycle['time.sample.start'] 
     enddate                         = DaCycle['time.sample.end'] 
@@ -196,52 +215,40 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag):
 
     status   = RunForecastModel(DaCycle,ObservationOperator)
 
-    # Add the observation operator restart+output files to the general Output+RestartFileList, only after 'Advance'
-    # Same logic for observation files
-
-    if lag == 0:
-        DaCycle.RestartFileList.extend( ObservationOperator.RestartFileList )
-        DaCycle.OutputFileList.extend(ObservationOperator.OutputFileList )
-    	msg = "Appended ObsOperator restart and output file lists to DaCycle for collection " ; logging.debug(msg)
-    
-        DaCycle.OutputFileList.extend([DaCycle['ObsOperator.inputfile']])
-        msg = "Appended Observation filename to DaCycle for collection " ; logging.debug(msg)
-
     # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
 
     status   = Samples.AddModelDataMismatch()
 
     msg = "Added Model Data Mismatch to all samples " ; logging.debug(msg)
 
-    # Read forecast model samples that were written to NetCDF files by each member, also add the obs to the statevector
-    # Note that obs will only be added to the statevector is either this is the first step (restart=False), or lag==nlag
-
-    if lag == int(DaCycle['time.nlag'])-1 or DaCycle['time.restart']==False :
+    # Read forecast model samples that were written to NetCDF files by each member. Add them to the exisiting
+    # Observation object for each sample loop. This data fill be written to file in the output folder for each sample cycle. 
+    
 
-        # We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates
-        # one file per member, some logic needs to be included to merge all files!!!
+    # We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates
+    # one file per member, some logic needs to be included to merge all files!!!
          
-        filename               = os.path.join(ObservationOperator.outputdir,'flask_output.%s.nc'%DaCycle['time.sample.stamp'])
-        status                  = Samples.AddSimulations(filename)
+    filename        = os.path.join(ObservationOperator.outputdir,'flask_output.%s.nc'%DaCycle['time.sample.stamp'])
+    status          = Samples.AddSimulations(filename)
 
-        # Give each member a model sample by first copying all samples, and then selecting the data for Member #n
+    # Now write a small file that holds for each observation a number of values that we need in postprocessing
 
-        members = StateVector.EnsembleMembers[lag]
-        for n,Member in enumerate(members):
-            
-            Member.ModelSample     = copy.deepcopy(Samples)
+    filename        = Samples.WriteSampleInfo() 
 
-            # Now subselect the data needed for member #n
+    # Now add the observations that need to be assimilated to the StateVector. 
+    # Note that obs will only be added to the statevector if either this is the first step (restart=False), or lag==nlag
+    # This is to make sure that the first step of the system uses all observations available, while the subsequent
+    # steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only 
+    # (and at least) once # in the assimilation
 
-            for Sim in Member.ModelSample.Data:
-                Sim.simulated = Sim.simulated[n]
+    if lag == int(DaCycle['time.nlag'])-1 or DaCycle['time.restart']==False :
 
+        StateVector.ObsToAssimmilate += (copy.deepcopy(Samples),)
 
-        StateVector.nobs       += Samples.getlength()
+        StateVector.nobs += Samples.getlength()
 
-        msg = "Added samples from the observation operator to each member of the StateVector" ; logging.debug(msg)
+        msg = "Added samples from the observation operator to the assimilted obs list in the StateVector" ; logging.debug(msg)
 
-    msg = "StateVector now carries %d samples"%StateVector.nobs ; logging.debug(msg)
 
     return None
 
@@ -292,6 +299,10 @@ def Advance( DaCycle, Samples, StateVector, ObservationOperator):
 
     SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,0)
 
+    # Write info from optimized flux cycle to a file
+
+    filename = Samples.WriteObsToFile()
+
     return None
 
 def SaveAndSubmit( DaCycle, StateVector):
-- 
GitLab