From d10e7bd1e5cb1fc1bb8716a1af905f2d8becfcd0 Mon Sep 17 00:00:00 2001 From: Wouter Peters <wouter.peters@wur.nl> Date: Tue, 11 Jun 2013 10:25:18 +0000 Subject: [PATCH] added extra logic to deal with TM5 release/trunk differences, and pycasso TM5 --- gridded/da/tm5/observationoperator.py | 77 ++++++++++++++++++--------- 1 file changed, 52 insertions(+), 25 deletions(-) diff --git a/gridded/da/tm5/observationoperator.py b/gridded/da/tm5/observationoperator.py index 9616ea6..811ff74 100755 --- a/gridded/da/tm5/observationoperator.py +++ b/gridded/da/tm5/observationoperator.py @@ -93,7 +93,7 @@ class TM5ObservationOperator(ObservationOperator): # Create the TM5 run directory to hold a copy of the modified rc-file - tm5compiledir = self.tm_settings['my.run.dir'] + tm5compiledir = self.tm_settings[self.rundirkey] create_dirs(tm5compiledir) rcfilename = os.path.join(tm5compiledir, 'tm5_setup_init.rc') @@ -165,8 +165,8 @@ class TM5ObservationOperator(ObservationOperator): new_items = { 'submit.options': self.dacycle.daplatform.give_blocking_flag(), - 'timerange.start': self.dacycle['time.sample.start'], - 'timerange.end': self.dacycle['time.sample.end'], + self.timestartkey: self.dacycle['time.sample.start'], + self.timefinalkey: self.dacycle['time.sample.end'], 'jobstep.timerange.start': self.dacycle['time.sample.start'], 'jobstep.timerange.end': self.dacycle['time.sample.end'], 'jobstep.length': 'inf', @@ -177,14 +177,14 @@ class TM5ObservationOperator(ObservationOperator): } if self.dacycle['time.restart']: # If this is a restart from a previous cycle, the TM5 model should do a restart - new_items['istart'] = self.restartvalue + new_items[self.istartkey] = self.restartvalue logging.debug('Resetting TM5 to perform restart') else: - new_items['istart'] = self.coldstartvalue # if not, start TM5 'cold' + new_items[self.istartkey] = self.coldstartvalue # if not, start TM5 'cold' logging.debug('Resetting TM5 to perform cold start') if self.dacycle['time.sample.window'] != 0: # If this is a restart from a previous time step within the filter lag, the TM5 model should do a restart - new_items['istart'] = self.restartvalue + new_items[self.istartkey] = self.restartvalue logging.debug('Resetting TM5 to perform restart') # If neither one is true, simply take the istart value from the tm5.rc file that was read @@ -202,6 +202,10 @@ class TM5ObservationOperator(ObservationOperator): self.tm_settings = self.rcfile.values self.rc_filename = name + if 'my.source.dirs' in self.tm_settings.keys(): + self.rcfiletype = 'pycasso' + else: + self.rcfiletype = 'pre-pycasso' logging.debug('TM5 rc-file loaded successfully') def validate_rc(self): @@ -209,18 +213,39 @@ class TM5ObservationOperator(ObservationOperator): Validate the contents of the tm_settings dictionary and add extra values. The required items for the TM5 rc-file are specified in the tm5_tools module, as dictionary variable "needed_rc_items". """ - self.restartvalue = 33 - self.coldstartvalue = 9 + + if self.rcfiletype == 'pycasso': + self.projectkey = 'my.project.dir' + self.rundirkey = 'my.run.dir' + self.outputdirkey = 'output.dir' + self.savedirkey = 'restart.write.dir' + self.timestartkey = 'timerange.start' + self.timefinalkey = 'timerange.end' + self.timelengthkey = 'jobstep.length' + self.istartkey = 'istart' + self.restartvalue = 33 + self.coldstartvalue = 9 + else: + self.projectkey = 'runid' + self.rundirkey = 'rundir' + self.outputdirkey = 'outputdir' + self.savedirkey = 'savedir' + self.timestartkey = 'time.start' + self.timefinalkey = 'time.final' + self.timelengthkey = 'time.break.nday' + self.istartkey = 'istart' + self.restartvalue = 3 + self.coldstartvalue = 9 needed_rc_items = [ - 'my.project.dir', - 'my.run.dir', - 'output.dir', - 'restart.write.dir', - 'timerange.start', - 'timerange.end', - 'jobstep.length', - 'istart' + self.projectkey, + self.rundirkey, + self.outputdirkey, + self.savedirkey, + self.timestartkey, + self.timefinalkey, + self.timelengthkey, + self.istartkey ] for k, v in self.tm_settings.iteritems(): @@ -313,10 +338,12 @@ class TM5ObservationOperator(ObservationOperator): # Next, make sure there is an actual model version compiled and ready to execute - targetdir = os.path.join(self.tm_settings['my.run.dir']) + targetdir = os.path.join(self.tm_settings[self.rundirkey]) - - self.tm5_exec = os.path.join(targetdir, self.tm_settings['my.basename'] + '.x') + if self.rcfiletype == 'pycasso': + self.tm5_exec = os.path.join(targetdir, self.tm_settings['my.basename'] + '.x') + else: + self.tm5_exec = os.path.join(targetdir, 'tm5.x') if not os.path.exists(self.tm5_exec): logging.error("Required TM5 executable was not found %s" % self.tm5_exec) @@ -340,7 +367,7 @@ class TM5ObservationOperator(ObservationOperator): # First get the restart data for TM5 from the current restart dir of the filter sourcedir = self.dacycle['dir.restart'] - targetdir = self.tm_settings['restart.write.dir'] + targetdir = self.tm_settings[self.savedirkey] self.outputdir = self.tm_settings[self.outputdirkey] # Needed further downstream to collect output data from TM5 filterlist = '%s' % self.tm_settings['timerange.start'] @@ -416,7 +443,7 @@ class TM5ObservationOperator(ObservationOperator): """ Method handles the case where a shell runs an MPI process that forks into N TM5 model instances """ da_platform = self.dacycle.daplatform - targetdir = os.path.join(self.tm_settings['my.run.dir']) + targetdir = os.path.join(self.tm_settings[self.rundirkey]) if not os.path.exists(os.path.join(mpi_shell_location, mpi_shell_filename)): logging.error("Cannot find the mpi_shell wrapper needed for completion (%s) in (%s)" % (mpi_shell_filename, mpi_shell_location)) @@ -471,7 +498,7 @@ class TM5ObservationOperator(ObservationOperator): """ Method handles the case where one TM5 model instance with N tracers does the sampling of all ensemble members""" - tm5submitdir = os.path.join(self.tm_settings['my.run.dir']) + tm5submitdir = os.path.join(self.tm_settings[self.rundirkey]) logging.info('tm5submitdir', tm5submitdir) # Go to executable directory and start the subprocess, using a new logfile @@ -516,8 +543,8 @@ class TM5ObservationOperator(ObservationOperator): Note 2: also adding the weekly mean flux output to the output_filelist for later collection """ - sourcedir = os.path.join(self.tm_settings['restart.write.dir']) - filterlist = ['%s' % self.tm_settings['timerange.end'].strftime('%Y%m%d')] + sourcedir = os.path.join(self.tm_settings[self.savedirkey]) + filterlist = ['%s' % self.tm_settings[self.timefinalkey].strftime('%Y%m%d')] logging.debug("Creating a new list of TM5 restart data") logging.debug(" from directory: %s " % sourcedir) @@ -549,7 +576,7 @@ class TM5ObservationOperator(ObservationOperator): self.restart_filelist.append(fil) logging.debug(" [added to restart list] .... %s " % fil) - sourcedir = os.path.join(self.tm_settings['output.dir']) + sourcedir = os.path.join(self.tm_settings[self.outputdirkey]) sd_ed = self.dacycle['time.sample.stamp'] filterlist = ['flask_output.%s' % sd_ed, 'flux1x1_%s' % sd_ed] -- GitLab