diff --git a/da/baseclasses/optimizer.py b/da/baseclasses/optimizer.py index 82e67089f95071cb5a19111cfe4c2b34e0714799..267bbb4f49591cffeb314d14a5121180bbf12fc4 100755 --- a/da/baseclasses/optimizer.py +++ b/da/baseclasses/optimizer.py @@ -1,34 +1,36 @@ -"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. +"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. Users are recommended to contact the developers (wouter.peters@wur.nl) to receive -updates of the code. See also: http://www.carbontracker.eu. +updates of the code. See also: http://www.carbontracker.eu. This program is free software: you can redistribute it and/or modify it under the -terms of the GNU General Public License as published by the Free Software Foundation, -version 3. This program is distributed in the hope that it will be useful, but -WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +terms of the GNU General Public License as published by the Free Software Foundation, +version 3. This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. -You should have received a copy of the GNU General Public License along with this +You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>.""" #!/usr/bin/env python # optimizer.py """ .. module:: optimizer -.. moduleauthor:: Wouter Peters +.. moduleauthor:: Wouter Peters Revision History: File created on 28 Jul 2010. +File updated on 16 April 2019 by liesbeth: changes to serial_minimum_least_squares """ import logging +import time import numpy as np import numpy.linalg as la import da.tools.io4 as io identifier = 'Optimizer baseclass' -version = '0.0' +version = '2.0' ################### Begin Class Optimizer ################### @@ -46,13 +48,15 @@ class Optimizer(object): logging.info('Optimizer object initialized: %s' % self.ID) + def setup(self, dims): - self.nlag = dims[0] + self.nlag = dims[0] self.nmembers = dims[1] - self.nparams = dims[2] - self.nobs = dims[3] + self.nparams = dims[2] + self.nobs = dims[3] self.create_matrices() + def create_matrices(self): """ Create Matrix space needed in optimization routine """ @@ -86,6 +90,8 @@ class Optimizer(object): self.species = np.zeros(self.nobs, str) # species type self.sitecode = np.zeros(self.nobs, str) + # rejection_threshold + self.rejection_threshold = np.zeros(self.nobs, float) # species mask self.speciesmask = {} @@ -94,51 +100,61 @@ class Optimizer(object): #self.KG = np.zeros((self.nlag * self.nparams, self.nobs,), float) self.KG = np.zeros((self.nlag * self.nparams,), float) + def state_to_matrix(self, statevector): - allsites = [] # collect all obs for n=1,..,nlag - allobs = [] # collect all obs for n=1,..,nlag - allmdm = [] # collect all mdm for n=1,..,nlag - allids = [] # collect all model samples for n=1,..,nlag - allreject = [] # collect all model samples for n=1,..,nlag - alllocalize = [] # collect all model samples for n=1,..,nlag - allflags = [] # collect all model samples for n=1,..,nlag - allspecies = [] # collect all model samples for n=1,..,nlag + allsites = [] # collect all obs for n=1,..,nlag + allobs = [] # collect all obs for n=1,..,nlag + allmdm = [] # collect all mdm for n=1,..,nlag + allids = [] # collect all model samples for n=1,..,nlag + allreject = [] # collect all model samples for n=1,..,nlag + alllocalize = [] # collect all model samples for n=1,..,nlag + allflags = [] # collect all model samples for n=1,..,nlag + allspecies = [] # collect all model samples for n=1,..,nlag allsimulated = [] # collect all members model samples for n=1,..,nlag + allrej_thres = [] # collect all rejection_thresholds, will be the same for all samples of same source for n in range(self.nlag): + samples = statevector.obs_to_assimilate[n] members = statevector.ensemble_members[n] - self.x[n * self.nparams:(n + 1) * self.nparams] = members[0].param_values + self.x[n * self.nparams:(n + 1) * self.nparams] = members[0].param_values self.X_prime[n * self.nparams:(n + 1) * self.nparams, :] = np.transpose(np.array([m.param_values for m in members])) - if samples != None: - self.rejection_threshold = samples.rejection_threshold - - allreject.extend(samples.getvalues('may_reject')) - alllocalize.extend(samples.getvalues('may_localize')) - allflags.extend(samples.getvalues('flag')) - allspecies.extend(samples.getvalues('species')) - allobs.extend(samples.getvalues('obs')) - allsites.extend(samples.getvalues('code')) - allmdm.extend(samples.getvalues('mdm')) - allids.extend(samples.getvalues('id')) - - simulatedensemble = samples.getvalues('simulated') - for s in range(simulatedensemble.shape[0]): + # Add observation data for all sample objects + if samples != None: + if type(samples) != list: samples = [samples] + for m in range(len(samples)): + sample = samples[m] + logging.debug('Lag %i, sample %i: rejection_threshold = %i, nobs = %i' %(n, m, sample.rejection_threshold, sample.getlength())) + + allrej_thres.extend([sample.rejection_threshold] * sample.getlength()) + allreject.extend(sample.getvalues('may_reject')) + alllocalize.extend(sample.getvalues('may_localize')) + allflags.extend(sample.getvalues('flag')) + allspecies.extend(sample.getvalues('species')) + allobs.extend(sample.getvalues('obs')) + allsites.extend(sample.getvalues('code')) + allmdm.extend(sample.getvalues('mdm')) + allids.extend(sample.getvalues('id')) + + simulatedensemble = sample.getvalues('simulated') + for s in range(simulatedensemble.shape[0]): allsimulated.append(simulatedensemble[s]) - self.obs[:] = np.array(allobs) - self.obs_ids[:] = np.array(allids) - self.HX_prime[:, :] = np.array(allsimulated) - self.Hx[:] = self.HX_prime[:, 0] + self.rejection_threshold[:] = np.array(allrej_thres) + + self.obs[:] = np.array(allobs) + self.obs_ids[:] = np.array(allids) + self.HX_prime[:, :] = np.array(allsimulated) + self.Hx[:] = self.HX_prime[:, 0] - self.may_reject[:] = np.array(allreject) + self.may_reject[:] = np.array(allreject) self.may_localize[:] = np.array(alllocalize) - self.flags[:] = np.array(allflags) - self.species[:] = np.array(allspecies) - self.sitecode = allsites + self.flags[:] = np.array(allflags) + self.species[:] = np.array(allspecies) + self.sitecode = allsites - self.X_prime = self.X_prime - self.x[:, np.newaxis] # make into a deviation matrix + self.X_prime = self.X_prime - self.x[:, np.newaxis] # make into a deviation matrix self.HX_prime = self.HX_prime - self.Hx[:, np.newaxis] # make a deviation matrix if self.algorithm == 'Serial': @@ -147,15 +163,17 @@ class Optimizer(object): else: for i, mdm in enumerate(allmdm): self.R[i, i] = mdm ** 2 - + + def matrix_to_state(self, statevector): for n in range(self.nlag): members = statevector.ensemble_members[n] for m, mem in enumerate(members): - members[m].param_values[:] = self.X_prime[n * self.nparams:(n + 1) * self.nparams, m] + self.x[n * self.nparams:(n + 1) * self.nparams] + members[m].param_values[:] = self.X_prime[n * self.nparams:(n + 1) * self.nparams, m] + self.x[n * self.nparams:(n + 1) * self.nparams] logging.debug('Returning optimized data to the StateVector, setting "StateVector.isOptimized = True" ') + def write_diagnostics(self, filename, type): """ Open a NetCDF file and write diagnostic output from optimization process: @@ -172,7 +190,6 @@ class Optimizer(object): """ # Open or create file - if type == 'prior': f = io.CT_CDF(filename, method='create') logging.debug('Creating new diagnostics file for optimizer (%s)' % filename) @@ -180,118 +197,116 @@ class Optimizer(object): f = io.CT_CDF(filename, method='write') logging.debug('Opening existing diagnostics file for optimizer (%s)' % filename) - # Add dimensions - - dimparams = f.add_params_dim(self.nparams) + # Add dimensions + dimparams = f.add_params_dim(self.nparams) dimmembers = f.add_members_dim(self.nmembers) - dimlag = f.add_lag_dim(self.nlag, unlimited=False) - dimobs = f.add_obs_dim(self.nobs) - dimstate = f.add_dim('nstate', self.nparams * self.nlag) + dimlag = f.add_lag_dim(self.nlag, unlimited=False) + dimobs = f.add_obs_dim(self.nobs) + dimstate = f.add_dim('nstate', self.nparams * self.nlag) dim200char = f.add_dim('string_of200chars', 200) # Add data, first the ones that are written both before and after the optimization - - savedict = io.std_savedict.copy() - savedict['name'] = "statevectormean_%s" % type + savedict = io.std_savedict.copy() + savedict['name'] = "statevectormean_%s" % type savedict['long_name'] = "full_statevector_mean_%s" % type - savedict['units'] = "unitless" - savedict['dims'] = dimstate - savedict['values'] = self.x.tolist() - savedict['comment'] = 'Full %s state vector mean ' % type + savedict['units'] = "unitless" + savedict['dims'] = dimstate + savedict['values'] = self.x.tolist() + savedict['comment'] = 'Full %s state vector mean ' % type f.add_data(savedict) - savedict = io.std_savedict.copy() - savedict['name'] = "statevectordeviations_%s" % type + savedict = io.std_savedict.copy() + savedict['name'] = "statevectordeviations_%s" % type savedict['long_name'] = "full_statevector_deviations_%s" % type - savedict['units'] = "unitless" - savedict['dims'] = dimstate + dimmembers - savedict['values'] = self.X_prime.tolist() - savedict['comment'] = 'Full state vector %s deviations as resulting from the optimizer' % type + savedict['units'] = "unitless" + savedict['dims'] = dimstate + dimmembers + savedict['values'] = self.X_prime.tolist() + savedict['comment'] = 'Full state vector %s deviations as resulting from the optimizer' % type f.add_data(savedict) - savedict = io.std_savedict.copy() - savedict['name'] = "modelsamplesmean_%s" % type + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesmean_%s" % type savedict['long_name'] = "modelsamplesforecastmean_%s" % type - savedict['units'] = "mol mol-1" - savedict['dims'] = dimobs - savedict['values'] = self.Hx.tolist() - savedict['comment'] = '%s mean mole fractions based on %s state vector' % (type, type) + savedict['units'] = "mol mol-1" + savedict['dims'] = dimobs + savedict['values'] = self.Hx.tolist() + savedict['comment'] = '%s mean mole fractions based on %s state vector' % (type, type) f.add_data(savedict) - savedict = io.std_savedict.copy() - savedict['name'] = "modelsamplesdeviations_%s" % type + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesdeviations_%s" % type savedict['long_name'] = "modelsamplesforecastdeviations_%s" % type - savedict['units'] = "mol mol-1" - savedict['dims'] = dimobs + dimmembers - savedict['values'] = self.HX_prime.tolist() - savedict['comment'] = '%s mole fraction deviations based on %s state vector' % (type, type) + savedict['units'] = "mol mol-1" + savedict['dims'] = dimobs + dimmembers + savedict['values'] = self.HX_prime.tolist() + savedict['comment'] = '%s mole fraction deviations based on %s state vector' % (type, type) f.add_data(savedict) # Continue with prior only data if type == 'prior': - savedict = io.std_savedict.copy() - savedict['name'] = "sitecode" + savedict = io.std_savedict.copy() + savedict['name'] = "sitecode" savedict['long_name'] = "site code propagated from observation file" - savedict['dtype'] = "char" - savedict['dims'] = dimobs + dim200char - savedict['values'] = self.sitecode + savedict['dtype'] = "char" + savedict['dims'] = dimobs + dim200char + savedict['values'] = self.sitecode savedict['missing_value'] = '!' f.add_data(savedict) - savedict = io.std_savedict.copy() - savedict['name'] = "observed" + savedict = io.std_savedict.copy() + savedict['name'] = "observed" savedict['long_name'] = "observedvalues" - savedict['units'] = "mol mol-1" - savedict['dims'] = dimobs - savedict['values'] = self.obs.tolist() - savedict['comment'] = 'Observations used in optimization' + savedict['units'] = "mol mol-1" + savedict['dims'] = dimobs + savedict['values'] = self.obs.tolist() + savedict['comment'] = 'Observations used in optimization' f.add_data(savedict) - savedict = io.std_savedict.copy() - savedict['name'] = "obspack_num" - savedict['dtype'] = "int64" + savedict = io.std_savedict.copy() + savedict['name'] = "obspack_num" + savedict['dtype'] = "int64" savedict['long_name'] = "Unique_ObsPack_observation_number" - savedict['units'] = "" - savedict['dims'] = dimobs - savedict['values'] = self.obs_ids.tolist() - savedict['comment'] = 'Unique observation number across the entire ObsPack distribution' + savedict['units'] = "" + savedict['dims'] = dimobs + savedict['values'] = self.obs_ids.tolist() + savedict['comment'] = 'Unique observation number across the entire ObsPack distribution' f.add_data(savedict) - savedict = io.std_savedict.copy() - savedict['name'] = "modeldatamismatchvariance" + savedict = io.std_savedict.copy() + savedict['name'] = "modeldatamismatchvariance" savedict['long_name'] = "modeldatamismatch variance" - savedict['units'] = "[mol mol-1]^2" + savedict['units'] = "[mol mol-1]^2" if self.algorithm == 'Serial': savedict['dims'] = dimobs else: savedict['dims'] = dimobs + dimobs - savedict['values'] = self.R.tolist() - savedict['comment'] = 'Variance of mole fractions resulting from model-data mismatch' + savedict['values'] = self.R.tolist() + savedict['comment'] = 'Variance of mole fractions resulting from model-data mismatch' f.add_data(savedict) # Continue with posterior only data elif type == 'optimized': - - savedict = io.std_savedict.copy() - savedict['name'] = "totalmolefractionvariance" + + savedict = io.std_savedict.copy() + savedict['name'] = "totalmolefractionvariance" savedict['long_name'] = "totalmolefractionvariance" savedict['units'] = "[mol mol-1]^2" if self.algorithm == 'Serial': savedict['dims'] = dimobs else: savedict['dims'] = dimobs + dimobs - savedict['values'] = self.HPHR.tolist() - savedict['comment'] = 'Variance of mole fractions resulting from prior state and model-data mismatch' + savedict['values'] = self.HPHR.tolist() + savedict['comment'] = 'Variance of mole fractions resulting from prior state and model-data mismatch' f.add_data(savedict) - savedict = io.std_savedict.copy() - savedict['name'] = "flag" + savedict = io.std_savedict.copy() + savedict['name'] = "flag" savedict['long_name'] = "flag_for_obs_model" - savedict['units'] = "None" - savedict['dims'] = dimobs - savedict['values'] = self.flags.tolist() - savedict['comment'] = 'Flag (0/1/2/99) for observation value, 0 means okay, 1 means QC error, 2 means rejected, 99 means not sampled' + savedict['units'] = "None" + savedict['dims'] = dimobs + savedict['values'] = self.flags.tolist() + savedict['comment'] = 'Flag (0/1/2/99) for observation value, 0 means okay, 1 means QC error, 2 means rejected, 99 means not sampled' f.add_data(savedict) #savedict = io.std_savedict.copy() @@ -309,20 +324,25 @@ class Optimizer(object): def serial_minimum_least_squares(self): """ Make minimum least squares solution by looping over obs""" + + # calculate prior value cost function + res_prior = np.abs(self.obs-self.Hx) + select = (res_prior < 1E10).nonzero()[0] + J_prior = res_prior.take(select,axis=0)**2/self.R.take(select,axis=0) + res_prior = np.mean(res_prior) + + tic = time.clock() for n in range(self.nobs): # Screen for flagged observations (for instance site not found, or no sample written from model) - if self.flags[n] != 0: logging.debug('Skipping observation (%s,%i) because of flag value %d' % (self.sitecode[n], self.obs_ids[n], self.flags[n])) continue # Screen for outliers greather than 3x model-data mismatch, only apply if obs may be rejected - res = self.obs[n] - self.Hx[n] - if self.may_reject[n]: - threshold = self.rejection_threshold * np.sqrt(self.R[n]) + threshold = self.rejection_threshold[n] * np.sqrt(self.R[n]) if np.abs(res) > threshold: logging.debug('Rejecting observation (%s,%i) because residual (%f) exceeds threshold (%f)' % (self.sitecode[n], self.obs_ids[n], res, threshold)) self.flags[n] = 2 @@ -330,10 +350,9 @@ class Optimizer(object): logging.debug('Proceeding to assimilate observation %s, %i' % (self.sitecode[n], self.obs_ids[n])) - PHt = 1. / (self.nmembers - 1) * np.dot(self.X_prime, self.HX_prime[n, :]) - self.HPHR[n] = 1. / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[n, :]).sum() + self.R[n] - - self.KG[:] = PHt / self.HPHR[n] + PHt = 1.0 / (self.nmembers - 1) * np.dot(self.X_prime, self.HX_prime[n, :]) + self.HPHR[n] = 1.0 / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[n, :]).sum() + self.R[n] + self.KG[:] = PHt / self.HPHR[n] if self.may_localize[n]: logging.debug('Trying to localize observation %s, %i' % (self.sitecode[n], self.obs_ids[n])) @@ -341,32 +360,41 @@ class Optimizer(object): else: logging.debug('Not allowed to localize observation %s, %i' % (self.sitecode[n], self.obs_ids[n])) - alpha = np.double(1.0) / (np.double(1.0) + np.sqrt((self.R[n]) / self.HPHR[n])) - + alpha = np.double(1.0) / (np.double(1.0) + np.sqrt((self.R[n]) / self.HPHR[n])) self.x[:] = self.x + self.KG[:] * res for r in range(self.nmembers): self.X_prime[:, r] = self.X_prime[:, r] - alpha * self.KG[:] * (self.HX_prime[n, r]) + del r + + # update samples to account for update of statevector based on observation n + HXprime_n = self.HX_prime[n,:].copy() + res = self.obs[n] - self.Hx[n] + fac = 1.0 / (self.nmembers - 1) * np.sum(HXprime_n[np.newaxis,:] * self.HX_prime, axis=1) / self.HPHR[n] + self.Hx = self.Hx + fac*res + self.HX_prime = self.HX_prime - alpha* fac[:,np.newaxis]*HXprime_n + + + del n + if 'HXprime_n' in globals(): del HXprime_n + + toc = time.clock() + logging.debug('Minimum least squares update finished in %s seconds' %(toc-tic)) -#WP !!!! Very important to first do all obervations from n=1 through the end, and only then update 1,...,n. The current observation -#WP should always be updated last because it features in the loop of the adjustments !!!! + # calculate posterior value cost function + res_post = np.abs(self.obs-self.Hx) + select = (res_post < 1E10).nonzero()[0] + J_post = res_post.take(select,axis=0)**2/self.R.take(select,axis=0) + res_post = np.mean(res_post) + + + logging.info('Observation part cost function: prior = %s, posterior = %s' % (np.sum(J_prior), np.sum(J_post))) + logging.info('Mean residual: prior = %s, posterior = %s' % (res_prior, res_post)) - for m in range(n + 1, self.nobs): - res = self.obs[n] - self.Hx[n] - fac = 1.0 / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[m, :]).sum() / self.HPHR[n] - self.Hx[m] = self.Hx[m] + fac * res - self.HX_prime[m, :] = self.HX_prime[m, :] - alpha * fac * self.HX_prime[n, :] - for m in range(1, n + 1): - res = self.obs[n] - self.Hx[n] - fac = 1.0 / (self.nmembers - 1) * (self.HX_prime[n, :] * self.HX_prime[m, :]).sum() / self.HPHR[n] - self.Hx[m] = self.Hx[m] + fac * res - self.HX_prime[m, :] = self.HX_prime[m, :] - alpha * fac * self.HX_prime[n, :] - def bulk_minimum_least_squares(self): """ Make minimum least squares solution by solving matrix equations""" - # Create full solution, first calculate the mean of the posterior analysis @@ -380,8 +408,8 @@ class Optimizer(object): self.x[:] = self.x + np.dot(self.KG, self.obs - self.Hx) # xa = xp + K (y-Hx) - # And next make the updated ensemble deviations. Note that we calculate P by using the full equation (10) at once, and - # not in a serial update fashion as described in Whitaker and Hamill. + # And next make the updated ensemble deviations. Note that we calculate P by using the full equation (10) at once, and + # not in a serial update fashion as described in Whitaker and Hamill. # For the current problem with limited N_obs this is easier, or at least more straightforward to do. I = np.identity(self.nlag * self.nparams) @@ -391,7 +419,6 @@ class Optimizer(object): Kw = np.dot(part1, part2) # K~ self.X_prime[:, :] = np.dot(I, self.X_prime) - np.dot(Kw, self.HX_prime) # HX' = I - K~ * HX' - # Now do the adjustments of the modeled mole fractions using the linearized ensemble. These are not strictly needed but can be used # for diagnosis. @@ -402,20 +429,24 @@ class Optimizer(object): logging.info('Minimum Least Squares solution was calculated, returning') + def set_localization(self): """ determine which localization to use """ self.localization = True self.localizetype = "None" logging.info("Current localization option is set to %s" % self.localizetype) + def localize(self, n): """ localize the Kalman Gain matrix """ logging.debug('Not localized observation %s, %i' % (self.sitecode[n], self.obs_ids[n])) - + + def set_algorithm(self): self.algorithm = 'Serial' logging.info("Current algorithm is set to %s in baseclass optimizer" % self.algorithm) + ################### End Class Optimizer ################### diff --git a/da/baseclasses/statevector.py b/da/baseclasses/statevector.py index 94cb6e97da9ced531eda5b50bcfdcb120cf8acff..7bb72f38c48bbda903a4338e897e967b5b4cc923 100755 --- a/da/baseclasses/statevector.py +++ b/da/baseclasses/statevector.py @@ -246,7 +246,7 @@ class StateVector(object): """ - if covariancematrix == None: + if covariancematrix is None: covariancematrix = np.identity(self.nparams) # Make a cholesky decomposition of the covariance matrix @@ -411,7 +411,7 @@ class StateVector(object): logging.info('Successfully read the State Vector from file (%s) ' % filename) - def write_members_to_file(self, lag, outdir,endswith='.nc'): + def write_members_to_file(self, lag, outdir,endswith='.nc', obsoperator=None): """ :param: lag: Which lag step of the filter to write, must lie in range [1,...,nlag] :param: outdir: Directory where to write files diff --git a/da/co2gridded/statevectorNHgridded.py b/da/co2gridded/statevectorNHgridded.py old mode 100755 new mode 100644 index 24027f9031c219cc8a5cb82ca92ec4410d101e0c..67341824b02711d6c6d45b92b84e3cdf20e3013a --- a/da/co2gridded/statevectorNHgridded.py +++ b/da/co2gridded/statevectorNHgridded.py @@ -1,20 +1,20 @@ -"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. +"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. Users are recommended to contact the developers (wouter.peters@wur.nl) to receive -updates of the code. See also: http://www.carbontracker.eu. +updates of the code. See also: http://www.carbontracker.eu. This program is free software: you can redistribute it and/or modify it under the -terms of the GNU General Public License as published by the Free Software Foundation, -version 3. This program is distributed in the hope that it will be useful, but -WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +terms of the GNU General Public License as published by the Free Software Foundation, +version 3. This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. -You should have received a copy of the GNU General Public License along with this +You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>.""" #!/usr/bin/env python # ct_statevector_tools.py """ -Author : peters +Author : peters Revision History: File created on 28 Jul 2010. @@ -50,12 +50,12 @@ class CO2GriddedStateVector(StateVector): def get_covariance(self, date, dacycle): - """ Make a new ensemble from specified matrices, the attribute lag refers to the position in the state vector. + """ Make a new ensemble from specified matrices, the attribute lag refers to the position in the state vector. Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below. The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag] - """ + """ + - try: import matplotlib.pyplot as plt except: @@ -63,7 +63,7 @@ class CO2GriddedStateVector(StateVector): # Get the needed matrices from the specified covariance files - file_ocn_cov = dacycle.dasystem['ocn.covariance'] + file_ocn_cov = dacycle.dasystem['ocn.covariance'] cov_files = os.listdir(dacycle.dasystem['bio.cov.dir']) cov_files = [os.path.join(dacycle.dasystem['bio.cov.dir'], f) for f in cov_files if dacycle.dasystem['bio.cov.prefix'] in f] @@ -79,7 +79,7 @@ class CO2GriddedStateVector(StateVector): covariancematrixlist = [] for file in cov_files: if not os.path.exists(file): - msg = "Cannot find the specified file %s" % file + msg = "Cannot find the specified file %s" % file logging.error(msg) raise IOError(msg) else: @@ -87,14 +87,19 @@ class CO2GriddedStateVector(StateVector): f = io.ct_read(file, 'read') - if 'pco2' in file or 'cov_ocean' in file: - cov_ocn = f.get_variable('CORMAT') - parnr = range(9805,9835) - cov = cov_ocn + if 'pco2' in file or 'cov_ocean' in file: + parnr = list(range(9805,9835)) + if 'ocn.optimize' in dacycle.dasystem and not dacycle.dasystem['ocn.optimize']: + cov = np.identity(30)*1E-20 + logging.debug('Prior ocean covariance matrix = Identity matrix') + else: + cov_ocn = f.get_variable('CORMAT') + cov = cov_ocn + elif 'tropic' in file: cov = f.get_variable('covariance') parnr = f.get_variable('parameternumber') - else: + else: cov = f.get_variable('covariance') parnr = f.get_variable('parameternumber') cov_sf = 90.0 / np.sqrt(cov.diagonal().sum()) # this scaling factor makes the total variance close to the value of a single ecoregion @@ -110,12 +115,12 @@ class CO2GriddedStateVector(StateVector): return covariancematrixlist def make_new_ensemble(self, lag, covariancematrixlist=[None]): - """ + """ :param lag: an integer indicating the time step in the lag order :param covariancematrix: a list of matrices specifying the covariance distribution to draw from :rtype: None - - Make a new ensemble, the attribute lag refers to the position in the state vector. + + Make a new ensemble, the attribute lag refers to the position in the state vector. Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below. The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag] @@ -123,7 +128,7 @@ class CO2GriddedStateVector(StateVector): used to draw ensemblemembers from. Each draw is done on a matrix from the list, to make the computational burden smaller when the StateVector nparams becomes very large. - """ + """ try: import matplotlib.pyplot as plt except: @@ -137,7 +142,7 @@ class CO2GriddedStateVector(StateVector): dims = 1 # start from 1.0 to account for the last parameter that scales Ice+Non-optimized, we have no covariance matrix for this though - for matrix,parnr,file in covariancematrixlist: + for matrix,parnr,file in covariancematrixlist: dims += matrix.shape[0] if dims != self.nparams: @@ -157,7 +162,7 @@ class CO2GriddedStateVector(StateVector): dof += np.sum(s) ** 2 / sum(s ** 2) try: C = np.linalg.cholesky(matrix) - except np.linalg.linalg.LinAlgError, err: + except np.linalg.linalg.LinAlgError as err: logging.error('Cholesky decomposition has failed ') logging.error('For a matrix of dimensions: %d' % matrix.shape[0]) logging.debug(err) @@ -190,7 +195,7 @@ class CO2GriddedStateVector(StateVector): # Now fill the ensemble members with the deviations we have just created - # Create mean values + # Create mean values new_mean = np.ones(self.nparams, float) # standard value for a new time step is 1.0 @@ -198,7 +203,7 @@ class CO2GriddedStateVector(StateVector): if lag == self.nlag - 1 and self.nlag >= 3: new_mean += self.ensemble_members[lag - 1][0].param_values + \ - self.ensemble_members[lag - 2][0].param_values + self.ensemble_members[lag - 2][0].param_values new_mean = new_mean / 3.0 # Create the first ensemble member with a deviation of 0.0 and add to list diff --git a/da/platform/cartesius_short.py b/da/platform/cartesius_short.py new file mode 100755 index 0000000000000000000000000000000000000000..2ed3478e96faa052de083c8b6f0361b7e28d1911 --- /dev/null +++ b/da/platform/cartesius_short.py @@ -0,0 +1,102 @@ +"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. +Users are recommended to contact the developers (wouter.peters@wur.nl) to receive +updates of the code. See also: http://www.carbontracker.eu. + +This program is free software: you can redistribute it and/or modify it under the +terms of the GNU General Public License as published by the Free Software Foundation, +version 3. This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with this +program. If not, see <http://www.gnu.org/licenses/>.""" +#!/usr/bin/env python +# cartesius.py + +""" +Author : peters + +Revision History: +File created on 06 Sep 2010. + +""" + +import logging +import subprocess + +from da.baseclasses.platform import Platform +from da.platform.cartesius import CartesiusPlatform + +std_joboptions = {'jobname':'test', 'jobaccount':'co2', 'jobtype':'serial', 'jobshell':'/bin/sh', 'depends':'', 'jobtime':'01:00:00', 'jobinput':'/dev/null', 'jobnodes':'1', 'jobtasks':'', 'modulenetcdf':'netcdf/4.1.2', 'networkMPI':'','jobqueue': 'normal'} + + +class CartesiusPlatformShort(Platform): + + def get_job_template(self, joboptions={}, block=False): + """ + Returns the job template for a given computing system, and fill it with options from the dictionary provided as argument. + The job template should return the preamble of a job that can be submitted to a queue on your platform, + examples of popular queuing systems are: + - SGE + - MOAB + - XGrid + - + + A list of job options can be passed through a dictionary, which are then filled in on the proper line, + an example is for instance passing the dictionary {'account':'co2'} which will be placed + after the ``-A`` flag in a ``qsub`` environment. + + An extra option ``block`` has been added that allows the job template to be configured to block the current + job until the submitted job in this template has been completed fully. + """ + + #template = """## \n"""+ \ + # """## This is a set of dummy names, to be replaced by values from the dictionary \n"""+ \ + # """## Please make your own platform specific template with your own keys and place it in a subfolder of the da package.\n """+ \ + # """## \n"""+ \ + # """ \n"""+ \ + # """#$ jobname \n"""+ \ + # """#$ jobaccount \n"""+ \ + # """#$ jobnodes \n"""+ \ + # """#$ jobtime \n"""+ \ + # """#$ jobshell \n"""+ \ + # """\n"""+ \ + # """source /usr/bin/sh\n"""+ \ + # """module load python\n"""+ \ + # """\n""" + + + template = """#!/bin/bash \n""" + \ + """## \n""" + \ + """## This is a set of dummy names, to be replaced by values from the dictionary \n""" + \ + """## Please make your own platform specific template with your own keys and place it in a subfolder of the da package.\n """ + \ + """## \n""" + \ + """#SBATCH -J jobname \n""" + \ + """#SBATCH -p short \n""" + \ + """#SBATCH -n jobnodes \n""" + \ + """#SBATCH -t 01:00:00 \n""" + \ + """#SBATCH -o joblog \n""" + \ + """module load pre2019\n""" + \ + """module load python\n""" + \ + """module load nco\n""" + \ + """\n""" + + if 'depends' in joboptions: + template += """#$ -hold_jid depends \n""" + + # First replace from passed dictionary + for k, v in joboptions.items(): + while k in template: + template = template.replace(k, v) + + # Fill remaining values with std_options + for k, v in std_joboptions.items(): + while k in template: + template = template.replace(k, v) + + return template + + + +if __name__ == "__main__": + pass diff --git a/da/tm5/observationoperator.py b/da/tm5/observationoperator.py index fa60a095a05a88a149601f8e9dc0d9072fccb92e..dd21dba0cc292bb920c7d7b037b144b20af2c000 100755 --- a/da/tm5/observationoperator.py +++ b/da/tm5/observationoperator.py @@ -1,32 +1,32 @@ -"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. +"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. Users are recommended to contact the developers (wouter.peters@wur.nl) to receive -updates of the code. See also: http://www.carbontracker.eu. +updates of the code. See also: http://www.carbontracker.eu. This program is free software: you can redistribute it and/or modify it under the -terms of the GNU General Public License as published by the Free Software Foundation, -version 3. This program is distributed in the hope that it will be useful, but -WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +terms of the GNU General Public License as published by the Free Software Foundation, +version 3. This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. -You should have received a copy of the GNU General Public License along with this +You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>.""" #!/usr/bin/env python # tm5_tools.py """ -Author : peters +Author : peters Revision History: File created on 09 Feb 2009. Major modifications to go to a class-based approach, July 2010. -This module holds specific functions needed to use the TM5 model within the data assimilation shell. It uses the information -from the DA system in combination with the generic tm5.rc files. +This module holds specific functions needed to use the TM5 model within the data assimilation shell. It uses the information +from the DA system in combination with the generic tm5.rc files. The TM5 model is now controlled by a python subprocess. This subprocess consists of an MPI wrapper (written in C) that spawns a large number ( N= nmembers) of TM5 model instances under mpirun, and waits for them all to finish. -The design of the system assumes that the tm5.x (executable) was pre-compiled with the normal TM5 tools, and is residing in a +The design of the system assumes that the tm5.x (executable) was pre-compiled with the normal TM5 tools, and is residing in a directory specified by the ${RUNDIR} of a tm5 rc-file. This tm5 rc-file name is taken from the data assimilation rc-file. Thus, this python shell does *not* compile the TM5 model for you! @@ -59,11 +59,11 @@ mpi_shell_location = 'da/bin/' class TM5ObservationOperator(ObservationOperator): """ This class holds methods and variables that are needed to run the TM5 model. It is initiated with as only argument a TM5 rc-file - location. This rc-file will be used to figure out the settings for the run. - + location. This rc-file will be used to figure out the settings for the run. + *** This method of running TM5 assumes that a pre-compiled tm5.exe is present, and it will be run from time.start to time.final *** - - These settings can be modified later. To run a model version, simply compile the model using an existing TM5 rc-file, then + + These settings can be modified later. To run a model version, simply compile the model using an existing TM5 rc-file, then open python, and type: []> tm=TM5('/Users/peters/Modeling/TM5/tutorial.rc') @@ -74,30 +74,32 @@ class TM5ObservationOperator(ObservationOperator): To use this class inside a data assimilation cycle, a stand-alone method "setup()" is included which modifies the TM5 settings according to an external dictionary of values to overwrite, and then runs the TM5 model. - + """ def __init__(self, filename): """ The instance of an TMObservationOperator is application dependent """ - self.ID = identifier # the identifier gives the model name - self.version = version # the model version used + self.ID = identifier # the identifier gives the model name + self.version = version # the model version used self.restart_filelist = [] - self.output_filelist = [] - - self.outputdir = None # Needed for opening the samples.nc files created + self.output_filelist = [] + + self.outputdir = None # Needed for opening the samples.nc files created - self.load_rc(filename) # load the specified rc-file + self.load_rc(filename) # load the specified rc-file self.validate_rc() # validate the contents logging.info('Observation Operator initialized: %s (%s)' % (self.ID, self.version)) - + + + def setup(self, dacycle): - """ + """ Execute all steps needed to prepare the ObsOperator for use inside CTDAS, only done at the very first cycle normally - + """ self.dacycle = dacycle - + if self.dacycle['time.restart'] == False or self.dacycle['transition'] == True: newitemsmeteo = {} if self.dacycle['transition']: @@ -123,7 +125,7 @@ class TM5ObservationOperator(ObservationOperator): if k == 'my.meteo.nlev': newitemsmeteo[k] = '91' logging.debug('Resetting TM5 to perform transition of od meteo from 25 to 34 levels') - else: + else: logging.info('First time step, setting up and compiling the TM5 model before proceeding!') # Modify the rc-file to reflect directory structure defined by CTDAS newitems = {'my.basedir': self.dacycle['dir.exec']} @@ -132,7 +134,6 @@ class TM5ObservationOperator(ObservationOperator): self.modify_rc(newitemsmeteo) # Create the TM5 run directory to hold a copy of the modified rc-file - tm5compiledir = self.tm_settings[self.rundirkey] create_dirs(tm5compiledir) @@ -142,10 +143,10 @@ class TM5ObservationOperator(ObservationOperator): # Compile TM5 in the new directory, but only if this is a fresh start logging.debug('Original rc file: %s '%(self.dacycle['da.obsoperator.rc'])) self.compile_tm5(rcfilename) - + newrcfilename = os.path.join(self.tm_settings['rundir'], self.tm_settings['install.rc']) - #Use a TM5 restart file in the first cycle (instead of init file). Used now for the CO project. + # Use a TM5 restart file in the first cycle (instead of init file). Used now for the CO project. if 'da.obsoperator.restartfileinfirstcycle' in self.dacycle: restartfilename = self.dacycle['da.obsoperator.restartfileinfirstcycle'] targetdir = self.tm_settings[self.savedirkey] @@ -156,12 +157,9 @@ class TM5ObservationOperator(ObservationOperator): shutil.copy(file,os.path.join(targetdir,fname)) # Replace the rc filename for TM5 with the newly created one in the new run directory - - logging.debug('Working copy of the tm5.rc file is in place (%s) ' % newrcfilename) # And also replace the path to the ObservationOperator in the dacycle object so we can work from the TM5 copy from here on - self.dacycle['da.obsoperator.rc'] = newrcfilename logging.debug('...and set as the da.obsoperator.rc value in this dacycle ') @@ -170,7 +168,8 @@ class TM5ObservationOperator(ObservationOperator): logging.debug('Reloading the da.obsoperator.rc file for this dacycle') self.load_rc(self.dacycle['da.obsoperator.rc']) logging.debug('Note that the obsoperator is not recompiled if this is a recovery from a crash!!!') - + + def compile_tm5(self, rcfilename): """ @@ -186,7 +185,7 @@ class TM5ObservationOperator(ObservationOperator): logging.warning('Proceeding from guessed TM5 root dir (%s) ' % tm5_dir) os.chdir(tm5_dir) - + if self.dacycle['transition']: cmd = ['python', 'setup_tm5', '-n', '--%s' % self.dacycle.daplatform.give_queue_type(), rcfilename] else: cmd = ['python', 'setup_tm5', '--%s' % self.dacycle.daplatform.give_queue_type(), rcfilename] @@ -204,19 +203,20 @@ class TM5ObservationOperator(ObservationOperator): else: logging.info('Compilation successful, continuing') - def prepare_run(self): - """ + + + def prepare_run(self, samples): + """ Prepare a forward model TM5 run, this consists of: - - reading the working copy TM5 rc-file, - - validating it, + - reading the working copy TM5 rc-file, + - validating it, - modifying the values, - Removing the existing tm5.ok file if present - - """ -# Write a modified TM5 model rc-file in which run/break times are defined by our da system + """ + # Write a modified TM5 model rc-file in which run/break times are defined by our da system new_items = { 'submit.options': self.dacycle.daplatform.give_blocking_flag(), self.timestartkey: self.dacycle['time.sample.start'], @@ -226,13 +226,17 @@ class TM5ObservationOperator(ObservationOperator): 'jobstep.length': 'inf', 'ct.params.input.dir': self.dacycle['dir.input'], 'ct.params.input.file': os.path.join(self.dacycle['dir.input'], 'parameters'), - 'output.flask.infile': self.dacycle['ObsOperator.inputfile'] , - 'output.flask': 'True' } + # add sample type specific settings to rc-file + for sample in samples: + new_items['output.'+sample.get_samples_type()] = 'True' + new_items['output.'+sample.get_samples_type()+'.infile'] = self.dacycle['ObsOperator.inputfile.'+sample.get_samples_type()] + del sample + if self.dacycle['transition']: new_items[self.istartkey] = self.transitionvalue - logging.debug('Resetting TM5 to perform transition of od meteo from 25 to 34 levels') + logging.debug('Resetting TM5 to perform transition of od meteo from 25 to 34 levels') elif self.dacycle['time.restart']: # If this is a restart from a previous cycle, the TM5 model should do a restart new_items[self.istartkey] = self.restartvalue logging.debug('Resetting TM5 to perform restart') @@ -247,30 +251,35 @@ class TM5ObservationOperator(ObservationOperator): if self.dacycle['time.sample.window'] != 0: # If this is a restart from a previous time step within the filter lag, the TM5 model should do a restart new_items[self.istartkey] = self.restartvalue logging.debug('Resetting TM5 to perform restart') - - # If neither one is true, simply take the istart value from the tm5.rc file that was read + # If neither one is true, simply take the istart value from the tm5.rc file that was read self.modify_rc(new_items) self.write_rc(self.rc_filename) - # Define the name of the file that will contain the modeled output of each observation + # For each sample type, define the name of the file that will contain the modeled output of each observation + self.simulated_file = [None] * len(samples) + for i in range(len(samples)): + self.simulated_file[i] = os.path.join(self.outputdir, '%s_output.%s.nc' % (samples[i].get_samples_type(),self.dacycle['time.sample.stamp'])) + del i + - self.simulated_file = os.path.join(self.outputdir, 'flask_output.%s.nc' % self.dacycle['time.sample.stamp']) def load_rc(self, name): - """ - This method loads a TM5 rc-file with settings for this simulation + """ + This method loads a TM5 rc-file with settings for this simulation """ self.rcfile = rc.RcFile(name) self.tm_settings = self.rcfile.values self.rc_filename = name - if 'my.source.dirs' in self.tm_settings.keys(): + if 'my.source.dirs' in list(self.tm_settings.keys()): self.rcfiletype = 'pycasso' else: self.rcfiletype = 'pre-pycasso' logging.debug('TM5 rc-file loaded successfully') + + def validate_rc(self): """ Validate the contents of the tm_settings dictionary and add extra values. The required items for the TM5 rc-file @@ -319,13 +328,13 @@ class TM5ObservationOperator(ObservationOperator): if v == 'True' : self.tm_settings[k] = True if v == 'False': self.tm_settings[k] = False if 'date' in k : self.tm_settings[k] = to_datetime(v) - if 'time.start' in k : + if 'time.start' in k : self.tm_settings[k] = to_datetime(v, fmt='TM5') - if 'time.final' in k : + if 'time.final' in k : self.tm_settings[k] = to_datetime(v, fmt='TM5') - if 'timerange.start' in k : + if 'timerange.start' in k : self.tm_settings[k] = to_datetime(v) - if 'timerange.end' in k : + if 'timerange.end' in k : self.tm_settings[k] = to_datetime(v) for key in needed_rc_items: @@ -336,27 +345,28 @@ class TM5ObservationOperator(ObservationOperator): logging.debug('rc-file has been validated succesfully') + def modify_rc(self, newvalues): - """ + """ Modify parts of the tm5 settings, for instance to give control of file locations to the DA shell - instead of to the tm5.rc script. + instead of to the tm5.rc script. Note that we replace these values in all {key,value} pairs of the tm5.rc file! """ - + for k, v in newvalues.items(): - if key in self.tm_settings: + if k in self.tm_settings: # keep previous value v_orig = self.tm_settings[k] #replace with new self.tm_settings[k] = v #replace all instances of old with new, but only if it concerns a name of a path!!! - if os.path.exists(str(v)): + if os.path.exists(str(v)): for k_old, v_old in self.tm_settings.items(): - if not isinstance(v_old, str): + if not isinstance(v_old, str): continue - if str(v_orig) in str(v_old): + if str(v_orig) in str(v_old): v_new = str(v_old).replace(str(v_orig), str(v)) self.tm_settings[k_old] = v_new @@ -367,43 +377,47 @@ class TM5ObservationOperator(ObservationOperator): logging.debug('Added new tm5 rc-item %s : %s' % (k,v)) + def write_rc(self, tm5rcfilename): - """ + """ Write the rc-file settings to a tm5.rc file in the rundir """ rc.write(tm5rcfilename, self.tm_settings) logging.debug("Modified rc file for TM5 written (%s)" % tm5rcfilename) - def validate_input(self): + + + def validate_input(self, samples): """ Make sure that parameter files are written to the TM5 inputdir, and that observation lists are present """ datadir = self.tm_settings['ct.params.input.dir'] if not os.path.exists(datadir): - msg = "The specified input directory for the TM5 model to read from does not exist (%s), exiting..." % datadir + msg = "The specified input directory for the TM5 model to read from does not exist (%s), exiting..." % datadir logging.error(msg) raise IOError(msg) datafiles = os.listdir(datadir) - obsfile = self.dacycle['ObsOperator.inputfile'] - - if not os.path.exists(obsfile): - msg = "The specified obs input file for the TM5 model to read from does not exist (%s), exiting..." % obsfile - logging.error(msg) - if 'forward.savestate.dir' not in self.dacycle: - raise IOError(msg) + # For every sample type, check if input file is present + for sample in samples: + obsfile = self.dacycle['ObsOperator.inputfile.'+sample.get_samples_type()] + if not os.path.exists(obsfile) and len(sample.datalist) > 0: + msg = "The specified obs input file for the TM5 model to read from does not exist (%s), exiting..." % obsfile + logging.error(msg) + if 'forward.savestate.dir' not in self.dacycle: + raise IOError(msg) + del sample for n in range(int(self.dacycle['da.optimizer.nmembers'])): paramfile = 'parameters.%03d.nc' % n if paramfile not in datafiles: - msg = "The specified parameter input file for the TM5 model to read from does not exist (%s), exiting..." % paramfile + msg = "The specified parameter input file for the TM5 model to read from does not exist (%s), exiting..." % paramfile logging.error(msg) raise IOError(msg) # Next, make sure there is an actual model version compiled and ready to execute - targetdir = os.path.join(self.tm_settings[self.rundirkey]) if self.rcfiletype == 'pycasso': @@ -417,21 +431,20 @@ class TM5ObservationOperator(ObservationOperator): raise IOError + def get_initial_data(self): """ This method places all initial data needed by an ObservationOperator in the proper folder for the model. For TM5, this means copying the save_*.hdf* files to the dir.save directory from which TM5 will read initial - concentrations for all tracers. + concentrations for all tracers. We get the input data from the restart.current directory at 2 times: (1) When the model starts the forecast over nlag cycles (2) When the model starts the advance step over 1 cycle - """ logging.debug("Moving TM5 model restart data from the restart directory to the TM5 save dir") # First get the restart data for TM5 from the current restart dir of the filter - sourcedir = self.dacycle['dir.restart'] targetdir = self.tm_settings[self.savedirkey] self.outputdir = self.tm_settings[self.outputdirkey] # Needed further downstream to collect output data from TM5 @@ -442,33 +455,36 @@ class TM5ObservationOperator(ObservationOperator): fpath = os.path.join(sourcedir, f) if os.path.isdir(fpath): # skip dirs logging.debug(" [skip] .... %s " % fpath) - continue + continue #if not f.startswith('save_'): if not f.startswith('TM5_restart'): logging.debug(" [skip] .... %s " % fpath) - continue + continue if not filterlist in f: logging.debug(" [skip] .... %s " % fpath) - continue + continue # all okay, copy file - logging.debug(" [copy] .... %s " % fpath) shutil.copy(fpath, fpath.replace(sourcedir, targetdir)) logging.debug("All restart data have been copied from the restart/current directory to the TM5 save dir") - def run_forecast_model(self): - self.prepare_run() - self.validate_input() + + + def run_forecast_model(self, samples): + self.prepare_run(samples) + self.validate_input(samples) self.run() - self.save_data() + self.save_data(samples) + + def run(self): - """ + """ Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and - only if successfull on all processors will execution of the shell continue. - + only if successfull on all processors will execution of the shell continue. + """ cwd = os.getcwd() @@ -500,27 +516,25 @@ class TM5ObservationOperator(ObservationOperator): raise OSError # Return to working directory - os.chdir(cwd) return code + + def tm5_with_n_tracers(self): """ Method handles the case where one TM5 model instance with N tracers does the sampling of all ensemble members""" - tm5submitdir = os.path.join(self.tm_settings[self.rundirkey]) logging.info('tm5submitdir', tm5submitdir) # Go to executable directory and start the subprocess, using a new logfile - os.chdir(tm5submitdir) logging.debug('Changing directory to %s ' % tm5submitdir) # Remove the tm5.ok file from a previous run, placed back only if a successful TM5 run is executed - okfile = 'tm5.ok' - if os.path.exists(okfile): + if os.path.exists(okfile): os.remove(okfile) # Prepare a job for the current platform, this job needs to account for all job parameters such as @@ -533,19 +547,21 @@ class TM5ObservationOperator(ObservationOperator): # An option would need to be added to force a re-compile of the TM5 code, for debugging purposes. # file ID and names - submitcommand = self.tm_settings['submit.command'] + submitcommand = self.tm_settings['submit.command'] logging.info('Submitting job at %s' % datetime.datetime.now()) - code = subprocess.call(submitcommand.split()) + code = subprocess.call(submitcommand.split()) logging.info('Resuming job at %s' % datetime.datetime.now()) - - if not os.path.exists(okfile): + + if not os.path.exists(okfile): code = -1 else: code = 0 return code - def save_data(self): + + + def save_data(self, samples): """ Copy the TM5 recovery data from the outputdir to the TM5 savedir, also add the restart files to a list of names that is used by the dacycle object to collect restart data for the filter. @@ -572,24 +588,29 @@ class TM5ObservationOperator(ObservationOperator): if os.path.isdir(fil): # skip dirs skip = True elif filterlist == []: # copy all - skip = False + skip = False else: # check filter skip = True # default skip for f in filterlist: - if f in fil: + if f in fil: skip = False # unless in filterlist break - - if skip: + + if skip: logging.debug(" [skip] .... %s " % fil) - continue + continue self.restart_filelist.append(fil) logging.debug(" [added to restart list] .... %s " % fil) sourcedir = os.path.join(self.tm_settings[self.outputdirkey]) sd_ed = self.dacycle['time.sample.stamp'] - filterlist = ['flask_output.%s' % sd_ed, 'flux1x1_%s' % sd_ed] + + filerlist = [] + for sample in samples: + filterlist.append('%s_output.%s' % (sample.get_samples_type(),sd_ed)) + del sample + filterlist.append('flux1x1_%s' % sd_ed) logging.debug("Creating a new list of TM5 output data to collect") logging.debug(" from directory: %s " % sourcedir) @@ -598,7 +619,6 @@ class TM5ObservationOperator(ObservationOperator): # Start from empty lists for each TM5 run. Note that these "private" lists from the obs operator are later on appended to the system # lists - self.output_filelist = [] for fil in os.listdir(sourcedir): @@ -607,28 +627,24 @@ class TM5ObservationOperator(ObservationOperator): if os.path.isdir(fil): # skip dirs skip = True elif filterlist == []: # copy all - skip = False + skip = False else: # check filterlist skip = True # default skip for f in filterlist: - if f in fil: + if f in fil: skip = False # unless in filterlist break - - if skip: + + if skip: logging.debug(" [skip] .... %s " % fil) - continue + continue self.output_filelist.append(fil) logging.debug(" [added to output list] .... %s " % fil) ################### End Class TM5 ################### - + if __name__ == "__main__": pass - - - - diff --git a/da/tools/pipeline.py b/da/tools/pipeline.py old mode 100755 new mode 100644 index 91a763bde96b3a05864f0b4940c516cedff01cfc..a74bfe5839efeae00808f3a9249ca1d3cb5c1af2 --- a/da/tools/pipeline.py +++ b/da/tools/pipeline.py @@ -1,26 +1,26 @@ -"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. +"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. Users are recommended to contact the developers (wouter.peters@wur.nl) to receive -updates of the code. See also: http://www.carbontracker.eu. +updates of the code. See also: http://www.carbontracker.eu. This program is free software: you can redistribute it and/or modify it under the -terms of the GNU General Public License as published by the Free Software Foundation, -version 3. This program is distributed in the hope that it will be useful, but -WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +terms of the GNU General Public License as published by the Free Software Foundation, +version 3. This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. -You should have received a copy of the GNU General Public License along with this +You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>.""" #!/usr/bin/env python # pipeline.py """ .. module:: pipeline -.. moduleauthor:: Wouter Peters +.. moduleauthor:: Wouter Peters Revision History: File created on 06 Sep 2010. -The pipeline module holds methods that execute consecutive tasks with each of the objects of the DA system. +The pipeline module holds methods that execute consecutive tasks with each of the objects of the DA system. """ import logging @@ -32,29 +32,36 @@ import copy header = '\n\n *************************************** ' footer = ' *************************************** \n ' + def ensemble_smoother_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator, optimizer): """ The main point of entry for the pipeline """ sys.path.append(os.getcwd()) + samples = samples if isinstance(samples,list) else [samples] + logging.info(header + "Initializing current cycle" + footer) start_job(dacycle, dasystem, platform, statevector, samples, obsoperator) prepare_state(dacycle, statevector) sample_state(dacycle, samples, statevector, obsoperator) - + invert(dacycle, statevector, optimizer) - + advance(dacycle, samples, statevector, obsoperator) - - save_and_submit(dacycle, statevector) + + save_and_submit(dacycle, statevector) logging.info("Cycle finished...exiting pipeline") + + def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator): """ The main point of entry for the pipeline """ sys.path.append(os.getcwd()) + samples = samples if isinstance(samples,list) else [samples] + logging.info(header + "Initializing current cycle" + footer) - start_job(dacycle, dasystem, platform, statevector, samples, obsoperator) + start_job(dacycle, dasystem, platform, statevector, samples, obsoperator) if 'forward.savestate.exceptsam' in dacycle: sam = (dacycle['forward.savestate.exceptsam'].upper() in ["TRUE","T","YES","Y"]) @@ -72,32 +79,29 @@ def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsopera else: legacy = False logging.debug("No forward.savestate.legacy key found in rc-file") - + if not fwddir: # Simply make a prior statevector using the normal method - - prepare_state(dacycle, statevector)#LU tutaj zamiast tego raczej to stworzenie nowej kowariancji i ensembli bo pozostale rzeczy sa na gorze i na doel. - - else: + else: # Read prior information from another simulation into the statevector. # This loads the results from another assimilation experiment into the current statevector if sam: - filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d')) + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d')) #filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') - statevector.read_from_file_exceptsam(filename, 'prior') + statevector.read_from_file_exceptsam(filename, 'prior') elif not legacy: - filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d')) - statevector.read_from_file(filename, 'prior') + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d')) + statevector.read_from_file(filename, 'prior') else: filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') - statevector.read_from_legacy_file(filename, 'prior') + statevector.read_from_legacy_file(filename, 'prior') # We write this "prior" statevector to the restart directory, so we can later also populate it with the posterior statevector # Note that we could achieve the same by just copying the wanted forward savestate.nc file to the restart folder of our current - # experiment, but then it would already contain a posterior field as well which we will try to write in save_and_submit. + # experiment, but then it would already contain a posterior field as well which we will try to write in save_and_submit. # This could cause problems. Moreover, this method allows us to read older formatted savestate.nc files (legacy) and write them into # the current format through the "write_to_file" method. @@ -105,38 +109,151 @@ def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsopera statevector.write_to_file(savefilename, 'prior') # Now read optimized fluxes which we will actually use to propagate through the system - - if not fwddir: + if not fwddir: # if there is no forward dir specified, we simply run forward with unoptimized prior fluxes in the statevector logging.info("Running forward with prior savestate from: %s"%savefilename) - else: - + else: # Read posterior information from another simulation into the statevector. # This loads the results from another assimilation experiment into the current statevector if sam: - statevector.read_from_file_exceptsam(filename, 'opt') + statevector.read_from_file_exceptsam(filename, 'opt') elif not legacy: - statevector.read_from_file(filename, 'opt') + statevector.read_from_file(filename, 'opt') else: - statevector.read_from_legacy_file(filename, 'opt') + statevector.read_from_legacy_file(filename, 'opt') logging.info("Running forward with optimized savestate from: %s"%filename) # Finally, we run forward with these parameters - advance(dacycle, samples, statevector, obsoperator) # In save_and_submit, the posterior statevector will be added to the savestate.nc file, and it is added to the copy list. # This way, we have both the prior and posterior data from another run copied into this assimilation, for later analysis. - save_and_submit(dacycle, statevector) + save_and_submit(dacycle, statevector) logging.info("Cycle finished...exiting pipeline") + + #################################################################################################### + +def analysis_pipeline_v2(dacycle, platform, dasystem, samples, statevector): + """ Main entry point for analysis of ctdas results """ + + # copy da/analysis folder to exec folder and copy copied_region files + if not os.path.exists(os.path.join(dacycle['dir.exec'],'da','analysis','copied_regions.nc')): + if dacycle['dir.da_source'] != dacycle['dir.da_run']: + import shutil + from da.tools.general import create_dirs + create_dirs(os.path.join(dacycle['dir.exec'],'da')) + try: + shutil.copytree(os.path.join(dacycle['dir.da_source'],'da','analysis'),os.path.join(dacycle['dir.exec'],'da','analysis')) + except: + try: + create_dirs(os.path.join(dacycle['dir.exec'],'da','analysis')) + shutil.copy(os.path.join(dacycle['dir.da_source'],'da','analysis/*'),os.path.join(dacycle['dir.exec'],'da','analysis/')) + except: + pass + shutil.move(os.path.join(dacycle['dir.analysis'],'copied_regions.nc'),os.path.join(dacycle['dir.exec'],'da','analysis','copied_regions.nc')) + shutil.move(os.path.join(dacycle['dir.analysis'],'copied_regions_extended.nc'),os.path.join(dacycle['dir.exec'],'da','analysis','copied_regions_extended.nc')) + # ---------- + + from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data + from da.analysis.expand_molefractions import write_mole_fractions + from da.analysis.summarize_obs import summarize_obs + from da.analysis.time_avg_fluxes import time_avg + + logging.info(header + "Starting analysis" + footer) + + dasystem.validate() + dacycle.dasystem = dasystem + dacycle.daplatform = platform + dacycle.setup() + statevector.setup(dacycle) + + # write molefractions for flask observations + for sample in samples: + if sample.get_samples_type() == 'flask': + logging.info(header + "Starting mole fractions" + footer) + write_mole_fractions(dacycle) + summarize_obs(dacycle['dir.analysis']) + del sample + + logging.info(header + "Starting weekly averages" + footer) + + save_weekly_avg_1x1_data(dacycle, statevector) + save_weekly_avg_state_data(dacycle, statevector) + save_weekly_avg_tc_data(dacycle, statevector) + save_weekly_avg_ext_tc_data(dacycle) + save_weekly_avg_agg_data(dacycle,region_aggregate='transcom') + save_weekly_avg_agg_data(dacycle,region_aggregate='transcom_extended') + #save_weekly_avg_agg_data(dacycle,region_aggregate='olson') + #save_weekly_avg_agg_data(dacycle,region_aggregate='olson_extended') + save_weekly_avg_agg_data(dacycle,region_aggregate='country') + + logging.info(header + "Starting monthly and yearly averages" + footer) + + time_avg(dacycle,'flux1x1') + time_avg(dacycle,'transcom') + time_avg(dacycle,'transcom_extended') + #time_avg(dacycle,'olson') + #time_avg(dacycle,'olson_extended') + time_avg(dacycle,'country') + + logging.info(header + "Finished analysis" + footer) + + + +def analysis_pipeline_v3(dacycle, platform, dasystem, samples, statevector): + + from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data + from da.analysis.expand_molefractions import write_mole_fractions + from da.analysis.summarize_obs import summarize_obs + from da.analysis.time_avg_fluxes import time_avg + + logging.info(header + "Starting analysis" + footer) + + dasystem.validate() + dacycle.dasystem = dasystem + dacycle.daplatform = platform + dacycle.setup() + statevector.setup(dacycle) + + # write molefractions for flask observations + for sample in samples: + if sample.get_samples_type() == 'flask': + logging.info(header + "Starting mole fractions" + footer) + write_mole_fractions(dacycle) + summarize_obs(dacycle['dir.analysis']) + del sample + + logging.info(header + "Starting weekly averages" + footer) + + save_weekly_avg_1x1_data(dacycle, statevector) + save_weekly_avg_state_data(dacycle, statevector) + save_weekly_avg_tc_data(dacycle, statevector) + save_weekly_avg_ext_tc_data(dacycle) + save_weekly_avg_agg_data(dacycle,region_aggregate='transcom') + save_weekly_avg_agg_data(dacycle,region_aggregate='transcom_extended') + #save_weekly_avg_agg_data(dacycle,region_aggregate='olson') + #save_weekly_avg_agg_data(dacycle,region_aggregate='olson_extended') + save_weekly_avg_agg_data(dacycle,region_aggregate='country') + + logging.info(header + "Starting monthly and yearly averages" + footer) + time_avg(dacycle,'flux1x1') + time_avg(dacycle,'transcom') + time_avg(dacycle,'transcom_extended') + #time_avg(dacycle,'olson') + #time_avg(dacycle,'olson_extended') + time_avg(dacycle,'country') + + logging.info(header + "Finished analysis" + footer) + + def analysis_pipeline(dacycle, platform, dasystem, samples, statevector): """ Main entry point for analysis of ctdas results """ @@ -145,20 +262,20 @@ def analysis_pipeline(dacycle, platform, dasystem, samples, statevector): from da.analysis.summarize_obs import summarize_obs from da.analysis.time_avg_fluxes import time_avg - logging.info(header + "Starting analysis" + footer) + logging.info(header + "Starting analysis" + footer) - dasystem.validate() - dacycle.dasystem = dasystem - dacycle.daplatform = platform - dacycle.setup() - statevector.setup(dacycle) + dasystem.validate() + dacycle.dasystem = dasystem + dacycle.daplatform = platform + dacycle.setup() + statevector.setup(dacycle) - logging.info(header + "Starting mole fractions" + footer) + logging.info(header + "Starting mole fractions" + footer) write_mole_fractions(dacycle) summarize_obs(dacycle['dir.analysis']) - logging.info(header + "Starting weekly averages" + footer) + logging.info(header + "Starting weekly averages" + footer) save_weekly_avg_1x1_data(dacycle, statevector) save_weekly_avg_state_data(dacycle, statevector) @@ -166,20 +283,22 @@ def analysis_pipeline(dacycle, platform, dasystem, samples, statevector): save_weekly_avg_ext_tc_data(dacycle) save_weekly_avg_agg_data(dacycle,region_aggregate='transcom') save_weekly_avg_agg_data(dacycle,region_aggregate='transcom_extended') - save_weekly_avg_agg_data(dacycle,region_aggregate='olson') - save_weekly_avg_agg_data(dacycle,region_aggregate='olson_extended') + #save_weekly_avg_agg_data(dacycle,region_aggregate='olson') + #save_weekly_avg_agg_data(dacycle,region_aggregate='olson_extended') save_weekly_avg_agg_data(dacycle,region_aggregate='country') - logging.info(header + "Starting monthly and yearly averages" + footer) + logging.info(header + "Starting monthly and yearly averages" + footer) time_avg(dacycle,'flux1x1') time_avg(dacycle,'transcom') time_avg(dacycle,'transcom_extended') - time_avg(dacycle,'olson') - time_avg(dacycle,'olson_extended') + #time_avg(dacycle,'olson') + #time_avg(dacycle,'olson_extended') time_avg(dacycle,'country') - logging.info(header + "Finished analysis" + footer) + logging.info(header + "Finished analysis" + footer) + + def archive_pipeline(dacycle, platform, dasystem): """ Main entry point for archiving of output from one disk/system to another """ @@ -197,7 +316,7 @@ def archive_pipeline(dacycle, platform, dasystem): rsyncflags = dacycle['task.rsync.%s.flags'%task] # file ID and names - jobid = dacycle['time.end'].strftime('%Y%m%d') + jobid = dacycle['time.end'].strftime('%Y%m%d') targetdir = os.path.join(dacycle['dir.exec']) jobfile = os.path.join(targetdir, 'jb.rsync.%s.%s.jb' % (task,jobid) ) logfile = os.path.join(targetdir, 'jb.rsync.%s.%s.log' % (task,jobid) ) @@ -212,23 +331,26 @@ def archive_pipeline(dacycle, platform, dasystem): execcommand = "\nrsync %s %s %s\n" % (rsyncflags, sourcedir,destdir,) template += execcommand - # write and submit + # write and submit platform.write_job(jobfile, template, jobid) jobid = platform.submit_job(jobfile, joblog=logfile) + def start_job(dacycle, dasystem, platform, statevector, samples, obsoperator): """ Set up the job specific directory structure and create an expanded rc-file """ - dasystem.validate() - dacycle.dasystem = dasystem - dacycle.daplatform = platform - dacycle.setup() - #statevector.dacycle = dacycle # also embed object in statevector so it can access cycle information for I/O etc - #samples.dacycle = dacycle # also embed object in samples object so it can access cycle information for I/O etc + dasystem.validate() + dacycle.dasystem = dasystem + dacycle.daplatform = platform + dacycle.setup() + #statevector.dacycle = dacycle # also embed object in statevector so it can access cycle information for I/O etc + #samples.dacycle = dacycle # also embed object in samples object so it can access cycle information for I/O etc #obsoperator.dacycle = dacycle # also embed object in obsoperator object so it can access cycle information for I/O etc - obsoperator.setup(dacycle) # Setup Observation Operator - statevector.setup(dacycle) + obsoperator.setup(dacycle) # Setup Observation Operator + statevector.setup(dacycle) + + def prepare_state(dacycle, statevector): """ Set up the input data for the forward model: obs and parameters/fluxes""" @@ -240,22 +362,17 @@ def prepare_state(dacycle, statevector): logging.info(header + "starting prepare_state" + footer) - if not dacycle['time.restart']: + if not dacycle['time.restart']: # Fill each week from n=1 to n=nlag with a new ensemble - for n in range(statevector.nlag): - date = dacycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(dacycle['time.cycle'])) + date = dacycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(dacycle['time.cycle'])) cov = statevector.get_covariance(date, dacycle) statevector.make_new_ensemble(n, cov) else: - # Read the statevector data from file - - #saved_sv = os.path.join(dacycle['dir.restart.current'], 'savestate.nc') - - + #saved_sv = os.path.join(dacycle['dir.restart.current'], 'savestate.nc') saved_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['da.restart.tstamp'].strftime('%Y%m%d')) statevector.read_from_file(saved_sv) # by default will read "opt"(imized) variables, and then propagate @@ -265,12 +382,13 @@ def prepare_state(dacycle, statevector): # Finally, also write the statevector to a file so that we can always access the a-priori information current_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) - statevector.write_to_file(current_sv, 'prior') # write prior info + statevector.write_to_file(current_sv, 'prior') # write prior info + + def sample_state(dacycle, samples, statevector, obsoperator): """ Sample the filter state for the inversion """ - # Before a forecast step, save all the data to a save/tmp directory so we can later recover it before the propagation step. # This is especially important for: # (i) The transport model restart data which holds the background mole fractions. This is needed to run the model one cycle forward @@ -278,14 +396,14 @@ def sample_state(dacycle, samples, statevector, obsoperator): #status = dacycle.MoveSaveData(io_option='store',save_option='partial',filter=[]) #msg = "All restart data have been copied to the save/tmp directory for future use" ; logging.debug(msg) - logging.info(header + "starting sample_state" + footer) + logging.info(header + "starting sample_state" + footer) nlag = int(dacycle['time.nlag']) - logging.info("Sampling model will be run over %d cycles" % nlag) + logging.info("Sampling model will be run over %d cycles" % nlag) obsoperator.get_initial_data() - for lag in range(nlag): - logging.info(header + ".....Ensemble Kalman Filter at lag %d" % (lag + 1)) + for lag in range(nlag): + logging.info(header + ".....Ensemble Kalman Filter at lag %d" % (lag + 1)) ############# Perform the actual sampling loop ##################### @@ -294,15 +412,15 @@ def sample_state(dacycle, samples, statevector, obsoperator): logging.debug("statevector now carries %d samples" % statevector.nobs) + def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False): """ Perform all actions needed to sample one cycle """ - # First set up the information for time start and time end of this sample dacycle.set_sample_times(lag) - startdate = dacycle['time.sample.start'] - enddate = dacycle['time.sample.end'] + startdate = dacycle['time.sample.start'] + enddate = dacycle['time.sample.end'] dacycle['time.sample.window'] = lag dacycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H")) @@ -311,71 +429,81 @@ def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False): logging.info(" end date : %s " % enddate.strftime('%F %H:%M')) logging.info(" file stamp: %s " % dacycle['time.sample.stamp']) - - # Implement something that writes the ensemble member parameter info to file, or manipulates them further into the + # Implement something that writes the ensemble member parameter info to file, or manipulates them further into the # type of info needed in your transport model - statevector.write_members_to_file(lag, dacycle['dir.input']) + statevector.write_members_to_file(lag, dacycle['dir.input'], obsoperator=obsoperator) - samples.setup(dacycle) - samples.add_observations() + for sample in samples: - # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future?? + sample.setup(dacycle) - samples.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc']) - - sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp']) - samples.write_sample_coords(sampling_coords_file) - # Write filename to dacycle, and to output collection list - dacycle['ObsOperator.inputfile'] = sampling_coords_file + # Read observations (include option to read from different satellites) + perform observation selection + sample.add_observations(advance) - # Run the observation operator + # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future?? + sample.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc'], advance) + + sampling_coords_file = os.path.join(dacycle['dir.input'], sample.get_samples_type()+'_coordinates_%s.nc' % dacycle['time.sample.stamp']) + logging.debug('sampling_coords_file = %s' % sampling_coords_file) + sample.write_sample_coords(sampling_coords_file) + + # Write filename to dacycle, and to output collection list + dacycle['ObsOperator.inputfile.'+sample.get_samples_type()] = sampling_coords_file - obsoperator.run_forecast_model() + del sample + # Run the observation operator + obsoperator.run_forecast_model(samples) # Read forecast model samples that were written to NetCDF files by each member. Add them to the exisiting - # Observation object for each sample loop. This data fill be written to file in the output folder for each sample cycle. - + # Observation object for each sample loop. This data fill be written to file in the output folder for each sample cycle. # We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates # one file per member, some logic needs to be included to merge all files!!! - if os.path.exists(sampling_coords_file): - samples.add_simulations(obsoperator.simulated_file) - else: logging.warning("No simulations added, because input file does not exist (no samples found in obspack)") - - # Now write a small file that holds for each observation a number of values that we need in postprocessing + for i in range(len(samples)): + if os.path.exists(dacycle['ObsOperator.inputfile.'+samples[i].get_samples_type()]): + samples[i].add_simulations(obsoperator.simulated_file[i]) + else: logging.warning("No simulations added, because sample input file does not exist") + del i - #filename = samples.write_sample_coords() - - # Now add the observations that need to be assimilated to the statevector. + # Now add the observations that need to be assimilated to the statevector. # Note that obs will only be added to the statevector if either this is the first step (restart=False), or lag==nlag # This is to make sure that the first step of the system uses all observations available, while the subsequent - # steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only + # steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only # (and at least) once # in the assimilation - if not advance: + logging.debug('time.restart = %s' %dacycle['time.restart']) if dacycle['time.restart'] == False or lag == int(dacycle['time.nlag']) - 1: statevector.obs_to_assimilate += (copy.deepcopy(samples),) - statevector.nobs += samples.getlength() + for sample in samples: + statevector.nobs += sample.getlength() + del sample + logging.info('nobs = %i' %statevector.nobs) logging.debug("Added samples from the observation operator to the assimilated obs list in the statevector") else: statevector.obs_to_assimilate += (None,) + def invert(dacycle, statevector, optimizer): """ Perform the inverse calculation """ logging.info(header + "starting invert" + footer) + + if statevector.nobs == 0: + logging.info('List with observations to assimilate is empty, skipping invert step and continuing without statevector update...') + return + dims = (int(dacycle['time.nlag']), int(dacycle['da.optimizer.nmembers']), int(dacycle.dasystem['nparameters']), statevector.nobs) if 'opt.algorithm' not in dacycle.dasystem: - logging.info("There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)") + logging.info("There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)") logging.info("...using serial algorithm as default...") optimizer.set_algorithm('Serial') elif dacycle.dasystem['opt.algorithm'] == 'serial': @@ -384,24 +512,24 @@ def invert(dacycle, statevector, optimizer): elif dacycle.dasystem['opt.algorithm'] == 'bulk': logging.info("Using the bulk minimum least squares algorithm to solve ENKF equations") optimizer.set_algorithm('Bulk') - + optimizer.setup(dims) - optimizer.state_to_matrix(statevector) - + optimizer.state_to_matrix(statevector) + diagnostics_file = os.path.join(dacycle['dir.output'], 'optimizer.%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) - + optimizer.write_diagnostics(diagnostics_file, 'prior') optimizer.set_localization(dacycle['da.system.localization']) - + if optimizer.algorithm == 'Serial': optimizer.serial_minimum_least_squares() - else: + else: optimizer.bulk_minimum_least_squares() - + optimizer.matrix_to_state(statevector) optimizer.write_diagnostics(diagnostics_file, 'optimized') - - + + def advance(dacycle, samples, statevector, obsoperator): """ Advance the filter state to the next step """ @@ -414,31 +542,33 @@ def advance(dacycle, samples, statevector, obsoperator): obsoperator.get_initial_data() - sample_step(dacycle, samples, statevector, obsoperator, 0, True) + sample_step(dacycle, samples, statevector, obsoperator, 0, True) dacycle.restart_filelist.extend(obsoperator.restart_filelist) dacycle.output_filelist.extend(obsoperator.output_filelist) logging.debug("Appended ObsOperator restart and output file lists to dacycle for collection ") - - dacycle.output_filelist.append(dacycle['ObsOperator.inputfile']) - logging.debug("Appended Observation filename to dacycle for collection: %s"%(dacycle['ObsOperator.inputfile'])) - sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp']) - if os.path.exists(sampling_coords_file): - outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp']) - samples.write_sample_auxiliary(outfile) - else: logging.warning("Sample auxiliary output not written, because input file does not exist (no samples found in obspack)") + # write sample output file + for sample in samples: + dacycle.output_filelist.append(dacycle['ObsOperator.inputfile.'+sample.get_samples_type()]) + logging.debug("Appended Observation filename to dacycle for collection: %s"%(dacycle['ObsOperator.inputfile.'+sample.get_samples_type()])) + + sampling_coords_file = os.path.join(dacycle['dir.input'], sample.get_samples_type()+'_coordinates_%s.nc' % dacycle['time.sample.stamp']) + if os.path.exists(sampling_coords_file): + if sample.get_samples_type() == 'flask': + outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp']) + sample.write_sample_auxiliary(outfile) + else: logging.warning("Sample auxiliary output not written, because input file does not exist (no samples found in obspack)") + del sample + + def save_and_submit(dacycle, statevector): """ Save the model state and submit the next job """ logging.info(header + "starting save_and_submit" + footer) - + filename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) statevector.write_to_file(filename, 'opt') - + dacycle.output_filelist.append(filename) dacycle.finalize() - - - -