diff --git a/ct_tools.py b/ct_tools.py new file mode 100755 index 0000000000000000000000000000000000000000..8d66d2033e7be874ff1bea23484069605068bfb0 --- /dev/null +++ b/ct_tools.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python +# ct_tools.py + +""" +Author : peters + +Revision History: +File created on 12 Feb 2009. + +""" +import os +import sys +import rc +import logging + +identifier = 'CarbonTracker CO2' + +needed_rc_items = ['obs.input.dir', + 'obs.input.fname', + 'ocn.covariance', + 'nparameters', + 'bio.covariance', + 'deltaco2.prefix', + 'regtype'] + +def MakeResiduals(rc_da_shell): + """ Method makes residuals of CO2 mixing ratios from the observed and sampled values """ + import pyhdf.SD as HDF + + # Read observations NetCDF file + + ObsFileName = os.path.join(rc_da_shell['dir.output'],rc_da_shell['obs.full.filename']) + ncf = HDF.SD(ObsFileName) + obsdates = ncf.select('decimal_date').get() + dummy = ncf.end() + msg = "Successfully read data from obs file (%s)"%ObsFileName ; logging.debug(msg) + + # Read samples NetCDF file + + SamplesFileName = ObsFileName.replace('observations','samples').replace('input','output') + ncf = HDF.SD(SamplesFileName) + sampdates = ncf.select('decimal_date').get() + dummy = ncf.end() + msg = "Successfully read data from model sample file (%s)"%SamplesFileName ; logging.debug(msg) + + if (obsdates==sampdates).all(): + pass + + else: + msg = "Dates in the observation file and samples file are not equal, exiting..." ; logging.error(msg) + raise IOError,msg + + msg = "Residuals created, continuing" ; logging.info(msg) + + return None + + +def PrepareObs(rc_da_shell,rc_da_system,type='forecast'): + """ PrepareObs """ + + import pycdf as CDF + import shutil + + saveas = os.path.join(rc_da_shell['dir.input'],'observations.nc') + # ncf = CDF.CDF(saveas,CDF.NC.WRITE|CDF.NC.TRUNC|CDF.NC.CREATE) + # ncf.CreationDate = datetime.datetime.now().strftime('%Y %B %d') + # ncf.CreationUser = os.environ['USER'] + # ncf.CreationDir = rc_da_shell['dir.da_submit'] + # ncf.CreationScript = sys.argv[0] + # ncf.close() + + # msg = 'Created a new observation input file (%s) ' % saveas ; logging.info(msg) + + # For carbontracker the observations already come in netcdf format thanks to Andy and Ken. We will therefore simply merge + # the observations.nc file with the pre-cooked obs files + + + # Select observation input file for this time step + #sfname = rc_da_system['obs.input.fname']+'.%s'%(rc_da_shell['startdate'].strftime('%Y%m%d') )+'.nc' + sfname = rc_da_system['obs.input.fname']+'.%s'%('20050305')+'.nc' + msg = 'This filename is hardcoded but needs replacement' ; logging.warning(msg) + filename = os.path.join(rc_da_system['obs.input.dir'],sfname) + + if not os.path.exists(filename): + + msg = 'Could not find the required observation input file (%s) ' % filename ; logging.error(msg) + raise IOError,msg + + else: + + dummy = shutil.copy2(filename,saveas) + + msg = 'Successfully copied file (%s) to (%s) ' % (filename,saveas) ; logging.info(msg) + + rc_da_shell['obs.full.filename'] = saveas + + return rc_da_shell + +def PrepareEnsemble(rc_da_shell ): + """ + + Prepare the ensemble of parameters needed by the forecast model. There are two possible pathways: + + (A) Construct a brand new ensemble from a covariance matrix + + (B) Get an existing ensemble and propagate it + + + The steps for procedure A are: + + (A1) Construct the covariance matrix + (A2) Make a Cholesky decomposition + (A3) Prepare a set of [nmembers] input parameters for each forecast member + + The steps for procedure B are: + + (B1) Get existing parameter values for each member + (B2) Run them through a forecast model + + + Both procedures finish with writing the parameter values to a NetCDF file + + + """ + + # Get the da system specific rc-items, note that they've been validated already by FillObs + + rc_da_system = rc.read(rc_da_shell['da.system.rc'],silent = True) + + nlag = int(rc_da_shell['time.nlag']) + + for n in range(nlag): + + if n == nlag: + + # Route A: make a new ensemble + + Xprime,Qprior = MakeNewEnsemble(rc_da_shell, rc_da_system) + + else: + + # Route B: Propagate existing ensemble + + Xprime,Qprior = [0,0] + + + return Xprime, Qprior + + +def MakeNewEnsemble(rc_da_shell, rc_da_system): + """ Make a new ensemble from specified matrices """ + + import pyhdf.SD as hdf + import numpy as np + import matplotlib.pyplot as plt + + # Get the needed matrices from the specified covariance files + + file_ocn_cov = rc_da_system['ocn.covariance'] + file_bio_cov = rc_da_system['bio.covariance'] + + for file in [file_ocn_cov,file_bio_cov]: + + if not os.path.exists(file): + + msg = "Cannot find the specified file %s" % file ; logging.error(msg) + raise IOError,msg + else: + + msg = "Using covariance file: %s" % file ; logging.info(msg) + + f_ocn = hdf.SD(file_ocn_cov) + f_bio = hdf.SD(file_bio_cov) + + cov_ocn = f_ocn.select('qprior').get() + cov_bio = f_bio.select('qprior').get() + + dummy = f_ocn.end() + dummy = f_bio.end() + + dummy = logging.debug("Succesfully closed files after retrieving prior covariance matrices") + + # Once we have the matrices, we can start to make the full covariance matrix, and then decompose it + + nparams = int(rc_da_system['nparameters']) + nmembers = int(rc_da_shell['forecast.nmembers']) + + fullcov = np.zeros((nparams,nparams),float) + + nocn = cov_ocn.shape[0] + nbio = cov_bio.shape[0] + + fullcov[0:nbio,0:nbio] = cov_bio + fullcov[nbio:nbio+nocn,nbio:nbio+nocn] = cov_ocn + + # Visually inspect full covariance matrix if verbose + + if rc_da_shell['verbose']: + + plt.imshow(fullcov) + plt.colorbar() + plt.savefig('fullcovariancematrix.png') + plt.close('all') + + # Make a cholesky decomposition of the covariance matrix + + U,s,Vh = np.linalg.svd(fullcov) + dof = np.sum(s)**2/sum(s**2) + + C = np.linalg.cholesky(fullcov) + + msg = 'Cholesky decomposition has succeeded offline' ; logging.info(msg) + msg = 'Appr. degrees of freedom in covariance matrix is %s'%(int(dof)) ; logging.info(msg) + + if rc_da_shell['verbose']: + + plt.plot(s) + plt.savefig('eigenvaluespectrum.png') + plt.close('all') + + # Now create a set of random instances of the given covariance structure + + # First see the random number generator with a fixed value from the rc-file, this is to make reproducible results + + dummy = np.random.seed(int(rc_da_system['random.seed'])) + + # Structure to hold the data + + parameter_deviations = np.zeros((nparams,nmembers),float) # This is array X' + + for member in range(nmembers): + rands = np.random.randn(nparams) + parameter_deviations[:,member] = np.dot(C,rands) + + if rc_da_shell['verbose']: + + plt.hist(parameter_deviations[20,:],20) + plt.savefig('par20histogram.png') + plt.close('all') + + return parameter_deviations, fullcov + + + + + +if __name__ == "__main__": + + import rc + import os + import logging + + logging.basicConfig(level = logging.DEBUG, + format = ' [%(levelname)-7s] (%(asctime)s) %(module)-20s : %(message)s', + datefmt = '%Y-%m-%d %H:%M:%S') + + rc_da_system = rc.read('carbontracker.rc') + rc_da_shell = rc.read('da.rc') + rc_da_shell['dir.input'] = os.path.join(rc_da_shell['dir.da_run'],'input') + rc_da_shell['dir.output'] = os.path.join(rc_da_shell['dir.da_run'],'output') + + rc_da_system['verbose'] = True + + X,C = PrepareEnsemble(rc_da_shell) + + rc_da_shell = PrepareObs(rc_da_shell,rc_da_system) + + resids = MakeResiduals(rc_da_shell) + diff --git a/da.rc b/da.rc new file mode 100644 index 0000000000000000000000000000000000000000..7d3de376201e2fcd78a77ce5d6368daf27d580c6 --- /dev/null +++ b/da.rc @@ -0,0 +1,19 @@ +! Info on the data assimilation cycle + +time.restart : False +time.start : 2005030500 +time.finish : 2005030700 +time.cycle : 1 +time.nlag : 2 +dir.da_run : ${HOME}/tmp/test_da + +! Info on the DA system used + +da.system : CarbonTracker +da.system.rc : carbontracker.rc + +! Info on the forward model to be used + +forecast.model : TM5 +forecast.model.rc : ${HOME}/Modeling/TM5/ct_new.rc +forecast.nmembers : 2 diff --git a/tm5_tools.py b/tm5_tools.py index 64f1bb6155684c2c65e60e237b655b9c839950ec..a9ea5a0f4423f8cc94f347270a6da9b607a4b505 100755 --- a/tm5_tools.py +++ b/tm5_tools.py @@ -6,6 +6,7 @@ Author : peters Revision History: File created on 09 Feb 2009. +Major modifications to go to a class-based approach, July 2010. This module holds specific functions needed to use the TM5 model within the data assimilation shell. It uses the information from the DA system in combination with the generic tm5.rc files. @@ -26,7 +27,7 @@ import rc import shutil identifier = 'TM5' -version = '0.1' +version = 'release 3.0' mpi_shell_file = 'tm5_mpi_wrapper' needed_rc_items = [ @@ -36,173 +37,271 @@ needed_rc_items = [ 'time.break.nday' ] -def ModifyRC(rc_da_shell,rc_tm5): - """ modify parts of the tm5 rc-file to give control of file locations to the DA shell - instead of to the tm5.rc script. Note that most parameters remain unchanged because they do - not pertain to the actual run in progress, but to the TM5 model as a whole. +################### Begin Class TM5 ################### - Currently, only the savedir and outputdir variables are replaced so that TM5 writes and reads from - the DA system directories. +class TM5(): + """ This class holds methods and variables that are needed to run the TM5 model. It is initiated with as only argument a TM5 rc-file + location. This rc-file will be used to figure out the settings for the run. This method of running TM5 assumes that a pre-compiled + tm5.exe is present, and it will be run from time.start to time.final. These settings can be modified later. To run a model version, + simply compile the model using an existing TM5 rc-file, then open python, and type: - Also, we add a new rc-variable which describes the inputdir for the da system + []> tm=TM5('/Users/peters/Modeling/TM5/tutorial.rc') + []> tm.WriteRc() + []> tm.WriteRunRc() + []> tm.Run() - Note that we replace these values in all {key,value} pairs of the tm5.rc file! - + """ - orig_savedir = rc_tm5['savedir'] - orig_outputdir = rc_tm5['outputdir'] + def __init__(self, RcFileName='dummy.rc',log=True): + """In instance of the TM5 class requires information on the model version only""" + import tools_da as tools - for k,v in rc_tm5.iteritems(): - if not isinstance(v,str): continue - if orig_savedir in v: - rc_tm5[k] = v.replace(orig_savedir,rc_da_shell['dir.save']) - logging.debug('Replaced savedir in tm5 rc-item %s ' % k) - if orig_outputdir in v: - rc_tm5[k] = v.replace(orig_outputdir,rc_da_shell['dir.output']) - logging.debug('Replaced outputdir in tm5 rc-item %s ' % k) + self.Identifier = identifier # the identifier gives the model name + self.Version = version # the TM5 model version used - rc_tm5['das.input.dir'] = rc_da_shell['dir.input'] - msg = 'Added das.input.dir to tm5 rc-file' ;logging.debug(msg) + if log: + self.Log = tools.StartLogger() + else: + self.Log = False - tm5rcfilename = os.path.join(rc_tm5['rundir'],'tm5.rc') - dummy = rc.write(tm5rcfilename,rc_tm5) - msg = "Modified tm5.rc written (%s)" % tm5rcfilename ;logging.debug(msg) + dummy = self.LoadRc(RcFileName) + dummy = self.ValidateRc() + msg = '%s object initialized'%self.Identifier ; logging.info(msg) + msg = '%s version: %s'%(self.Identifier,self.Version) ; logging.info(msg) -def CreateRunRC(rc_da_shell,rc_tm5): - """ - - Create the tm5-runtime.rc file which is read by initexit.F90 to control time loop and restart from save files - - We replace values for start and end date, as well as save and output folders, with values from the da shell. - - """ - rc_runtm5={} + def __str__(self): + """ String representation of the TM5 object """ - rc_runtm5['year1'] = rc_da_shell['startdate'].year - rc_runtm5['month1'] = rc_da_shell['startdate'].month - rc_runtm5['day1'] = rc_da_shell['startdate'].day - rc_runtm5['hour1'] = rc_da_shell['startdate'].hour - rc_runtm5['minu1'] = 0 - rc_runtm5['sec1'] = 0 + return "This is a %s object, version %s"%(self.Identifier,self.Version) - rc_runtm5['year2'] = rc_da_shell['enddate'].year - rc_runtm5['month2'] = rc_da_shell['enddate'].month - rc_runtm5['day2'] = rc_da_shell['enddate'].day - rc_runtm5['hour2'] = rc_da_shell['enddate'].hour - rc_runtm5['minu2'] = 0 - rc_runtm5['sec2'] = 0 + def LoadRc(self,RcFileName): + """ This method loads a TM5 rc-file with settings for this simulation """ - if rc_da_shell['time.restart'] == True: - rc_runtm5['istart'] = 3 - else: - rc_runtm5['istart'] = rc_tm5['istart'] + self.tm_settings = rc.read(RcFileName) + self.RcFileName = RcFileName + self.Tm5RcLoaded = True - rc_runtm5['savedir'] = rc_da_shell['dir.save'] - rc_runtm5['outputdir'] = rc_da_shell['dir.output'] + msg = 'TM5 rc-file loaded successfully' ; logging.info(msg) - tm5rcfilename = os.path.join(rc_tm5['rundir'],'tm5_runtime.rc') - rc.write(tm5rcfilename,rc_runtm5) + return True -def PrepareExe(rc_da_shell): - """ - Prepare a forward model TM5 run, this consists of: + def ValidateRc(self): + """ + Validate the contents of the tm_settings dictionary and add extra values + """ + import datetime + + for k,v in self.tm_settings.iteritems(): + if v == 'True' : self.tm_settings[k] = True + if v == 'False': self.tm_settings[k] = False + if 'date' in k : self.tm_settings[k] = datetime.datetime.strptime(v,'%Y-%m-%d %H:%M:%S') + if 'time.start' in k : + self.tm_settings[k] = datetime.datetime.strptime(v,'%Y%m%d%H') + if 'time.final' in k : + self.tm_settings[k] = datetime.datetime.strptime(v,'%Y%m%d%H') + + for key in needed_rc_items: + + if not self.tm_settings.has_key(key): + status,msg = ( False,'Missing a required value in rc-file : %s' % key) + logging.error(msg) + raise IOError,msg + + status,msg = ( True,'rc-file has been validated succesfully' ) ; logging.debug(msg) + + + + + def ModifyRC(self,NewValues): + """ modify parts of the tm5 settings to give control of file locations to the DA shell + instead of to the tm5.rc script. Note that most parameters remain unchanged because they do + not pertain to the actual run in progress, but to the TM5 model as a whole. + + Currently, only the savedir and outputdir variables are replaced so that TM5 writes and reads from + the DA system directories. + + Also, we add a new rc-variable which describes the inputdir for the da system + + Note that we replace these values in all {key,value} pairs of the tm5.rc file! + + """ - - reading the TM5 rc-file, - - validating it, - - modifying the values, - - Creating a tm5_runtime.rc file - - Removing the existing tm5.ok file if present - """ + for k,v in NewValues.iteritems(): + if self.tm_settings.has_key(k): + self.tm_settings[k] = v + logging.debug('Replaced tm5 rc-item %s ' % k) + else: + logging.debug('Skipped new rc-item %s ' % k) - from tools_da import ValidateRC -# Get tm5.rc parameters defined for this forecast model run, only needed for location, etc. + def WriteRc(self): + """ Write the rc-file settings to a tm5.rc file in the rundir""" - rc_tm5 = rc.read(rc_da_shell['forecast.model.rc'],silent = True) + tm5rcfilename = os.path.join(self.tm_settings['rundir'],'tm5.rc') - ValidateRC(rc_tm5,needed_rc_items) + dummy = rc.write(tm5rcfilename,self.tm_settings) -# Write a modified TM5 model rc-file in which run/break times are defined by our da system + msg = "Modified tm5.rc written (%s)" % tm5rcfilename ;logging.debug(msg) - ModifyRC(rc_da_shell,rc_tm5) + def WriteRunRc(self): + """ + + Create the tm5-runtime.rc file which is read by initexit.F90 to control time loop and restart from save files + + """ -# Create a TM5 runtime rc-file in which run/break times are defined by our da system + tm5rcfilename = os.path.join(self.tm_settings['rundir'],'tm5_runtime.rc') - CreateRunRC(rc_da_shell,rc_tm5) + dummy = rc.write(tm5rcfilename,self.tm_settings) -# Copy the pre-compiled MPI wrapper to the execution directory + rc_runtm5={} - targetdir = os.path.join(rc_da_shell['dir.exec'],'tm5') + rc_runtm5['year1'] = self.tm_settings['time.start'].year + rc_runtm5['month1'] = self.tm_settings['time.start'].month + rc_runtm5['day1'] = self.tm_settings['time.start'].day + rc_runtm5['hour1'] = self.tm_settings['time.start'].hour + rc_runtm5['minu1'] = 0 + rc_runtm5['sec1'] = 0 - if not os.path.exists(mpi_shell_file): - msg = "Cannot find the mpi_shell wrapper needed for completion (%s)"% mpi_shell_file ; logging.error(msg) - msg = "Please see the 'readme_wrapper.txt' file for instructions to compile" ; logging.error(msg) - raise IOError + rc_runtm5['year2'] = self.tm_settings['time.final'].year + rc_runtm5['month2'] = self.tm_settings['time.final'].month + rc_runtm5['day2'] = self.tm_settings['time.final'].day + rc_runtm5['hour2'] = self.tm_settings['time.final'].hour + rc_runtm5['minu2'] = 0 + rc_runtm5['sec2'] = 0 - shutil.copy(mpi_shell_file,targetdir) + rc_runtm5['istart'] = self.tm_settings['istart'] + rc_runtm5['savedir'] = self.tm_settings['savedir'] + rc_runtm5['outputdir'] = self.tm_settings['outputdir'] -# Remove the tm5.ok file from a previous run, placed back only if a successful TM5 run is executed + rc.write(tm5rcfilename,rc_runtm5) - okfile = os.path.join(targetdir,'tm5.ok') - if os.path.exists(okfile): os.remove(okfile) + msg = "Modified tm5_runtime.rc written (%s)" % tm5rcfilename ;logging.debug(msg) - return rc_tm5 -def StartExe(rc_da_shell,rc_tm5): - """ + + def Run(self): + """ Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and only if successfull on all processors will execution of the shell continue. + """ + + cwd = os.getcwd() + targetdir = os.path.join(self.tm_settings['rundir']) + + # Go to executable directory and start the subprocess, using a new logfile + + os.chdir(targetdir) + logging.debug('Changing directory to %s ' % targetdir ) + self.ModelLogFilename = 'tm5.log' + modellogfile = open(self.ModelLogFilename,'w') + logging.info('Logging model output to %s ' % self.ModelLogFilename) + + # Next, make sure there is an actual model version compiled and ready to execute + + self.Tm5Executable = os.path.join(targetdir,'tm5.x') + if not os.path.exists(self.Tm5Executable): + msg = "Required TM5 executable was not found %s"%self.Tm5Executable ; logging.error(msg) + msg = "Please compile the model with the specified rc-file and the regular TM5 scripts first" ; logging.error(msg) + raise IOError + + # Remove the tm5.ok file from a previous run, placed back only if a successful TM5 run is executed + + okfile = 'tm5.ok' + if os.path.exists(okfile): os.remove(okfile) + + # Open logfile and spawn model, wait for finish and return code + + logging.info('Starting model executable as subprocess ') + #cmd = ['openmpirun','-np', rc_da_shell['forecast.nmembers'],mpi_shell_file,'./tm5.x'] + #cmd = ['mpirun','-np', rc_da_shell['forecast.nmembers'],mpi_shell_file,'./tm5.x'] + cmd = ['./tm5.x'] + code = subprocess.call(cmd,stdout=modellogfile,stderr=modellogfile) + + modellogfile.close() + + # Interpret/Handle exit code + + if not os.path.exists(okfile): code = -1 + + if code == 0: + logging.info('Finished model executable succesfully (%s)'%code) + self.Status = 'Success' + else: + logging.error('Error in model executable return code: %s ' % code) + logging.info('Inspect [%s] to find output from model executable ' % modellogfilename) + self.Status = 'Failed' + raise OSError + +# Return to working directory + + os.chdir(cwd) + +################### End Class TM5 ################### + + + +def PrepareExe(rc_da_shell): + """ + Prepare a forward model TM5 run, this consists of: + + - reading the TM5 rc-file, + - validating it, + - modifying the values, + - Creating a tm5_runtime.rc file + - Removing the existing tm5.ok file if present + """ - from tools_da import CreateLinks + from tools_da import CreateLinks, CreateDirs, ParseTimes + +# Get tm5.rc parameters defined for this forecast model run, only needed for location, etc. + + Tm5Model = TM5( rc_da_shell['forecast.model.rc'] ) # Create a link to the rundirectory of the TM5 model - sourcedir = rc_tm5['rundir'] - targetdir = os.path.join(rc_da_shell['dir.exec'],'tm5') + sourcedir = Tm5Model.tm_settings['rundir'] + targetdir = os.path.join(rc_da_shell['dir.exec']) CreateLinks(sourcedir,targetdir) -# Go to executable directory and start the subprocess, using a new logfile + ParseTimes(rc_da_shell) - os.chdir(targetdir) - logging.debug('Changing directory to %s ' % targetdir ) - modellogfilename = os.path.join(rc_da_shell['dir.jobs'],'tm5.%s'%rc_da_shell['log']) - modellogfile = open(modellogfilename,'w') - logging.info('Logging model output to %s ' % modellogfilename) +# Write a modified TM5 model rc-file in which run/break times are defined by our da system -# Open logfile and spawn model, wait for finish and return code + NewItems = { + 'time.start': rc_da_shell['startdate'] , + 'time.final' : rc_da_shell['enddate'] , + 'rundir' : rc_da_shell['dir.exec'] , + 'savedir' : rc_da_shell['dir.save'] , + 'outputdir': rc_da_shell['dir.output'] + } - logging.info('Starting model executable as subprocess ') - #cmd = ['openmpirun','-np', rc_da_shell['forecast.nmembers'],mpi_shell_file,'./tm5.x'] - #cmd = ['mpirun','-np', rc_da_shell['forecast.nmembers'],mpi_shell_file,'./tm5.x'] - cmd = ['./tm5.x'] - code = subprocess.call(cmd,stdout=modellogfile,stderr=modellogfile) + Tm5Model.ModifyRC(NewItems) + Tm5Model.WriteRc() - modellogfile.close() + Tm5Model.WriteRunRc() -# Interpret/Handle exit code +# Copy the pre-compiled MPI wrapper to the execution directory - if not os.path.exists(os.path.join(targetdir,'tm5.ok')): code = -1 + targetdir = os.path.join(rc_da_shell['dir.exec']) - if code == 0: - logging.info('Finished model executable succesfully (%s)'%code) - else: - logging.error('Error in model executable return code: %s ' % code) - logging.info('Inspect [%s] to find output from model executable ' % modellogfilename) - raise OSError + if not os.path.exists(mpi_shell_file): + msg = "Cannot find the mpi_shell wrapper needed for completion (%s)"% mpi_shell_file ; logging.error(msg) + msg = "Please see the 'readme_wrapper.txt' file for instructions to compile" ; logging.error(msg) + raise IOError -# Return to working directory + shutil.copy(mpi_shell_file,targetdir) - os.chdir(rc_da_shell['dir.da_run']) + return Tm5Model - return code def WriteSaveData(rc_da_shell): """ Write the TM5 recovery data for the next DA cycle """ @@ -241,4 +340,31 @@ def WriteSaveData(rc_da_shell): if __name__ == "__main__": - pass + + import sys + import logging + + #tm=TM5('/Users/peters/Modeling/TM5/tutorial.rc') + #tm.WriteRc() + #tm.WriteRunRc() + #tm.Run() + + #sys.exit(0) + + dasrc=rc.read('da.rc') + + dasrc['dir.save']=os.path.join(dasrc['dir.da_run'],'save') + dasrc['dir.output']=os.path.join(dasrc['dir.da_run'],'output') + dasrc['dir.exec']=os.path.join(dasrc['dir.da_run'],'exec') + + tm = PrepareExe(dasrc) + tm.Run() + + + + + + + + + diff --git a/tools_da.py b/tools_da.py new file mode 100755 index 0000000000000000000000000000000000000000..d56820d2f371d4dda4880619c53d1e74e17a8e01 --- /dev/null +++ b/tools_da.py @@ -0,0 +1,348 @@ +#!/usr/bin/env python +# tools_da.py + +""" +Author : peters + +Revision History: +File created on 03 Oct 2008. + +Temporary module to hold classes and methods that are in development + +""" + +import logging +import os +import sys +import shutil +import rc +import datetime + +needed_rc_da_items=[ + 'time.start', + 'time.finish', + 'time.nlag', + 'time.cycle', + 'dir.da_run', + 'forecast.model', + 'forecast.model.rc', + 'da.system'] + +helptext=\ +""" +HELP!!! +""" + +def StartLogger(logfile='logfile.log'): + """ start the logging of messages to screen and to file""" + +# start the logging basic configuration by setting up a log file + + logging.basicConfig(level = logging.DEBUG, + format = ' [%(levelname)-7s] (%(asctime)s) py-%(module)-20s : %(message)s', + datefmt = '%Y-%m-%d %H:%M:%S', + filename = logfile, + filemode = 'a') + +# define a Handler which writes INFO messages or higher to the sys.stdout + + console = logging.StreamHandler(sys.stdout) + dummy = console.setLevel(logging.DEBUG) + +# set a format which is simpler for console use + + formatter = logging.Formatter('[%(levelname)-7s] (%(asctime)s) py-%(module)-20s: %(message)s') + +# tell the handler to use this format + + dummy = console.setFormatter(formatter) + +# add the handler to the root logger + + dummy = logging.getLogger('').addHandler(console) + + +def ParseOptions(): + """ Function parses options from the command line and returns the arguments as a dictionary""" + import getopt + import sys + +# Parse keywords, the only option accepted so far is the "-h" flag for help + + opts=[] + args=[] + try: + opts, args = getopt.gnu_getopt(sys.argv[1:], "-hrv") + except getopt.GetoptError, msg: + logging.error('%s'%msg) + sys.exit(2) + + for options in opts: + options=options[0].lower() + if options == '-h': + print "" + print helptext + sys.exit(2) + if options == '-r': + print "" + # logging.info('-r flag specified on command line: recovering from crash') + if options == '-v': + print "" + # logging.info('-v flag specified on command line: extra verbose output') + +# Parse arguments and return as dictionary + + arguments={} + for item in args: + #item=item.lower() + +# Catch arguments that are passed not in "key=value" format + + if '=' in item: + key, arg = item.split('=') + else: + logging.error('%s'%'Argument passed without description (%s)' % item) + raise getopt.GetoptError,arg + + arguments[key]=arg + + if opts: opts=opts[0] + + return opts, arguments + +def ValidateRC(rcfile,needed_items): + """ validate the contents of an rc-file given a dictionary of required keys """ + + for k,v in rcfile.iteritems(): + if v == 'True' : rcfile[k] = True + if v == 'False': rcfile[k] = False + if 'date' in k : rcfile[k] = datetime.datetime.strptime(v,'%Y-%m-%d %H:%M:%S') + + for key in needed_items: + + if not rcfile.has_key(key): + status,msg = ( False,'Missing a required value in rc-file : %s' % key) + logging.error(msg) + raise IOError,msg + + status,msg = ( True,'rc-file has been validated succesfully' ) ; logging.debug(msg) + +def CreateDirs(dirname,forceclean=False): + """ Create a directory and report success, only if non-existent """ + + if forceclean: + try: + shutil.rmtree(dirname) + except: + pass + + if not os.path.exists(dirname): + os.makedirs(dirname) + msg='Creating new directory %s' % dirname + logging.info(msg) + else: + msg='Using existing directory %s' % dirname + logging.info(msg) + return None + +def CreateLinks(sourcedir,targetdir): + """ Create a symbolic link from, source to target and report success, note + that exisiting links are first removed and then recreated """ + + if os.path.exists(targetdir): + os.unlink(targetdir) + msg='Unlinking existing directory %s' % targetdir + logging.debug(msg) + try: + os.symlink(sourcedir,targetdir) + msg='Created new link from %s to %s' % (sourcedir, targetdir) + logging.debug(msg) + except OSError,msg: + msg='Failed to create link from %s to %s' % (sourcedir, targetdir) + logging.error(msg) + raise OSError + + return None + +def SetInterval(rc_da_shell,run='forecast'): + """ Set the interval over which the observation operator will be run. There are two options: + + (1) forecast : the simulation runs from startdate to startdate + nlag*delta_time + (1) advance : the simulation runs from startdate to startdate + 1 *delta_time + """ + + if run not in ['forecast','advance']: + raise ValueError, "Unknown interval specified for run (%s)" % run + + nlag = int(rc_da_shell['time.nlag']) + startdate = rc_da_shell['startdate'] + cyclelen = rc_da_shell['cyclelength'] + + if run == 'forecast': + + enddate = startdate + + for i in range(nlag): + enddate = AdvanceTime(enddate,cyclelen) + + elif run == 'advance': + + enddate = AdvanceTime(startdate,cyclelen) + + rc_da_shell['enddate'] = enddate + + msg = "New simulation interval set : " ; logging.info(msg) + msg = " start date : %s " % startdate.strftime('%F %H:%M') ; logging.info(msg) + msg = " end date : %s " % enddate.strftime('%F %H:%M') ; logging.info(msg) + + return None + + +def AdvanceTime(time_in,interval): + """ Advance time_in by a specified interval""" + + time_out=time_in + + if interval == 'month': # if monthly, this run will go to the first day of the next month + if time_in.month != 12: + time_out = datetime.datetime(time_in.year,time_in.month+1,1,time_in.hour,0,0) + else: + time_out = datetime.datetime(time_in.year+1,1,1,time_in.hour,0,0) # end of year provision + elif interval == 'week': + time_out = time_in + datetime.timedelta(days=7) + else: # assume that the interval specified is the number of days to run forward before resubmitting + time_out = time_in + datetime.timedelta(days=float(interval)) + + return time_out + + +def ParseTimes(rc_da_shell): + """ get time parameters from rc-file and parse into datetime objects for later use """ + + td = rc_da_shell['time.start'] + ymd = map(int,[td[0:4],td[4:6],td[6:8],td[8:10]]) # creates a 6-digit integer yyyy,mm,dd,hh,mn,sec + startdate = datetime.datetime(*ymd) # from which we create a python datetime object + td = rc_da_shell['time.finish'] + ymd = map(int,[td[0:4],td[4:6],td[6:8],td[8:10]]) # again + finaldate = datetime.datetime(*ymd) # again + + if finaldate <= startdate: + msg = 'The start date (%s) is not greater than the end date (%s), please revise'%(startdate.strftime('%Y%M%d'),finaldate.strftime('%Y%M%d')) + logging.error(msg) + raise ValueError + # + cyclelength = rc_da_shell['time.cycle'] # get time step + +# Determine end date + + if cyclelength == 'infinite': + enddate = finaldate + else: + enddate = AdvanceTime(startdate,cyclelength) + + # + if enddate > finaldate: # do not run beyon finaldate + enddate = finaldate + + rc_da_shell['startdate'] = startdate + rc_da_shell['enddate'] = enddate + rc_da_shell['finaldate'] = finaldate + rc_da_shell['cyclelength'] = cyclelength + + msg = "===============================================================" ; logging.info(msg) + msg = "Filter start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) + msg = "Filter end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) + msg = "Filter final date is %s" % finaldate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) + msg = "Filter cycle length is %s" % cyclelength ; logging.info(msg) + msg = "===============================================================" ; logging.info(msg) + + return None + +def PrepareObs(rc_da_shell,type='forecast'): + """ Prepare a set of observations to be co-sampled by the model. Although the collecting and parsing of + the observations will depend on the specific application, the final result of this step is an output + file called "observations.nc" which carries the x,y,z,t, dt information of each observation """ + + if rc_da_shell['da.system'] == 'CarbonTracker': import ct_tools as da_system + + msg = "Using %s as DA system" % da_system.identifier ; logging.debug(msg) + + rc_da_system = rc.read(rc_da_shell['da.system.rc'],silent = True) + + dummy = ValidateRC(rc_da_system,da_system.needed_rc_items) + +# Add da_system specific rc-file to the rc_da_shell dictionary + + rc_da_shell['da.system.info'] = rc_da_system + + dummy = da_system.PrepareObs(rc_da_shell,rc_da_system,type=type) + + return None + +def PrepareEnsemble(rc_da_shell ): + """ + + Prepare the ensemble of parameters needed by the forecast model. This step will depend on the assimilation system type, + and here we only provide an interface to the system-specific PrepareEnsemble functions. This happens by reading the + da.system key from the rc_da_shell and importing the corresponding module as da_system. + + """ + + if rc_da_shell['da.system'] == 'CarbonTracker': import ct_tools as da_system + + dummy = da_system.PrepareEnsemble(rc_da_shell ) + + return None + + +def RunForecastModel(rc_da_shell): + """Prepare and execute a forecast step using an external Fortran model. Note that the flavor of model + used is determined in the very first line where the import statement of module "model" depends on a + setting in your da.rc file. After that choice, the module written specifically for a particular + application takes over. There are a few required variables and methods in such a module: + + version [variable, string] : defines the version number of the module, e.g. "1.0" + identifier [variable, string] : identifies the model used as a string variable, e.g. "TM5" + PrepareExe (method) : A method that preps the model simulation. It can for instance create an input + parameter file for the model (tm5.rc), or execute some scripts needed before the + actual execution starts (get_meteo). After this step, a call to the model + executable should start a succesfull simulation + StartExe (method) : Start the executable. How this is done depends on your model and might involve + submitting a string of jobs to the queue, or simply spawning a subprocess, or ... + """ + +# import modules, note that depending on the type of assimilation system, different submodules are imported! + + if rc_da_shell['forecast.model'] == 'TM5': import tm5_tools as model + elif rc_da_shell['forecast.model'] == 'SIBCASA': import sibcasa_tools as model + elif rc_da_shell['forecast.model'] == 'WRF': import wrf_tools as model + +####### FROM HERE ON, PROCESSES ARE MODEL DEPENDENT AND NEED TO BE IMPLEMENTED ON A PER-MODEL BASIS ############ + + msg = "Using %s as forecast model" % model.identifier ; logging.debug(msg) + +# Prepare everything needed to run the forward model + + rc_fcmodel = model.PrepareExe(rc_da_shell) + +# Run the forward model + + status = model.StartExe(rc_da_shell,rc_fcmodel) + +########################################### RETURN CONTROL TO DA SHELL ######################################### + + dummy = os.chdir(rc_da_shell['dir.da_submit']) + + return status + +def Invert(rc_da_shell): + """ Perform least-squares minimization""" + + if rc_da_shell['da.system'] == 'CarbonTracker': import ct_tools as da_system + + dummy = da_system.MakeResiduals(rc_da_shell) + + + +if __name__ == "__main__": + pass