Skip to content
Snippets Groups Projects
das.py 8.12 KiB
Newer Older
#!/usr/bin/env python
# das.py

"""
Author : peters 

Revision History:
File created on 29 Sep 2009.

"""
import rc
from tools_da import ValidateRC
from tools_da import needed_rc_da_items

header  = '\n\n    ***************************************   '
footer  = '    *************************************** \n  '

validprocess=['jobstart','jobinput','sample','invert','propagate','resubmit','all']

def JobStart(opts,args):
    """ Set up the job specific directory structure and create an expanded rc-file """

    import rc
    from da_initexit import StartRestartRecover
    from tools_da import ParseTimes
    from da_initexit import WriteRC

    rc_da_shell = rc.read(args['rc'])

# Add some useful variables to the rc-file dictionary

    rc_da_shell['log']              = args["logfile"]

    rc_da_shell['dir.da_submit']    = os.getcwd()
    rc_da_shell['da.crash.recover'] = '-r' in opts
    rc_da_shell['verbose']          = '-v' in opts

    dummy = ValidateRC(rc_da_shell,needed_rc_da_items)

# Figure out what to do: is this is a fresh start, a continuation, or a recover from crash

    dummy = StartRestartRecover(rc_da_shell)

# Calculate DA system startdate, enddate, and finaldate from rc-file items

    dummy = ParseTimes(rc_da_shell)

    dummy = WriteRC(rc_da_shell,args['jobrcfilename'])

    return rc_da_shell


def JobInput(args):
    """ Set up the input data for the forward model: obs and parameters/fluxes"""
    from tools_da import PrepareObs
    from tools_da import PrepareEnsemble
    from da_initexit import WriteRC

    rc_da_shell = rc.read(args['jobrcfilename'])

    dummy = ValidateRC(rc_da_shell,needed_rc_da_items)

    dummy = PrepareEnsemble(rc_da_shell)

    dummy = PrepareObs(rc_da_shell,'forecast')

    dummy = WriteRC(rc_da_shell,args['jobrcfilename'])

    return None

def Sample(args):
    """ Sample the filter state for the inversion """
    from da_initexit import WriteRC

    rc_da_shell = rc.read(args['jobrcfilename'])

    dummy = ValidateRC(rc_da_shell,needed_rc_da_items)

    dummy = ForwardRun(args,'forecast')

    # Optionally, post-processing of the model ouptu can be added that deals for instance with
    # sub-sampling of time series, vertical averaging, etc.

    dummy = WriteRC(rc_da_shell,args['jobrcfilename'])

    return None

def ForwardRun(args,runtype='forecast'):
    """ Run the forward model from startdate to enddate """

    from tools_da import SetInterval
    from tools_da import RunForecastModel
    from da_initexit import IOSaveData
    from da_initexit import WriteRC

    rc_da_shell = rc.read(args['jobrcfilename'])

    dummy = ValidateRC(rc_da_shell,needed_rc_da_items)

    dummy = SetInterval(rc_da_shell,runtype)

    dummy = RunForecastModel(rc_da_shell)

    dummy = WriteRC(rc_da_shell,args['jobrcfilename'])

    return None

def Invert(args):
    """ Perform the inverse calculation """
    import tools_da
    from da_initexit import WriteRC

    rc_da_shell = rc.read(args['jobrcfilename'])

    dummy = ValidateRC(rc_da_shell,needed_rc_da_items)

    dummy = tools_da.Invert(rc_da_shell)

    dummy = WriteRC(rc_da_shell,args['jobrcfilename'])

    return None

def Propagate(args):
    """ Propagate the filter state to the next step """
    from da_initexit import WriteRC

    rc_da_shell = rc.read(args['jobrcfilename'])

    dummy = ValidateRC(rc_da_shell,needed_rc_da_items)


# This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)

    dummy = ForwardRun(args,'advance')

    dummy = WriteRC(rc_da_shell,args['jobrcfilename'])

    return None

def SaveAndSubmit(args):
    """ Save the model state and submit the next job """
    from da_initexit import IOSaveData
    from da_initexit import WriteRC
    from da_initexit import WriteNewRCfile
    from da_initexit import SubmitNextCycle

    rc_da_shell = rc.read(args['jobrcfilename'])

    dummy = ValidateRC(rc_da_shell,needed_rc_da_items)

    dummy = IOSaveData(rc_da_shell,io_option='store',save_option='full')

    dummy = WriteNewRCfile(rc_da_shell)

    dummy = SubmitNextCycle(rc_da_shell)

    return None

if __name__ == "__main__":
    import sys
    import os
    import logging
    import shutil
    import subprocess
    from tools_da import ParseOptions
    from tools_da import StartLogger

# Append current working dir to path

    sys.path.append(os.getcwd())

# Get name of logfile

    opts, args = ParseOptions()

    if not args.has_key("logfile"):
        msg = "There is no logfile specified on the command line. Using logfile=logfile.log"  
        args['logfile'] = 'logfile.log'

    logfile = args['logfile']
    dummy   = StartLogger(logfile=logfile)

    if not args.has_key("rc"):
        msg = "There is no rc-file specified on the command line. Please use rc=yourfile.rc"   ; logging.error(msg)
        raise IOError,msg
    elif not os.path.exists(args['rc']):
        msg = "The specified rc-file (%s) does not exist " % args['rc'] ;  logging.error(msg)
        raise IOError,msg

    if not args.has_key("process"):
        msg = "There is no execution process specified on the command line. Using default process=all"   ; logging.error(msg)
        args["process"] = 'all'
    if not args["process"] in validprocess:
        msg = "The specified execution process is not valid (%s). Please use one of %s"%(args['process'],validprocess)   ; logging.error(msg)
        raise IOError,msg

# Get name of the process requested

    process=args['process']

    msg    = 'Process %s starting, entered python from master shell'%process ; logging.debug(msg)

    if process == 'jobstart':
        rcf   = JobStart(opts,args)
    if process == 'jobinput':
        dummy = JobInput(args)
    if process == 'sample':
        dummy = ForwardRun(args,'forecast')
    if process == 'invert':
        dummy = Invert(args)
    if process == 'propagate':
        dummy = Propagate(args)
    if process == 'resubmit':
        dummy = SaveAndSubmit(args)
    if process == 'all':
        args['jobrcfilename'] = "jb.%s.rc"%(os.getpid(),)
        msg   = header+"starting JobStart"+footer           ; logging.info(msg) 
        rcf   = JobStart(opts,args)
        msg   = header+"starting JobInput"+footer                  ; logging.info(msg) 
        dummy = JobInput(args)
        msg   = header+"starting ForwardRun"+footer         ; logging.info(msg) 
        dummy = ForwardRun(args,'forecast')
        msg   = header+"starting Invert"+footer             ; logging.info(msg) 
        dummy = Invert(args)
        msg   = header+"starting Propagate"+footer          ; logging.info(msg) 
        dummy = Propagate(args)
        msg   = header+"starting SaveAndSubmit"+footer      ; logging.info(msg) 
        dummy = SaveAndSubmit(args)

        msg   = "Cycle finished...exiting"     ; logging.info(msg)

        # move log file to rundir/jobs
        jobdir      = os.path.join(rcf['dir.da_run'],"jobs")
        joblogfile  = os.path.join(jobdir,logfile) 
        dummy       = shutil.move(logfile,joblogfile)
        msg         = "....Moved %s to %s"%(logfile,joblogfile)   ; logging.debug(msg)

        # move rc file to rundir/jobs
        jobrcfile   = os.path.join(jobdir,args["jobrcfilename"] )
        dummy       = shutil.move(args["jobrcfilename"],jobrcfile )
        msg         = "....Moved %s to %s"%(args['jobrcfilename'],jobrcfile)   ; logging.debug(msg)

        # cat TM5 output and rc-file to the job file output
        tm5jobfile  = os.path.join(jobdir,"tm5.%s"%(args['logfile']) )
        if  os.path.exists(tm5jobfile):
            msg     = "....Concatenating %s to %s"%(tm5jobfile,joblogfile)   ; logging.debug(msg)
            f       = open(joblogfile,'a')
            dummy   = f.write(open(tm5jobfile,'r').read())
            dummy   = f.close()
        if  os.path.exists(jobrcfile):
            msg     = "....Concatenating %s to %s"%(jobrcfile,joblogfile)   ; logging.debug(msg)
            f       = open(joblogfile,'a')
            dummy   = f.write(open(jobrcfile,'r').read())
            dummy   = f.close()
        msg = "The complete log file is now at: %s"%(joblogfile)   ; logging.info(msg)

    msg    = 'Process %s done, returning from python to master shell'%process ; logging.debug(msg)
    sys.exit(0)