From ad607933b8b51ca79afa0e766a331f4977a2ae67 Mon Sep 17 00:00:00 2001
From: Ingrid Luijx <ingrid.vanderlaan@wur.nl>
Date: Mon, 8 Apr 2013 09:53:34 +0000
Subject: [PATCH] Changed R and HPHR matrices to vectors for serial minimum
 least squares algorithm in optimizer to save memory (the routines needed only
 vectors). For the bulk algorithm this is not possible, so nothing was changed
 for that option.

---
 da/baseclasses/optimizer.py | 51 ++++++++++++++++++++++++-------------
 da/ct/optimizer.py          |  9 +++++++
 da/tools/pipeline.py        | 38 ++++++++++++++-------------
 3 files changed, 63 insertions(+), 35 deletions(-)

diff --git a/da/baseclasses/optimizer.py b/da/baseclasses/optimizer.py
index 9c55734..725116d 100755
--- a/da/baseclasses/optimizer.py
+++ b/da/baseclasses/optimizer.py
@@ -34,7 +34,6 @@ class Optimizer(object):
 
         logging.info('Optimizer object initialized: %s' % self.Identifier)
 
-
     def Initialize(self, dims):
         self.nlag = dims[0]
         self.nmembers = dims[1]
@@ -59,7 +58,13 @@ class Optimizer(object):
         # observation ids
         self.obs_ids = np.zeros((self.nobs,), float)
         # covariance of observations
-        self.R = np.zeros((self.nobs, self.nobs,), float)
+        # Total covariance of fluxes and obs in units of obs [H P H^t + R]
+        if self.algorithm == 'Serial':
+            self.R = np.zeros((self.nobs,), float)
+            self.HPHR = np.zeros((self.nobs,), float)
+        else:
+            self.R = np.zeros((self.nobs,self.nobs,), float)
+            self.HPHR = np.zeros((self.nobs, self.nobs,), float)
         # localization of obs
         self.may_localize = np.zeros(self.nobs, bool)
         # rejection of obs
@@ -74,8 +79,6 @@ class Optimizer(object):
         # species mask
         self.speciesmask = {}
 
-        # Total covariance of fluxes and obs in units of obs [H P H^t + R]
-        self.HPHR = np.zeros((self.nobs, self.nobs,), float)
         # Kalman Gain matrix
         self.KG = np.zeros((self.nlag * self.nparams, self.nobs,), float)
 
@@ -131,9 +134,13 @@ class Optimizer(object):
         self.X_prime = self.X_prime - self.x[:, np.newaxis] # make into a deviation matrix
         self.HX_prime = self.HX_prime - self.Hx[:, np.newaxis] # make a deviation matrix
 
-        for i, mdm in enumerate(allmdm):
-            self.R[i, i] = mdm ** 2
-
+        if self.algorithm == 'Serial':
+            for i, mdm in enumerate(allmdm):
+                self.R[i] = mdm ** 2
+        else:
+            for i, mdm in enumerate(allmdm):
+                self.R[i,i] = mdm ** 2
+                    
     def matrix_to_state(self, StateVector):
         for n in range(self.nlag):
             members = StateVector.EnsembleMembers[n]
@@ -256,7 +263,9 @@ class Optimizer(object):
             savedict['name'] = "modeldatamismatchvariance"
             savedict['long_name'] = "modeldatamismatch variance"
             savedict['units'] = "[mol mol-1]^2"
-            savedict['dims'] = dimobs + dimobs
+            if self.algorithm == 'Serial':
+                savedict['dims'] = dimobs
+            else: savedict['dims'] = dimobs + dimobs
             savedict['values'] = self.R.tolist()
             savedict['comment'] = 'Variance of mole fractions resulting from model-data mismatch'
             f.AddData(savedict)
@@ -269,7 +278,9 @@ class Optimizer(object):
             savedict['name'] = "totalmolefractionvariance"
             savedict['long_name'] = "totalmolefractionvariance"
             savedict['units'] = "[mol mol-1]^2"
-            savedict['dims'] = dimobs + dimobs
+            if self.algorithm == 'Serial':
+                savedict['dims'] = dimobs
+            else: savedict['dims'] = dimobs + dimobs
             savedict['values'] = self.HPHR.tolist()
             savedict['comment'] = 'Variance of mole fractions resulting from prior state and model-data mismatch'
             f.AddData(savedict)
@@ -313,24 +324,24 @@ class Optimizer(object):
             res = self.obs[n] - self.Hx[n]
 
             if self.may_reject[n]:
-                threshold = self.rejection_threshold * np.sqrt(self.R[n, n])
+                threshold = self.rejection_threshold * np.sqrt(self.R[n])
                 if np.abs(res) > threshold:
                     logging.debug('Rejecting observation (%s,%i) because residual (%f) exceeds threshold (%f)' % (self.sitecode[n], self.obs_ids[n], res, threshold))
                     self.flags[n] = 2
                     continue
 
             PHt = 1. / (self.nmembers - 1) * np.dot(self.X_prime, self.HX_prime[n, :])
-            self.HPHR[n, n] = 1. / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[n, :]).sum() + self.R[n, n]
+            self.HPHR[n] = 1. / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[n, :]).sum() + self.R[n]
 
-            self.KG[:, n] = PHt / self.HPHR[n, n]
+            self.KG[:, n] = PHt / self.HPHR[n]
 
             if self.may_localize[n]:
-                logging.debug('Trying to localize observation %i' % self.obs_ids[n])
+                logging.debug('Trying to localize observation %s, %i' % (self.sitecode[n],self.obs_ids[n]))
                 self.localize(n)
             else:
-                logging.debug('Not allowed to localize observation %i' % self.obs_ids[n])
+                logging.debug('Not allowed to localize observation %s, %i' % (self.sitecode[n],self.obs_ids[n]))
 
-            alpha = np.double(1.0) / (np.double(1.0) + np.sqrt((self.R[n, n]) / self.HPHR[n, n]))
+            alpha = np.double(1.0) / (np.double(1.0) + np.sqrt((self.R[n]) / self.HPHR[n]))
 
             self.x[:] = self.x + self.KG[:, n] * res
 
@@ -342,13 +353,13 @@ class Optimizer(object):
 
             for m in range(n + 1, self.nobs):
                 res = self.obs[n] - self.Hx[n]
-                fac = 1.0 / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[m, :]).sum() / self.HPHR[n, n]
+                fac = 1.0 / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[m, :]).sum() / self.HPHR[n]
                 self.Hx[m] = self.Hx[m] + fac * res
                 self.HX_prime[m, :] = self.HX_prime[m, :] - alpha * fac * self.HX_prime[n, :]
 
             for m in range(1, n + 1):
                 res = self.obs[n] - self.Hx[n]
-                fac = 1.0 / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[m, :]).sum() / self.HPHR[n, n]
+                fac = 1.0 / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[m, :]).sum() / self.HPHR[n]
                 self.Hx[m] = self.Hx[m] + fac * res
                 self.HX_prime[m, :] = self.HX_prime[m, :] - alpha * fac * self.HX_prime[n, :]
 
@@ -400,8 +411,12 @@ class Optimizer(object):
 
     def localize(self, n):
         """ localize the Kalman Gain matrix """
-        logging.debug('Not localized observation %i' % self.obs_ids[n])
+        logging.debug('Not localized observation %s, %i' % (self.sitecode[n],self.obs_ids[n]))
         pass
+    
+    def set_algorithm(self):
+        self.algorithm = 'Serial'
+        logging.info("Current algorithm is set to %s in baseclass optimizer" % self.algorithm)
 
 ################### End Class Optimizer ###################
 
diff --git a/da/ct/optimizer.py b/da/ct/optimizer.py
index 82e8cf3..3bc584b 100755
--- a/da/ct/optimizer.py
+++ b/da/ct/optimizer.py
@@ -73,6 +73,15 @@ class CtOptimizer(Optimizer):
                     count_localized = count_localized + 1
             logging.debug('Localized observation %i, %i%% of values set to 0' % (self.obs_ids[n],count_localized*100/(self.nlag * self.nparams)))
 
+    def set_algorithm(self, algorithm='Serial'):
+        """ determine which minimum least squares algorithm to use """
+
+        if algorithm == 'Serial':
+            self.algorithm = 'Serial'
+        else:
+            self.algorithm = 'Bulk'
+    
+        logging.info("Current minimum least squares algorithm is set to %s" % self.algorithm)
 
 ################### End Class CtOptimizer ###################
 
diff --git a/da/tools/pipeline.py b/da/tools/pipeline.py
index d711aaf..1e4ea14 100755
--- a/da/tools/pipeline.py
+++ b/da/tools/pipeline.py
@@ -48,9 +48,11 @@ def ForwardPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperat
     StateVector.Initialize()                            #LU w prepare state inicjalizujemy wektor stanu ktoremu dopiero co przypisalismy wartosci.
 
     # Read from other simulation and write priors, then read posteriors and propagate
-
-    filename = os.path.join(DaCycle['dir.forward.savestate'],DaCycle['time.start'].strftime('%Y%m%d'), 'savestate.nc')                #LU teraz czytamy savestate.nc
-    StateVector.ReadFromLegacyFile(filename,'prior') 
+    
+    if DaCycle['dir.forward.savestate'] != 'None':
+        filename = os.path.join(DaCycle['dir.forward.savestate'],DaCycle['time.start'].strftime('%Y%m%d'), 'savestate.nc')                #LU teraz czytamy savestate.nc
+        StateVector.ReadFromLegacyFile(filename,'prior') 
+    else: prepare_state(DaCycle, StateVector)
 
     StateVector.isOptimized = False
 
@@ -61,8 +63,10 @@ def ForwardPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperat
     StateVector.WriteToFile(savefilename)
 
     # Now read optimized fluxes to propagate
-
-    StateVector.ReadFromLegacyFile(filename) # by default will read "opt"(imized) variables, and then propagate
+    
+    if DaCycle['dir.forward.savestate'] != 'None':
+        StateVector.ReadFromLegacyFile(filename) # by default will read "opt"(imized) variables, and then propagate
+    else: StateVector.ReadFromFile(savefilename,'prior')
 
     StateVector.isOptimized = True
 
@@ -115,7 +119,6 @@ def prepare_state(DaCycle, StateVector):
 
         # Read the StateVector data from file
 
-        
         filename = os.path.join(DaCycle['dir.restart.current'], 'savestate.nc')                #LU teraz czytamy savestate.nc
 
         StateVector.ReadFromFile(filename) # by default will read "opt"(imized) variables, and then propagate
@@ -126,9 +129,6 @@ def prepare_state(DaCycle, StateVector):
 
     # Finally, also write the StateVector to a file so that we can always access the a-priori information
 
-    filename = os.path.join(DaCycle['dir.forward.savestate'],DaCycle['time.start'].strftime('%Y%m%d'), 'savestate.nc')                #LU teraz czytamy savestate.nc
-    StateVector.ReadFromLegacyFile(filename,'prior') 
-    
     filename = os.path.join(DaCycle['dir.output'], 'savestate.nc')
 
     StateVector.WriteToFile(filename)  # write prior info because StateVector.Isoptimized == False for now
@@ -243,22 +243,26 @@ def invert(DaCycle, StateVector, Optimizer):
                   int(DaCycle.DaSystem['nparameters']),
                   StateVector.nobs)
 
-    Optimizer.Initialize(dims)
-    Optimizer.state_to_matrix(StateVector)
-    Optimizer.write_diagnostics(DaCycle, StateVector, type='prior')
-    Optimizer.set_localization('CT2007')
-
     if not DaCycle.DaSystem.has_key('opt.algorithm'):
         logging.info("There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)") 
         logging.info("...using serial algorithm as default...")
-        Optimizer.serial_minimum_least_squares()
+        Optimizer.set_algorithm('Serial')
     elif DaCycle.DaSystem['opt.algorithm'] == 'serial':
         logging.info("Using the serial minimum least squares algorithm to solve ENKF equations")
-        Optimizer.serial_minimum_least_squares()
+        Optimizer.set_algorithm('Serial')
     elif DaCycle.DaSystem['opt.algorithm'] == 'bulk':
         logging.info("Using the bulk minimum least squares algorithm to solve ENKF equations")
+        Optimizer.set_algorithm('Bulk')
+    
+    Optimizer.Initialize(dims)
+    Optimizer.state_to_matrix(StateVector)
+    Optimizer.write_diagnostics(DaCycle, StateVector, type='prior')
+    Optimizer.set_localization('None')
+    
+    if Optimizer.algorithm == 'Serial':
+        Optimizer.serial_minimum_least_squares()
+    else: 
         Optimizer.bulk_minimum_least_squares()
-
     
     Optimizer.matrix_to_state(StateVector)
     Optimizer.write_diagnostics(DaCycle, StateVector, type='optimized')
-- 
GitLab