Commit d1313c14 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

jobs are now running through one cycle under sh, to be tested under qsub

parent 2168c110
......@@ -2,7 +2,7 @@
time.restart : False
time.start : 2000-01-01 00:00:00
time.finish : 2000-01-08 00:00:00
time.finish : 2000-01-15 00:00:00
time.cycle : 7
time.nlag : 2
dir.da_run : ${HOME}/tmp/test_da
......
......@@ -71,8 +71,8 @@ class PlatForm(object):
""" This method submits a jobfile to the queue, and returns the queue ID """
cmd = ['sh',jobfile]
msg = "A new job will be submitted (%s)"%cmd ; logging.debug(msg)
jobid = subprocess.call(cmd)
msg = "A job was submitted (%s)"%cmd ; logging.debug(msg)
return jobid
......
......@@ -354,15 +354,15 @@ class TM5ObservationOperator(ObservationOperator):
template += 'cd %s\n'%targetdir
template += 'mpirun -np %d %s ./tm5.x'%(int(nprocesses),mpi_shell_filename,)
jobfile = DaPlatForm.WriteJob(DaCycle,template,'sample_tm5')
jobfile = DaPlatForm.WriteJob(DaCycle,template,'tm5')
jobid = DaPlatForm.SubmitJob(jobfile)
status = DaPlatForm.StatJob(jobid)
jobid = DaPlatForm.KillJob(jobid)
#jobid = DaPlatForm.SubmitJob(jobfile)
#status = DaPlatForm.StatJob(jobid)
#jobid = DaPlatForm.KillJob(jobid)
if not os.path.exists(okfile):
code = -1
code=0
return code
......
......@@ -56,8 +56,17 @@ class CycleControl(object):
self.da_settings['dir.da_submit'] = os.getcwd()
self.da_settings['da.crash.recover'] = '-r' in opts
self.da_settings['verbose'] = '-v' in opts
self.currentprocess = args['process']
self.DaSystem = None # to be filled later
self.da_settings['dir.exec'] = os.path.join(self.da_settings['dir.da_run'],'exec')
self.da_settings['dir.input'] = os.path.join(self.da_settings['dir.da_run'],'input')
self.da_settings['dir.output'] = os.path.join(self.da_settings['dir.da_run'],'output')
self.da_settings['dir.diagnostics'] = os.path.join(self.da_settings['dir.da_run'],'diagnostics')
self.da_settings['dir.analysis'] = os.path.join(self.da_settings['dir.da_run'],'analysis')
self.da_settings['dir.jobs'] = os.path.join(self.da_settings['dir.da_run'],'jobs')
self.da_settings['dir.save'] = os.path.join(self.da_settings['dir.da_run'],'save')
def __str__(self):
"""
String representation of a CycleControl object
......@@ -143,8 +152,9 @@ class CycleControl(object):
self.da_settings['time.start'] = startdate
self.da_settings['time.end'] = enddate
self.da_settings['time.finish'] = finaldate
self.da_settings['cyclelength'] = cyclelength
self.da_settings['time.finish'] = finaldate
self.da_settings['cyclelength'] = cyclelength
self.da_settings['dir.output'] += startdate.strftime('/%Y%m%d')
msg = "===============================================================" ; logging.info(msg)
msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg)
......@@ -191,6 +201,14 @@ class CycleControl(object):
(c) fresh start : set up the required file structure for this simulation
"""
#
# case 0: This is a substep of a DA cycle (sample,invert,or advance)
#
if self.currentprocess in ['sample','invert','advance']:
msg = "Continuation of ongoing cycle with process %s" % self.currentprocess ; logging.info(msg)
#
# case 1: A recover from a previous crash, this is signaled by flag "-r"
#
......@@ -257,14 +275,6 @@ class CycleControl(object):
CreateDirs(self.da_settings['dir.da_run'])
self.da_settings['dir.exec'] = os.path.join(self.da_settings['dir.da_run'],'exec')
self.da_settings['dir.input'] = os.path.join(self.da_settings['dir.da_run'],'input')
self.da_settings['dir.output'] = os.path.join(self.da_settings['dir.da_run'],'output','%s'%filtertime)
self.da_settings['dir.diagnostics'] = os.path.join(self.da_settings['dir.da_run'],'diagnostics')
self.da_settings['dir.analysis'] = os.path.join(self.da_settings['dir.da_run'],'analysis')
self.da_settings['dir.jobs'] = os.path.join(self.da_settings['dir.da_run'],'jobs')
self.da_settings['dir.save'] = os.path.join(self.da_settings['dir.da_run'],'save')
CreateDirs(os.path.join(self.da_settings['dir.exec']))
CreateDirs(os.path.join(self.da_settings['dir.input']))
CreateDirs(os.path.join(self.da_settings['dir.output']))
......@@ -500,7 +510,7 @@ class CycleControl(object):
jobparams = {'jobname':'das.%s'%cd,'jobaccount':'co2','jobnodes':'nserial 1','jobtime':'00:30:00','jobshell':'/bin/sh'}
template = DaPlatForm.GetJobTemplate(jobparams)
template += 'cd %s\n'%os.getcwd()
template += './das.py','rc=%s process=start'% self.da_settings['da.restart.fname']
template += './das.py rc=%s process=start'% self.da_settings['da.restart.fname']
jobfile = DaPlatForm.WriteJob(self,template,'cycle.%s.das'%cd)
jobid = DaPlatForm.SubmitJob(jobfile)
......@@ -618,14 +628,12 @@ def ValidateOptsArgs(opts,args):
args['logfile'] = "jb.%s.log"%(os.getpid(),)
if not args.has_key('process'):
msg = "There is no process specified on the command line, please use process=Start to start a new job" ; logging.error(msg)
raise IOError,msg
msg = "There is no process specified on the command line, assuming process=Start" ; logging.info(msg)
args['process'] = 'start'
if args['process'].lower() not in validprocesses:
msg = "The specified process (%s) is not valid"%args['process'] ; logging.error(msg)
raise IOError,msg
return opts,args
def CleanUpCycle(DaCycle):
......
......@@ -8,22 +8,6 @@ Revision History:
File created on 29 Sep 2009.
"""
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~
# qsub (ijet)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~
#$ -N das.py
#$ -A co2
#$ -cwd
#$ -pe hcomp 2
#$ -r y
#$ -l h_rt=03:00:00
#$ -S /bin/sh
#$ -j y
#$ -v PYTHONPATH
#$ -v NETCDF
#$ -v LD_LIBRARY_PATH
if __name__ == "__main__":
import sys
import os
......@@ -42,9 +26,9 @@ if __name__ == "__main__":
from da.tools.initexit import StartLogger
from da.tools.pipeline import *
from da.tools.general import StoreData,RestoreData
# Start a logger for all that happens from here on
dummy = StartLogger()
# Parse options from the command line
......@@ -57,61 +41,69 @@ if __name__ == "__main__":
process = args['process'].lower()
DaCycle = JobStart(opts,args)
# Create the Cycle Control object for this job
msg = header+"Initializing Job"+footer ; logging.info(msg)
DaCycle = JobStart(opts,args)
dummy = DaCycle.Initialize()
# Start the subprocesses
savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle')
if process == 'start':
msg = header+"starting JobStart"+footer ; logging.info(msg)
# Start the requested subprocesses
dummy = DaCycle.Initialize()
if process == 'start':
msg = header+"starting JobInput"+footer ; logging.info(msg)
msg = header+"starting JobInput"+footer ; logging.info(msg)
StateVector = JobInput(DaCycle)
StateVector = JobInput(DaCycle)
savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle')
StateVector = StoreData(StateVector,savefilename)
dummy = DaCycle.SubmitSubStep('samplestate')
dummy = DaCycle.SubmitSubStep('samplestate')
if process == 'samplestate':
msg = header+"starting SampleState"+footer ; logging.info(msg)
savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle')
StateVector = RestoreData(savefilename)
dummy = SampleState(DaCycle,StateVector)
dummy = StoreData(StateVector,savefilename)
dummy = DaCycle.SubmitSubStep('invert')
if process == 'invert':
msg = header+"starting Invert"+footer ; logging.info(msg)
savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle')
StateVector = RestoreData(savefilename)
dummy = Invert(DaCycle, StateVector)
StateVector = StoreData(StateVector,savefilename)
dummy = DaCycle.SubmitSubStep('advance')
if process == 'advance':
msg = header+"starting Advance"+footer ; logging.info(msg)
savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle')
StateVector = RestoreData(savefilename)
dummy = Advance(DaCycle, StateVector)
StateVector = StoreData(StateVector,savefilename)
dummy = DaCycle.SubmitSubStep('done')
if process == 'done':
msg = header+"starting SaveAndSubmit"+footer ; logging.info(msg)
msg = header+"starting SaveAndSubmit"+footer ; logging.info(msg)
savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle')
StateVector = RestoreData(savefilename)
dummy = SaveAndSubmit(DaCycle, StateVector)
msg = "Cycle finished...exiting" ; logging.info(msg)
dummy = CleanUpCycle(DaCycle)
sys.exit(0)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment