Skip to content
Snippets Groups Projects
Commit 407628c0 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

replacing

parent ecadc964
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
# das.py
"""
Author : peters
#$ -N das
#$ -A co2
#$ -pe nserial 1
#$ -l h_rt=00:30:00
#$ -S /bin/sh
#$ -cwd
#$ -j y
#$ -r n
Revision History:
File created on 29 Sep 2009.
from da.tools.initexit import StartLogger
from da.tools.pipeline import Main
"""
import rc
from tools_da import ValidateRC
from tools_da import needed_rc_da_items
dummy = StartLogger()
header = '\n\n *************************************** '
footer = ' *************************************** \n '
###########################################################################################
### IMPORT THE APPLICATION SPECIFIC MODULES HERE, TO BE PASSED INTO THE MAIN PIPELINE!!! ##
###########################################################################################
validprocess=['jobstart','jobinput','sample','invert','propagate','resubmit','all']
from da.platform.maunaloa import MaunaloaPlatForm
from da.ct.dasystem import CtDaSystem
from da.ct.statevector import CtStateVector
from da.ct.obs import CtObservations
from da.tm5.observationoperator import TM5ObservationOperator
from da.ct.optimizer import CtOptimizer
def JobStart(opts,args):
""" Set up the job specific directory structure and create an expanded rc-file """
PlatForm = MaunaloaPlatForm()
DaSystem = CtDaSystem('carbontracker.rc')
StateVector = CtStateVector()
Samples = CtObservations()
ObsOperator = TM5ObservationOperator()
Optimizer = CtOptimizer()
import rc
from da_initexit import StartRestartRecover
from tools_da import ParseTimes
from da_initexit import WriteRC
##########################################################################################
################### ENTER THE PIPELINE WITH THE OBJECTS PASSED BY THE USER ###############
##########################################################################################
rc_da_shell = rc.read(args['rc'])
# Add some useful variables to the rc-file dictionary
print "\n ********************************************************************************************************"
print " *************************************** Entering Pipeline ******************************************"
print " ********************************************************************************************************\n"
rc_da_shell['log'] = args["logfile"]
Main(PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer)
rc_da_shell['dir.da_submit'] = os.getcwd()
rc_da_shell['da.crash.recover'] = '-r' in opts
rc_da_shell['verbose'] = '-v' in opts
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
# Figure out what to do: is this is a fresh start, a continuation, or a recover from crash
dummy = StartRestartRecover(rc_da_shell)
# Calculate DA system startdate, enddate, and finaldate from rc-file items
dummy = ParseTimes(rc_da_shell)
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return rc_da_shell
def JobInput(args):
""" Set up the input data for the forward model: obs and parameters/fluxes"""
from tools_da import PrepareObs
from tools_da import PrepareEnsemble
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = PrepareEnsemble(rc_da_shell)
dummy = PrepareObs(rc_da_shell,'forecast')
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def Sample(args):
""" Sample the filter state for the inversion """
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = ForwardRun(args,'forecast')
# Optionally, post-processing of the model ouptu can be added that deals for instance with
# sub-sampling of time series, vertical averaging, etc.
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def ForwardRun(args,runtype='forecast'):
""" Run the forward model from startdate to enddate """
from tools_da import SetInterval
from tools_da import RunForecastModel
from da_initexit import IOSaveData
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = SetInterval(rc_da_shell,runtype)
dummy = RunForecastModel(rc_da_shell)
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def Invert(args):
""" Perform the inverse calculation """
import tools_da
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = tools_da.Invert(rc_da_shell)
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def Propagate(args):
""" Propagate the filter state to the next step """
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
# This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)
dummy = ForwardRun(args,'advance')
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def SaveAndSubmit(args):
""" Save the model state and submit the next job """
from da_initexit import IOSaveData
from da_initexit import WriteRC
from da_initexit import WriteNewRCfile
from da_initexit import SubmitNextCycle
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = IOSaveData(rc_da_shell,io_option='store',save_option='full')
dummy = WriteNewRCfile(rc_da_shell)
dummy = SubmitNextCycle(rc_da_shell)
return None
if __name__ == "__main__":
import sys
import os
import logging
import shutil
import subprocess
from tools_da import ParseOptions
from tools_da import StartLogger
# Append current working dir to path
sys.path.append(os.getcwd())
# Get name of logfile
opts, args = ParseOptions()
if not args.has_key("logfile"):
msg = "There is no logfile specified on the command line. Using logfile=logfile.log"
args['logfile'] = 'logfile.log'
logfile = args['logfile']
dummy = StartLogger(logfile=logfile)
if not args.has_key("rc"):
msg = "There is no rc-file specified on the command line. Please use rc=yourfile.rc" ; logging.error(msg)
raise IOError,msg
elif not os.path.exists(args['rc']):
msg = "The specified rc-file (%s) does not exist " % args['rc'] ; logging.error(msg)
raise IOError,msg
if not args.has_key("process"):
msg = "There is no execution process specified on the command line. Using default process=all" ; logging.error(msg)
args["process"] = 'all'
if not args["process"] in validprocess:
msg = "The specified execution process is not valid (%s). Please use one of %s"%(args['process'],validprocess) ; logging.error(msg)
raise IOError,msg
# Get name of the process requested
process=args['process']
msg = 'Process %s starting, entered python from master shell'%process ; logging.debug(msg)
if process == 'jobstart':
rcf = JobStart(opts,args)
if process == 'jobinput':
dummy = JobInput(args)
if process == 'sample':
dummy = ForwardRun(args,'forecast')
if process == 'invert':
dummy = Invert(args)
if process == 'propagate':
dummy = Propagate(args)
if process == 'resubmit':
dummy = SaveAndSubmit(args)
if process == 'all':
args['jobrcfilename'] = "jb.%s.rc"%(os.getpid(),)
msg = header+"starting JobStart"+footer ; logging.info(msg)
rcf = JobStart(opts,args)
msg = header+"starting JobInput"+footer ; logging.info(msg)
dummy = JobInput(args)
msg = header+"starting ForwardRun"+footer ; logging.info(msg)
dummy = ForwardRun(args,'forecast')
msg = header+"starting Invert"+footer ; logging.info(msg)
dummy = Invert(args)
msg = header+"starting Propagate"+footer ; logging.info(msg)
dummy = Propagate(args)
msg = header+"starting SaveAndSubmit"+footer ; logging.info(msg)
dummy = SaveAndSubmit(args)
msg = "Cycle finished...exiting" ; logging.info(msg)
# move log file to rundir/jobs
jobdir = os.path.join(rcf['dir.da_run'],"jobs")
joblogfile = os.path.join(jobdir,logfile)
dummy = shutil.move(logfile,joblogfile)
msg = "....Moved %s to %s"%(logfile,joblogfile) ; logging.debug(msg)
# move rc file to rundir/jobs
jobrcfile = os.path.join(jobdir,args["jobrcfilename"] )
dummy = shutil.move(args["jobrcfilename"],jobrcfile )
msg = "....Moved %s to %s"%(args['jobrcfilename'],jobrcfile) ; logging.debug(msg)
# cat TM5 output and rc-file to the job file output
tm5jobfile = os.path.join(jobdir,"tm5.%s"%(args['logfile']) )
if os.path.exists(tm5jobfile):
msg = "....Concatenating %s to %s"%(tm5jobfile,joblogfile) ; logging.debug(msg)
f = open(joblogfile,'a')
dummy = f.write(open(tm5jobfile,'r').read())
dummy = f.close()
if os.path.exists(jobrcfile):
msg = "....Concatenating %s to %s"%(jobrcfile,joblogfile) ; logging.debug(msg)
f = open(joblogfile,'a')
dummy = f.write(open(jobrcfile,'r').read())
dummy = f.close()
msg = "The complete log file is now at: %s"%(joblogfile) ; logging.info(msg)
msg = 'Process %s done, returning from python to master shell'%process ; logging.debug(msg)
sys.exit(0)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment