diff --git a/da/tm5/model.py b/da/tm5/model.py index d6fe2e82814e8bbbd2de80674d7caf660dd7af0f..f6d9fd0be5d4af6dcd30a5b1fea37278f86aeed1 100755 --- a/da/tm5/model.py +++ b/da/tm5/model.py @@ -40,7 +40,9 @@ needed_rc_items = [ ################### Begin Class TM5 ################### -class TM5(object): +from da.baseclasses.model import ObservationOperator + +class TM5(ObservationOperator): """ This class holds methods and variables that are needed to run the TM5 model. It is initiated with as only argument a TM5 rc-file location. This rc-file will be used to figure out the settings for the run. @@ -60,27 +62,65 @@ class TM5(object): """ - def __init__(self, RcFileName='dummy.rc'): - """ - An instance of the TM5 class requires information on the rc-file location only. - """ - - self.Identifier = identifier # the identifier gives the model name - self.Version = version # the TM5 model version used + def Initialize(self, DaCycle): + """ + Prepare a forward model TM5 run, this consists of: + - reading the TM5 rc-file, + - validating it, + - modifying the values, + - Creating a tm5_runtime.rc file + - Removing the existing tm5.ok file if present + + """ + RcFilename = DaCycle.da_settings['forecast.model.rc'] dummy = self.LoadRc(RcFileName) # load the specified rc-file dummy = self.ValidateRc() # validate the contents - msg = '%s object initialized'%self.Identifier ; logging.debug(msg) - msg = '%s version: %s'%(self.Identifier,self.Version) ; logging.info(msg) +# Create a link from TM5 to the rundirectory of the das system + sourcedir = self.tm_settings['rundir'] + targetdir = os.path.join(rc_da_shell['dir.exec'],'tm5') + dummy = CreateLinks(sourcedir,targetdir) - def __str__(self): - """ - String representation of the TM5 object - """ + DaCycle.da_settings['dir.exec.tm5'] = targetdir - return "This is a %s object, version %s"%(self.Identifier,self.Version) +# Write a modified TM5 model rc-file in which run/break times are defined by our da system + + NewItems = { + 'time.start' : rc_da_shell['time.sample.start'] , + 'time.final' : rc_da_shell['time.sample.end'] , + 'rundir' : rc_da_shell['dir.exec.tm5'] , + 'outputdir' : rc_da_shell['dir.output'] , + 'savedir' : rc_da_shell['dir.save'] , + 'das.input.dir' : rc_da_shell['dir.input'] + } + + if rc_da_shell['time.restart']: # If this is a restart from a previous cycle, the TM5 model should do a restart + NewItems['istart'] = 3 + if rc_da_shell['time.sample.window'] != 0: # If this is a restart from a previous time step wihtin the filter lag, the TM5 model should do a restart + NewItems['istart'] = 3 + + # If neither one is true, simply take the istart value from the tm5.rc file that was read + + Tm5Model.ModifyRC(NewItems) + + Tm5Model.WriteRc() + + Tm5Model.WriteRunRc() + +# Copy the pre-compiled MPI wrapper to the execution directory + + targetdir = os.path.join(rc_da_shell['dir.exec.tm5']) + + if not os.path.exists(os.path.join(mpi_shell_location,mpi_shell_filename) ): + msg = "Cannot find the mpi_shell wrapper needed for completion (%s) in (%s)"% (mpi_shell_filename,mpi_shell_location) ; logging.error(msg) + msg = "Please see the %s/readme_wrapper.txt file for instructions to compile it"%mpi_shell_location ; logging.error(msg) + raise IOError + + shutil.copy(os.path.join(mpi_shell_location,mpi_shell_filename) ,os.path.join(targetdir,mpi_shell_filename) ) + + return None def LoadRc(self,RcFileName): """ @@ -95,7 +135,7 @@ class TM5(object): return True - def ValidateRc(self): + def ValidateRC(self): """ Validate the contents of the tm_settings dictionary and add extra values. The required items for the TM5 rc-file are specified in the tm5_tools module, as dictionary variable "needed_rc_items". @@ -199,8 +239,16 @@ class TM5(object): msg = "Modified tm5_runtime.rc written (%s)" % tm5rcfilename ;logging.debug(msg) + def ValidateInput(self,DaCycle): + """ + Make sure that parameter files are written to the TM5 inputdir, and that observation lists are present + """ + + - def Run(self,nprocesses): + return 0 + + def Run(self,DaCycle): """ Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and @@ -235,6 +283,7 @@ class TM5(object): # Open logfile and spawn model, wait for finish and return code + nprocesses = DaCyle.da_settings['forecast.nmembers'] #cmd = ['openmpirun','-np', '10', mpi_shell_filename,'./tm5.x'] cmd = ['mpirun','-np',str(nprocesses),mpi_shell_filename,'./tm5.x'] #cmd = ['./tm5.x'] @@ -299,73 +348,6 @@ class TM5(object): ################### End Class TM5 ################### - - -def DaInitialize(rc_da_shell): - """ - Prepare a forward model TM5 run, this consists of: - - - reading the TM5 rc-file, - - validating it, - - modifying the values, - - Creating a tm5_runtime.rc file - - Removing the existing tm5.ok file if present - - """ - - from da.tools.general import CreateLinks, CreateDirs - -# Make an instance of the TM5 class, supply it with a valid rc-file name - - Tm5Model = TM5( rc_da_shell['forecast.model.rc'] ) - -# Create a link from TM5 to the rundirectory of the das system - - sourcedir = Tm5Model.tm_settings['rundir'] - targetdir = os.path.join(rc_da_shell['dir.exec'],'tm5') - CreateLinks(sourcedir,targetdir) - - rc_da_shell['dir.exec.tm5'] = targetdir - -# Write a modified TM5 model rc-file in which run/break times are defined by our da system - - - NewItems = { - 'time.start' : rc_da_shell['time.sample.start'] , - 'time.final' : rc_da_shell['time.sample.end'] , - 'rundir' : rc_da_shell['dir.exec.tm5'] , - 'outputdir' : rc_da_shell['dir.output'] , - 'savedir' : rc_da_shell['dir.save'] , - 'das.input.dir' : rc_da_shell['dir.input'] - } - - if rc_da_shell['time.restart']: # If this is a restart from a previous cycle, the TM5 model should do a restart - NewItems['istart'] = 3 - if rc_da_shell['time.sample.window'] != 0: # If this is a restart from a previous time step wihtin the filter lag, the TM5 model should do a restart - NewItems['istart'] = 3 - - # If neither one is true, simply take the istart value from the tm5.rc file that was read - - Tm5Model.ModifyRC(NewItems) - - Tm5Model.WriteRc() - - Tm5Model.WriteRunRc() - -# Copy the pre-compiled MPI wrapper to the execution directory - - targetdir = os.path.join(rc_da_shell['dir.exec.tm5']) - - if not os.path.exists(os.path.join(mpi_shell_location,mpi_shell_filename) ): - msg = "Cannot find the mpi_shell wrapper needed for completion (%s) in (%s)"% (mpi_shell_filename,mpi_shell_location) ; logging.error(msg) - msg = "Please see the %s/readme_wrapper.txt file for instructions to compile it"%mpi_shell_location ; logging.error(msg) - raise IOError - - shutil.copy(os.path.join(mpi_shell_location,mpi_shell_filename) ,os.path.join(targetdir,mpi_shell_filename) ) - - return Tm5Model - - if __name__ == "__main__": import sys diff --git a/da/tools/pipeline.py b/da/tools/pipeline.py index d7e9a6abdc991bbf3c7338d31e9f52ed35b6d37a..f4c6d926215c1b88456b589bbcad9cd4a8ed9948 100755 --- a/da/tools/pipeline.py +++ b/da/tools/pipeline.py @@ -265,18 +265,24 @@ def RunForecastModel(DaCycle,lag): # Call a special method that creates a model instance, and overwrites the model settings with those from the DA cycle - executable = model.DaInitialize(DaCycle.da_settings) + ObsOperator = model.ObservationOperator() - # Run the forward model + dummy = ObsOperator.Initialize(DaCycle) - #status = executable.Run(DaCycle.da_settings['forecast.nmembers']) + status = ObsOperator.ValidateInput(DaCycle) + + if status != 0 : + msg = "Validating the input for the sampling model has failed, exiting..." ; logging.error(msg) + raise Exception + + #status = ObsOperator.Run(DaCycle) status = 0 if status != 0 : msg = "Running the sampling model has failed, exiting..." ; logging.error(msg) raise Exception - status = executable.SaveData() + dummy = ObsOperator.SaveData() # Advance the sample time