Commit adf3fb2c authored by Peters, Wouter's avatar Peters, Wouter
Browse files

changed order of calls to allow an easy flow of consecutive jobs with propoer init/exit

parent d1313c14
......@@ -71,7 +71,7 @@ class PlatForm(object):
""" This method submits a jobfile to the queue, and returns the queue ID """
cmd = ['sh',jobfile]
msg = "A new job will be submitted (%s)"%cmd ; logging.debug(msg)
msg = "A new job will be submitted (%s)\n\n\n"%cmd ; logging.debug(msg)
jobid = subprocess.call(cmd)
return jobid
......
......@@ -51,7 +51,7 @@ class CycleControl(object):
# Add some useful variables to the rc-file dictionary
self.da_settings['jobrcfilename'] = args["jobrcfilename"]
self.da_settings['jobrcfilename'] = self.RcFileName
self.da_settings['log'] = args["logfile"]
self.da_settings['dir.da_submit'] = os.getcwd()
self.da_settings['da.crash.recover'] = '-r' in opts
......@@ -59,13 +59,6 @@ class CycleControl(object):
self.currentprocess = args['process']
self.DaSystem = None # to be filled later
self.da_settings['dir.exec'] = os.path.join(self.da_settings['dir.da_run'],'exec')
self.da_settings['dir.input'] = os.path.join(self.da_settings['dir.da_run'],'input')
self.da_settings['dir.output'] = os.path.join(self.da_settings['dir.da_run'],'output')
self.da_settings['dir.diagnostics'] = os.path.join(self.da_settings['dir.da_run'],'diagnostics')
self.da_settings['dir.analysis'] = os.path.join(self.da_settings['dir.da_run'],'analysis')
self.da_settings['dir.jobs'] = os.path.join(self.da_settings['dir.da_run'],'jobs')
self.da_settings['dir.save'] = os.path.join(self.da_settings['dir.da_run'],'save')
def __str__(self):
"""
......@@ -109,6 +102,8 @@ class CycleControl(object):
if 'date' in k : self.da_settings[k] = ToDatetime(v)
if 'time.start' in k :
self.da_settings[k] = ToDatetime(v)
if 'time.end' in k :
self.da_settings[k] = ToDatetime(v)
if 'time.finish' in k :
self.da_settings[k] = ToDatetime(v)
......@@ -154,7 +149,6 @@ class CycleControl(object):
self.da_settings['time.end'] = enddate
self.da_settings['time.finish'] = finaldate
self.da_settings['cyclelength'] = cyclelength
self.da_settings['dir.output'] += startdate.strftime('/%Y%m%d')
msg = "===============================================================" ; logging.info(msg)
msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg)
......@@ -239,6 +233,10 @@ class CycleControl(object):
dummy = self.SetupFileStructure()
# expand jobrcfilename to include exec dir from now on.
self.da_settings['jobrcfilename'] = os.path.join(self.da_settings['dir.exec'],self.da_settings['jobrcfilename'])
self.ParseTimes()
self.WriteRC(self.da_settings['jobrcfilename'])
......@@ -273,8 +271,15 @@ class CycleControl(object):
filtertime = self.da_settings['time.start'].strftime('%Y%m%d')
CreateDirs(self.da_settings['dir.da_run'])
self.da_settings['dir.exec'] = os.path.join(self.da_settings['dir.da_run'],'exec')
self.da_settings['dir.input'] = os.path.join(self.da_settings['dir.da_run'],'input')
self.da_settings['dir.output'] = os.path.join(self.da_settings['dir.da_run'],'output',filtertime)
self.da_settings['dir.diagnostics'] = os.path.join(self.da_settings['dir.da_run'],'diagnostics')
self.da_settings['dir.analysis'] = os.path.join(self.da_settings['dir.da_run'],'analysis')
self.da_settings['dir.jobs'] = os.path.join(self.da_settings['dir.da_run'],'jobs')
self.da_settings['dir.save'] = os.path.join(self.da_settings['dir.da_run'],'save')
CreateDirs(self.da_settings['dir.da_run'])
CreateDirs(os.path.join(self.da_settings['dir.exec']))
CreateDirs(os.path.join(self.da_settings['dir.input']))
CreateDirs(os.path.join(self.da_settings['dir.output']))
......@@ -481,16 +486,14 @@ class CycleControl(object):
self.da_settings['da.restart.fname'] = fname
dummy = rc.write(fname,self.da_settings)
msg = 'Wrote new da_runtime.rc file to working directory' ; logging.debug(msg)
msg = 'Wrote new da_runtime.rc (%s)'%fname ; logging.debug(msg)
def WriteRC(self,filename):
def WriteRC(self,fname):
""" Write RC file after each process to reflect updated info """
rcname = filename
fname = './'+rcname
dummy = rc.write(fname,self.da_settings)
msg = 'Wrote expanded rc-file (%s) to current directory (%s)'%(rcname,self.da_settings['dir.exec']) ; logging.debug(msg)
msg = 'Wrote expanded rc-file (%s)'%(fname) ; logging.debug(msg)
return None
def SubmitNextCycle(self):
......@@ -624,7 +627,6 @@ def ValidateOptsArgs(opts,args):
msg = "The specified rc-file (%s) does not exist " % args['rc'] ; logging.error(msg)
raise IOError,msg
args['jobrcfilename'] = "jb.%s.rc"%(os.getpid(),)
args['logfile'] = "jb.%s.log"%(os.getpid(),)
if not args.has_key('process'):
......@@ -648,25 +650,6 @@ def CleanUpCycle(DaCycle):
dummy = shutil.move(logfile,joblogfile)
msg = "....Moved %s to %s"%(logfile,joblogfile) ; logging.debug(msg)
# move rc file to rundir/jobs
jobrcfile = os.path.join(jobdir,DaCycle.da_settings["jobrcfilename"] )
dummy = shutil.move(DaCycle.da_settings["jobrcfilename"],jobrcfile )
msg = "....Moved %s to %s"%(DaCycle.da_settings['jobrcfilename'],jobrcfile) ; logging.debug(msg)
# WP TOO SPECIFIC NOW, MAKE A ROUTINE THAT CLEANS UP ANY MODEL OUTPUT FILE, NOT JUST TM5!!!
# cat model output and rc-file to the job file output
tm5jobfile = os.path.join(jobdir,"tm5.%s"%(DaCycle.da_settings['log']) )
if os.path.exists(tm5jobfile):
msg = "....Concatenating %s to %s"%(tm5jobfile,joblogfile) ; logging.debug(msg)
f = open(joblogfile,'a')
dummy = f.write(open(tm5jobfile,'r').read())
dummy = f.close()
if os.path.exists(jobrcfile):
msg = "....Concatenating %s to %s"%(jobrcfile,joblogfile) ; logging.debug(msg)
f = open(joblogfile,'a')
dummy = f.write(open(jobrcfile,'r').read())
dummy = f.close()
msg = "The complete log file is now at: %s"%(joblogfile) ; logging.info(msg)
......
......@@ -42,25 +42,33 @@ if __name__ == "__main__":
process = args['process'].lower()
# Create the Cycle Control object for this job
msg = header+"Initializing Job"+footer ; logging.info(msg)
DaCycle = JobStart(opts,args)
dummy = DaCycle.Initialize()
savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle')
# Start the requested subprocesses
if process == 'start':
msg = header+"Initializing Da Cycle"+footer ; logging.info(msg)
dummy = DaCycle.Initialize()
msg = header+"starting JobInput"+footer ; logging.info(msg)
StateVector = JobInput(DaCycle)
StateVector = StoreData(StateVector,savefilename)
savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle')
dummy = StoreData(StateVector,savefilename)
dummy = DaCycle.SubmitSubStep('samplestate')
sys.exit(0)
savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle')
if process == 'samplestate':
msg = header+"starting SampleState"+footer ; logging.info(msg)
......@@ -72,6 +80,10 @@ if __name__ == "__main__":
dummy = DaCycle.SubmitSubStep('invert')
dummy = CleanUpCycle(DaCycle)
sys.exit(0)
if process == 'invert':
msg = header+"starting Invert"+footer ; logging.info(msg)
......@@ -83,6 +95,10 @@ if __name__ == "__main__":
dummy = DaCycle.SubmitSubStep('advance')
dummy = CleanUpCycle(DaCycle)
sys.exit(0)
if process == 'advance':
msg = header+"starting Advance"+footer ; logging.info(msg)
......@@ -95,6 +111,10 @@ if __name__ == "__main__":
dummy = DaCycle.SubmitSubStep('done')
dummy = CleanUpCycle(DaCycle)
sys.exit(0)
if process == 'done':
msg = header+"starting SaveAndSubmit"+footer ; logging.info(msg)
......@@ -106,4 +126,4 @@ if __name__ == "__main__":
dummy = CleanUpCycle(DaCycle)
sys.exit(0)
sys.exit(0)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment