Skip to content
Snippets Groups Projects
Commit c22d295d authored by Peters, Wouter's avatar Peters, Wouter
Browse files

extra logic to run multiple cycles in one job

parent 34ae25b6
No related branches found
No related tags found
No related merge requests found
......@@ -73,6 +73,9 @@ needed_da_items = [
'time.nlag',
'time.cycle',
'dir.da_run',
'da.resources.ncycles_per_job',
'da.resources.ntasks',
'da.resources.ntime',
'da.system',
'da.system.rc',
'da.obsoperator',
......@@ -299,6 +302,7 @@ class CycleControl(dict):
strippedname = os.path.split(self['jobrcfilename'])[-1]
self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname)
self.read_random_seed(False)
self['da.resources.ncycle_in_job'] = self['da.resources.ncycles_per_job'] # force submission of next job after this
elif self['time.restart']:
logging.info("Restarting filter from previous step")
......@@ -306,10 +310,15 @@ class CycleControl(dict):
strippedname = os.path.split(self['jobrcfilename'])[-1]
self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname)
self.read_random_seed(False)
self['da.resources.ncycle_in_job'] = int(self['da.resources.ncycle_in_job'])+1
if int(self['da.resources.ncycle_in_job']) > int(self['da.resources.ncycles_per_job']):
self['da.resources.ncycle_in_job'] = 1 # reset counter if we just submitted this job
else: #assume that it is a fresh start, change this condition to more specific if crash recover added
logging.info("First time step in filter sequence")
self.setup_file_structure()
self['da.resources.ncycle_in_job'] = self['da.resources.ncycles_per_job'] # force submission of next job after this
# expand jobrcfilename to include exec dir from now on.
# First strip current leading path from filename
......@@ -482,7 +491,7 @@ class CycleControl(dict):
fname = os.path.join(self['dir.restart'], 'da_runtime_%s.rc' % new_dacycle['time.start'].strftime('%Y%m%d'))#advanced time
rc.write(fname, new_dacycle)
logging.debug('Wrote new da_runtime.rc (%s) to exec dir' % fname)
logging.debug('Wrote new da_runtime.rc (%s) to restart dir' % fname)
# The rest is info needed for a system restart, so it modifies the current dacycle object (self)
......@@ -523,11 +532,25 @@ class CycleControl(dict):
execcommand = os.path.join(self['dir.da_submit'], sys.argv[0])
if '-t' in self.opts:
(self.opts).remove('-t')
template += 'python %s rc=%s %s >&%s' % (execcommand, self['da.restart.fname'], join(self.opts, ''), logfile)
ncycles = int(self['da.resources.ncycles_per_job'])
for cycle in range(ncycles):
nextjobid = '%s'% ( (self['time.end']+cycle*self['cyclelength']).strftime('%Y%m%d'),)
nextrestartfilename = self['da.restart.fname'].replace(jobid,nextjobid)
nextlogfilename = logfile.replace(jobid,nextjobid)
template += 'python %s rc=%s %s >&%s\n' % (execcommand, nextrestartfilename, join(self.opts, ''), nextlogfilename)
# write and submit
self.daplatform.write_job(jobfile, template, jobid)
jobid = self.daplatform.submit_job(jobfile, joblog=logfile)
if 'da.resources.ncycles_per_job' in self:
do_submit = (int(self['da.resources.ncycle_in_job']) == int(self['da.resources.ncycles_per_job']))
else:
dosubmit = True
if do_submit:
jobid = self.daplatform.submit_job(jobfile, joblog=logfile)
else:
logging.info('Final date reached, no new cycle started')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment