Commit 4697e5cf authored by Peters, Wouter's avatar Peters, Wouter
Browse files

cosmetic changes, logger removed from TM5 init function now

parent 5727ebc9
!!! Info for the CarbonTracker data assimilation system
!datadir : /lfs0/projects/co2/input/
datadir : /data/CO2/carbontracker/ct08/
obs.input.dir : ${datadir}/obsnc/
obs.input.fname : obs_final
ocn.covariance : ${datadir}/covariance_ocn_base.nc
bio.covariance : ${datadir}/covariance_bio_olson19.nc
deltaco2.prefix : oif_p3_era40.dpco2
regtype : olson19_oif30
nparameters : 220
random.seed : 4385
#!/usr/bin/env python
# da_initexit.py
"""
Author : peters
Revision History:
File created on 13 May 2009.
"""
import logging
import os
import sys
import shutil
import rc
import datetime
from tools_da import CreateDirs
from tools_da import AdvanceTime
from tools_da import StartLogger
from tools_da import ParseOptions
def Init():
""" Initialization of the data assimilation job. Parse command line options, read specified rc-file values """
# Initialize the logging system
logfilename = 'da.test.%s.log' % os.getpid()
dummy = StartLogger(logfile=logfilename)
logging.info('Starting execution of Main program')
# Start execution, first pass command line arguments
opts, args = ParseOptions()
# Check for the existence of the "rc" key, this is needed input for the script to pass a set of external options
if not args.has_key("rc"):
msg = "There is no rc-file specified on the command line. Please use rc=yourfile.rc" ; logging.error(msg)
raise IOError,msg
elif not os.path.exists(args['rc']):
msg = "The specified rc-file (%s) does not exist " % args['rc'] ; logging.error(msg)
raise IOError,msg
else:
rc_da_shell = rc.read(args['rc'])
# Add some useful variables to the rc-file dictionary
rc_da_shell['log'] = logfilename
rc_da_shell['pid'] = '%s' % os.getpid()
rc_da_shell['dir.da_submit'] = os.getcwd()
rc_da_shell['da.crash.recover'] = '-r' in opts
rc_da_shell['verbose'] = '-v' in opts
return rc_da_shell
def SetupFileStructure(rc_da_shell):
""" Create file structure needed for data assimilation system.
In principle this looks like:
${da_rundir}
${da_rundir}/input
${da_rundir}/output
${da_rundir}/exec
${da_rundir}/diagnostics
${da_rundir}/analysis
${da_rundir}/jobs
${da_rundir}/save
${da_rundir}/save/one-ago
${da_rundir}/save/two-ago
Note that the exec dir will actually be a simlink to the directory where
the SIBCASA or TM5 model executable lives. This directory is passed through
the da.rc file. The observation input files will be placed in the exec dir,
and the resulting simulated values will be retrieved from there as well.
"""
# Create the run directory for this DA job, including I/O structure
CreateDirs(rc_da_shell['dir.da_run'])
rc_da_shell['dir.exec'] = os.path.join(rc_da_shell['dir.da_run'],'exec')
rc_da_shell['dir.input'] = os.path.join(rc_da_shell['dir.da_run'],'input')
rc_da_shell['dir.output'] = os.path.join(rc_da_shell['dir.da_run'],'output')
rc_da_shell['dir.diagnostics'] = os.path.join(rc_da_shell['dir.da_run'],'diagnostics')
rc_da_shell['dir.analysis'] = os.path.join(rc_da_shell['dir.da_run'],'analysis')
rc_da_shell['dir.jobs'] = os.path.join(rc_da_shell['dir.da_run'],'jobs')
rc_da_shell['dir.save'] = os.path.join(rc_da_shell['dir.da_run'],'save')
CreateDirs(os.path.join(rc_da_shell['dir.exec']))
CreateDirs(os.path.join(rc_da_shell['dir.input']))
CreateDirs(os.path.join(rc_da_shell['dir.output']))
CreateDirs(os.path.join(rc_da_shell['dir.diagnostics']))
CreateDirs(os.path.join(rc_da_shell['dir.analysis']))
CreateDirs(os.path.join(rc_da_shell['dir.jobs']))
CreateDirs(os.path.join(rc_da_shell['dir.save']))
CreateDirs(os.path.join(rc_da_shell['dir.save'],'one-ago'))
CreateDirs(os.path.join(rc_da_shell['dir.save'],'two-ago'))
msg = 'Succesfully created the file structure for the assimilation job' ; logging.info(msg)
def RecoverRun(rc_da_shell):
"""Prepare a recovery from a crashed run. This consists of:
- copying all data from the save/one-ago folder,
- replacing all rc-file items with those from da_runtime.rc
"""
savedir = os.path.join(rc_da_shell['dir.save'])
recoverydir = os.path.join(rc_da_shell['dir.save'],'one-ago')
# Replace rc-items with those from the crashed run last rc-file
file_rc_rec = os.path.join(rc_da_shell['dir.save'],'da_runtime.rc')
rc_rec = rc.read(file_rc_rec)
for k,v in rc_rec.iteritems():
rc_da_shell[k] = v
msg = "Replaced rc-items.... " ; logging.debug(msg)
msg = "Next cycle start date is %s" % rc_da_shell['time.start'] ; logging.debug(msg)
return None
def IOSaveData(rc_da_shell, io_option='restore', save_option='partial',filter=[]):
""" Store or restore model state to/from a temporary directory. Two save_option options are available:
(1) save_option = 'partial' : Save the background concentration fields only, to use for a later advance of the background
(2) save_option = 'full' : Save all restart data, to use for a next cycle
While also two IO options are available:
(1) io_option = restore : Get data from a directory
(2) io_option = store : Save data to a directory
In case of a 'store' command the target folder is re-created so that the contents are empty to begin with.
There is currently room for the specific sub-models to also add save data to the mix. This is done through a
forecast.model dependend import statement, followed by a call to function:
model.IOSaveData(**args)
"""
# import modules, note that depending on the type of assimilation system, different submodules are imported!
if rc_da_shell['forecast.model'] == 'TM5': import tm5_tools as model
elif rc_da_shell['forecast.model'] == 'SIBCASA': import sibcasa_tools as model
elif rc_da_shell['forecast.model'] == 'WRF': import wrf_tools as model
if io_option not in ['store','restore']:
raise ValueError,'Invalid option specified for io_option (%s)' % io_option
if save_option not in ['partial','full']:
raise ValueError,'Invalid option specified for save_option (%s)' % save_option
savedir = os.path.join(rc_da_shell['dir.save'])
oneagodir = os.path.join(rc_da_shell['dir.save'],'one-ago')
tempdir = os.path.join(rc_da_shell['dir.save'],'tmp')
if save_option == 'partial': # save background data to tmp dir
if io_option == 'store':
dummy = model.WriteSaveData(rc_da_shell)
targetdir = tempdir
sourcedir = savedir
elif io_option == 'restore':
sourcedir = tempdir
targetdir = savedir
elif save_option == 'full': # move all save data to one-ago dir
if io_option == 'store':
dummy = model.WriteSaveData(rc_da_shell)
targetdir = oneagodir
sourcedir = savedir
elif io_option == 'restore':
sourcedir = oneagodir
targetdir = savedir
# If "store" is requested, recreate target dir, cleaning the contents
if io_option == 'store':
CreateDirs(os.path.join(targetdir),forceclean=True)
msg = "Performing a %s %s of data" % (save_option,io_option) ; logging.info(msg)
msg = " from directory: %s " % sourcedir ; logging.info(msg)
msg = " to directory: %s " % targetdir ; logging.info(msg)
msg = " with filter : %s " % filter ; logging.info(msg)
for file in os.listdir(sourcedir):
file = os.path.join(sourcedir,file)
if os.path.isdir(file): # skip dirs
skip = True
elif filter == []: # copy all
skip= False
else: # check filter
skip = True # default skip
for f in filter:
if f in file:
skip = False # unless in filter
break
if skip:
msg = " [skip] .... %s " % file ; logging.debug(msg)
continue
if io_option == 'store' and save_option == 'full':
msg = " [move] .... %s " % file ; logging.info(msg)
dummy = shutil.move(file,file.replace(sourcedir,targetdir) )
else:
msg = " [copy] .... %s " % file ; logging.info(msg)
dummy = shutil.copy(file,file.replace(sourcedir,targetdir) )
def StartRestartRecover(rc_da_shell):
""" Determine how to proceed with this cycle:
(a) recover from crash : get the save data from the one-ago folder and replace the da_runtime variables with those from the save dir
(b) restart cycle : get the save data from the one-ago folder and use latest da_runtime variables from the exec dir
(c) fresh start : set up the required file structure for this simulation
"""
#
# case 1: A recover from a previous crash, this is signaled by flag "-r"
#
if rc_da_shell['da.crash.recover']:
msg = "Recovering simulation from data in: %s" % rc_da_shell['dir.da_run'] ; logging.info(msg)
dummy = SetupFileStructure(rc_da_shell)
dummy = IOSaveData(rc_da_shell,io_option='restore',save_option='full',filter=[])
dummy = RecoverRun(rc_da_shell)
#
# case 2: A continuation, this is signaled by rc-item time.restart = True
#
elif rc_da_shell['time.restart']:
msg = "Restarting filter from previous step" ; logging.info(msg)
dummy = SetupFileStructure(rc_da_shell)
dummy = IOSaveData(rc_da_shell,io_option='restore',save_option='full',filter=[])
#
# case 3: A fresh start, this is signaled by rc-item time.restart = False
#
elif not rc_da_shell['time.restart']:
msg = "First time step in filter sequence" ; logging.info(msg)
dummy = SetupFileStructure(rc_da_shell)
return None
#
def WriteNewRCfile(rc_da_shell):
""" Write the rc-file for the next DA cycle. Note that the start time for the next cycle is the end time of this one, while
the end time for the next cycle is the current end time + one cycle length. The resulting rc-file is written twice:
(1) to the dir.exec so that it can be used when resubmitting the next cycle
(2) to the dir.save so that it can be backed up with the rest of the save data, for later recovery if needed
"""
# These first two lines advance the filter time for the next cycle
rc_da_shell['startdate'] = rc_da_shell['enddate']
rc_da_shell['enddate'] = AdvanceTime(rc_da_shell['enddate'],rc_da_shell['cyclelength'])
# The rest is info needed for a system restart
rc_da_shell['time.restart'] = True
rc_da_shell['time.start'] = rc_da_shell['startdate'].strftime('%Y%m%d%H')
rc_da_shell['time.finish'] = rc_da_shell['finaldate'].strftime('%Y%m%d%H')
fname = os.path.join(rc_da_shell['dir.exec'],'da_runtime.rc')
rc_da_shell['da.restart.fname'] = fname
dummy = rc.write(fname,rc_da_shell)
msg = 'Wrote new da_runtime.rc file to working directory' ; logging.debug(msg)
# Also write this info to the savedir
fname = os.path.join(rc_da_shell['dir.save'],'da_runtime.rc')
dummy = rc.write(fname,rc_da_shell)
msg = 'Wrote new da_runtime.rc file to save directory' ; logging.debug(msg)
def WriteRC(rc_da_shell,filename):
""" Write RC file after each process to reflect updated info """
rcname = filename
fname = './'+rcname
dummy = rc.write(fname,rc_da_shell)
msg = 'Wrote expanded rc-file (%s) to current directory (%s)'%(rcname,rc_da_shell['dir.exec']) ; logging.debug(msg)
return None
def SubmitNextCycle(rc_da_shell):
""" submit next job in cycle, if needed. The end date is held against the final date. A subprocess is called that
submits the next job, without waiting for its result so this cycle can finish and exit.
"""
import subprocess
import os
if rc_da_shell['startdate'] < rc_da_shell['finaldate']:
#code = subprocess.call(['qsub',sys.argv[0],'rc=%s' % rc_da_shell['da.restart.fname']] )
#code = subprocess.Popen(['das.sh','rc=%s' % rc_da_shell['da.restart.fname']] )
code = subprocess.Popen([os.getcwd()+'/das.py','rc=%s' % rc_da_shell['da.restart.fname'],"process=all"] )
code=0
if code == 0:
logging.info('Submitted next cycle succesfully ')
else:
logging.error('Error in submitting next cycle, return code: %s ' % code)
raise OSError
else:
logging.info('Final date reached, no new cycle started')
return None
if __name__ == "__main__":
pass
#!/usr/bin/env python
# das.py
"""
Author : peters
Revision History:
File created on 29 Sep 2009.
"""
import rc
from tools_da import ValidateRC
from tools_da import needed_rc_da_items
header = '\n\n *************************************** '
footer = ' *************************************** \n '
validprocess=['jobstart','jobinput','sample','invert','propagate','resubmit','all']
def JobStart(opts,args):
""" Set up the job specific directory structure and create an expanded rc-file """
import rc
from da_initexit import StartRestartRecover
from tools_da import ParseTimes
from da_initexit import WriteRC
rc_da_shell = rc.read(args['rc'])
# Add some useful variables to the rc-file dictionary
rc_da_shell['log'] = args["logfile"]
rc_da_shell['dir.da_submit'] = os.getcwd()
rc_da_shell['da.crash.recover'] = '-r' in opts
rc_da_shell['verbose'] = '-v' in opts
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
# Figure out what to do: is this is a fresh start, a continuation, or a recover from crash
dummy = StartRestartRecover(rc_da_shell)
# Calculate DA system startdate, enddate, and finaldate from rc-file items
dummy = ParseTimes(rc_da_shell)
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return rc_da_shell
def JobInput(args):
""" Set up the input data for the forward model: obs and parameters/fluxes"""
from tools_da import PrepareObs
from tools_da import PrepareEnsemble
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = PrepareEnsemble(rc_da_shell)
dummy = PrepareObs(rc_da_shell,'forecast')
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def Sample(args):
""" Sample the filter state for the inversion """
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = ForwardRun(args,'forecast')
# Optionally, post-processing of the model ouptu can be added that deals for instance with
# sub-sampling of time series, vertical averaging, etc.
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def ForwardRun(args,runtype='forecast'):
""" Run the forward model from startdate to enddate """
from tools_da import SetInterval
from tools_da import RunForecastModel
from da_initexit import IOSaveData
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = SetInterval(rc_da_shell,runtype)
dummy = RunForecastModel(rc_da_shell)
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def Invert(args):
""" Perform the inverse calculation """
import tools_da
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = tools_da.Invert(rc_da_shell)
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def Propagate(args):
""" Propagate the filter state to the next step """
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
# This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)
dummy = ForwardRun(args,'advance')
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def SaveAndSubmit(args):
""" Save the model state and submit the next job """
from da_initexit import IOSaveData
from da_initexit import WriteRC
from da_initexit import WriteNewRCfile
from da_initexit import SubmitNextCycle
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = IOSaveData(rc_da_shell,io_option='store',save_option='full')
dummy = WriteNewRCfile(rc_da_shell)
dummy = SubmitNextCycle(rc_da_shell)
return None
if __name__ == "__main__":
import sys
import os
import logging
import shutil
import subprocess
from tools_da import ParseOptions
from tools_da import StartLogger
# Append current working dir to path
sys.path.append(os.getcwd())
# Get name of logfile
opts, args = ParseOptions()
if not args.has_key("logfile"):
msg = "There is no logfile specified on the command line. Using logfile=logfile.log"
args['logfile'] = 'logfile.log'
logfile = args['logfile']
dummy = StartLogger(logfile=logfile)
if not args.has_key("rc"):
msg = "There is no rc-file specified on the command line. Please use rc=yourfile.rc" ; logging.error(msg)
raise IOError,msg
elif not os.path.exists(args['rc']):
msg = "The specified rc-file (%s) does not exist " % args['rc'] ; logging.error(msg)
raise IOError,msg
if not args.has_key("process"):
msg = "There is no execution process specified on the command line. Using default process=all" ; logging.error(msg)
args["process"] = 'all'
if not args["process"] in validprocess:
msg = "The specified execution process is not valid (%s). Please use one of %s"%(args['process'],validprocess) ; logging.error(msg)
raise IOError,msg
# Get name of the process requested
process=args['process']
msg = 'Process %s starting, entered python from master shell'%process ; logging.debug(msg)
if process == 'jobstart':
rcf = JobStart(opts,args)
if process == 'jobinput':
dummy = JobInput(args)
if process == 'sample':
dummy = ForwardRun(args,'forecast')
if process == 'invert':
dummy = Invert(args)
if process == 'propagate':
dummy = Propagate(args)
if process == 'resubmit':
dummy = SaveAndSubmit(args)
if process == 'all':
args['jobrcfilename'] = "jb.%s.rc"%(os.getpid(),)
msg = header+"starting JobStart"+footer ; logging.info(msg)
rcf = JobStart(opts,args)
msg = header+"starting JobInput"+footer ; logging.info(msg)
dummy = JobInput(args)
msg = header+"starting ForwardRun"+footer ; logging.info(msg)
dummy = ForwardRun(args,'forecast')
msg = header+"starting Invert"+footer ; logging.info(msg)
dummy = Invert(args)
msg = header+"starting Propagate"+footer ; logging.info(msg)
dummy = Propagate(args)
msg = header+"starting SaveAndSubmit"+footer ; logging.info(msg)
dummy = SaveAndSubmit(args)
msg = "Cycle finished...exiting" ; logging.info(msg)
# move log file to rundir/jobs
jobdir = os.path.join(rcf['dir.da_run'],"jobs")
joblogfile = os.path.join(jobdir,logfile)
dummy = shutil.move(logfile,joblogfile)
msg = "....Moved %s to %s"%(logfile,joblogfile) ; logging.debug(msg)
# move rc file to rundir/jobs
jobrcfile = os.path.join(jobdir,args["jobrcfilename"] )
dummy = shutil.move(args["jobrcfilename"],jobrcfile )
msg = "....Moved %s to %s"%(args['jobrcfilename'],jobrcfile) ; logging.debug(msg)
# cat TM5 output and rc-file to the job file output
tm5jobfile = os.path.join(jobdir,"tm5.%s"%(args['logfile']) )
if os.path.exists(tm5jobfile):
msg = "....Concatenating %s to %s"%(tm5jobfile,joblogfile) ; logging.debug(msg)
f = open(joblogfile,'a')
dummy = f.write(open(tm5jobfile,'r').read())
dummy = f.close()
if os.path.exists(jobrcfile):
msg = "....Concatenating %s to %s"%(jobrcfile,joblogfile) ; logging.debug(msg)
f = open(joblogfile,'a')
dummy = f.write(open(jobrcfile,'r').read())
dummy = f.close()
msg = "The complete log file is now at: %s"%(joblogfile) ; logging.info(msg)
msg = 'Process %s done, returning from python to master shell'%process ; logging.debug(msg)
sys.exit(0)
......@@ -41,44 +41,50 @@ needed_rc_items = [
class TM5():
""" This class holds methods and variables that are needed to run the TM5 model. It is initiated with as only argument a TM5 rc-file
location. This rc-file will be used to figure out the settings for the run. This method of running TM5 assumes that a pre-compiled
tm5.exe is present, and it will be run from time.start to time.final. These settings can be modified later. To run a model version,
simply compile the model using an existing TM5 rc-file, then open python, and type:
location. This rc-file will be used to figure out the settings for the run.
*** This method of running TM5 assumes that a pre-compiled tm5.exe is present, and it will be run from time.start to time.final ***
These settings can be modified later. To run a model version, simply compile the model using an existing TM5 rc-file, then