Skip to content
Snippets Groups Projects
Commit 360cfcfb authored by Peters, Wouter's avatar Peters, Wouter
Browse files

moved out of the way fo new trunk

parent 34cbeb20
No related branches found
No related tags found
No related merge requests found
!!! Info for the CarbonTracker data assimilation system
!datadir : /lfs0/projects/co2/input/
datadir : /data/CO2/carbontracker/ct08/
obs.input.dir : ${datadir}/obsnc/
obs.input.fname : obs_final
ocn.covariance : ${datadir}/covariance_ocn_base.nc
bio.covariance : ${datadir}/covariance_bio_olson19.nc
deltaco2.prefix : oif_p3_era40.dpco2
regtype : olson19_oif30
nparameters : 220
random.seed : 4385
#!/usr/bin/env python
# ct_tools.py
"""
Author : peters
Revision History:
File created on 12 Feb 2009.
"""
import os
import sys
import rc
import logging
identifier = 'CarbonTracker CO2'
needed_rc_items = ['obs.input.dir',
'obs.input.fname',
'ocn.covariance',
'nparameters',
'bio.covariance',
'deltaco2.prefix',
'regtype']
def MakeResiduals(rc_da_shell):
""" Method makes residuals of CO2 mixing ratios from the observed and sampled values """
import pyhdf.SD as HDF
# Read observations NetCDF file
ObsFileName = os.path.join(rc_da_shell['dir.output'],rc_da_shell['obs.full.filename'])
ncf = HDF.SD(ObsFileName)
obsdates = ncf.select('decimal_date').get()
dummy = ncf.end()
msg = "Successfully read data from obs file (%s)"%ObsFileName ; logging.debug(msg)
# Read samples NetCDF file
SamplesFileName = ObsFileName.replace('observations','samples').replace('input','output')
ncf = HDF.SD(SamplesFileName)
sampdates = ncf.select('decimal_date').get()
dummy = ncf.end()
msg = "Successfully read data from model sample file (%s)"%SamplesFileName ; logging.debug(msg)
if (obsdates==sampdates).all():
pass
else:
msg = "Dates in the observation file and samples file are not equal, exiting..." ; logging.error(msg)
raise IOError,msg
msg = "Residuals created, continuing" ; logging.info(msg)
return None
def PrepareObs(rc_da_shell,rc_da_system,type='forecast'):
""" PrepareObs """
import pycdf as CDF
import shutil
saveas = os.path.join(rc_da_shell['dir.input'],'observations.nc')
# ncf = CDF.CDF(saveas,CDF.NC.WRITE|CDF.NC.TRUNC|CDF.NC.CREATE)
# ncf.CreationDate = datetime.datetime.now().strftime('%Y %B %d')
# ncf.CreationUser = os.environ['USER']
# ncf.CreationDir = rc_da_shell['dir.da_submit']
# ncf.CreationScript = sys.argv[0]
# ncf.close()
# msg = 'Created a new observation input file (%s) ' % saveas ; logging.info(msg)
# For carbontracker the observations already come in netcdf format thanks to Andy and Ken. We will therefore simply merge
# the observations.nc file with the pre-cooked obs files
# Select observation input file for this time step
#sfname = rc_da_system['obs.input.fname']+'.%s'%(rc_da_shell['startdate'].strftime('%Y%m%d') )+'.nc'
sfname = rc_da_system['obs.input.fname']+'.%s'%('20050305')+'.nc'
msg = 'This filename is hardcoded but needs replacement' ; logging.warning(msg)
filename = os.path.join(rc_da_system['obs.input.dir'],sfname)
if not os.path.exists(filename):
msg = 'Could not find the required observation input file (%s) ' % filename ; logging.error(msg)
raise IOError,msg
else:
dummy = shutil.copy2(filename,saveas)
msg = 'Successfully copied file (%s) to (%s) ' % (filename,saveas) ; logging.info(msg)
rc_da_shell['obs.full.filename'] = saveas
return rc_da_shell
def PrepareEnsemble(rc_da_shell ):
"""
Prepare the ensemble of parameters needed by the forecast model. There are two possible pathways:
(A) Construct a brand new ensemble from a covariance matrix
(B) Get an existing ensemble and propagate it
The steps for procedure A are:
(A1) Construct the covariance matrix
(A2) Make a Cholesky decomposition
(A3) Prepare a set of [nmembers] input parameters for each forecast member
The steps for procedure B are:
(B1) Get existing parameter values for each member
(B2) Run them through a forecast model
Both procedures finish with writing the parameter values to a NetCDF file
"""
# Get the da system specific rc-items, note that they've been validated already by FillObs
rc_da_system = rc.read(rc_da_shell['da.system.rc'],silent = True)
nlag = int(rc_da_shell['time.nlag'])
for n in range(nlag):
if n == nlag:
# Route A: make a new ensemble
Xprime,Qprior = MakeNewEnsemble(rc_da_shell, rc_da_system)
else:
# Route B: Propagate existing ensemble
Xprime,Qprior = [0,0]
return Xprime, Qprior
def MakeNewEnsemble(rc_da_shell, rc_da_system):
""" Make a new ensemble from specified matrices """
import pyhdf.SD as hdf
import numpy as np
import matplotlib.pyplot as plt
# Get the needed matrices from the specified covariance files
file_ocn_cov = rc_da_system['ocn.covariance']
file_bio_cov = rc_da_system['bio.covariance']
for file in [file_ocn_cov,file_bio_cov]:
if not os.path.exists(file):
msg = "Cannot find the specified file %s" % file ; logging.error(msg)
raise IOError,msg
else:
msg = "Using covariance file: %s" % file ; logging.info(msg)
f_ocn = hdf.SD(file_ocn_cov)
f_bio = hdf.SD(file_bio_cov)
cov_ocn = f_ocn.select('qprior').get()
cov_bio = f_bio.select('qprior').get()
dummy = f_ocn.end()
dummy = f_bio.end()
dummy = logging.debug("Succesfully closed files after retrieving prior covariance matrices")
# Once we have the matrices, we can start to make the full covariance matrix, and then decompose it
nparams = int(rc_da_system['nparameters'])
nmembers = int(rc_da_shell['forecast.nmembers'])
fullcov = np.zeros((nparams,nparams),float)
nocn = cov_ocn.shape[0]
nbio = cov_bio.shape[0]
fullcov[0:nbio,0:nbio] = cov_bio
fullcov[nbio:nbio+nocn,nbio:nbio+nocn] = cov_ocn
# Visually inspect full covariance matrix if verbose
if rc_da_shell['verbose']:
plt.imshow(fullcov)
plt.colorbar()
plt.savefig('fullcovariancematrix.png')
plt.close('all')
# Make a cholesky decomposition of the covariance matrix
U,s,Vh = np.linalg.svd(fullcov)
dof = np.sum(s)**2/sum(s**2)
C = np.linalg.cholesky(fullcov)
msg = 'Cholesky decomposition has succeeded offline' ; logging.info(msg)
msg = 'Appr. degrees of freedom in covariance matrix is %s'%(int(dof)) ; logging.info(msg)
if rc_da_shell['verbose']:
plt.plot(s)
plt.savefig('eigenvaluespectrum.png')
plt.close('all')
# Now create a set of random instances of the given covariance structure
# First see the random number generator with a fixed value from the rc-file, this is to make reproducible results
dummy = np.random.seed(int(rc_da_system['random.seed']))
# Structure to hold the data
parameter_deviations = np.zeros((nparams,nmembers),float) # This is array X'
for member in range(nmembers):
rands = np.random.randn(nparams)
parameter_deviations[:,member] = np.dot(C,rands)
if rc_da_shell['verbose']:
plt.hist(parameter_deviations[20,:],20)
plt.savefig('par20histogram.png')
plt.close('all')
return parameter_deviations, fullcov
if __name__ == "__main__":
import rc
import os
import logging
logging.basicConfig(level = logging.DEBUG,
format = ' [%(levelname)-7s] (%(asctime)s) %(module)-20s : %(message)s',
datefmt = '%Y-%m-%d %H:%M:%S')
rc_da_system = rc.read('carbontracker.rc')
rc_da_shell = rc.read('da.rc')
rc_da_shell['dir.input'] = os.path.join(rc_da_shell['dir.da_run'],'input')
rc_da_shell['dir.output'] = os.path.join(rc_da_shell['dir.da_run'],'output')
rc_da_system['verbose'] = True
X,C = PrepareEnsemble(rc_da_shell)
rc_da_shell = PrepareObs(rc_da_shell,rc_da_system)
resids = MakeResiduals(rc_da_shell)
! Info on the data assimilation cycle
time.restart : False
time.start : 2005030500
time.finish : 2005030700
time.cycle : 1
time.nlag : 2
dir.da_run : ${HOME}/tmp/test_da
! Info on the DA system used
da.system : CarbonTracker
da.system.rc : carbontracker.rc
! Info on the forward model to be used
forecast.model : TM5
forecast.model.rc : ${HOME}/Modeling/TM5/ct_new.rc
forecast.nmembers : 2
#!/usr/bin/env python
# da_initexit.py
"""
Author : peters
Revision History:
File created on 13 May 2009.
"""
import logging
import os
import sys
import shutil
import rc
import datetime
from tools_da import CreateDirs
from tools_da import AdvanceTime
from tools_da import StartLogger
from tools_da import ParseOptions
def Init():
""" Initialization of the data assimilation job. Parse command line options, read specified rc-file values """
# Initialize the logging system
logfilename = 'da.test.%s.log' % os.getpid()
dummy = StartLogger(logfile=logfilename)
logging.info('Starting execution of Main program')
# Start execution, first pass command line arguments
opts, args = ParseOptions()
# Check for the existence of the "rc" key, this is needed input for the script to pass a set of external options
if not args.has_key("rc"):
msg = "There is no rc-file specified on the command line. Please use rc=yourfile.rc" ; logging.error(msg)
raise IOError,msg
elif not os.path.exists(args['rc']):
msg = "The specified rc-file (%s) does not exist " % args['rc'] ; logging.error(msg)
raise IOError,msg
else:
rc_da_shell = rc.read(args['rc'])
# Add some useful variables to the rc-file dictionary
rc_da_shell['log'] = logfilename
rc_da_shell['pid'] = '%s' % os.getpid()
rc_da_shell['dir.da_submit'] = os.getcwd()
rc_da_shell['da.crash.recover'] = '-r' in opts
rc_da_shell['verbose'] = '-v' in opts
return rc_da_shell
def SetupFileStructure(rc_da_shell):
""" Create file structure needed for data assimilation system.
In principle this looks like:
${da_rundir}
${da_rundir}/input
${da_rundir}/output
${da_rundir}/exec
${da_rundir}/diagnostics
${da_rundir}/analysis
${da_rundir}/jobs
${da_rundir}/save
${da_rundir}/save/one-ago
${da_rundir}/save/two-ago
Note that the exec dir will actually be a simlink to the directory where
the SIBCASA or TM5 model executable lives. This directory is passed through
the da.rc file. The observation input files will be placed in the exec dir,
and the resulting simulated values will be retrieved from there as well.
"""
# Create the run directory for this DA job, including I/O structure
CreateDirs(rc_da_shell['dir.da_run'])
rc_da_shell['dir.exec'] = os.path.join(rc_da_shell['dir.da_run'],'exec')
rc_da_shell['dir.input'] = os.path.join(rc_da_shell['dir.da_run'],'input')
rc_da_shell['dir.output'] = os.path.join(rc_da_shell['dir.da_run'],'output')
rc_da_shell['dir.diagnostics'] = os.path.join(rc_da_shell['dir.da_run'],'diagnostics')
rc_da_shell['dir.analysis'] = os.path.join(rc_da_shell['dir.da_run'],'analysis')
rc_da_shell['dir.jobs'] = os.path.join(rc_da_shell['dir.da_run'],'jobs')
rc_da_shell['dir.save'] = os.path.join(rc_da_shell['dir.da_run'],'save')
CreateDirs(os.path.join(rc_da_shell['dir.exec']))
CreateDirs(os.path.join(rc_da_shell['dir.input']))
CreateDirs(os.path.join(rc_da_shell['dir.output']))
CreateDirs(os.path.join(rc_da_shell['dir.diagnostics']))
CreateDirs(os.path.join(rc_da_shell['dir.analysis']))
CreateDirs(os.path.join(rc_da_shell['dir.jobs']))
CreateDirs(os.path.join(rc_da_shell['dir.save']))
CreateDirs(os.path.join(rc_da_shell['dir.save'],'one-ago'))
CreateDirs(os.path.join(rc_da_shell['dir.save'],'two-ago'))
msg = 'Succesfully created the file structure for the assimilation job' ; logging.info(msg)
def RecoverRun(rc_da_shell):
"""Prepare a recovery from a crashed run. This consists of:
- copying all data from the save/one-ago folder,
- replacing all rc-file items with those from da_runtime.rc
"""
savedir = os.path.join(rc_da_shell['dir.save'])
recoverydir = os.path.join(rc_da_shell['dir.save'],'one-ago')
# Replace rc-items with those from the crashed run last rc-file
file_rc_rec = os.path.join(rc_da_shell['dir.save'],'da_runtime.rc')
rc_rec = rc.read(file_rc_rec)
for k,v in rc_rec.iteritems():
rc_da_shell[k] = v
msg = "Replaced rc-items.... " ; logging.debug(msg)
msg = "Next cycle start date is %s" % rc_da_shell['time.start'] ; logging.debug(msg)
return None
def IOSaveData(rc_da_shell, io_option='restore', save_option='partial',filter=[]):
""" Store or restore model state to/from a temporary directory. Two save_option options are available:
(1) save_option = 'partial' : Save the background concentration fields only, to use for a later advance of the background
(2) save_option = 'full' : Save all restart data, to use for a next cycle
While also two IO options are available:
(1) io_option = restore : Get data from a directory
(2) io_option = store : Save data to a directory
In case of a 'store' command the target folder is re-created so that the contents are empty to begin with.
There is currently room for the specific sub-models to also add save data to the mix. This is done through a
forecast.model dependend import statement, followed by a call to function:
model.IOSaveData(**args)
"""
# import modules, note that depending on the type of assimilation system, different submodules are imported!
if rc_da_shell['forecast.model'] == 'TM5': import tm5_tools as model
elif rc_da_shell['forecast.model'] == 'SIBCASA': import sibcasa_tools as model
elif rc_da_shell['forecast.model'] == 'WRF': import wrf_tools as model
if io_option not in ['store','restore']:
raise ValueError,'Invalid option specified for io_option (%s)' % io_option
if save_option not in ['partial','full']:
raise ValueError,'Invalid option specified for save_option (%s)' % save_option
savedir = os.path.join(rc_da_shell['dir.save'])
oneagodir = os.path.join(rc_da_shell['dir.save'],'one-ago')
tempdir = os.path.join(rc_da_shell['dir.save'],'tmp')
if save_option == 'partial': # save background data to tmp dir
if io_option == 'store':
dummy = model.WriteSaveData(rc_da_shell)
targetdir = tempdir
sourcedir = savedir
elif io_option == 'restore':
sourcedir = tempdir
targetdir = savedir
elif save_option == 'full': # move all save data to one-ago dir
if io_option == 'store':
dummy = model.WriteSaveData(rc_da_shell)
targetdir = oneagodir
sourcedir = savedir
elif io_option == 'restore':
sourcedir = oneagodir
targetdir = savedir
# If "store" is requested, recreate target dir, cleaning the contents
if io_option == 'store':
CreateDirs(os.path.join(targetdir),forceclean=True)
msg = "Performing a %s %s of data" % (save_option,io_option) ; logging.info(msg)
msg = " from directory: %s " % sourcedir ; logging.info(msg)
msg = " to directory: %s " % targetdir ; logging.info(msg)
msg = " with filter : %s " % filter ; logging.info(msg)
for file in os.listdir(sourcedir):
file = os.path.join(sourcedir,file)
if os.path.isdir(file): # skip dirs
skip = True
elif filter == []: # copy all
skip= False
else: # check filter
skip = True # default skip
for f in filter:
if f in file:
skip = False # unless in filter
break
if skip:
msg = " [skip] .... %s " % file ; logging.debug(msg)
continue
if io_option == 'store' and save_option == 'full':
msg = " [move] .... %s " % file ; logging.info(msg)
dummy = shutil.move(file,file.replace(sourcedir,targetdir) )
else:
msg = " [copy] .... %s " % file ; logging.info(msg)
dummy = shutil.copy(file,file.replace(sourcedir,targetdir) )
def StartRestartRecover(rc_da_shell):
""" Determine how to proceed with this cycle:
(a) recover from crash : get the save data from the one-ago folder and replace the da_runtime variables with those from the save dir
(b) restart cycle : get the save data from the one-ago folder and use latest da_runtime variables from the exec dir
(c) fresh start : set up the required file structure for this simulation
"""
#
# case 1: A recover from a previous crash, this is signaled by flag "-r"
#
if rc_da_shell['da.crash.recover']:
msg = "Recovering simulation from data in: %s" % rc_da_shell['dir.da_run'] ; logging.info(msg)
dummy = SetupFileStructure(rc_da_shell)
dummy = IOSaveData(rc_da_shell,io_option='restore',save_option='full',filter=[])
dummy = RecoverRun(rc_da_shell)
#
# case 2: A continuation, this is signaled by rc-item time.restart = True
#
elif rc_da_shell['time.restart']:
msg = "Restarting filter from previous step" ; logging.info(msg)
dummy = SetupFileStructure(rc_da_shell)
dummy = IOSaveData(rc_da_shell,io_option='restore',save_option='full',filter=[])
#
# case 3: A fresh start, this is signaled by rc-item time.restart = False
#
elif not rc_da_shell['time.restart']:
msg = "First time step in filter sequence" ; logging.info(msg)
dummy = SetupFileStructure(rc_da_shell)
return None
#
def WriteNewRCfile(rc_da_shell):
""" Write the rc-file for the next DA cycle. Note that the start time for the next cycle is the end time of this one, while
the end time for the next cycle is the current end time + one cycle length. The resulting rc-file is written twice:
(1) to the dir.exec so that it can be used when resubmitting the next cycle
(2) to the dir.save so that it can be backed up with the rest of the save data, for later recovery if needed
"""
# These first two lines advance the filter time for the next cycle
rc_da_shell['startdate'] = rc_da_shell['enddate']
rc_da_shell['enddate'] = AdvanceTime(rc_da_shell['enddate'],rc_da_shell['cyclelength'])
# The rest is info needed for a system restart
rc_da_shell['time.restart'] = True
rc_da_shell['time.start'] = rc_da_shell['startdate'].strftime('%Y%m%d%H')
rc_da_shell['time.finish'] = rc_da_shell['finaldate'].strftime('%Y%m%d%H')
fname = os.path.join(rc_da_shell['dir.exec'],'da_runtime.rc')
rc_da_shell['da.restart.fname'] = fname
dummy = rc.write(fname,rc_da_shell)
msg = 'Wrote new da_runtime.rc file to working directory' ; logging.debug(msg)
# Also write this info to the savedir
fname = os.path.join(rc_da_shell['dir.save'],'da_runtime.rc')
dummy = rc.write(fname,rc_da_shell)
msg = 'Wrote new da_runtime.rc file to save directory' ; logging.debug(msg)
def WriteRC(rc_da_shell,filename):
""" Write RC file after each process to reflect updated info """
rcname = filename
fname = './'+rcname
dummy = rc.write(fname,rc_da_shell)
msg = 'Wrote expanded rc-file (%s) to current directory (%s)'%(rcname,rc_da_shell['dir.exec']) ; logging.debug(msg)
return None
def SubmitNextCycle(rc_da_shell):
""" submit next job in cycle, if needed. The end date is held against the final date. A subprocess is called that
submits the next job, without waiting for its result so this cycle can finish and exit.
"""
import subprocess
import os
if rc_da_shell['startdate'] < rc_da_shell['finaldate']:
#code = subprocess.call(['qsub',sys.argv[0],'rc=%s' % rc_da_shell['da.restart.fname']] )
#code = subprocess.Popen(['das.sh','rc=%s' % rc_da_shell['da.restart.fname']] )
code = subprocess.Popen([os.getcwd()+'/das.py','rc=%s' % rc_da_shell['da.restart.fname'],"process=all"] )
code=0
if code == 0:
logging.info('Submitted next cycle succesfully ')
else:
logging.error('Error in submitting next cycle, return code: %s ' % code)
raise OSError
else:
logging.info('Final date reached, no new cycle started')
return None
if __name__ == "__main__":
pass
#!/usr/bin/env python
# da_master.py
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~
# qsub (jet)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~
#$ -N da_test
#$ -A co2
#$ -cwd
#$ -pe wcomp 2
#$ -V
#$ -r y
#$ -l h_rt=00:02:00
"""
Author : peters
Revision History:
File created on 03 Oct 2008.
This is the master script for the data assimilation system used by NOAA ESRL (CarbonTracker), Wageningen University (CarbonTracker Europe) and by Kevin Schaefer (SIBCASA). It provides an interface to a set of subroutines that control different aspects of a data assimilation cycle. The list of subroutines can be expanded when needed. The routine contains simple error handling and logging capacities and is not meant for use in general DA settings. It is especially tailored to be used with TM5 and SIBCASA.
"""
def Main():
""" Main executable"""
#
# Import all the needed modules and subroutines
#
#
# General python modules first
#
import sys
import os
import shutil
import logging
#
# Private python modules next
#
import rc
from tools_da import ParseOptions
from tools_da import ValidateRC
from tools_da import PrepareObs
from tools_da import PrepareEnsemble
from tools_da import RunForecastModel
from tools_da import needed_rc_da_items
from tools_da import ParseTimes, SetInterval
from tools_da import Invert
from da_initexit import Init
from da_initexit import StartRestartRecover
from da_initexit import IOSaveData
from da_initexit import WriteNewRCfile
from da_initexit import SubmitNextCycle
# Initialize the simulation by getting arguments and constructing an initial dictionary with settings
rc_da_shell = Init()
# Validate the contents of the rc-file passed
msg = " ++++++++++++++++++++ Preparing DA cycle ++++++++++++++++++++" ; logging.info(msg)
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
# Figure out what to do: is this is a fresh start, a continuation, or a recover from crash
dummy = StartRestartRecover(rc_da_shell)
# Calculate DA system startdate, enddate, and finaldate from rc-file items
dummy = ParseTimes(rc_da_shell)
# Prepare observation files
msg = " ++++++++++++++++++++ Prepare Observation set ++++++++++++++++++++" ; logging.info(msg)
#dummy = PrepareObs(rc_da_shell,'forecast')
# Prepare ensemble of parameters, including mean
msg = " ++++++++++++++++++++ Prepare ensemble parameters set ++++++++++++++++++++" ; logging.info(msg)
dummy = PrepareEnsemble(rc_da_shell )
# Run forecast model to obtain simulations.hdf
msg = " ++++++++++++++++++++ Running Forecast model ++++++++++++++++++++" ; logging.info(msg)
dummy = SetInterval(rc_da_shell,'forecast')
dummy = IOSaveData(rc_da_shell,io_option='store',save_option='partial')
dummy = RunForecastModel(rc_da_shell)
# Perform inversion
msg = " ++++++++++++++++++++ Inversion ++++++++++++++++++++" ; logging.info(msg)
dummy = Invert(rc_da_shell)
# Advance background: first reset the model to the beginning of the window
msg = " ++++++++++++++++++++ Advancing Background State ++++++++++++++++++++" ; logging.info(msg)
dummy = SetInterval(rc_da_shell,'advance')
dummy = IOSaveData(rc_da_shell,io_option='restore',save_option='partial')
dummy = RunForecastModel(rc_da_shell)
# Copy all data needed for recovery to the "savedir/one-ago" folder
msg = " ++++++++++++++++++++ Saving State for next cycle ++++++++++++++++++++" ; logging.info(msg)
dummy = IOSaveData(rc_da_shell,io_option='store',save_option='full')
# Write a new RC file for the next cycle of the DA system
dummy = WriteNewRCfile(rc_da_shell)
# Submit the next cycle of the DA sequence
msg = " ++++++++++++++++++++ Submitting next cycle ++++++++++++++++++++" ; logging.info(msg)
dummy = SubmitNextCycle(rc_da_shell)
# Finished the submission of the job, report success and exit
logging.info('Succesfully finished, exiting...')
shutil.move(rc_da_shell['log'],rc_da_shell['dir.jobs'])
sys.exit(0)
if __name__ == "__main__":
import sys
import os
# Append current working dir to path
sys.path.append(os.getcwd())
Main()
#!/usr/bin/env python
# das.py
"""
Author : peters
Revision History:
File created on 29 Sep 2009.
"""
import rc
from tools_da import ValidateRC
from tools_da import needed_rc_da_items
header = '\n\n *************************************** '
footer = ' *************************************** \n '
validprocess=['jobstart','jobinput','sample','invert','propagate','resubmit','all']
def JobStart(opts,args):
""" Set up the job specific directory structure and create an expanded rc-file """
import rc
from da_initexit import StartRestartRecover
from tools_da import ParseTimes
from da_initexit import WriteRC
rc_da_shell = rc.read(args['rc'])
# Add some useful variables to the rc-file dictionary
rc_da_shell['log'] = args["logfile"]
rc_da_shell['dir.da_submit'] = os.getcwd()
rc_da_shell['da.crash.recover'] = '-r' in opts
rc_da_shell['verbose'] = '-v' in opts
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
# Figure out what to do: is this is a fresh start, a continuation, or a recover from crash
dummy = StartRestartRecover(rc_da_shell)
# Calculate DA system startdate, enddate, and finaldate from rc-file items
dummy = ParseTimes(rc_da_shell)
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return rc_da_shell
def JobInput(args):
""" Set up the input data for the forward model: obs and parameters/fluxes"""
from tools_da import PrepareObs
from tools_da import PrepareEnsemble
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = PrepareEnsemble(rc_da_shell)
dummy = PrepareObs(rc_da_shell,'forecast')
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def Sample(args):
""" Sample the filter state for the inversion """
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = ForwardRun(args,'forecast')
# Optionally, post-processing of the model ouptu can be added that deals for instance with
# sub-sampling of time series, vertical averaging, etc.
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def ForwardRun(args,runtype='forecast'):
""" Run the forward model from startdate to enddate """
from tools_da import SetInterval
from tools_da import RunForecastModel
from da_initexit import IOSaveData
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = SetInterval(rc_da_shell,runtype)
dummy = RunForecastModel(rc_da_shell)
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def Invert(args):
""" Perform the inverse calculation """
import tools_da
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = tools_da.Invert(rc_da_shell)
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def Propagate(args):
""" Propagate the filter state to the next step """
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
# This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)
dummy = ForwardRun(args,'advance')
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def SaveAndSubmit(args):
""" Save the model state and submit the next job """
from da_initexit import IOSaveData
from da_initexit import WriteRC
from da_initexit import WriteNewRCfile
from da_initexit import SubmitNextCycle
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = IOSaveData(rc_da_shell,io_option='store',save_option='full')
dummy = WriteNewRCfile(rc_da_shell)
dummy = SubmitNextCycle(rc_da_shell)
return None
if __name__ == "__main__":
import sys
import os
import logging
import shutil
import subprocess
from tools_da import ParseOptions
from tools_da import StartLogger
# Append current working dir to path
sys.path.append(os.getcwd())
# Get name of logfile
opts, args = ParseOptions()
if not args.has_key("logfile"):
msg = "There is no logfile specified on the command line. Using logfile=logfile.log"
args['logfile'] = 'logfile.log'
logfile = args['logfile']
dummy = StartLogger(logfile=logfile)
if not args.has_key("rc"):
msg = "There is no rc-file specified on the command line. Please use rc=yourfile.rc" ; logging.error(msg)
raise IOError,msg
elif not os.path.exists(args['rc']):
msg = "The specified rc-file (%s) does not exist " % args['rc'] ; logging.error(msg)
raise IOError,msg
if not args.has_key("process"):
msg = "There is no execution process specified on the command line. Using default process=all" ; logging.error(msg)
args["process"] = 'all'
if not args["process"] in validprocess:
msg = "The specified execution process is not valid (%s). Please use one of %s"%(args['process'],validprocess) ; logging.error(msg)
raise IOError,msg
# Get name of the process requested
process=args['process']
msg = 'Process %s starting, entered python from master shell'%process ; logging.debug(msg)
if process == 'jobstart':
rcf = JobStart(opts,args)
if process == 'jobinput':
dummy = JobInput(args)
if process == 'sample':
dummy = ForwardRun(args,'forecast')
if process == 'invert':
dummy = Invert(args)
if process == 'propagate':
dummy = Propagate(args)
if process == 'resubmit':
dummy = SaveAndSubmit(args)
if process == 'all':
args['jobrcfilename'] = "jb.%s.rc"%(os.getpid(),)
msg = header+"starting JobStart"+footer ; logging.info(msg)
rcf = JobStart(opts,args)
msg = header+"starting JobInput"+footer ; logging.info(msg)
dummy = JobInput(args)
msg = header+"starting ForwardRun"+footer ; logging.info(msg)
dummy = ForwardRun(args,'forecast')
msg = header+"starting Invert"+footer ; logging.info(msg)
dummy = Invert(args)
msg = header+"starting Propagate"+footer ; logging.info(msg)
dummy = Propagate(args)
msg = header+"starting SaveAndSubmit"+footer ; logging.info(msg)
dummy = SaveAndSubmit(args)
msg = "Cycle finished...exiting" ; logging.info(msg)
# move log file to rundir/jobs
jobdir = os.path.join(rcf['dir.da_run'],"jobs")
joblogfile = os.path.join(jobdir,logfile)
dummy = shutil.move(logfile,joblogfile)
msg = "....Moved %s to %s"%(logfile,joblogfile) ; logging.debug(msg)
# move rc file to rundir/jobs
jobrcfile = os.path.join(jobdir,args["jobrcfilename"] )
dummy = shutil.move(args["jobrcfilename"],jobrcfile )
msg = "....Moved %s to %s"%(args['jobrcfilename'],jobrcfile) ; logging.debug(msg)
# cat TM5 output and rc-file to the job file output
tm5jobfile = os.path.join(jobdir,"tm5.%s"%(args['logfile']) )
if os.path.exists(tm5jobfile):
msg = "....Concatenating %s to %s"%(tm5jobfile,joblogfile) ; logging.debug(msg)
f = open(joblogfile,'a')
dummy = f.write(open(tm5jobfile,'r').read())
dummy = f.close()
if os.path.exists(jobrcfile):
msg = "....Concatenating %s to %s"%(jobrcfile,joblogfile) ; logging.debug(msg)
f = open(joblogfile,'a')
dummy = f.write(open(jobrcfile,'r').read())
dummy = f.close()
msg = "The complete log file is now at: %s"%(joblogfile) ; logging.info(msg)
msg = 'Process %s done, returning from python to master shell'%process ; logging.debug(msg)
sys.exit(0)
#! /usr/bin/env sh
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~
# qsub (jet)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~
#$ -S /bin/sh
#$ -N das
#$ -A co2
#$ -cwd
#$ -pe wcomp 8
#$ -V
#$ -r y
#$ -l h_rt=00:02:00
# Author : peters
#
# Revision History:
# File created on 03 Oct 2008 as a python shell script. 29 Sep 2009: now ported to bash shell (WP).
#
# This is the master script for the data assimilation system used by NOAA ESRL (CarbonTracker),
# Wageningen University (CarbonTracker Europe) and by Kevin Schaefer (SIBCASA). It provides an
# interface to a set of subroutines that control different aspects of a data assimilation cycle.
# The list of subroutines can be expanded when needed. The routine contains simple error handling
# and logging capacities and is not meant for use in general DA settings. It is especially
# tailored to be used with TM5 and SIBCASA.
#
#####################################################################################
#
#
# Initialization of the script itself, only the bare necessities are done here so we
# can quickly branch out to other codes
#
# exit on error
set -e
# get script name
pwd="/bin/pwd"
case $0 in
/* ) script=$0 ;;
* ) script="`${pwd}`/$0" ;;
esac
# exctract current process id, needed in the initialization phase to create names of logfiles, jobfiles, and rc-files.
pid=$$
# base name for created log files:
jbpid="jb.${pid}"
jbprefix="${jbpid}"
logfile="${jbprefix}.log"
rcfile="jb.${pid}.rc"
go_readrc="./go_readrc"
# Start logging into a pre-specified file, as well as to the screen. The filename will be passed to other
# routines/languages so that we gather all the logging output into one place.
# Create a standard format for logging on to the screen and into a file, so far
# there are 3 levels of warnings: WARNING | INFO | ERROR
inflogprefix="[INFO ] (`date +%F\ %T`) ${0} :"
errlogprefix="[ERROR ] (`date +%F\ %T`) ${0} :"
warlogprefix="[WARNING] (`date +%F\ %T`) ${0} :"
if [ -f ${logfile} ]; then
rm -f ${logfile}
msg=" Removing existing log file (${logfile})" && echo "${warlogprefix}${msg}" >> ${logfile} && echo "${warlogprefix}${msg}"
fi
msg=" Logging has started from script ${script}" && echo "${inflogprefix}${msg}" >> ${logfile} && echo "${inflogprefix}${msg}"
msg=" Logging has started by user ${USER}" && echo "${inflogprefix}${msg}" >> ${logfile} && echo "${inflogprefix}${msg}"
#####################################################################################
########################### Now branch out to the first block: ###############
########################### initialization of the run settings ###############
#####################################################################################
#
# Purpose:
# Create an extended rc-file with extra settings and flags
# Create a directory structure for the job
#
# Mandatory input:
# name of a log file
# rc-file name
# job rc-file name
# Mandatory output:
# an expanded rc-file named expanded_{rcfilenamehere}
# a directory structure for the job, specified inside the new rc-file
#
######################################################################################
msg=" Starting job settings block" && echo "${inflogprefix}${msg}" >> ${logfile} && echo "${inflogprefix}${msg}"
# This the python call out to start a new job
python das.py "$@" logfile=${logfile} process='jobstart' jobrcfilename=${rcfile}
#####################################################################################
########################### Now branch out to the second block: ###############
########################### Prepare input datasets for the fwd model ###############
#####################################################################################
#
# Purpose:
# Create a state vector or parameter input file
# Create a list of observations (x,y,z,t) to sample from the forward model
#
# Mandatory input:
# name of a log file
# expanded rc-file name
# Mandatory output:
# input files needed for flux construction
# obs.nc file with (x,y,z,t,value,model-data-mismatch,can-skip,...) Todo: make a NetCDF description file for obs.nc !!!
#
######################################################################################
msg=" Starting prepare input block" && echo "${inflogprefix}${msg}" >> ${logfile} && echo "${inflogprefix}${msg}"
# This the python call out to preparing the datasets
python das.py $@ logfile=${logfile} process='jobinput' jobrcfilename=${rcfile}
#####################################################################################
########################### Now branch out to the third block: ###############
########################### Sample the state vector ###############
#####################################################################################
#
# Purpose:
# Run the observation operator from t_now to t_obs and sample the model
#
# Mandatory input:
# name of a log file
# expanded rc-file name
# an obs.nc file carrying (x,y,z,t) values to sample
# Mandatory output:
# a sim.nc file carrying the simulated values for each (x,y,z,t)
#
######################################################################################
msg=" Starting sample state run block" && echo "${inflogprefix}${msg}" >> ${logfile} && echo "${inflogprefix}${msg}"
# This the python call out to running the model
python das.py $@ logfile=${logfile} process='sample' jobrcfilename=${rcfile}
#####################################################################################
########################### Now branch out to the fourth block: ###############
########################### Perform the inversion ###############
#####################################################################################
#
# Purpose:
# Minimize a cost function based on model+obs and return optimal values of state vector
#
# Mandatory input:
# name of a log file
# expanded rc-file name
# an obs.nc file carrying (x,y,z,t) values from obs
# an sim.nc file carrying (x,y,z,t) values from the model
# Mandatory output:
# a state.nc file carrying the updated state vector values
#
######################################################################################
msg=" Starting inverse calculation block" && echo "${inflogprefix}${msg}" >> ${logfile} && echo "${inflogprefix}${msg}"
# This the python call out to performing an inversion
python das.py $@ logfile=${logfile} process='invert' jobrcfilename=${rcfile}
#####################################################################################
########################### Now branch out to the fifth block: ###############
########################### Advance the model to the next cycle ###############
#####################################################################################
#
# Purpose:
# Advance the filter by one time step
#
# Mandatory input:
# name of a log file
# expanded rc-file name
# Mandatory output:
# an updated state.nc file with propagated state variables
# updated save* files from the forward model
#
######################################################################################
msg=" Starting state propagation block" && echo "${inflogprefix}${msg}" >> ${logfile} && echo "${inflogprefix}${msg}"
# This the python call out to updating the state
python das.py $@ logfile=${logfile} process='propagate' jobrcfilename=${rcfile}
#####################################################################################
########################### Now branch out to the sixth block: ###############
########################### Save filter state and start next cycle ###############
#####################################################################################
#
# Purpose:
# Save the filter state, submit next cycle
#
# Mandatory input:
# name of a log file
# expanded rc-file name
# Mandatory output:
# saved filter state for next cycle
# a new process for the next cycle
#
######################################################################################
msg=" Starting process resubmission block" && echo "${inflogprefix}${msg}" >> ${logfile} && echo "${inflogprefix}${msg}"
# This the python call out to save and resubmit
python das.py $@ logfile=${logfile} process='resubmit' jobrcfilename=${rcfile}
#####################################################################################
########################### Room for more blocks here ###############
########################### Such as analysis or post-processing ###############
#####################################################################################
#####################################################################################
########################### End this cycle ###############
########################### ###############
#####################################################################################
msg=" Cycle finished...exiting" && echo "${inflogprefix}${msg}" >> ${logfile} && echo "${inflogprefix}${msg}"
# Move logfile and rcfile, combine with TM5 logfile for clarity
msg=" TM5 output follows" && echo "${inflogprefix}${msg}" >> ${logfile}
jobdir=`${go_readrc} ${rcfile} 'dir.da_run'`"/jobs"
mv ${logfile} ${jobdir}
mv ${rcfile} ${jobdir}
if [ -f "${jobdir}/tm5.${logfile}" ]; then
cat "${jobdir}/tm5.${logfile}" >> "${jobdir}/${logfile}"
fi
msg=" The complete log file is now at: ${jobdir}/${logfile}" && echo "${inflogprefix}${msg}"
exit 0
#! /bin/sh
# --- init ---
# leave on error
set -e
# --- external ---
basename='/bin/basename'
test ! -x ${basename} && basename='/usr/bin/basename'
egrep='/bin/egrep'
test ! -x ${egrep} && egrep='/usr/bin/egrep'
less='/bin/less'
test ! -x ${less} && less='/usr/bin/less'
test ! -x ${less} && less='/usr/local/bin/less'
sed='/bin/sed'
test ! -x ${sed} && sed='/usr/bin/sed'
# --- definitions ---
prog=`${basename} $0`
# --- help ---
DisplayHelp ()
{
${xPAGER:-${less}} << EOF
$prog General Objects
NAME
$prog - read data value from a resource file
SYNOPSIS
go_readrc <rcfile> <key> [<default>]
go_readrc -h|--help
DESCRIPTION
A recourcefile is a text file with key/data pairs, usefull
to initialize programs (scripts, Fortran, etc).
The format of the <rcfile> is chosen close to the standard X resources:
* Comment lines start with '!'
* A key/data pair has the format:
<key> : <value>
where the white space (space or tabs) is optional.
The <key> consists of letters, numbers, '_', and '.' .
Example of a valid rcfile:
! Specify an output directory:
output.path : d/
Given a text <key>, the <rcfile> is scanned for a line starting
with this key; all text behind the ':' is written to the standard output.
Example of usage in sh script:
output_root=\`go_readrc test.rc output.path\`
If the <key> is not found, an error message is issued,
unless a <default> is supplied which is then written to standard output.
The <default> might be an empty string, e.g. '' .
PREPROCESSING
The rcfile might be preprocessed by go_pprc,
to expand environment variables.
EXIT STATUS
Non zero in case of any error.
SEE ALSO
X, go_pprc
AUTHOR
Arjo Segers
EOF
exit 0
}
ErrorMessage ()
{
echo "ERROR in $prog: $1" 1>&2
echo " Use '$prog -h' for information." 1>&2
exit 1
}
# --- arguments ---
rcfile=''
rckey=''
with_default=''
while [ $# -gt 0 ]; do
case "$1" in
-h | --help )
DisplayHelp
;;
-* )
ErrorMessage "unknown option '$1' ..."
;;
* )
if [ -z "${rcfile}" ]; then
rcfile="$1"
elif [ -z "${rckey}" ]; then
rckey="$1"
elif [ -z "${with_default}" ]; then
default="$1"
with_default='true'
else
ErrorMessage "unknown argument '$1'"
fi
;;
esac
shift
done
if [ -z "${rcfile}" -o -z "${rckey}" ]; then
ErrorMessage "missing arguments"
fi
# --- begin ---
# does the rcfile exist?
if [ ! -f ${rcfile} ]; then
ErrorMessage "rcfile '${rcfile}' does not exist ..."
fi
# replace '.' in the rckey by '\.'
rckeydots=`echo ${rckey} | ${sed} -e 's/\./\\\\./g'`
# 10 Apr 06: Andy Jacobson
# [[:space:]] indicates a space or tab character
#wspace='[[:space:]]*'
#
# 26 Apr 06: Arjo Segers
# The egrep on SGI system does not support the '[:space:]' ;
# use a real tab character instead ...
tab=' '
wspace="[ ${tab}]*"
# A key-data line has the following synopsis:
#
# <begin-of-line><key>[<wspace>]:[<wspace>]<data>
#
# where <wspace> denote tabs or spaces.
# Set regular expression for such a line except the <data> part;
# this expression is used to search for a key and to extract
# the data part:
#
re="^${rckeydots}${wspace}:${wspace}"
# set grep command to select matching lines:
selectlinecmd="${egrep} '${re}' ${rcfile}"
# count number of hits; should be exactely 1 ...
nfound=`eval "${selectlinecmd}" | /usr/bin/wc -l`
if [ ${nfound} -eq 0 ]; then
if [ -z "${with_default}" ]; then
ErrorMessage "key '${rckey}' not found in ${rcfile} and no default specified ..."
else
echo "${default}"
exit 0
fi
elif [ ${nfound} -gt 1 ]; then
ErrorMessage "key '${rckey}' found ${nfound} times in $rcfile ..."
fi
# extract the data part for this key;
# substitute an empty string for the 'key : ' part;
# remove trailing blanks;
# output is written to standard output:
eval "${selectlinecmd}" | ${sed} -e "s/${re}//" -e "s/${wspace}$//"
mpicc tm5_mpi_wrapper.c -o tm5_mpi_wrapper
#include <stdio.h>
#include <errno.h>
#include "mpi.h"
#define CMDLENGTH 200
int safe_system (const char *command);
int main(int argc, char *argv[]) {
int ierr;
int myrank;
char cmd[CMDLENGTH];
ierr = MPI_Init(&argc, &argv);
if (argc != 2) {
fprintf(stderr, "[tm5_mpi_wrapper] Expecting 1 argument, got %d.\n",argc);
exit(-1);
}
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
snprintf(cmd,CMDLENGTH,"%s %03d",argv[1],myrank);
printf( "MPI rank %d about to execute command \"%s\".\n",myrank,cmd );
ierr = safe_system(cmd);
if(ierr != 0) {
MPI_Abort(MPI_COMM_WORLD,ierr);
exit(ierr);
}
ierr = MPI_Finalize( );
exit(ierr);
}
int safe_system (const char *command) {
int pid, status;
if (command == 0)
return 1;
pid = fork();
if (pid == -1) // fork failed
return -1;
if (pid == 0) { // then this is the child
char *argv[4];
argv[0] = "sh";
argv[1] = "-c";
argv[2] = (char*)command;
argv[3] = 0;
execv("/bin/sh", argv);
_exit(127);
}
do {
if (waitpid(pid, &status, 0) == -1) {
if (errno != EINTR)
return -1;
} else
return status;
} while(1);
}
#!/usr/bin/env python
# tm5_tools.py
"""
Author : peters
Revision History:
File created on 09 Feb 2009.
This module holds specific functions needed to use the TM5 model within the data assimilation shell. It uses the information
from the DA system in combination with the generic tm5.rc files.
The TM5 model is now controlled by a python subprocess. This subprocess consists of an MPI wrapper (written in C) that spawns
a large number ( N= nmembers) of TM5 model instances under mpirun, and waits for them all to finish.
The design of the system assumes that the tm5.x (executable) was pre-compiled with the normal TM5 tools, and is residing in a
directory specified by the ${RUNDIR} of a tm5 rc-file. This tm5 rc-file name is taken from the data assimilation rc-file. Thus,
this python shell does *not* compile the TM5 model for you!
"""
import os
import subprocess
import logging
import rc
import shutil
identifier = 'TM5'
version = '0.1'
mpi_shell_file = 'tm5_mpi_wrapper'
needed_rc_items = [
'rundir',
'outputdir',
'time.start',
'time.break.nday'
]
def ModifyRC(rc_da_shell,rc_tm5):
""" modify parts of the tm5 rc-file to give control of file locations to the DA shell
instead of to the tm5.rc script. Note that most parameters remain unchanged because they do
not pertain to the actual run in progress, but to the TM5 model as a whole.
Currently, only the savedir and outputdir variables are replaced so that TM5 writes and reads from
the DA system directories.
Also, we add a new rc-variable which describes the inputdir for the da system
Note that we replace these values in all {key,value} pairs of the tm5.rc file!
"""
orig_savedir = rc_tm5['savedir']
orig_outputdir = rc_tm5['outputdir']
for k,v in rc_tm5.iteritems():
if not isinstance(v,str): continue
if orig_savedir in v:
rc_tm5[k] = v.replace(orig_savedir,rc_da_shell['dir.save'])
logging.debug('Replaced savedir in tm5 rc-item %s ' % k)
if orig_outputdir in v:
rc_tm5[k] = v.replace(orig_outputdir,rc_da_shell['dir.output'])
logging.debug('Replaced outputdir in tm5 rc-item %s ' % k)
rc_tm5['das.input.dir'] = rc_da_shell['dir.input']
msg = 'Added das.input.dir to tm5 rc-file' ;logging.debug(msg)
tm5rcfilename = os.path.join(rc_tm5['rundir'],'tm5.rc')
dummy = rc.write(tm5rcfilename,rc_tm5)
msg = "Modified tm5.rc written (%s)" % tm5rcfilename ;logging.debug(msg)
def CreateRunRC(rc_da_shell,rc_tm5):
"""
Create the tm5-runtime.rc file which is read by initexit.F90 to control time loop and restart from save files
We replace values for start and end date, as well as save and output folders, with values from the da shell.
"""
rc_runtm5={}
rc_runtm5['year1'] = rc_da_shell['startdate'].year
rc_runtm5['month1'] = rc_da_shell['startdate'].month
rc_runtm5['day1'] = rc_da_shell['startdate'].day
rc_runtm5['hour1'] = rc_da_shell['startdate'].hour
rc_runtm5['minu1'] = 0
rc_runtm5['sec1'] = 0
rc_runtm5['year2'] = rc_da_shell['enddate'].year
rc_runtm5['month2'] = rc_da_shell['enddate'].month
rc_runtm5['day2'] = rc_da_shell['enddate'].day
rc_runtm5['hour2'] = rc_da_shell['enddate'].hour
rc_runtm5['minu2'] = 0
rc_runtm5['sec2'] = 0
if rc_da_shell['time.restart'] == True:
rc_runtm5['istart'] = 3
else:
rc_runtm5['istart'] = rc_tm5['istart']
rc_runtm5['savedir'] = rc_da_shell['dir.save']
rc_runtm5['outputdir'] = rc_da_shell['dir.output']
tm5rcfilename = os.path.join(rc_tm5['rundir'],'tm5_runtime.rc')
rc.write(tm5rcfilename,rc_runtm5)
def PrepareExe(rc_da_shell):
"""
Prepare a forward model TM5 run, this consists of:
- reading the TM5 rc-file,
- validating it,
- modifying the values,
- Creating a tm5_runtime.rc file
- Removing the existing tm5.ok file if present
"""
from tools_da import ValidateRC
# Get tm5.rc parameters defined for this forecast model run, only needed for location, etc.
rc_tm5 = rc.read(rc_da_shell['forecast.model.rc'],silent = True)
ValidateRC(rc_tm5,needed_rc_items)
# Write a modified TM5 model rc-file in which run/break times are defined by our da system
ModifyRC(rc_da_shell,rc_tm5)
# Create a TM5 runtime rc-file in which run/break times are defined by our da system
CreateRunRC(rc_da_shell,rc_tm5)
# Copy the pre-compiled MPI wrapper to the execution directory
targetdir = os.path.join(rc_da_shell['dir.exec'],'tm5')
if not os.path.exists(mpi_shell_file):
msg = "Cannot find the mpi_shell wrapper needed for completion (%s)"% mpi_shell_file ; logging.error(msg)
msg = "Please see the 'readme_wrapper.txt' file for instructions to compile" ; logging.error(msg)
raise IOError
shutil.copy(mpi_shell_file,targetdir)
# Remove the tm5.ok file from a previous run, placed back only if a successful TM5 run is executed
okfile = os.path.join(targetdir,'tm5.ok')
if os.path.exists(okfile): os.remove(okfile)
return rc_tm5
def StartExe(rc_da_shell,rc_tm5):
"""
Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is
spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and
only if successfull on all processors will execution of the shell continue.
"""
from tools_da import CreateLinks
# Create a link to the rundirectory of the TM5 model
sourcedir = rc_tm5['rundir']
targetdir = os.path.join(rc_da_shell['dir.exec'],'tm5')
CreateLinks(sourcedir,targetdir)
# Go to executable directory and start the subprocess, using a new logfile
os.chdir(targetdir)
logging.debug('Changing directory to %s ' % targetdir )
modellogfilename = os.path.join(rc_da_shell['dir.jobs'],'tm5.%s'%rc_da_shell['log'])
modellogfile = open(modellogfilename,'w')
logging.info('Logging model output to %s ' % modellogfilename)
# Open logfile and spawn model, wait for finish and return code
logging.info('Starting model executable as subprocess ')
#cmd = ['openmpirun','-np', rc_da_shell['forecast.nmembers'],mpi_shell_file,'./tm5.x']
#cmd = ['mpirun','-np', rc_da_shell['forecast.nmembers'],mpi_shell_file,'./tm5.x']
cmd = ['./tm5.x']
code = subprocess.call(cmd,stdout=modellogfile,stderr=modellogfile)
modellogfile.close()
# Interpret/Handle exit code
if not os.path.exists(os.path.join(targetdir,'tm5.ok')): code = -1
if code == 0:
logging.info('Finished model executable succesfully (%s)'%code)
else:
logging.error('Error in model executable return code: %s ' % code)
logging.info('Inspect [%s] to find output from model executable ' % modellogfilename)
raise OSError
# Return to working directory
os.chdir(rc_da_shell['dir.da_run'])
return code
def WriteSaveData(rc_da_shell):
""" Write the TM5 recovery data for the next DA cycle """
sourcedir = os.path.join(rc_da_shell['dir.output'])
targetdir = os.path.join(rc_da_shell['dir.save'])
filter = ['save_%s'%rc_da_shell['enddate'].strftime('%Y%m%d')]
msg = "Performing a full backup of TM5 save data" ; logging.debug(msg)
msg = " from directory: %s " % sourcedir ; logging.debug(msg)
msg = " to directory: %s " % targetdir ; logging.debug(msg)
msg = " with filter: %s " % filter ; logging.debug(msg)
for file in os.listdir(sourcedir):
file = os.path.join(sourcedir,file)
if os.path.isdir(file): # skip dirs
skip = True
elif filter == []: # copy all
skip= False
else: # check filter
skip = True # default skip
for f in filter:
if f in file:
skip = False # unless in filter
break
if skip:
msg = " [skip] .... %s " % file ; logging.debug(msg)
continue
msg = " [copy] .... %s " % file ; logging.debug(msg)
dummy = shutil.copy(file,file.replace(sourcedir,targetdir) )
if __name__ == "__main__":
pass
#!/usr/bin/env python
# tools_da.py
"""
Author : peters
Revision History:
File created on 03 Oct 2008.
Temporary module to hold classes and methods that are in development
"""
import logging
import os
import sys
import shutil
import rc
import datetime
needed_rc_da_items=[
'time.start',
'time.finish',
'time.nlag',
'time.cycle',
'dir.da_run',
'forecast.model',
'forecast.model.rc',
'da.system']
helptext=\
"""
HELP!!!
"""
def StartLogger(logfile='logfile.log'):
""" start the logging of messages to screen and to file"""
# start the logging basic configuration by setting up a log file
logging.basicConfig(level = logging.DEBUG,
format = ' [%(levelname)-7s] (%(asctime)s) py-%(module)-20s : %(message)s',
datefmt = '%Y-%m-%d %H:%M:%S',
filename = logfile,
filemode = 'a')
# define a Handler which writes INFO messages or higher to the sys.stdout
console = logging.StreamHandler(sys.stdout)
dummy = console.setLevel(logging.INFO)
# set a format which is simpler for console use
formatter = logging.Formatter('[%(levelname)-7s] (%(asctime)s) py-%(module)-20s: %(message)s')
# tell the handler to use this format
dummy = console.setFormatter(formatter)
# add the handler to the root logger
dummy = logging.getLogger('').addHandler(console)
#logging.info('Logging has started')
logging.info('python logger started for %s' % sys.argv)
#logging.info('on host %s' % os.environ['HOST'])
#logging.info('by user %s' % os.environ['USER'])
#logging.info('in directory %s' % os.getcwd())
#logging.info('Please see file %s for a more detailed log ' % logfile)
def ParseOptions():
""" Function parses options from the command line and returns the arguments as a dictionary"""
import getopt
import sys
# Parse keywords, the only option accepted so far is the "-h" flag for help
opts=[]
args=[]
try:
opts, args = getopt.gnu_getopt(sys.argv[1:], "-hrv")
except getopt.GetoptError, msg:
logging.error('%s'%msg)
sys.exit(2)
for options in opts:
options=options[0].lower()
if options == '-h':
print ""
print helptext
sys.exit(2)
if options == '-r':
print ""
# logging.info('-r flag specified on command line: recovering from crash')
if options == '-v':
print ""
# logging.info('-v flag specified on command line: extra verbose output')
# Parse arguments and return as dictionary
arguments={}
for item in args:
#item=item.lower()
# Catch arguments that are passed not in "key=value" format
if '=' in item:
key, arg = item.split('=')
else:
logging.error('%s'%'Argument passed without description (%s)' % item)
raise getopt.GetoptError,arg
arguments[key]=arg
if opts: opts=opts[0]
return opts, arguments
def ValidateRC(rcfile,needed_items):
""" validate the contents of an rc-file given a dictionary of required keys """
for k,v in rcfile.iteritems():
if v == 'True' : rcfile[k] = True
if v == 'False': rcfile[k] = False
if 'date' in k : rcfile[k] = datetime.datetime.strptime(v,'%Y-%m-%d %H:%M:%S')
for key in needed_items:
if not rcfile.has_key(key):
status,msg = ( False,'Missing a required value in rc-file : %s' % key)
logging.error(msg)
raise IOError,msg
status,msg = ( True,'rc-file has been validated succesfully' ) ; logging.debug(msg)
def CreateDirs(dirname,forceclean=False):
""" Create a directory and report success, only if non-existent """
if forceclean:
try:
shutil.rmtree(dirname)
except:
pass
if not os.path.exists(dirname):
os.makedirs(dirname)
msg='Creating new directory %s' % dirname
logging.info(msg)
else:
msg='Using existing directory %s' % dirname
logging.info(msg)
return None
def CreateLinks(sourcedir,targetdir):
""" Create a symbolic link from, source to target and report success, note
that exisiting links are first removed and then recreated """
if os.path.exists(targetdir):
os.unlink(targetdir)
msg='Unlinking existing directory %s' % targetdir
logging.debug(msg)
try:
os.symlink(sourcedir,targetdir)
msg='Created new link from %s to %s' % (sourcedir, targetdir)
logging.debug(msg)
except OSError,msg:
logging.error(msg)
raise OSError
return None
def SetInterval(rc_da_shell,run='forecast'):
""" Set the interval over which the observation operator will be run. There are two options:
(1) forecast : the simulation runs from startdate to startdate + nlag*delta_time
(1) advance : the simulation runs from startdate to startdate + 1 *delta_time
"""
if run not in ['forecast','advance']:
raise ValueError, "Unknown interval specified for run (%s)" % run
nlag = int(rc_da_shell['time.nlag'])
startdate = rc_da_shell['startdate']
cyclelen = rc_da_shell['cyclelength']
if run == 'forecast':
enddate = startdate
for i in range(nlag):
enddate = AdvanceTime(enddate,cyclelen)
elif run == 'advance':
enddate = AdvanceTime(startdate,cyclelen)
rc_da_shell['enddate'] = enddate
msg = "New simulation interval set : " ; logging.info(msg)
msg = " start date : %s " % startdate.strftime('%F %H:%M') ; logging.info(msg)
msg = " end date : %s " % enddate.strftime('%F %H:%M') ; logging.info(msg)
return None
def AdvanceTime(time_in,interval):
""" Advance time_in by a specified interval"""
time_out=time_in
if interval == 'month': # if monthly, this run will go to the first day of the next month
if time_in.month != 12:
time_out = datetime.datetime(time_in.year,time_in.month+1,1,time_in.hour,0,0)
else:
time_out = datetime.datetime(time_in.year+1,1,1,time_in.hour,0,0) # end of year provision
elif interval == 'week':
time_out = time_in + datetime.timedelta(days=7)
else: # assume that the interval specified is the number of days to run forward before resubmitting
time_out = time_in + datetime.timedelta(days=float(interval))
return time_out
def ParseTimes(rc_da_shell):
""" get time parameters from rc-file and parse into datetime objects for later use """
td = rc_da_shell['time.start']
ymd = map(int,[td[0:4],td[4:6],td[6:8],td[8:10]]) # creates a 6-digit integer yyyy,mm,dd,hh,mn,sec
startdate = datetime.datetime(*ymd) # from which we create a python datetime object
td = rc_da_shell['time.finish']
ymd = map(int,[td[0:4],td[4:6],td[6:8],td[8:10]]) # again
finaldate = datetime.datetime(*ymd) # again
if finaldate <= startdate:
msg = 'The start date (%s) is not greater than the end date (%s), please revise'%(startdate.strftime('%Y%M%d'),finaldate.strftime('%Y%M%d'))
logging.error(msg)
raise ValueError
#
cyclelength = rc_da_shell['time.cycle'] # get time step
# Determine end date
if cyclelength == 'infinite':
enddate = finaldate
else:
enddate = AdvanceTime(startdate,cyclelength)
#
if enddate > finaldate: # do not run beyon finaldate
enddate = finaldate
rc_da_shell['startdate'] = startdate
rc_da_shell['enddate'] = enddate
rc_da_shell['finaldate'] = finaldate
rc_da_shell['cyclelength'] = cyclelength
msg = "===============================================================" ; logging.info(msg)
msg = "Filter start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg)
msg = "Filter end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg)
msg = "Filter final date is %s" % finaldate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg)
msg = "Filter cycle length is %s" % cyclelength ; logging.info(msg)
msg = "===============================================================" ; logging.info(msg)
return None
def PrepareObs(rc_da_shell,type='forecast'):
""" Prepare a set of observations to be co-sampled by the model. Although the collecting and parsing of
the observations will depend on the specific application, the final result of this step is an output
file called "observations.nc" which carries the x,y,z,t, dt information of each observation """
if rc_da_shell['da.system'] == 'CarbonTracker': import ct_tools as da_system
msg = "Using %s as DA system" % da_system.identifier ; logging.debug(msg)
rc_da_system = rc.read(rc_da_shell['da.system.rc'],silent = True)
dummy = ValidateRC(rc_da_system,da_system.needed_rc_items)
# Add da_system specific rc-file to the rc_da_shell dictionary
rc_da_shell['da.system.info'] = rc_da_system
dummy = da_system.PrepareObs(rc_da_shell,rc_da_system,type=type)
return None
def PrepareEnsemble(rc_da_shell ):
"""
Prepare the ensemble of parameters needed by the forecast model. This step will depend on the assimilation system type,
and here we only provide an interface to the system-specific PrepareEnsemble functions. This happens by reading the
da.system key from the rc_da_shell and importing the corresponding module as da_system.
"""
if rc_da_shell['da.system'] == 'CarbonTracker': import ct_tools as da_system
dummy = da_system.PrepareEnsemble(rc_da_shell )
return None
def RunForecastModel(rc_da_shell):
"""Prepare and execute a forecast step using an external Fortran model. Note that the flavor of model
used is determined in the very first line where the import statement of module "model" depends on a
setting in your da.rc file. After that choice, the module written specifically for a particular
application takes over. There are a few required variables and methods in such a module:
version [variable, string] : defines the version number of the module, e.g. "1.0"
identifier [variable, string] : identifies the model used as a string variable, e.g. "TM5"
PrepareExe (method) : A method that preps the model simulation. It can for instance create an input
parameter file for the model (tm5.rc), or execute some scripts needed before the
actual execution starts (get_meteo). After this step, a call to the model
executable should start a succesfull simulation
StartExe (method) : Start the executable. How this is done depends on your model and might involve
submitting a string of jobs to the queue, or simply spawning a subprocess, or ...
"""
# import modules, note that depending on the type of assimilation system, different submodules are imported!
if rc_da_shell['forecast.model'] == 'TM5': import tm5_tools as model
elif rc_da_shell['forecast.model'] == 'SIBCASA': import sibcasa_tools as model
elif rc_da_shell['forecast.model'] == 'WRF': import wrf_tools as model
####### FROM HERE ON, PROCESSES ARE MODEL DEPENDENT AND NEED TO BE IMPLEMENTED ON A PER-MODEL BASIS ############
msg = "Using %s as forecast model" % model.identifier ; logging.debug(msg)
# Prepare everything needed to run the forward model
rc_fcmodel = model.PrepareExe(rc_da_shell)
# Run the forward model
status = model.StartExe(rc_da_shell,rc_fcmodel)
########################################### RETURN CONTROL TO DA SHELL #########################################
dummy = os.chdir(rc_da_shell['dir.da_submit'])
return status
def Invert(rc_da_shell):
""" Perform least-squares minimization"""
if rc_da_shell['da.system'] == 'CarbonTracker': import ct_tools as da_system
dummy = da_system.MakeResiduals(rc_da_shell)
if __name__ == "__main__":
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment