From 8f88ead7b08c2b019228c26e8477febd8bd0a6eb Mon Sep 17 00:00:00 2001 From: Wouter Peters <wouter.peters@wur.nl> Date: Mon, 26 Jul 2010 15:40:59 +0000 Subject: [PATCH] Slowly getting closer to the new design. Observations and StateVector are now class based, but objects are getting more elaborate each time. One remaining problem is the relation between the StateVector, the ensemble members, and the observations. These are all linked but no object of one type can easily contain the other. For instance, does a model mixing ratio sample belong to an observation object because that is what it represents and is derived from? Or is it part of an ensemble member object because each member creates one sample set? And does a member object belong to the state vector and if so, how does it play together with the ohter time steps in the filter? Are these all separate StateVectoir objects or is there only one StateVector with nlag sets of data?? The issue with nlag is not nicely resolved now. Next task is to build the optimizer such that it takes all info from the different objects and performs a LSQM. --- ct_tools.py | 546 ++++++++++++++++++++++++++++++++++++------------- da_initexit.py | 27 +-- das.py | 39 ++-- tm5_tools.py | 14 +- tools_da.py | 70 +++++-- 5 files changed, 504 insertions(+), 192 deletions(-) diff --git a/ct_tools.py b/ct_tools.py index 8d66d20..f0ddebc 100755 --- a/ct_tools.py +++ b/ct_tools.py @@ -12,6 +12,7 @@ import os import sys import rc import logging +import datetime identifier = 'CarbonTracker CO2' @@ -23,247 +24,502 @@ needed_rc_items = ['obs.input.dir', 'deltaco2.prefix', 'regtype'] -def MakeResiduals(rc_da_shell): - """ Method makes residuals of CO2 mixing ratios from the observed and sampled values """ - import pyhdf.SD as HDF +################### Begin Class DaInfo ################### - # Read observations NetCDF file +class DaInfo(): + """ Information on the data assimilation system used. For CarbonTracker, this is an rc-file with settings. + The settings list includes at least the values above, in needed_rc_items. + """ - ObsFileName = os.path.join(rc_da_shell['dir.output'],rc_da_shell['obs.full.filename']) - ncf = HDF.SD(ObsFileName) - obsdates = ncf.select('decimal_date').get() - dummy = ncf.end() - msg = "Successfully read data from obs file (%s)"%ObsFileName ; logging.debug(msg) + def __init__(self,rcfilename): + """ + Initialization occurs from passed rc-file name + """ - # Read samples NetCDF file + self.LoadRc(rcfilename) + self.ValidateRC() - SamplesFileName = ObsFileName.replace('observations','samples').replace('input','output') - ncf = HDF.SD(SamplesFileName) - sampdates = ncf.select('decimal_date').get() - dummy = ncf.end() - msg = "Successfully read data from model sample file (%s)"%SamplesFileName ; logging.debug(msg) + def __str__(self): + """ + String representation of a DaInfo object + """ - if (obsdates==sampdates).all(): - pass + msg = "===============================================================" ; print msg + msg = "DA System Info rc-file is %s" % self.RcFileName ; print msg + msg = "===============================================================" ; print msg - else: - msg = "Dates in the observation file and samples file are not equal, exiting..." ; logging.error(msg) - raise IOError,msg + return "" - msg = "Residuals created, continuing" ; logging.info(msg) - return None + def LoadRc(self,RcFileName): + """ + This method loads a DA System Info rc-file with settings for this simulation + """ + self.da_settings = rc.read(RcFileName) + self.RcFileName = RcFileName + self.DaRcLoaded = True -def PrepareObs(rc_da_shell,rc_da_system,type='forecast'): - """ PrepareObs """ + msg = 'DA System Info rc-file (%s) loaded successfully'%self.RcFileName ; logging.info(msg) - import pycdf as CDF - import shutil + return True - saveas = os.path.join(rc_da_shell['dir.input'],'observations.nc') - # ncf = CDF.CDF(saveas,CDF.NC.WRITE|CDF.NC.TRUNC|CDF.NC.CREATE) - # ncf.CreationDate = datetime.datetime.now().strftime('%Y %B %d') - # ncf.CreationUser = os.environ['USER'] - # ncf.CreationDir = rc_da_shell['dir.da_submit'] - # ncf.CreationScript = sys.argv[0] - # ncf.close() - # msg = 'Created a new observation input file (%s) ' % saveas ; logging.info(msg) + def ValidateRC(self): + """ + Validate the contents of the rc-file given a dictionary of required keys + """ - # For carbontracker the observations already come in netcdf format thanks to Andy and Ken. We will therefore simply merge - # the observations.nc file with the pre-cooked obs files + for k,v in self.da_settings.iteritems(): + if v == 'True' : self.da_settings[k] = True + if v == 'False': self.da_settings[k] = False + for key in needed_rc_items: - # Select observation input file for this time step - #sfname = rc_da_system['obs.input.fname']+'.%s'%(rc_da_shell['startdate'].strftime('%Y%m%d') )+'.nc' - sfname = rc_da_system['obs.input.fname']+'.%s'%('20050305')+'.nc' - msg = 'This filename is hardcoded but needs replacement' ; logging.warning(msg) - filename = os.path.join(rc_da_system['obs.input.dir'],sfname) + if not self.da_settings.has_key(key): + status,msg = ( False,'Missing a required value in rc-file : %s' % key) + logging.error(msg) + raise IOError,msg - if not os.path.exists(filename): + status,msg = ( True,'DA System Info settings have been validated succesfully' ) ; logging.debug(msg) - msg = 'Could not find the required observation input file (%s) ' % filename ; logging.error(msg) - raise IOError,msg + return None +################### End Class DaInfo ################### - else: - dummy = shutil.copy2(filename,saveas) +################### Begin Class MixingRatioSample ################### - msg = 'Successfully copied file (%s) to (%s) ' % (filename,saveas) ; logging.info(msg) +class MixingRatioSample(): + """ + Holds the data that defines a Mixing Ratio Sample in the data assimilation framework. Sor far, this includes all + attributes listed below in the __init__ method. One can additionally make more types of data, or make new + objects for specific projects. - rc_da_shell['obs.full.filename'] = saveas + """ - return rc_da_shell + def __init__(self,xdate,code='XXX',obs=0.0,simulated=0.0,resid=0.0,hphr=0.0,mdm=0.0,flag=0,height=0.0,lat=-999.,lon=-999.,evn='0000',sdev=0.0): + + self.code = code.strip() # Site code + self.xdate = xdate # Date of obs + self.obs = obs # Value observed + self.simulated = simulated # Value simulated by model + self.resid = resid # Mixing ratio residuals + self.hphr = hphr # Mixing ratio prior uncertainty from fluxes and (HPH) and model data mismatch (R) + self.mdm = mdm # Model data mismatch + self.flag = flag # Flag + self.height = height # Sample height + self.lat = lat # Sample lat + self.lon = lon # Sample lon + self.id = evn # Event number + self.sdev = sdev # standard deviation of ensemble + self.masl = True # Sample is in Meters Above Sea Level + self.mag = not self.masl # Sample is in Meters Above Ground + + def __str__(self): + day=self.xdate.strftime('%Y-%m-%d %H:%M:%S') + return ' '.join(map(str,[self.code,day,self.obs,self.flag,self.id])) + +################### End Class MixingRatioSample ################### + +################### Begin Class MixingRatioList ################### + +class MixingRatioList(list): + """ This is a special type of list that holds MixingRatioSample objects. It has methods to extract data from such a list easily """ + from numpy import array + + def getvalues(self,name,constructor=array): + return constructor([getattr(o,name) for o in self]) + def unflagged(self): + return MixingRatioSample([o for o in self if o.flag == 0]) + def flagged(self): + return MixingRatioSample([o for o in self if o.flag != 0]) + def selectsite(self,site='mlo'): + l=[o for o in self if o.code == site] + return MixingRatioSample(l) + def selecthours(self,hours=range(1,24)): + l=[o for o in self if o.xdate.hour in hours] + return MixingRatioSample(l) + +################### End Class MixingRatioList ################### + + +################### Begin Class CtObservations ################### + +class CtObservations(): + """ an object that holds data + methods and attributes needed to manipulate mixing ratio values """ + + def __init__(self,DaInfo): + + sfname = DaInfo.da_settings['obs.input.fname']+'.%s'%('20050305')+'.nc' + msg = 'This filename is hardcoded but needs replacement' ; logging.warning(msg) + filename = os.path.join(DaInfo.da_settings['obs.input.dir'],sfname) + + if not os.path.exists(filename): + msg = 'Could not find the required observation input file (%s) ' % filename ; logging.error(msg) + raise IOError,msg + else: + self.ObsFilename = filename -def PrepareEnsemble(rc_da_shell ): - """ - - Prepare the ensemble of parameters needed by the forecast model. There are two possible pathways: + self.MrData = None - (A) Construct a brand new ensemble from a covariance matrix + def __str__(self): + """ Prints a list of MixingRatioSample objects""" + return self.ObsFilename - (B) Get an existing ensemble and propagate it + def AddObs(self): + """ Returns a MixingRatioList holding individual MixingRatioSample objects for all obs in a file""" + import pyhdf.SD as HDF + import datetime as dtm + from string import strip, join + ncf = HDF.SD(self.ObsFilename) + idates = ncf.select('date_components').get() + ids = ncf.select('id').get() + sites = ncf.select('site').get() + sites = [join(s,'').lower() for s in sites] + sites = map(strip,sites) + lats = ncf.select('lat').get() + lons = ncf.select('lon').get() + alts = ncf.select('alt').get() + obs = ncf.select('obs').get() + species = ncf.select('species').get() + date = ncf.select('date').get() + window = ncf.select('sampling_strategy').get() + flags = ncf.select('NOAA_QC_flags').get() + flags = [join(s,'') for s in flags] + flags = map(strip,flags) + dummy = ncf.end() + msg = "Successfully read data from obs file (%s)"%self.ObsFilename ; logging.debug(msg) - The steps for procedure A are: + dates = [dtm.datetime(*d) for d in idates] + + if self.MrData == None: + self.MrData = MixingRatioList([]) - (A1) Construct the covariance matrix - (A2) Make a Cholesky decomposition - (A3) Prepare a set of [nmembers] input parameters for each forecast member + for n in range(len(dates)): + self.MrData.append( MixingRatioSample(dates[n],sites[n],obs[n],0.0,0.0,0.0,0.0,flags[n],alts[n],lats[n],lons[n],ids[n],0.0) ) - The steps for procedure B are: + msg = "Added %d observations to the MrData list" % len(dates) ; logging.info(msg) - (B1) Get existing parameter values for each member - (B2) Run them through a forecast model + def AddSimulations(self, filename): + """ Adds model simulated values to the mixing ratio objects """ + import pyhdf.SD as HDF - Both procedures finish with writing the parameter values to a NetCDF file + ncf = HDF.SD(filename) + ids = ncf.select('id').get() + simulated = ncf.select('sampled_mole_frac').get()*1e6 + dummy = ncf.end() + msg = "Successfully read data from model sample file (%s)"%filename ; logging.debug(msg) + + obs_ids = self.MrData.getvalues('id') + for n in range(len(ids)): + + try: + index = obs_ids.index(ids[n]) + except: + msg = 'A model sample was found that did not match any ID in the existing observation list. Skipping...' ; logging.warning(msg) + continue + dummy = self.MrData[index].simulated = simulated[n] + + msg = "Added %d simulated values to the MrData list" % len(ids) ; logging.info(msg) + + +################### End Class CtObservations ################### + +################### Begin Class CtMember ################### + +class CtMember(): + """ + An ensemble member for CarbonTracker. This consists of + * parameter values + * a CtObservations object with sampled mixing ratios + * file names to write/read the data for this member """ - # Get the da system specific rc-items, note that they've been validated already by FillObs + def __init__(self, membernumber): + self.membernumber = membernumber # the member number + self.randomstate = (None,) # the state of the random number generator needed to reproduce the random sequence + self.ModelSamples = None # CtObservations object that holds model samples for this member + self.ParameterValues = None # Parameter values of this member + self.ParameterOutputFile = None # File to which the member info should be written + self.SampleInputFile = None # File from which the sample info can be read - rc_da_system = rc.read(rc_da_shell['da.system.rc'],silent = True) + def __str__(self): + return "%03d"%self.membernumber - nlag = int(rc_da_shell['time.nlag']) - for n in range(nlag): +################### End Class CtMember ################### - if n == nlag: +################### Begin Class CtStateVector ################### - # Route A: make a new ensemble +class CtStateVector(): + """ an object that holds data + methods and attributes needed to manipulate state vector values """ + def __init__(self,CycleInfo,DaInfo): - Xprime,Qprior = MakeNewEnsemble(rc_da_shell, rc_da_system) + self.CycleInfo = CycleInfo + self.DaInfo = DaInfo + self.nlag = int(CycleInfo.da_settings['time.nlag']) + self.nmembers = int(CycleInfo.da_settings['forecast.nmembers']) + self.nparams = int(DaInfo.da_settings['nparameters']) + self.isOptimized = False - else: + self.EnsembleMembers = range(self.nlag) + for n in range(self.nlag): + self.EnsembleMembers[n] = [] - # Route B: Propagate existing ensemble + def __str__(self): + return "This is a CarbonTracker state vector object" - Xprime,Qprior = [0,0] + def MakeNewEnsemble(self,nlag): + """ Make a new ensemble from specified matrices """ + import pyhdf.SD as hdf + import numpy as np + import matplotlib.pyplot as plt - return Xprime, Qprior + # Get the needed matrices from the specified covariance files + file_ocn_cov = self.DaInfo.da_settings['ocn.covariance'] + file_bio_cov = self.DaInfo.da_settings['bio.covariance'] -def MakeNewEnsemble(rc_da_shell, rc_da_system): - """ Make a new ensemble from specified matrices """ + for file in [file_ocn_cov,file_bio_cov]: - import pyhdf.SD as hdf - import numpy as np - import matplotlib.pyplot as plt + if not os.path.exists(file): - # Get the needed matrices from the specified covariance files + msg = "Cannot find the specified file %s" % file ; logging.error(msg) + raise IOError,msg + else: - file_ocn_cov = rc_da_system['ocn.covariance'] - file_bio_cov = rc_da_system['bio.covariance'] + msg = "Using covariance file: %s" % file ; logging.info(msg) - for file in [file_ocn_cov,file_bio_cov]: + f_ocn = hdf.SD(file_ocn_cov) + f_bio = hdf.SD(file_bio_cov) - if not os.path.exists(file): + cov_ocn = f_ocn.select('qprior').get() + cov_bio = f_bio.select('qprior').get() - msg = "Cannot find the specified file %s" % file ; logging.error(msg) - raise IOError,msg - else: + dummy = f_ocn.end() + dummy = f_bio.end() + + dummy = logging.debug("Succesfully closed files after retrieving prior covariance matrices") + + # Once we have the matrices, we can start to make the full covariance matrix, and then decompose it + + fullcov = np.zeros((self.nparams,self.nparams),float) + + nocn = cov_ocn.shape[0] + nbio = cov_bio.shape[0] + + fullcov[0:nbio,0:nbio] = cov_bio + fullcov[nbio:nbio+nocn,nbio:nbio+nocn] = cov_ocn + + # Visually inspect full covariance matrix if verbose + + if self.CycleInfo.da_settings['verbose']: + + plt.imshow(fullcov) + plt.colorbar() + plt.savefig('fullcovariancematrix.png') + plt.close('all') + + # Make a cholesky decomposition of the covariance matrix + + U,s,Vh = np.linalg.svd(fullcov) + dof = np.sum(s)**2/sum(s**2) + + C = np.linalg.cholesky(fullcov) + + msg = 'Cholesky decomposition has succeeded offline' ; logging.info(msg) + msg = 'Appr. degrees of freedom in covariance matrix is %s'%(int(dof)) ; logging.info(msg) + + if self.CycleInfo.da_settings['verbose']: + + plt.plot(s) + plt.savefig('eigenvaluespectrum.png') + plt.close('all') + + # Now create a set of random instances of the given covariance structure + + dummy = np.random.seed() + + # mean does not hold data yet + + NewMember = CtMember(0) + NewMember.ParameterValues = np.ones((self.nparams,self.nmembers),float) + dummy = self.EnsembleMembers[nlag].append(NewMember) - msg = "Using covariance file: %s" % file ; logging.info(msg) + # Create members 1:nmembers and add to EnsembleMembers list - f_ocn = hdf.SD(file_ocn_cov) - f_bio = hdf.SD(file_bio_cov) + for member in range(1,self.nmembers): + randstate = np.random.get_state() + rands = np.random.randn(self.nparams) - cov_ocn = f_ocn.select('qprior').get() - cov_bio = f_bio.select('qprior').get() + NewMember = CtMember(member) + NewMember.ParameterValues = np.dot(C,rands) + NewMember.randomstate = randstate + dummy = self.EnsembleMembers[nlag].append(NewMember) - dummy = f_ocn.end() - dummy = f_bio.end() + msg = '%d new ensemble members were added to the state vector'%(self.nmembers) ; logging.info(msg) - dummy = logging.debug("Succesfully closed files after retrieving prior covariance matrices") - # Once we have the matrices, we can start to make the full covariance matrix, and then decompose it +################### End Class CtStateVector ################### - nparams = int(rc_da_system['nparameters']) - nmembers = int(rc_da_shell['forecast.nmembers']) +################### Begin Class CtOptimizer ################### - fullcov = np.zeros((nparams,nparams),float) +class CtOptimizer(): + """ + This creates an instance of a CarbonTracker optimization object. It handles the minimum least squares optimization + of the CT state vector given a set of CT sample objects. Two routines will be implemented: one where the optimization + is sequential and one where it is the equivalent matrix solution. The choice can be made based on considerations of speed + and efficiency. + """ - nocn = cov_ocn.shape[0] - nbio = cov_bio.shape[0] + def __init__(self, CycleInfo, DaInfo): - fullcov[0:nbio,0:nbio] = cov_bio - fullcov[nbio:nbio+nocn,nbio:nbio+nocn] = cov_ocn + self.nlag = int(CycleInfo.da_settings['time.nlag']) + self.nmembers = int(CycleInfo.da_settings['forecast.nmembers']) + self.nparams = int(DaInfo.da_settings['nparameters']) - # Visually inspect full covariance matrix if verbose + self.CreateMatrices() - if rc_da_shell['verbose']: + return None - plt.imshow(fullcov) - plt.colorbar() - plt.savefig('fullcovariancematrix.png') - plt.close('all') + def CreateMatrices(self): + """ Create Matrix space needed in optimization routine """ + import numpy as np - # Make a cholesky decomposition of the covariance matrix + self.X_prior = np.array( (self.nlag*self.nparams,), 'float') + self.X_prior_prime = np.array( (self.nmembers,self.nlag*self.nparams,), 'float') - U,s,Vh = np.linalg.svd(fullcov) - dof = np.sum(s)**2/sum(s**2) + def FillMatrices(self,StateVector): - C = np.linalg.cholesky(fullcov) + + return None - msg = 'Cholesky decomposition has succeeded offline' ; logging.info(msg) - msg = 'Appr. degrees of freedom in covariance matrix is %s'%(int(dof)) ; logging.info(msg) - if rc_da_shell['verbose']: +################### End Class CtOptimizer ################### - plt.plot(s) - plt.savefig('eigenvaluespectrum.png') - plt.close('all') - # Now create a set of random instances of the given covariance structure +################# Stand alone methods that manipulate the objects above ############## - # First see the random number generator with a fixed value from the rc-file, this is to make reproducible results +def PrepareObs(CycleInfo,type='forecast'): + """ PrepareObs """ - dummy = np.random.seed(int(rc_da_system['random.seed'])) + import shutil - # Structure to hold the data + # First, we make a CarbonTracker Observation object which holds all the information related to the mixing ratios going in + # and coming out of the model. This object also holds the observations themselves, and the simulated value + statistics - parameter_deviations = np.zeros((nparams,nmembers),float) # This is array X' + DaSystem = DaInfo(CycleInfo.da_settings['da.system.rc']) - for member in range(nmembers): - rands = np.random.randn(nparams) - parameter_deviations[:,member] = np.dot(C,rands) + samples = CtObservations(DaSystem) - if rc_da_shell['verbose']: + dummy = samples.AddObs() - plt.hist(parameter_deviations[20,:],20) - plt.savefig('par20histogram.png') - plt.close('all') + # + # Next should follow a section where the observations that are read are further processes, and written to a NetCDF file for + # the transport model to use for x,y,z,t smapling. + # + # For carbontracker the observations already come in netcdf format thanks to Andy and Ken. We will therefore simply copy + # the observations.nc file to the pre-cooked obs files + # - return parameter_deviations, fullcov + filename= samples.ObsFilename + saveas = os.path.join(CycleInfo.da_settings['dir.input'],'observations.nc') + dummy = shutil.copy2(filename,saveas) + msg = 'Successfully copied file (%s) to (%s) ' % (filename,saveas) ; logging.info(msg) + return samples + +def PrepareState(CycleInfo): + """ + + Prepare the ensemble of parameters needed by the forecast model. There are two possible pathways: + (A) Construct a brand new ensemble from a covariance matrix + (B) Get an existing ensemble and propagate it + + The steps for procedure A are: + + (A1) Construct the covariance matrix + (A2) Make a Cholesky decomposition + (A3) Prepare a set of [nmembers] input parameters for each forecast member + + The steps for procedure B are: + + (B1) Get existing parameter values for each member + (B2) Run them through a forecast model + + + Both procedures finish with writing the parameter values to a NetCDF file + + + """ + + # Get the da system specific rc-items, note that they've been validated already by FillObs + + DaSystem = DaInfo(CycleInfo.da_settings['da.system.rc']) + + StateVector = CtStateVector(CycleInfo,DaSystem) + + nlag = int(CycleInfo.da_settings['time.nlag']) + + for n in range(1,nlag+1): + + if n == nlag: + + # Route A: make a new ensemble + + dummy = StateVector.MakeNewEnsemble(n-1) + + else: + + # Route B: Propagate existing ensemble + + Xprime,Qprior = [0,0] + + return StateVector if __name__ == "__main__": - import rc import os - import logging + import sys + from tools_da import StartLogger + from da_initexit import CycleControl + + opts = ['-v'] + args = {'rc':'da.rc','logfile':'da_initexit.log','jobrcfilename':'test.rc'} + + StartLogger() + DaCycle = CycleControl(opts,args) + Da_Info = DaInfo('carbontracker.rc') + + dummy = CtMember(0) + print dummy + + DaCycle.Initialize() + print DaCycle + + samples = CtObservations(Da_Info) + + dummy = samples.AddObs() + dummy = samples.AddSimulations() + + print samples.MrData.getvalues('code') + print samples.MrData.getvalues('obs') + + mlo=samples.MrData.selectsite('mlo_01d0') - logging.basicConfig(level = logging.DEBUG, - format = ' [%(levelname)-7s] (%(asctime)s) %(module)-20s : %(message)s', - datefmt = '%Y-%m-%d %H:%M:%S') + dummy = PrepareObs(DaCycle,'forecast') - rc_da_system = rc.read('carbontracker.rc') - rc_da_shell = rc.read('da.rc') - rc_da_shell['dir.input'] = os.path.join(rc_da_shell['dir.da_run'],'input') - rc_da_shell['dir.output'] = os.path.join(rc_da_shell['dir.da_run'],'output') + dummy = PrepareState(DaCycle) - rc_da_system['verbose'] = True - X,C = PrepareEnsemble(rc_da_shell) - rc_da_shell = PrepareObs(rc_da_shell,rc_da_system) - resids = MakeResiduals(rc_da_shell) diff --git a/da_initexit.py b/da_initexit.py index f71829e..b10a72e 100755 --- a/da_initexit.py +++ b/da_initexit.py @@ -31,6 +31,7 @@ from tools_da import CreateDirs from tools_da import AdvanceTime from tools_da import ParseOptions + class CycleControl(): """ This object controls a data assimilation cycle. It is created using information from a da rc-file with settings, and @@ -62,11 +63,11 @@ class CycleControl(): """ msg = "===============================================================" ; print msg - msg = "DAS rc-file is %s" % self.RcFileName ; print msg - msg = "DAS log file is %s" % self.da_settings['log'] ; print msg - msg = "DAS run directory is %s" % self.da_settings['dir.da_run'] ; print msg - msg = "DAS inverse system is %s" % self.da_settings['da.system'] ; print msg - msg = "DAS forecast model is %s" % self.da_settings['forecast.model'] ; print msg + msg = "DA Cycle rc-file is %s" % self.RcFileName ; print msg + msg = "DA Cycle log file is %s" % self.da_settings['log'] ; print msg + msg = "DA Cycle run directory is %s" % self.da_settings['dir.da_run'] ; print msg + msg = "DA Cycle inverse system is %s" % self.da_settings['da.system'] ; print msg + msg = "DA Cycle forecast model is %s" % self.da_settings['forecast.model'] ; print msg msg = "===============================================================" ; print msg return "" @@ -74,14 +75,14 @@ class CycleControl(): def LoadRc(self,RcFileName): """ - This method loads a DAS rc-file with settings for this simulation + This method loads a DA Cycle rc-file with settings for this simulation """ self.da_settings = rc.read(RcFileName) self.RcFileName = RcFileName self.DaRcLoaded = True - msg = 'DAS rc-file (%s) loaded successfully'%self.RcFileName ; logging.info(msg) + msg = 'DA Cycle rc-file (%s) loaded successfully'%self.RcFileName ; logging.info(msg) return True @@ -107,7 +108,7 @@ class CycleControl(): logging.error(msg) raise IOError,msg - status,msg = ( True,'DAS settings have been validated succesfully' ) ; logging.debug(msg) + status,msg = ( True,'DA Cycle settings have been validated succesfully' ) ; logging.debug(msg) return None @@ -144,11 +145,11 @@ class CycleControl(): self.da_settings['cyclelength'] = cyclelength msg = "===============================================================" ; logging.info(msg) - msg = "DAS start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) - msg = "DAS end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) - msg = "DAS final date is %s" % finaldate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) - msg = "DAS cycle length is %s" % cyclelength ; logging.info(msg) - msg = "DAS restart is %s" % str(self.da_settings['time.restart']) ; logging.info(msg) + msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) + msg = "DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) + msg = "DA Cycle final date is %s" % finaldate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) + msg = "DA Cycle cycle length is %s" % cyclelength ; logging.info(msg) + msg = "DA Cycle restart is %s" % str(self.da_settings['time.restart']) ; logging.info(msg) msg = "===============================================================" ; logging.info(msg) return None diff --git a/das.py b/das.py index a3a82ab..8907551 100755 --- a/das.py +++ b/das.py @@ -28,30 +28,43 @@ def JobStart(opts,args): def JobInput(CycleInfo): """ Set up the input data for the forward model: obs and parameters/fluxes""" from tools_da import PrepareObs - from tools_da import PrepareEnsemble + from tools_da import PrepareState + from tools_da import AddObsToState - dummy = PrepareEnsemble(CycleInfo) + Samples = PrepareObs(CycleInfo,'forecast') - dummy = PrepareObs(CycleInfo,'forecast') + StateVector = PrepareState(CycleInfo ) - return None + dummy = AddObsToState(CycleInfo, StateVector, Samples) + + + return StateVector -def Sample(CycleInfo): +def Sample(CycleInfo, StateVector): """ Sample the filter state for the inversion """ from tools_da import RunForecastModel + from tools_da import ReadModelSamples + + # Implement something that writes the ensemble member parameter info to file, or manipulates them further into the + # type of info needed in TM5/WRF/SIBCASA + # Run the forecast model dummy = RunForecastModel(CycleInfo,'forecast') - # Optionally, post-processing of the model ouptu can be added that deals for instance with + # Read forecast model samples that were written to NetCDF files + + dummy = ReadModelSamples(StateVector) + + # Optionally, post-processing of the model output can be added that deals for instance with # sub-sampling of time series, vertical averaging, etc. return None -def Invert(CycleInfo): +def Invert(CycleInfo, StateVector ): """ Perform the inverse calculation """ import tools_da - dummy = tools_da.Invert(CycleInfo) + dummy = tools_da.Optimize(CycleInfo,StateVector) return None @@ -104,14 +117,14 @@ if __name__ == "__main__": msg = header+"starting JobStart"+footer ; logging.info(msg) CycleInfo = JobStart(opts,args) - #msg = header+"starting JobInput"+footer ; logging.info(msg) - #dummy = JobInput(CycleInfo) + msg = header+"starting JobInput"+footer ; logging.info(msg) + StateVector = JobInput(CycleInfo) msg = header+"starting Sample Taking"+footer ; logging.info(msg) - dummy = Sample(CycleInfo) + dummy = Sample(CycleInfo, StateVector) - #msg = header+"starting Invert"+footer ; logging.info(msg) - #dummy = Invert(CycleInfo) + msg = header+"starting Invert"+footer ; logging.info(msg) + dummy = Invert(CycleInfo, StateVector ) msg = header+"starting Propagate"+footer ; logging.info(msg) dummy = Propagate(CycleInfo) diff --git a/tm5_tools.py b/tm5_tools.py index b82b61a..662749f 100755 --- a/tm5_tools.py +++ b/tm5_tools.py @@ -199,7 +199,7 @@ class TM5(): - def Run(self): + def Run(self,nprocesses): """ Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and @@ -234,8 +234,8 @@ class TM5(): # Open logfile and spawn model, wait for finish and return code logging.info('Starting model executable as subprocess ') - cmd = ['openmpirun','-np', '10', mpi_shell_file,'./tm5.x'] - #cmd = ['mpirun','-np', rc_da_shell['forecast.nmembers'],mpi_shell_file,'./tm5.x'] + #cmd = ['openmpirun','-np', '10', mpi_shell_file,'./tm5.x'] + cmd = ['mpirun','-np',str(nprocesses),mpi_shell_file,'./tm5.x'] #cmd = ['./tm5.x'] code = subprocess.call(cmd,stdout=modellogfile,stderr=modellogfile) @@ -332,8 +332,8 @@ def DaInitialize(rc_da_shell): 'time.final' : rc_da_shell['time.sample.end'] , 'rundir' : rc_da_shell['dir.exec.tm5'] , 'outputdir' : rc_da_shell['dir.output'] , - 'savedir' : rc_da_shell['dir.save'] #, - #'das.input.dir' : '/data/CO2/carbontracker/ct08//obsnc/' + 'savedir' : rc_da_shell['dir.save'] , + 'das.input.dir' : rc_da_shell['dir.input'] } if rc_da_shell['time.restart'] == True: NewItems['istart'] = 3 @@ -370,7 +370,7 @@ if __name__ == "__main__": #tm=TM5('/Users/peters/Modeling/TM5/ct_new.rc') #tm.WriteRc() #tm.WriteRunRc() - #tm.Run() + #tm.Run(1) #tm.SaveData() #sys.exit(0) @@ -385,7 +385,7 @@ if __name__ == "__main__": dasrc['dir.input'] = os.path.join(dasrc['dir.da_run'],'input') tm = DaInitialize(dasrc) - tm.Run() + tm.Run(dasrc['forecast.nmembers']) #tm.SaveData() diff --git a/tools_da.py b/tools_da.py index 4ce2a6c..5407bbd 100755 --- a/tools_da.py +++ b/tools_da.py @@ -110,6 +110,24 @@ def ValidateOptsArgs(opts,args): return opts,args +def ValidateRC(rcfile,needed_items): + """ validate the contents of an rc-file given a dictionary of required keys """ + + for k,v in rcfile.iteritems(): + if v == 'True' : rcfile[k] = True + if v == 'False': rcfile[k] = False + if 'date' in k : rcfile[k] = datetime.datetime.strptime(v,'%Y-%m-%d %H:%M:%S') + + for key in needed_items: + + if not rcfile.has_key(key): + status,msg = ( False,'Missing a required value in rc-file : %s' % key) + logging.error(msg) + raise IOError,msg + + status,msg = ( True,'rc-file has been validated succesfully' ) ; logging.debug(msg) + + def CreateDirs(dirname,forceclean=False): """ Create a directory and report success, only if non-existent """ @@ -199,28 +217,28 @@ def AdvanceTime(time_in,interval): return time_out -def PrepareObs(rc_da_shell,type='forecast'): +def PrepareObs(CycleInfo,type='forecast'): """ Prepare a set of observations to be co-sampled by the model. Although the collecting and parsing of the observations will depend on the specific application, the final result of this step is an output file called "observations.nc" which carries the x,y,z,t, dt information of each observation """ - if rc_da_shell['da.system'] == 'CarbonTracker': import ct_tools as da_system + if CycleInfo.da_settings['da.system'] == 'CarbonTracker': import ct_tools as da_system msg = "Using %s as DA system" % da_system.identifier ; logging.debug(msg) - rc_da_system = rc.read(rc_da_shell['da.system.rc'],silent = True) + rc_da_system = rc.read(CycleInfo.da_settings['da.system.rc'],silent = True) dummy = ValidateRC(rc_da_system,da_system.needed_rc_items) # Add da_system specific rc-file to the rc_da_shell dictionary - rc_da_shell['da.system.info'] = rc_da_system + CycleInfo.da_settings['da.system.info'] = rc_da_system - dummy = da_system.PrepareObs(rc_da_shell,rc_da_system,type=type) + Samples = da_system.PrepareObs(CycleInfo,type=type) - return None + return Samples -def PrepareEnsemble(rc_da_shell ): +def PrepareState(CycleInfo): """ Prepare the ensemble of parameters needed by the forecast model. This step will depend on the assimilation system type, @@ -229,11 +247,20 @@ def PrepareEnsemble(rc_da_shell ): """ - if rc_da_shell['da.system'] == 'CarbonTracker': import ct_tools as da_system + if CycleInfo.da_settings['da.system'] == 'CarbonTracker': import ct_tools as da_system - dummy = da_system.PrepareEnsemble(rc_da_shell ) + StateVector = da_system.PrepareState(CycleInfo ) - return None + return StateVector + +def AddObsToState(CycleInfo, StateVector, Samples): + """ Add the observation objects to the ensemble members in the StateVector """ + + if CycleInfo.da_settings['da.system'] != 'CarbonTracker': raise Exception,"CarbonTracker specific code in this routine!" + + for Member in StateVector.EnsembleMembers[-1]: + Member.ModelSample = Samples + Member.SampleInputFile = os.path.join(CycleInfo.da_settings['dir.output'],'samples.%03d.nc'%Member.membernumber) def RunForecastModel(CycleInfo,step='forecast'): @@ -285,18 +312,33 @@ def RunForecastModel(CycleInfo,step='forecast'): # Run the forward model - status = executable.Run() + #status = executable.Run(CycleInfo.da_settings['forecast.nmembers']) + status = 0 ########################################### RETURN CONTROL TO DA SHELL ######################################### return status -def Invert(rc_da_shell): +def ReadModelSamples(StateVector): + """ Read forecast model samples for each ensemble member into the StateVector's ensemble members. """ + + # Add model written samples to the ensemble members CtObservations objects + + for Member in StateVector.EnsembleMembers[-1]: + Member.ModelSample.AddSimulations(Member.SampleInputFile) + + msg = "Added samples from the forecast model to the StateVector" ; logging.info(msg) + + return None + +def Optimize(CycleInfo, StateVector ): """ Perform least-squares minimization""" - if rc_da_shell['da.system'] == 'CarbonTracker': import ct_tools as da_system + if CycleInfo.da_settings['da.system'] == 'CarbonTracker': import ct_tools as da_system + + DaSystem = da_system.DaInfo(CycleInfo.da_settings['da.system.rc']) - dummy = da_system.MakeResiduals(rc_da_shell) + optimizer = da_system.CtOptimizer(CycleInfo, DaSystem) def CleanUpCycle(CycleInfo): """ -- GitLab