Commit 37baecb4 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

The da_initexit.py and das.py are now starting to us the new class based modules.

parent 4697e5cf
! Info on the data assimilation cycle
time.restart : False
time.start : 2005030500
time.finish : 2005030700
time.start : 2005-03-05 00:00:00
time.finish : 2005-03-07 00:00:00
time.cycle : 1
time.nlag : 2
dir.da_run : ${HOME}/tmp/test_da
......
This diff is collapsed.
......@@ -8,9 +8,7 @@ Revision History:
File created on 29 Sep 2009.
"""
import rc
from tools_da import ValidateRC
from tools_da import needed_rc_da_items
from da_initexit import CycleControl
header = '\n\n *************************************** '
footer = ' *************************************** \n '
......@@ -20,139 +18,57 @@ validprocess=['jobstart','jobinput','sample','invert','propagate','resubmit','al
def JobStart(opts,args):
""" Set up the job specific directory structure and create an expanded rc-file """
import rc
from da_initexit import StartRestartRecover
from tools_da import ParseTimes
from da_initexit import WriteRC
DaCycle = CycleControl(opts,args)
rc_da_shell = rc.read(args['rc'])
DaCycle.Initialize()
# Add some useful variables to the rc-file dictionary
return DaCycle
rc_da_shell['log'] = args["logfile"]
rc_da_shell['dir.da_submit'] = os.getcwd()
rc_da_shell['da.crash.recover'] = '-r' in opts
rc_da_shell['verbose'] = '-v' in opts
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
# Figure out what to do: is this is a fresh start, a continuation, or a recover from crash
dummy = StartRestartRecover(rc_da_shell)
# Calculate DA system startdate, enddate, and finaldate from rc-file items
dummy = ParseTimes(rc_da_shell)
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return rc_da_shell
def JobInput(args):
def JobInput(CycleInfo):
""" Set up the input data for the forward model: obs and parameters/fluxes"""
from tools_da import PrepareObs
from tools_da import PrepareEnsemble
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = PrepareEnsemble(CycleInfo)
dummy = PrepareEnsemble(rc_da_shell)
dummy = PrepareObs(rc_da_shell,'forecast')
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
dummy = PrepareObs(CycleInfo,'forecast')
return None
def Sample(args):
def Sample(CycleInfo):
""" Sample the filter state for the inversion """
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
from tools_da import RunForecastModel
dummy = ForwardRun(args,'forecast')
dummy = RunForecastModel(CycleInfo,'forecast')
# Optionally, post-processing of the model ouptu can be added that deals for instance with
# sub-sampling of time series, vertical averaging, etc.
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def ForwardRun(args,runtype='forecast'):
""" Run the forward model from startdate to enddate """
from tools_da import SetInterval
from tools_da import RunForecastModel
from da_initexit import IOSaveData
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = SetInterval(rc_da_shell,runtype)
dummy = RunForecastModel(rc_da_shell)
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def Invert(args):
def Invert(CycleInfo):
""" Perform the inverse calculation """
import tools_da
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = tools_da.Invert(rc_da_shell)
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
dummy = tools_da.Invert(CycleInfo)
return None
def Propagate(args):
def Propagate(CycleInfo):
""" Propagate the filter state to the next step """
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
from tools_da import RunForecastModel
# This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)
dummy = ForwardRun(args,'advance')
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
dummy = RunForecastModel(CycleInfo,'advance')
return None
def SaveAndSubmit(args):
def SaveAndSubmit(CycleInfo):
""" Save the model state and submit the next job """
from da_initexit import IOSaveData
from da_initexit import WriteRC
from da_initexit import WriteNewRCfile
from da_initexit import SubmitNextCycle
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = IOSaveData(rc_da_shell,io_option='store',save_option='full')
dummy = WriteNewRCfile(rc_da_shell)
dummy = SubmitNextCycle(rc_da_shell)
dummy = CycleInfo.Finalize()
return None
......@@ -161,7 +77,9 @@ if __name__ == "__main__":
import os
import logging
import shutil
import subprocess
from tools_da import CleanUpCycle
from tools_da import ValidateOptsArgs
from tools_da import ParseOptions
from tools_da import StartLogger
......@@ -169,90 +87,40 @@ if __name__ == "__main__":
sys.path.append(os.getcwd())
# Get name of logfile
# Start a logger for all that happens from here on
dummy = StartLogger()
# Parse options from the command line
opts, args = ParseOptions()
if not args.has_key("logfile"):
msg = "There is no logfile specified on the command line. Using logfile=logfile.log"
args['logfile'] = 'logfile.log'
logfile = args['logfile']
dummy = StartLogger(logfile=logfile)
if not args.has_key("rc"):
msg = "There is no rc-file specified on the command line. Please use rc=yourfile.rc" ; logging.error(msg)
raise IOError,msg
elif not os.path.exists(args['rc']):
msg = "The specified rc-file (%s) does not exist " % args['rc'] ; logging.error(msg)
raise IOError,msg
if not args.has_key("process"):
msg = "There is no execution process specified on the command line. Using default process=all" ; logging.error(msg)
args["process"] = 'all'
if not args["process"] in validprocess:
msg = "The specified execution process is not valid (%s). Please use one of %s"%(args['process'],validprocess) ; logging.error(msg)
raise IOError,msg
# Get name of the process requested
process=args['process']
msg = 'Process %s starting, entered python from master shell'%process ; logging.debug(msg)
if process == 'jobstart':
rcf = JobStart(opts,args)
if process == 'jobinput':
dummy = JobInput(args)
if process == 'sample':
dummy = ForwardRun(args,'forecast')
if process == 'invert':
dummy = Invert(args)
if process == 'propagate':
dummy = Propagate(args)
if process == 'resubmit':
dummy = SaveAndSubmit(args)
if process == 'all':
args['jobrcfilename'] = "jb.%s.rc"%(os.getpid(),)
msg = header+"starting JobStart"+footer ; logging.info(msg)
rcf = JobStart(opts,args)
msg = header+"starting JobInput"+footer ; logging.info(msg)
dummy = JobInput(args)
msg = header+"starting ForwardRun"+footer ; logging.info(msg)
dummy = ForwardRun(args,'forecast')
msg = header+"starting Invert"+footer ; logging.info(msg)
dummy = Invert(args)
msg = header+"starting Propagate"+footer ; logging.info(msg)
dummy = Propagate(args)
msg = header+"starting SaveAndSubmit"+footer ; logging.info(msg)
dummy = SaveAndSubmit(args)
msg = "Cycle finished...exiting" ; logging.info(msg)
# move log file to rundir/jobs
jobdir = os.path.join(rcf['dir.da_run'],"jobs")
joblogfile = os.path.join(jobdir,logfile)
dummy = shutil.move(logfile,joblogfile)
msg = "....Moved %s to %s"%(logfile,joblogfile) ; logging.debug(msg)
# move rc file to rundir/jobs
jobrcfile = os.path.join(jobdir,args["jobrcfilename"] )
dummy = shutil.move(args["jobrcfilename"],jobrcfile )
msg = "....Moved %s to %s"%(args['jobrcfilename'],jobrcfile) ; logging.debug(msg)
# cat TM5 output and rc-file to the job file output
tm5jobfile = os.path.join(jobdir,"tm5.%s"%(args['logfile']) )
if os.path.exists(tm5jobfile):
msg = "....Concatenating %s to %s"%(tm5jobfile,joblogfile) ; logging.debug(msg)
f = open(joblogfile,'a')
dummy = f.write(open(tm5jobfile,'r').read())
dummy = f.close()
if os.path.exists(jobrcfile):
msg = "....Concatenating %s to %s"%(jobrcfile,joblogfile) ; logging.debug(msg)
f = open(joblogfile,'a')
dummy = f.write(open(jobrcfile,'r').read())
dummy = f.close()
msg = "The complete log file is now at: %s"%(joblogfile) ; logging.info(msg)
msg = 'Process %s done, returning from python to master shell'%process ; logging.debug(msg)
# Validate Options and arguments passed
opts,args = ValidateOptsArgs(opts,args)
# Start the subprocesses
msg = header+"starting JobStart"+footer ; logging.info(msg)
CycleInfo = JobStart(opts,args)
#msg = header+"starting JobInput"+footer ; logging.info(msg)
#dummy = JobInput(CycleInfo)
msg = header+"starting Sample Taking"+footer ; logging.info(msg)
dummy = Sample(CycleInfo)
#msg = header+"starting Invert"+footer ; logging.info(msg)
#dummy = Invert(CycleInfo)
msg = header+"starting Propagate"+footer ; logging.info(msg)
dummy = Propagate(CycleInfo)
msg = header+"starting SaveAndSubmit"+footer ; logging.info(msg)
dummy = SaveAndSubmit(CycleInfo)
msg = "Cycle finished...exiting" ; logging.info(msg)
dummy = CleanUpCycle(CycleInfo)
sys.exit(0)
......@@ -53,7 +53,7 @@ class TM5():
[]> tm.WriteRunRc()
[]> tm.Run()
To use this class inside a data assimilation cycle, a stand-alone method "PrepareExe()" is included which modifies the TM5
To use this class inside a data assimilation cycle, a stand-alone method "Initialize()" is included which modifies the TM5
settings according to an external dictionary of values to overwrite, and then runs the TM5 model.
......@@ -233,7 +233,7 @@ class TM5():
self.Status = 'Success'
else:
logging.error('Error in model executable return code: %s ' % code)
logging.info('Inspect [%s] to find output from model executable ' % modellogfilename)
logging.info('Inspect [%s] to find output from model executable ' % self.ModelLogFilename)
self.Status = 'Failed'
raise OSError
......@@ -281,7 +281,7 @@ class TM5():
def PrepareExe(rc_da_shell):
def Initialize(rc_da_shell):
"""
Prepare a forward model TM5 run, this consists of:
......@@ -293,9 +293,7 @@ def PrepareExe(rc_da_shell):
"""
from tools_da import CreateLinks, CreateDirs, ParseTimes, StartLogger
StartLogger()
from tools_da import CreateLinks, CreateDirs
# Make an instance of the TM5 class, supply it with a valid rc-file name
......@@ -304,22 +302,23 @@ def PrepareExe(rc_da_shell):
# Create a link from TM5 to the rundirectory of the das system
sourcedir = Tm5Model.tm_settings['rundir']
targetdir = os.path.join(rc_da_shell['dir.exec'])
targetdir = os.path.join(rc_da_shell['dir.exec'],'tm5')
CreateLinks(sourcedir,targetdir)
# Extract time information from the das system
ParseTimes(rc_da_shell)
rc_da_shell['dir.exec.tm5'] = targetdir
# Write a modified TM5 model rc-file in which run/break times are defined by our da system
NewItems = {
'time.start': rc_da_shell['startdate'] ,
'time.final' : rc_da_shell['enddate'] ,
'rundir' : rc_da_shell['dir.exec'] ,
'savedir' : rc_da_shell['dir.save'] ,
'outputdir': rc_da_shell['dir.output']
'time.start' : rc_da_shell['startdate'] ,
'time.final' : rc_da_shell['sample.enddate'] ,
'rundir' : rc_da_shell['dir.exec.tm5'] ,
'outputdir' : rc_da_shell['dir.output'] ,
'savedir' : rc_da_shell['dir.save'] ,
'das.input.dir' : rc_da_shell['dir.input']
}
if rc_da_shell['time.restart'] == True: NewItems['istart'] = 3
Tm5Model.ModifyRC(NewItems)
......@@ -329,7 +328,7 @@ def PrepareExe(rc_da_shell):
# Copy the pre-compiled MPI wrapper to the execution directory
targetdir = os.path.join(rc_da_shell['dir.exec'])
targetdir = os.path.join(rc_da_shell['dir.exec.tm5'])
if not os.path.exists(mpi_shell_file):
msg = "Cannot find the mpi_shell wrapper needed for completion (%s)"% mpi_shell_file ; logging.error(msg)
......@@ -362,7 +361,7 @@ if __name__ == "__main__":
#dasrc['dir.output']=os.path.join(dasrc['dir.da_run'],'output')
#dasrc['dir.exec']=os.path.join(dasrc['dir.da_run'],'exec')
#tm = PrepareExe(dasrc)
#tm = Initialize(dasrc)
#tm.Run()
#tm.SaveData()
......
......@@ -18,22 +18,7 @@ import shutil
import rc
import datetime
needed_rc_da_items=[
'time.start',
'time.finish',
'time.nlag',
'time.cycle',
'dir.da_run',
'forecast.model',
'forecast.model.rc',
'da.system']
helptext=\
"""
HELP!!!
"""
def StartLogger(logfile='logfile.log'):
def StartLogger(logfile='jb.%s.log'%os.getpid()):
""" start the logging of messages to screen and to file"""
# start the logging basic configuration by setting up a log file
......@@ -110,22 +95,20 @@ def ParseOptions():
return opts, arguments
def ValidateRC(rcfile,needed_items):
""" validate the contents of an rc-file given a dictionary of required keys """
def ValidateOptsArgs(opts,args):
""" Validate the options and arguments passed from the command line before starting the cycle """
for k,v in rcfile.iteritems():
if v == 'True' : rcfile[k] = True
if v == 'False': rcfile[k] = False
if 'date' in k : rcfile[k] = datetime.datetime.strptime(v,'%Y-%m-%d %H:%M:%S')
if not args.has_key("rc"):
msg = "There is no rc-file specified on the command line. Please use rc=yourfile.rc" ; logging.error(msg)
raise IOError,msg
elif not os.path.exists(args['rc']):
msg = "The specified rc-file (%s) does not exist " % args['rc'] ; logging.error(msg)
raise IOError,msg
for key in needed_items:
args['jobrcfilename'] = "jb.%s.rc"%(os.getpid(),)
args['logfile'] = "jb.%s.log"%(os.getpid(),)
if not rcfile.has_key(key):
status,msg = ( False,'Missing a required value in rc-file : %s' % key)
logging.error(msg)
raise IOError,msg
status,msg = ( True,'rc-file has been validated succesfully' ) ; logging.debug(msg)
return opts,args
def CreateDirs(dirname,forceclean=False):
""" Create a directory and report success, only if non-existent """
......@@ -164,7 +147,7 @@ def CreateLinks(sourcedir,targetdir):
return None
def SetInterval(rc_da_shell,run='forecast'):
def SetInterval(CycleInfo,run='forecast'):
""" Set the interval over which the observation operator will be run. There are two options:
(1) forecast : the simulation runs from startdate to startdate + nlag*delta_time
......@@ -174,9 +157,9 @@ def SetInterval(rc_da_shell,run='forecast'):
if run not in ['forecast','advance']:
raise ValueError, "Unknown interval specified for run (%s)" % run
nlag = int(rc_da_shell['time.nlag'])
startdate = rc_da_shell['startdate']
cyclelen = rc_da_shell['cyclelength']
nlag = int(CycleInfo.da_settings['time.nlag'])
startdate = CycleInfo.da_settings['startdate']
cyclelen = CycleInfo.da_settings['cyclelength']
if run == 'forecast':
......@@ -189,7 +172,7 @@ def SetInterval(rc_da_shell,run='forecast'):
enddate = AdvanceTime(startdate,cyclelen)
rc_da_shell['enddate'] = enddate
CycleInfo.da_settings['sample.enddate'] = enddate
msg = "New simulation interval set : " ; logging.info(msg)
msg = " start date : %s " % startdate.strftime('%F %H:%M') ; logging.info(msg)
......@@ -216,48 +199,6 @@ def AdvanceTime(time_in,interval):
return time_out
def ParseTimes(rc_da_shell):
""" get time parameters from rc-file and parse into datetime objects for later use """
td = rc_da_shell['time.start']
ymd = map(int,[td[0:4],td[4:6],td[6:8],td[8:10]]) # creates a 6-digit integer yyyy,mm,dd,hh,mn,sec
startdate = datetime.datetime(*ymd) # from which we create a python datetime object
td = rc_da_shell['time.finish']
ymd = map(int,[td[0:4],td[4:6],td[6:8],td[8:10]]) # again
finaldate = datetime.datetime(*ymd) # again
if finaldate <= startdate:
msg = 'The start date (%s) is not greater than the end date (%s), please revise'%(startdate.strftime('%Y%M%d'),finaldate.strftime('%Y%M%d'))
logging.error(msg)
raise ValueError
#
cyclelength = rc_da_shell['time.cycle'] # get time step
# Determine end date
if cyclelength == 'infinite':
enddate = finaldate
else:
enddate = AdvanceTime(startdate,cyclelength)
#
if enddate > finaldate: # do not run beyon finaldate
enddate = finaldate
rc_da_shell['startdate'] = startdate
rc_da_shell['enddate'] = enddate
rc_da_shell['finaldate'] = finaldate
rc_da_shell['cyclelength'] = cyclelength
msg = "===============================================================" ; logging.info(msg)
msg = "Filter start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg)
msg = "Filter end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg)
msg = "Filter final date is %s" % finaldate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg)
msg = "Filter cycle length is %s" % cyclelength ; logging.info(msg)
msg = "===============================================================" ; logging.info(msg)
return None
def PrepareObs(rc_da_shell,type='forecast'):
""" Prepare a set of observations to be co-sampled by the model. Although the collecting and parsing of
the observations will depend on the specific application, the final result of this step is an output
......@@ -295,7 +236,7 @@ def PrepareEnsemble(rc_da_shell ):
return None
def RunForecastModel(rc_da_shell):
def RunForecastModel(CycleInfo,step='forecast'):
"""Prepare and execute a forecast step using an external Fortran model. Note that the flavor of model
used is determined in the very first line where the import statement of module "model" depends on a
setting in your da.rc file. After that choice, the module written specifically for a particular
......@@ -307,15 +248,15 @@ def RunForecastModel(rc_da_shell):
parameter file for the model (tm5.rc), or execute some scripts needed before the
actual execution starts (get_meteo). After this step, a call to the model
executable should start a succesfull simulation
StartExe (method) : Start the executable. How this is done depends on your model and might involve
Run (method) : Start the executable. How this is done depends on your model and might involve
submitting a string of jobs to the queue, or simply spawning a subprocess, or ...
"""
# import modules, note that depending on the type of assimilation system, different submodules are imported!
if rc_da_shell['forecast.model'] == 'TM5': import tm5_tools as model
elif rc_da_shell['forecast.model'] == 'SIBCASA': import sibcasa_tools as model
elif rc_da_shell['forecast.model'] == 'WRF': import wrf_tools as model
if CycleInfo.da_settings['forecast.model'] == 'TM5': import tm5_tools as model
elif CycleInfo.da_settings['forecast.model'] == 'SIBCASA': import sibcasa_tools as model
elif CycleInfo.da_settings['forecast.model'] == 'WRF': import wrf_tools as model
####### FROM HERE ON, PROCESSES ARE MODEL DEPENDENT AND NEED TO BE IMPLEMENTED ON A PER-MODEL BASIS ############
......@@ -323,15 +264,18 @@ def RunForecastModel(rc_da_shell):
# Prepare everything needed to run the forward model
executable = model.PrepareExe(rc_da_shell)
dummy = SetInterval(CycleInfo,step)
executable = model.Initialize(CycleInfo.da_settings)
# Run the forward model
status = executable.Run()
########################################### RETURN CONTROL TO DA SHELL #########################################
if step =='advance':
status = executable.SaveData()
dummy = os.chdir(rc_da_shell['dir.da_submit'])
########################################### RETURN CONTROL TO DA SHELL #########################################
return status
......@@ -342,6 +286,36 @@ def Invert(rc_da_shell):
dummy = da_system.MakeResiduals(rc_da_shell)
def CleanUpCycle(CycleInfo):
"""
Move files, clean up rundir after a cycle has finished
"""
# move log file to rundir/jobs
jobdir = os.path.join(CycleInfo.da_settings['dir.da_run'],"jobs")
logfile = CycleInfo.da_settings['log']
joblogfile = os.path.join(jobdir,logfile)
dummy = shutil.move(logfile,joblogfile)
msg = "....Moved %s to %s"%(logfile,joblogfile) ; logging.debug(msg)
# move rc file to rundir/jobs
jobrcfile = os.path.join(jobdir,CycleInfo.da_settings["jobrcfilename"] )
dummy = shutil.move(CycleInfo.da_settings["jobrcfilename"],jobrcfile )
msg = "....Moved %s to %s"%(CycleInfo.da_settings['jobrcfilename'],jobrcfile) ; logging.debug(msg)
# cat TM5 output and rc-file to the job file output
tm5jobfile = os.path.join(jobdir,"tm5.%s"%(CycleInfo.da_settings['log']) )
if os.path.exists(tm5jobfile):
msg = "....Concatenating %s to %s"%(tm5jobfile,joblogfile) ; logging.debug(msg)
f = open(joblogfile,'a')
dummy = f.write(open(tm5jobfile,'r').read())
dummy = f.close()
if os.path.exists(jobrcfile):
msg = "....Concatenating %s to %s"%(jobrcfile,joblogfile) ; logging.debug(msg)
f = open(joblogfile,'a')
dummy = f.write(open(jobrcfile,'r').read())
dummy = f.close()
msg = "The complete log file is now at: %s"%(joblogfile) ; logging.info(msg)
if __name__ == "__main__":
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.