From adf3fb2cc2dd519822d48ecc8bbe947ca86ada76 Mon Sep 17 00:00:00 2001 From: Wouter Peters <wouter.peters@wur.nl> Date: Tue, 7 Sep 2010 09:45:16 +0000 Subject: [PATCH] changed order of calls to allow an easy flow of consecutive jobs with propoer init/exit --- da/baseclasses/jobcontrol.py | 2 +- da/tools/initexit.py | 53 ++++++++++++------------------------ das.py | 30 ++++++++++++++++---- 3 files changed, 44 insertions(+), 41 deletions(-) diff --git a/da/baseclasses/jobcontrol.py b/da/baseclasses/jobcontrol.py index d2957fc5..7f9594d9 100755 --- a/da/baseclasses/jobcontrol.py +++ b/da/baseclasses/jobcontrol.py @@ -71,7 +71,7 @@ class PlatForm(object): """ This method submits a jobfile to the queue, and returns the queue ID """ cmd = ['sh',jobfile] - msg = "A new job will be submitted (%s)"%cmd ; logging.debug(msg) + msg = "A new job will be submitted (%s)\n\n\n"%cmd ; logging.debug(msg) jobid = subprocess.call(cmd) return jobid diff --git a/da/tools/initexit.py b/da/tools/initexit.py index c01ceccb..9d432b0a 100755 --- a/da/tools/initexit.py +++ b/da/tools/initexit.py @@ -51,7 +51,7 @@ class CycleControl(object): # Add some useful variables to the rc-file dictionary - self.da_settings['jobrcfilename'] = args["jobrcfilename"] + self.da_settings['jobrcfilename'] = self.RcFileName self.da_settings['log'] = args["logfile"] self.da_settings['dir.da_submit'] = os.getcwd() self.da_settings['da.crash.recover'] = '-r' in opts @@ -59,13 +59,6 @@ class CycleControl(object): self.currentprocess = args['process'] self.DaSystem = None # to be filled later - self.da_settings['dir.exec'] = os.path.join(self.da_settings['dir.da_run'],'exec') - self.da_settings['dir.input'] = os.path.join(self.da_settings['dir.da_run'],'input') - self.da_settings['dir.output'] = os.path.join(self.da_settings['dir.da_run'],'output') - self.da_settings['dir.diagnostics'] = os.path.join(self.da_settings['dir.da_run'],'diagnostics') - self.da_settings['dir.analysis'] = os.path.join(self.da_settings['dir.da_run'],'analysis') - self.da_settings['dir.jobs'] = os.path.join(self.da_settings['dir.da_run'],'jobs') - self.da_settings['dir.save'] = os.path.join(self.da_settings['dir.da_run'],'save') def __str__(self): """ @@ -109,6 +102,8 @@ class CycleControl(object): if 'date' in k : self.da_settings[k] = ToDatetime(v) if 'time.start' in k : self.da_settings[k] = ToDatetime(v) + if 'time.end' in k : + self.da_settings[k] = ToDatetime(v) if 'time.finish' in k : self.da_settings[k] = ToDatetime(v) @@ -154,7 +149,6 @@ class CycleControl(object): self.da_settings['time.end'] = enddate self.da_settings['time.finish'] = finaldate self.da_settings['cyclelength'] = cyclelength - self.da_settings['dir.output'] += startdate.strftime('/%Y%m%d') msg = "===============================================================" ; logging.info(msg) msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) @@ -239,6 +233,10 @@ class CycleControl(object): dummy = self.SetupFileStructure() + # expand jobrcfilename to include exec dir from now on. + + self.da_settings['jobrcfilename'] = os.path.join(self.da_settings['dir.exec'],self.da_settings['jobrcfilename']) + self.ParseTimes() self.WriteRC(self.da_settings['jobrcfilename']) @@ -273,8 +271,15 @@ class CycleControl(object): filtertime = self.da_settings['time.start'].strftime('%Y%m%d') - CreateDirs(self.da_settings['dir.da_run']) + self.da_settings['dir.exec'] = os.path.join(self.da_settings['dir.da_run'],'exec') + self.da_settings['dir.input'] = os.path.join(self.da_settings['dir.da_run'],'input') + self.da_settings['dir.output'] = os.path.join(self.da_settings['dir.da_run'],'output',filtertime) + self.da_settings['dir.diagnostics'] = os.path.join(self.da_settings['dir.da_run'],'diagnostics') + self.da_settings['dir.analysis'] = os.path.join(self.da_settings['dir.da_run'],'analysis') + self.da_settings['dir.jobs'] = os.path.join(self.da_settings['dir.da_run'],'jobs') + self.da_settings['dir.save'] = os.path.join(self.da_settings['dir.da_run'],'save') + CreateDirs(self.da_settings['dir.da_run']) CreateDirs(os.path.join(self.da_settings['dir.exec'])) CreateDirs(os.path.join(self.da_settings['dir.input'])) CreateDirs(os.path.join(self.da_settings['dir.output'])) @@ -481,16 +486,14 @@ class CycleControl(object): self.da_settings['da.restart.fname'] = fname dummy = rc.write(fname,self.da_settings) - msg = 'Wrote new da_runtime.rc file to working directory' ; logging.debug(msg) + msg = 'Wrote new da_runtime.rc (%s)'%fname ; logging.debug(msg) - def WriteRC(self,filename): + def WriteRC(self,fname): """ Write RC file after each process to reflect updated info """ - rcname = filename - fname = './'+rcname dummy = rc.write(fname,self.da_settings) - msg = 'Wrote expanded rc-file (%s) to current directory (%s)'%(rcname,self.da_settings['dir.exec']) ; logging.debug(msg) + msg = 'Wrote expanded rc-file (%s)'%(fname) ; logging.debug(msg) return None def SubmitNextCycle(self): @@ -624,7 +627,6 @@ def ValidateOptsArgs(opts,args): msg = "The specified rc-file (%s) does not exist " % args['rc'] ; logging.error(msg) raise IOError,msg - args['jobrcfilename'] = "jb.%s.rc"%(os.getpid(),) args['logfile'] = "jb.%s.log"%(os.getpid(),) if not args.has_key('process'): @@ -648,25 +650,6 @@ def CleanUpCycle(DaCycle): dummy = shutil.move(logfile,joblogfile) msg = "....Moved %s to %s"%(logfile,joblogfile) ; logging.debug(msg) - # move rc file to rundir/jobs - jobrcfile = os.path.join(jobdir,DaCycle.da_settings["jobrcfilename"] ) - dummy = shutil.move(DaCycle.da_settings["jobrcfilename"],jobrcfile ) - msg = "....Moved %s to %s"%(DaCycle.da_settings['jobrcfilename'],jobrcfile) ; logging.debug(msg) - - # WP TOO SPECIFIC NOW, MAKE A ROUTINE THAT CLEANS UP ANY MODEL OUTPUT FILE, NOT JUST TM5!!! - - # cat model output and rc-file to the job file output - tm5jobfile = os.path.join(jobdir,"tm5.%s"%(DaCycle.da_settings['log']) ) - if os.path.exists(tm5jobfile): - msg = "....Concatenating %s to %s"%(tm5jobfile,joblogfile) ; logging.debug(msg) - f = open(joblogfile,'a') - dummy = f.write(open(tm5jobfile,'r').read()) - dummy = f.close() - if os.path.exists(jobrcfile): - msg = "....Concatenating %s to %s"%(jobrcfile,joblogfile) ; logging.debug(msg) - f = open(joblogfile,'a') - dummy = f.write(open(jobrcfile,'r').read()) - dummy = f.close() msg = "The complete log file is now at: %s"%(joblogfile) ; logging.info(msg) diff --git a/das.py b/das.py index 3116b24a..f9a11063 100755 --- a/das.py +++ b/das.py @@ -42,25 +42,33 @@ if __name__ == "__main__": process = args['process'].lower() # Create the Cycle Control object for this job - msg = header+"Initializing Job"+footer ; logging.info(msg) DaCycle = JobStart(opts,args) - dummy = DaCycle.Initialize() - savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle') # Start the requested subprocesses if process == 'start': + msg = header+"Initializing Da Cycle"+footer ; logging.info(msg) + + dummy = DaCycle.Initialize() + msg = header+"starting JobInput"+footer ; logging.info(msg) StateVector = JobInput(DaCycle) - StateVector = StoreData(StateVector,savefilename) + savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle') + + dummy = StoreData(StateVector,savefilename) dummy = DaCycle.SubmitSubStep('samplestate') + sys.exit(0) + + + savefilename = os.path.join(DaCycle.da_settings['dir.save'],'statevector.pickle') + if process == 'samplestate': msg = header+"starting SampleState"+footer ; logging.info(msg) @@ -72,6 +80,10 @@ if __name__ == "__main__": dummy = DaCycle.SubmitSubStep('invert') + dummy = CleanUpCycle(DaCycle) + + sys.exit(0) + if process == 'invert': msg = header+"starting Invert"+footer ; logging.info(msg) @@ -83,6 +95,10 @@ if __name__ == "__main__": dummy = DaCycle.SubmitSubStep('advance') + dummy = CleanUpCycle(DaCycle) + + sys.exit(0) + if process == 'advance': msg = header+"starting Advance"+footer ; logging.info(msg) @@ -95,6 +111,10 @@ if __name__ == "__main__": dummy = DaCycle.SubmitSubStep('done') + dummy = CleanUpCycle(DaCycle) + + sys.exit(0) + if process == 'done': msg = header+"starting SaveAndSubmit"+footer ; logging.info(msg) @@ -106,4 +126,4 @@ if __name__ == "__main__": dummy = CleanUpCycle(DaCycle) - sys.exit(0) + sys.exit(0) -- GitLab