Skip to content
Snippets Groups Projects
Commit 2168c110 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

a new model for the das.py to save data from substeps into a pickle file, and...

a new model for the das.py to save data from substeps into a pickle file, and resubmission of each substep as a separate job
parent e0d3c8a3
Branches
No related tags found
No related merge requests found
......@@ -20,14 +20,14 @@ class PlatForm(object):
def __init__(self):
self.Identifier = 'iPad' # the identifier gives the plaform name
self.Version = '1.0' # the platform version used
print self
def __str__(self):
msg1 = '%s object initialized'%self.Identifier ; logging.debug(msg1)
msg2 = '%s version: %s'%(self.Identifier,self.Version) ; logging.debug(msg2)
return msg1+'\n'+msg2
def __str__(self):
return None
def GetJobTemplate(self,joboptions={}):
""" Return the job template for a given computing system, and fill it with options from the dictionary provided as argument"""
......@@ -72,6 +72,7 @@ class PlatForm(object):
cmd = ['sh',jobfile]
jobid = subprocess.call(cmd)
msg = "A job was submitted (%s)"%cmd ; logging.debug(msg)
return jobid
......
......@@ -109,5 +109,25 @@ def ToDatetime(datestring,fmt=None):
hour,minute,second = map(int,time.split(':'))
return datetime.datetime(year,month,day,hour,minute,second)
def StoreData(pickleobject,filename):
""" pickle object into a specified file name """
import cPickle
f = open(filename,'wb')
dummy = cPickle.dump(pickleobject,f,-1)
f.close()
return None
def RestoreData(filename):
""" unpickle object into a specified file name """
import cPickle
f = open(filename,'rb')
pickleobject = cPickle.load(f)
f.close()
return pickleobject
if __name__ == "__main__":
pass
......@@ -20,6 +20,8 @@ needed_da_items=[
'forecast.model.rc',
'da.system']
validprocesses = ['start','done','samplestate','advance','invert']
import logging
import os
......@@ -45,6 +47,7 @@ class CycleControl(object):
self.LoadRc(args['rc'])
self.ValidateRC()
self.opts = opts
# Add some useful variables to the rc-file dictionary
......@@ -497,7 +500,7 @@ class CycleControl(object):
jobparams = {'jobname':'das.%s'%cd,'jobaccount':'co2','jobnodes':'nserial 1','jobtime':'00:30:00','jobshell':'/bin/sh'}
template = DaPlatForm.GetJobTemplate(jobparams)
template += 'cd %s\n'%os.getcwd()
template += './das.py','rc=%s' % self.da_settings['da.restart.fname']
template += './das.py','rc=%s process=start'% self.da_settings['da.restart.fname']
jobfile = DaPlatForm.WriteJob(self,template,'cycle.%s.das'%cd)
jobid = DaPlatForm.SubmitJob(jobfile)
......@@ -506,6 +509,25 @@ class CycleControl(object):
return None
def SubmitSubStep(self,stepname):
""" submit the next substep of a DA cycle"""
import subprocess
import os
from string import join
DaPlatForm = self.DaPlatForm
jobparams = {'jobname':'das.%s'%stepname}
template = DaPlatForm.GetJobTemplate(jobparams)
template += 'cd %s\n'%os.getcwd()
template += './das.py rc=%s process=%s %s' % (self.da_settings['jobrcfilename'],stepname,join(self.opts,''),)
jobfile = DaPlatForm.WriteJob(self,template,stepname)
jobid = DaPlatForm.SubmitJob(jobfile)
return None
def StartLogger(logfile='jb.%s.log'%os.getpid()):
""" start the logging of messages to screen and to file"""
......@@ -595,6 +617,14 @@ def ValidateOptsArgs(opts,args):
args['jobrcfilename'] = "jb.%s.rc"%(os.getpid(),)
args['logfile'] = "jb.%s.log"%(os.getpid(),)
if not args.has_key('process'):
msg = "There is no process specified on the command line, please use process=Start to start a new job" ; logging.error(msg)
raise IOError,msg
if args['process'].lower() not in validprocesses:
msg = "The specified process (%s) is not valid"%args['process'] ; logging.error(msg)
raise IOError,msg
return opts,args
......@@ -632,7 +662,6 @@ def CleanUpCycle(DaCycle):
msg = "The complete log file is now at: %s"%(joblogfile) ; logging.info(msg)
if __name__ == "__main__":
sys.path.append('../../')
......
......@@ -27,8 +27,8 @@ def JobStart(opts,args):
from da.tools.initexit import CycleControl
from da.baseclasses.control import DaSystem
from da.ct.tools import needed_rc_items
from da.platform.jet import JetPlatForm as PlatForm
#from da.baseclasses.jobcontrol import PlatForm
#from da.platform.jet import JetPlatForm as PlatForm
from da.baseclasses.jobcontrol import PlatForm
# First create a CycleControl object that handles all the details of the da cycle
......@@ -52,10 +52,6 @@ def JobStart(opts,args):
DaCycle.DaPlatForm = DaPlatForm
# Proceed with the initialization of this cycle
dummy = DaCycle.Initialize()
return DaCycle
......
......@@ -41,6 +41,7 @@ if __name__ == "__main__":
from da.tools.initexit import ParseOptions
from da.tools.initexit import StartLogger
from da.tools.pipeline import *
from da.tools.general import StoreData,RestoreData
# Start a logger for all that happens from here on
......@@ -54,28 +55,63 @@ if __name__ == "__main__":
opts,args = ValidateOptsArgs(opts,args)
process = args['process'].lower()
DaCycle = JobStart(opts,args)
# Start the subprocesses
msg = header+"starting JobStart"+footer ; logging.info(msg)
DaCycle = JobStart(opts,args)
if process == 'start':
msg = header+"starting JobStart"+footer ; logging.info(msg)
dummy = DaCycle.Initialize()
msg = header+"starting JobInput"+footer ; logging.info(msg)
StateVector = JobInput(DaCycle)
savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle')
StateVector = StoreData(StateVector,savefilename)
dummy = DaCycle.SubmitSubStep('samplestate')
if process == 'samplestate':
msg = header+"starting SampleState"+footer ; logging.info(msg)
savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle')
StateVector = RestoreData(savefilename)
dummy = SampleState(DaCycle,StateVector)
msg = header+"starting JobInput"+footer ; logging.info(msg)
StateVector = JobInput(DaCycle)
dummy = StoreData(StateVector,savefilename)
dummy = DaCycle.SubmitSubStep('invert')
msg = header+"starting SampleState"+footer ; logging.info(msg)
dummy = SampleState(DaCycle,StateVector)
if process == 'invert':
msg = header+"starting Invert"+footer ; logging.info(msg)
msg = header+"starting Invert"+footer ; logging.info(msg)
dummy = Invert(DaCycle, StateVector)
savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle')
StateVector = RestoreData(savefilename)
dummy = Invert(DaCycle, StateVector)
StateVector = StoreData(StateVector,savefilename)
dummy = DaCycle.SubmitSubStep('advance')
msg = header+"starting Advance"+footer ; logging.info(msg)
dummy = Advance(DaCycle, StateVector)
if process == 'advance':
msg = header+"starting Advance"+footer ; logging.info(msg)
msg = header+"starting SaveAndSubmit"+footer ; logging.info(msg)
dummy = SaveAndSubmit(DaCycle, StateVector)
savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle')
StateVector = RestoreData(savefilename)
dummy = Advance(DaCycle, StateVector)
StateVector = StoreData(StateVector,savefilename)
dummy = DaCycle.SubmitSubStep('done')
msg = "Cycle finished...exiting" ; logging.info(msg)
if process == 'done':
msg = header+"starting SaveAndSubmit"+footer ; logging.info(msg)
dummy = CleanUpCycle(DaCycle)
savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle')
StateVector = RestoreData(savefilename)
dummy = SaveAndSubmit(DaCycle, StateVector)
msg = "Cycle finished...exiting" ; logging.info(msg)
dummy = CleanUpCycle(DaCycle)
sys.exit(0)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment