Skip to content
Snippets Groups Projects
Commit f3ccc474 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

working on the pipeline and model.py

parent 025049c2
No related branches found
No related tags found
No related merge requests found
......@@ -40,7 +40,9 @@ needed_rc_items = [
################### Begin Class TM5 ###################
class TM5(object):
from da.baseclasses.model import ObservationOperator
class TM5(ObservationOperator):
""" This class holds methods and variables that are needed to run the TM5 model. It is initiated with as only argument a TM5 rc-file
location. This rc-file will be used to figure out the settings for the run.
......@@ -60,27 +62,65 @@ class TM5(object):
"""
def __init__(self, RcFileName='dummy.rc'):
"""
An instance of the TM5 class requires information on the rc-file location only.
"""
self.Identifier = identifier # the identifier gives the model name
self.Version = version # the TM5 model version used
def Initialize(self, DaCycle):
"""
Prepare a forward model TM5 run, this consists of:
- reading the TM5 rc-file,
- validating it,
- modifying the values,
- Creating a tm5_runtime.rc file
- Removing the existing tm5.ok file if present
"""
RcFilename = DaCycle.da_settings['forecast.model.rc']
dummy = self.LoadRc(RcFileName) # load the specified rc-file
dummy = self.ValidateRc() # validate the contents
msg = '%s object initialized'%self.Identifier ; logging.debug(msg)
msg = '%s version: %s'%(self.Identifier,self.Version) ; logging.info(msg)
# Create a link from TM5 to the rundirectory of the das system
sourcedir = self.tm_settings['rundir']
targetdir = os.path.join(rc_da_shell['dir.exec'],'tm5')
dummy = CreateLinks(sourcedir,targetdir)
def __str__(self):
"""
String representation of the TM5 object
"""
DaCycle.da_settings['dir.exec.tm5'] = targetdir
return "This is a %s object, version %s"%(self.Identifier,self.Version)
# Write a modified TM5 model rc-file in which run/break times are defined by our da system
NewItems = {
'time.start' : rc_da_shell['time.sample.start'] ,
'time.final' : rc_da_shell['time.sample.end'] ,
'rundir' : rc_da_shell['dir.exec.tm5'] ,
'outputdir' : rc_da_shell['dir.output'] ,
'savedir' : rc_da_shell['dir.save'] ,
'das.input.dir' : rc_da_shell['dir.input']
}
if rc_da_shell['time.restart']: # If this is a restart from a previous cycle, the TM5 model should do a restart
NewItems['istart'] = 3
if rc_da_shell['time.sample.window'] != 0: # If this is a restart from a previous time step wihtin the filter lag, the TM5 model should do a restart
NewItems['istart'] = 3
# If neither one is true, simply take the istart value from the tm5.rc file that was read
Tm5Model.ModifyRC(NewItems)
Tm5Model.WriteRc()
Tm5Model.WriteRunRc()
# Copy the pre-compiled MPI wrapper to the execution directory
targetdir = os.path.join(rc_da_shell['dir.exec.tm5'])
if not os.path.exists(os.path.join(mpi_shell_location,mpi_shell_filename) ):
msg = "Cannot find the mpi_shell wrapper needed for completion (%s) in (%s)"% (mpi_shell_filename,mpi_shell_location) ; logging.error(msg)
msg = "Please see the %s/readme_wrapper.txt file for instructions to compile it"%mpi_shell_location ; logging.error(msg)
raise IOError
shutil.copy(os.path.join(mpi_shell_location,mpi_shell_filename) ,os.path.join(targetdir,mpi_shell_filename) )
return None
def LoadRc(self,RcFileName):
"""
......@@ -95,7 +135,7 @@ class TM5(object):
return True
def ValidateRc(self):
def ValidateRC(self):
"""
Validate the contents of the tm_settings dictionary and add extra values. The required items for the TM5 rc-file
are specified in the tm5_tools module, as dictionary variable "needed_rc_items".
......@@ -199,8 +239,16 @@ class TM5(object):
msg = "Modified tm5_runtime.rc written (%s)" % tm5rcfilename ;logging.debug(msg)
def ValidateInput(self,DaCycle):
"""
Make sure that parameter files are written to the TM5 inputdir, and that observation lists are present
"""
def Run(self,nprocesses):
return 0
def Run(self,DaCycle):
"""
Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is
spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and
......@@ -235,6 +283,7 @@ class TM5(object):
# Open logfile and spawn model, wait for finish and return code
nprocesses = DaCyle.da_settings['forecast.nmembers']
#cmd = ['openmpirun','-np', '10', mpi_shell_filename,'./tm5.x']
cmd = ['mpirun','-np',str(nprocesses),mpi_shell_filename,'./tm5.x']
#cmd = ['./tm5.x']
......@@ -299,73 +348,6 @@ class TM5(object):
################### End Class TM5 ###################
def DaInitialize(rc_da_shell):
"""
Prepare a forward model TM5 run, this consists of:
- reading the TM5 rc-file,
- validating it,
- modifying the values,
- Creating a tm5_runtime.rc file
- Removing the existing tm5.ok file if present
"""
from da.tools.general import CreateLinks, CreateDirs
# Make an instance of the TM5 class, supply it with a valid rc-file name
Tm5Model = TM5( rc_da_shell['forecast.model.rc'] )
# Create a link from TM5 to the rundirectory of the das system
sourcedir = Tm5Model.tm_settings['rundir']
targetdir = os.path.join(rc_da_shell['dir.exec'],'tm5')
CreateLinks(sourcedir,targetdir)
rc_da_shell['dir.exec.tm5'] = targetdir
# Write a modified TM5 model rc-file in which run/break times are defined by our da system
NewItems = {
'time.start' : rc_da_shell['time.sample.start'] ,
'time.final' : rc_da_shell['time.sample.end'] ,
'rundir' : rc_da_shell['dir.exec.tm5'] ,
'outputdir' : rc_da_shell['dir.output'] ,
'savedir' : rc_da_shell['dir.save'] ,
'das.input.dir' : rc_da_shell['dir.input']
}
if rc_da_shell['time.restart']: # If this is a restart from a previous cycle, the TM5 model should do a restart
NewItems['istart'] = 3
if rc_da_shell['time.sample.window'] != 0: # If this is a restart from a previous time step wihtin the filter lag, the TM5 model should do a restart
NewItems['istart'] = 3
# If neither one is true, simply take the istart value from the tm5.rc file that was read
Tm5Model.ModifyRC(NewItems)
Tm5Model.WriteRc()
Tm5Model.WriteRunRc()
# Copy the pre-compiled MPI wrapper to the execution directory
targetdir = os.path.join(rc_da_shell['dir.exec.tm5'])
if not os.path.exists(os.path.join(mpi_shell_location,mpi_shell_filename) ):
msg = "Cannot find the mpi_shell wrapper needed for completion (%s) in (%s)"% (mpi_shell_filename,mpi_shell_location) ; logging.error(msg)
msg = "Please see the %s/readme_wrapper.txt file for instructions to compile it"%mpi_shell_location ; logging.error(msg)
raise IOError
shutil.copy(os.path.join(mpi_shell_location,mpi_shell_filename) ,os.path.join(targetdir,mpi_shell_filename) )
return Tm5Model
if __name__ == "__main__":
import sys
......
......@@ -265,18 +265,24 @@ def RunForecastModel(DaCycle,lag):
# Call a special method that creates a model instance, and overwrites the model settings with those from the DA cycle
executable = model.DaInitialize(DaCycle.da_settings)
ObsOperator = model.ObservationOperator()
# Run the forward model
dummy = ObsOperator.Initialize(DaCycle)
#status = executable.Run(DaCycle.da_settings['forecast.nmembers'])
status = ObsOperator.ValidateInput(DaCycle)
if status != 0 :
msg = "Validating the input for the sampling model has failed, exiting..." ; logging.error(msg)
raise Exception
#status = ObsOperator.Run(DaCycle)
status = 0
if status != 0 :
msg = "Running the sampling model has failed, exiting..." ; logging.error(msg)
raise Exception
status = executable.SaveData()
dummy = ObsOperator.SaveData()
# Advance the sample time
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment