From 2168c1109730a62e3c60f33fe1c98b7b9696a3b5 Mon Sep 17 00:00:00 2001 From: Wouter Peters <wouter.peters@wur.nl> Date: Mon, 6 Sep 2010 15:24:56 +0000 Subject: [PATCH] a new model for the das.py to save data from substeps into a pickle file, and resubmission of each substep as a separate job --- da/baseclasses/jobcontrol.py | 9 ++--- da/tools/general.py | 20 +++++++++++ da/tools/initexit.py | 33 +++++++++++++++++-- da/tools/pipeline.py | 8 ++--- das.py | 64 ++++++++++++++++++++++++++++-------- 5 files changed, 108 insertions(+), 26 deletions(-) diff --git a/da/baseclasses/jobcontrol.py b/da/baseclasses/jobcontrol.py index 723290e6..cf93548c 100755 --- a/da/baseclasses/jobcontrol.py +++ b/da/baseclasses/jobcontrol.py @@ -20,14 +20,14 @@ class PlatForm(object): def __init__(self): self.Identifier = 'iPad' # the identifier gives the plaform name self.Version = '1.0' # the platform version used - print self - - def __str__(self): msg1 = '%s object initialized'%self.Identifier ; logging.debug(msg1) msg2 = '%s version: %s'%(self.Identifier,self.Version) ; logging.debug(msg2) - return msg1+'\n'+msg2 + + def __str__(self): + + return None def GetJobTemplate(self,joboptions={}): """ Return the job template for a given computing system, and fill it with options from the dictionary provided as argument""" @@ -72,6 +72,7 @@ class PlatForm(object): cmd = ['sh',jobfile] jobid = subprocess.call(cmd) + msg = "A job was submitted (%s)"%cmd ; logging.debug(msg) return jobid diff --git a/da/tools/general.py b/da/tools/general.py index a0eb1cd1..93b06a09 100755 --- a/da/tools/general.py +++ b/da/tools/general.py @@ -109,5 +109,25 @@ def ToDatetime(datestring,fmt=None): hour,minute,second = map(int,time.split(':')) return datetime.datetime(year,month,day,hour,minute,second) +def StoreData(pickleobject,filename): + """ pickle object into a specified file name """ + import cPickle + + f = open(filename,'wb') + dummy = cPickle.dump(pickleobject,f,-1) + f.close() + + return None + +def RestoreData(filename): + """ unpickle object into a specified file name """ + import cPickle + + f = open(filename,'rb') + pickleobject = cPickle.load(f) + f.close() + + return pickleobject + if __name__ == "__main__": pass diff --git a/da/tools/initexit.py b/da/tools/initexit.py index 8ea0e7f3..e1278ffa 100755 --- a/da/tools/initexit.py +++ b/da/tools/initexit.py @@ -20,6 +20,8 @@ needed_da_items=[ 'forecast.model.rc', 'da.system'] +validprocesses = ['start','done','samplestate','advance','invert'] + import logging import os @@ -45,6 +47,7 @@ class CycleControl(object): self.LoadRc(args['rc']) self.ValidateRC() + self.opts = opts # Add some useful variables to the rc-file dictionary @@ -497,7 +500,7 @@ class CycleControl(object): jobparams = {'jobname':'das.%s'%cd,'jobaccount':'co2','jobnodes':'nserial 1','jobtime':'00:30:00','jobshell':'/bin/sh'} template = DaPlatForm.GetJobTemplate(jobparams) template += 'cd %s\n'%os.getcwd() - template += './das.py','rc=%s' % self.da_settings['da.restart.fname'] + template += './das.py','rc=%s process=start'% self.da_settings['da.restart.fname'] jobfile = DaPlatForm.WriteJob(self,template,'cycle.%s.das'%cd) jobid = DaPlatForm.SubmitJob(jobfile) @@ -506,6 +509,25 @@ class CycleControl(object): return None + def SubmitSubStep(self,stepname): + """ submit the next substep of a DA cycle""" + import subprocess + import os + from string import join + + + DaPlatForm = self.DaPlatForm + + jobparams = {'jobname':'das.%s'%stepname} + template = DaPlatForm.GetJobTemplate(jobparams) + template += 'cd %s\n'%os.getcwd() + template += './das.py rc=%s process=%s %s' % (self.da_settings['jobrcfilename'],stepname,join(self.opts,''),) + jobfile = DaPlatForm.WriteJob(self,template,stepname) + jobid = DaPlatForm.SubmitJob(jobfile) + + return None + + def StartLogger(logfile='jb.%s.log'%os.getpid()): """ start the logging of messages to screen and to file""" @@ -595,6 +617,14 @@ def ValidateOptsArgs(opts,args): args['jobrcfilename'] = "jb.%s.rc"%(os.getpid(),) args['logfile'] = "jb.%s.log"%(os.getpid(),) + if not args.has_key('process'): + msg = "There is no process specified on the command line, please use process=Start to start a new job" ; logging.error(msg) + raise IOError,msg + + if args['process'].lower() not in validprocesses: + msg = "The specified process (%s) is not valid"%args['process'] ; logging.error(msg) + raise IOError,msg + return opts,args @@ -632,7 +662,6 @@ def CleanUpCycle(DaCycle): msg = "The complete log file is now at: %s"%(joblogfile) ; logging.info(msg) - if __name__ == "__main__": sys.path.append('../../') diff --git a/da/tools/pipeline.py b/da/tools/pipeline.py index 800ef9ea..1c3108ce 100755 --- a/da/tools/pipeline.py +++ b/da/tools/pipeline.py @@ -27,8 +27,8 @@ def JobStart(opts,args): from da.tools.initexit import CycleControl from da.baseclasses.control import DaSystem from da.ct.tools import needed_rc_items - from da.platform.jet import JetPlatForm as PlatForm - #from da.baseclasses.jobcontrol import PlatForm + #from da.platform.jet import JetPlatForm as PlatForm + from da.baseclasses.jobcontrol import PlatForm # First create a CycleControl object that handles all the details of the da cycle @@ -52,10 +52,6 @@ def JobStart(opts,args): DaCycle.DaPlatForm = DaPlatForm - # Proceed with the initialization of this cycle - - dummy = DaCycle.Initialize() - return DaCycle diff --git a/das.py b/das.py index b817ce3a..acc5ff08 100755 --- a/das.py +++ b/das.py @@ -41,6 +41,7 @@ if __name__ == "__main__": from da.tools.initexit import ParseOptions from da.tools.initexit import StartLogger from da.tools.pipeline import * + from da.tools.general import StoreData,RestoreData # Start a logger for all that happens from here on @@ -54,28 +55,63 @@ if __name__ == "__main__": opts,args = ValidateOptsArgs(opts,args) + process = args['process'].lower() + + DaCycle = JobStart(opts,args) + + # Start the subprocesses - msg = header+"starting JobStart"+footer ; logging.info(msg) - DaCycle = JobStart(opts,args) + if process == 'start': + msg = header+"starting JobStart"+footer ; logging.info(msg) + + dummy = DaCycle.Initialize() + + msg = header+"starting JobInput"+footer ; logging.info(msg) + + StateVector = JobInput(DaCycle) + + savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle') + StateVector = StoreData(StateVector,savefilename) + + dummy = DaCycle.SubmitSubStep('samplestate') + + if process == 'samplestate': + msg = header+"starting SampleState"+footer ; logging.info(msg) + + savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle') + StateVector = RestoreData(savefilename) + + dummy = SampleState(DaCycle,StateVector) - msg = header+"starting JobInput"+footer ; logging.info(msg) - StateVector = JobInput(DaCycle) + dummy = StoreData(StateVector,savefilename) + dummy = DaCycle.SubmitSubStep('invert') - msg = header+"starting SampleState"+footer ; logging.info(msg) - dummy = SampleState(DaCycle,StateVector) + if process == 'invert': + msg = header+"starting Invert"+footer ; logging.info(msg) - msg = header+"starting Invert"+footer ; logging.info(msg) - dummy = Invert(DaCycle, StateVector) + savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle') + StateVector = RestoreData(savefilename) + dummy = Invert(DaCycle, StateVector) + StateVector = StoreData(StateVector,savefilename) + dummy = DaCycle.SubmitSubStep('advance') - msg = header+"starting Advance"+footer ; logging.info(msg) - dummy = Advance(DaCycle, StateVector) + if process == 'advance': + msg = header+"starting Advance"+footer ; logging.info(msg) - msg = header+"starting SaveAndSubmit"+footer ; logging.info(msg) - dummy = SaveAndSubmit(DaCycle, StateVector) + savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle') + StateVector = RestoreData(savefilename) + dummy = Advance(DaCycle, StateVector) + StateVector = StoreData(StateVector,savefilename) + dummy = DaCycle.SubmitSubStep('done') - msg = "Cycle finished...exiting" ; logging.info(msg) + if process == 'done': + msg = header+"starting SaveAndSubmit"+footer ; logging.info(msg) - dummy = CleanUpCycle(DaCycle) + savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle') + StateVector = RestoreData(savefilename) + dummy = SaveAndSubmit(DaCycle, StateVector) + msg = "Cycle finished...exiting" ; logging.info(msg) + dummy = CleanUpCycle(DaCycle) sys.exit(0) -- GitLab