Commit ad607933 authored by Ingrid Luijkx's avatar Ingrid Luijkx
Browse files

Changed R and HPHR matrices to vectors for serial minimum least squares...

Changed R and HPHR matrices to vectors for serial minimum least squares algorithm in optimizer to save memory (the routines needed only vectors). For the bulk algorithm this is not possible, so nothing was changed for that option.
parent 02129e61
......@@ -34,7 +34,6 @@ class Optimizer(object):
logging.info('Optimizer object initialized: %s' % self.Identifier)
def Initialize(self, dims):
self.nlag = dims[0]
self.nmembers = dims[1]
......@@ -59,7 +58,13 @@ class Optimizer(object):
# observation ids
self.obs_ids = np.zeros((self.nobs,), float)
# covariance of observations
self.R = np.zeros((self.nobs, self.nobs,), float)
# Total covariance of fluxes and obs in units of obs [H P H^t + R]
if self.algorithm == 'Serial':
self.R = np.zeros((self.nobs,), float)
self.HPHR = np.zeros((self.nobs,), float)
else:
self.R = np.zeros((self.nobs,self.nobs,), float)
self.HPHR = np.zeros((self.nobs, self.nobs,), float)
# localization of obs
self.may_localize = np.zeros(self.nobs, bool)
# rejection of obs
......@@ -74,8 +79,6 @@ class Optimizer(object):
# species mask
self.speciesmask = {}
# Total covariance of fluxes and obs in units of obs [H P H^t + R]
self.HPHR = np.zeros((self.nobs, self.nobs,), float)
# Kalman Gain matrix
self.KG = np.zeros((self.nlag * self.nparams, self.nobs,), float)
......@@ -131,9 +134,13 @@ class Optimizer(object):
self.X_prime = self.X_prime - self.x[:, np.newaxis] # make into a deviation matrix
self.HX_prime = self.HX_prime - self.Hx[:, np.newaxis] # make a deviation matrix
for i, mdm in enumerate(allmdm):
self.R[i, i] = mdm ** 2
if self.algorithm == 'Serial':
for i, mdm in enumerate(allmdm):
self.R[i] = mdm ** 2
else:
for i, mdm in enumerate(allmdm):
self.R[i,i] = mdm ** 2
def matrix_to_state(self, StateVector):
for n in range(self.nlag):
members = StateVector.EnsembleMembers[n]
......@@ -256,7 +263,9 @@ class Optimizer(object):
savedict['name'] = "modeldatamismatchvariance"
savedict['long_name'] = "modeldatamismatch variance"
savedict['units'] = "[mol mol-1]^2"
savedict['dims'] = dimobs + dimobs
if self.algorithm == 'Serial':
savedict['dims'] = dimobs
else: savedict['dims'] = dimobs + dimobs
savedict['values'] = self.R.tolist()
savedict['comment'] = 'Variance of mole fractions resulting from model-data mismatch'
f.AddData(savedict)
......@@ -269,7 +278,9 @@ class Optimizer(object):
savedict['name'] = "totalmolefractionvariance"
savedict['long_name'] = "totalmolefractionvariance"
savedict['units'] = "[mol mol-1]^2"
savedict['dims'] = dimobs + dimobs
if self.algorithm == 'Serial':
savedict['dims'] = dimobs
else: savedict['dims'] = dimobs + dimobs
savedict['values'] = self.HPHR.tolist()
savedict['comment'] = 'Variance of mole fractions resulting from prior state and model-data mismatch'
f.AddData(savedict)
......@@ -313,24 +324,24 @@ class Optimizer(object):
res = self.obs[n] - self.Hx[n]
if self.may_reject[n]:
threshold = self.rejection_threshold * np.sqrt(self.R[n, n])
threshold = self.rejection_threshold * np.sqrt(self.R[n])
if np.abs(res) > threshold:
logging.debug('Rejecting observation (%s,%i) because residual (%f) exceeds threshold (%f)' % (self.sitecode[n], self.obs_ids[n], res, threshold))
self.flags[n] = 2
continue
PHt = 1. / (self.nmembers - 1) * np.dot(self.X_prime, self.HX_prime[n, :])
self.HPHR[n, n] = 1. / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[n, :]).sum() + self.R[n, n]
self.HPHR[n] = 1. / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[n, :]).sum() + self.R[n]
self.KG[:, n] = PHt / self.HPHR[n, n]
self.KG[:, n] = PHt / self.HPHR[n]
if self.may_localize[n]:
logging.debug('Trying to localize observation %i' % self.obs_ids[n])
logging.debug('Trying to localize observation %s, %i' % (self.sitecode[n],self.obs_ids[n]))
self.localize(n)
else:
logging.debug('Not allowed to localize observation %i' % self.obs_ids[n])
logging.debug('Not allowed to localize observation %s, %i' % (self.sitecode[n],self.obs_ids[n]))
alpha = np.double(1.0) / (np.double(1.0) + np.sqrt((self.R[n, n]) / self.HPHR[n, n]))
alpha = np.double(1.0) / (np.double(1.0) + np.sqrt((self.R[n]) / self.HPHR[n]))
self.x[:] = self.x + self.KG[:, n] * res
......@@ -342,13 +353,13 @@ class Optimizer(object):
for m in range(n + 1, self.nobs):
res = self.obs[n] - self.Hx[n]
fac = 1.0 / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[m, :]).sum() / self.HPHR[n, n]
fac = 1.0 / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[m, :]).sum() / self.HPHR[n]
self.Hx[m] = self.Hx[m] + fac * res
self.HX_prime[m, :] = self.HX_prime[m, :] - alpha * fac * self.HX_prime[n, :]
for m in range(1, n + 1):
res = self.obs[n] - self.Hx[n]
fac = 1.0 / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[m, :]).sum() / self.HPHR[n, n]
fac = 1.0 / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[m, :]).sum() / self.HPHR[n]
self.Hx[m] = self.Hx[m] + fac * res
self.HX_prime[m, :] = self.HX_prime[m, :] - alpha * fac * self.HX_prime[n, :]
......@@ -400,8 +411,12 @@ class Optimizer(object):
def localize(self, n):
""" localize the Kalman Gain matrix """
logging.debug('Not localized observation %i' % self.obs_ids[n])
logging.debug('Not localized observation %s, %i' % (self.sitecode[n],self.obs_ids[n]))
pass
def set_algorithm(self):
self.algorithm = 'Serial'
logging.info("Current algorithm is set to %s in baseclass optimizer" % self.algorithm)
################### End Class Optimizer ###################
......
......@@ -73,6 +73,15 @@ class CtOptimizer(Optimizer):
count_localized = count_localized + 1
logging.debug('Localized observation %i, %i%% of values set to 0' % (self.obs_ids[n],count_localized*100/(self.nlag * self.nparams)))
def set_algorithm(self, algorithm='Serial'):
""" determine which minimum least squares algorithm to use """
if algorithm == 'Serial':
self.algorithm = 'Serial'
else:
self.algorithm = 'Bulk'
logging.info("Current minimum least squares algorithm is set to %s" % self.algorithm)
################### End Class CtOptimizer ###################
......
......@@ -48,9 +48,11 @@ def ForwardPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperat
StateVector.Initialize() #LU w prepare state inicjalizujemy wektor stanu ktoremu dopiero co przypisalismy wartosci.
# Read from other simulation and write priors, then read posteriors and propagate
filename = os.path.join(DaCycle['dir.forward.savestate'],DaCycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') #LU teraz czytamy savestate.nc
StateVector.ReadFromLegacyFile(filename,'prior')
if DaCycle['dir.forward.savestate'] != 'None':
filename = os.path.join(DaCycle['dir.forward.savestate'],DaCycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') #LU teraz czytamy savestate.nc
StateVector.ReadFromLegacyFile(filename,'prior')
else: prepare_state(DaCycle, StateVector)
StateVector.isOptimized = False
......@@ -61,8 +63,10 @@ def ForwardPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperat
StateVector.WriteToFile(savefilename)
# Now read optimized fluxes to propagate
StateVector.ReadFromLegacyFile(filename) # by default will read "opt"(imized) variables, and then propagate
if DaCycle['dir.forward.savestate'] != 'None':
StateVector.ReadFromLegacyFile(filename) # by default will read "opt"(imized) variables, and then propagate
else: StateVector.ReadFromFile(savefilename,'prior')
StateVector.isOptimized = True
......@@ -115,7 +119,6 @@ def prepare_state(DaCycle, StateVector):
# Read the StateVector data from file
filename = os.path.join(DaCycle['dir.restart.current'], 'savestate.nc') #LU teraz czytamy savestate.nc
StateVector.ReadFromFile(filename) # by default will read "opt"(imized) variables, and then propagate
......@@ -126,9 +129,6 @@ def prepare_state(DaCycle, StateVector):
# Finally, also write the StateVector to a file so that we can always access the a-priori information
filename = os.path.join(DaCycle['dir.forward.savestate'],DaCycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') #LU teraz czytamy savestate.nc
StateVector.ReadFromLegacyFile(filename,'prior')
filename = os.path.join(DaCycle['dir.output'], 'savestate.nc')
StateVector.WriteToFile(filename) # write prior info because StateVector.Isoptimized == False for now
......@@ -243,22 +243,26 @@ def invert(DaCycle, StateVector, Optimizer):
int(DaCycle.DaSystem['nparameters']),
StateVector.nobs)
Optimizer.Initialize(dims)
Optimizer.state_to_matrix(StateVector)
Optimizer.write_diagnostics(DaCycle, StateVector, type='prior')
Optimizer.set_localization('CT2007')
if not DaCycle.DaSystem.has_key('opt.algorithm'):
logging.info("There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)")
logging.info("...using serial algorithm as default...")
Optimizer.serial_minimum_least_squares()
Optimizer.set_algorithm('Serial')
elif DaCycle.DaSystem['opt.algorithm'] == 'serial':
logging.info("Using the serial minimum least squares algorithm to solve ENKF equations")
Optimizer.serial_minimum_least_squares()
Optimizer.set_algorithm('Serial')
elif DaCycle.DaSystem['opt.algorithm'] == 'bulk':
logging.info("Using the bulk minimum least squares algorithm to solve ENKF equations")
Optimizer.set_algorithm('Bulk')
Optimizer.Initialize(dims)
Optimizer.state_to_matrix(StateVector)
Optimizer.write_diagnostics(DaCycle, StateVector, type='prior')
Optimizer.set_localization('None')
if Optimizer.algorithm == 'Serial':
Optimizer.serial_minimum_least_squares()
else:
Optimizer.bulk_minimum_least_squares()
Optimizer.matrix_to_state(StateVector)
Optimizer.write_diagnostics(DaCycle, StateVector, type='optimized')
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment