Skip to content
Snippets Groups Projects
Commit 3b9e49c0 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

started implementation of different models for the TM5 model to be run

parent f3ccc474
No related branches found
No related tags found
No related merge requests found
......@@ -42,7 +42,7 @@ needed_rc_items = [
from da.baseclasses.model import ObservationOperator
class TM5(ObservationOperator):
class TM5ObservationOperator(ObservationOperator):
""" This class holds methods and variables that are needed to run the TM5 model. It is initiated with as only argument a TM5 rc-file
location. This rc-file will be used to figure out the settings for the run.
......@@ -62,6 +62,14 @@ class TM5(ObservationOperator):
"""
def __init__(self):
""" The instance of an TMObservationOperator is application dependent """
self.Identifier = identifier # the identifier gives the model name
self.Version = version # the model version used
msg = '%s object initialized'%self.Identifier ; logging.debug(msg)
msg = '%s version: %s'%(self.Identifier,self.Version) ; logging.info(msg)
def Initialize(self, DaCycle):
"""
Prepare a forward model TM5 run, this consists of:
......@@ -73,14 +81,16 @@ class TM5(ObservationOperator):
- Removing the existing tm5.ok file if present
"""
RcFilename = DaCycle.da_settings['forecast.model.rc']
from da.tools.general import CreateLinks
RcFileName = DaCycle.da_settings['forecast.model.rc']
dummy = self.LoadRc(RcFileName) # load the specified rc-file
dummy = self.ValidateRc() # validate the contents
# Create a link from TM5 to the rundirectory of the das system
sourcedir = self.tm_settings['rundir']
targetdir = os.path.join(rc_da_shell['dir.exec'],'tm5')
targetdir = os.path.join(DaCycle.da_settings['dir.exec'],'tm5')
dummy = CreateLinks(sourcedir,targetdir)
DaCycle.da_settings['dir.exec.tm5'] = targetdir
......@@ -88,30 +98,30 @@ class TM5(ObservationOperator):
# Write a modified TM5 model rc-file in which run/break times are defined by our da system
NewItems = {
'time.start' : rc_da_shell['time.sample.start'] ,
'time.final' : rc_da_shell['time.sample.end'] ,
'rundir' : rc_da_shell['dir.exec.tm5'] ,
'outputdir' : rc_da_shell['dir.output'] ,
'savedir' : rc_da_shell['dir.save'] ,
'das.input.dir' : rc_da_shell['dir.input']
'time.start' : DaCycle.da_settings['time.sample.start'] ,
'time.final' : DaCycle.da_settings['time.sample.end'] ,
'rundir' : DaCycle.da_settings['dir.exec.tm5'] ,
'outputdir' : DaCycle.da_settings['dir.output'] ,
'savedir' : DaCycle.da_settings['dir.save'] ,
'das.input.dir' : DaCycle.da_settings['dir.input']
}
if rc_da_shell['time.restart']: # If this is a restart from a previous cycle, the TM5 model should do a restart
if DaCycle.da_settings['time.restart']: # If this is a restart from a previous cycle, the TM5 model should do a restart
NewItems['istart'] = 3
if rc_da_shell['time.sample.window'] != 0: # If this is a restart from a previous time step wihtin the filter lag, the TM5 model should do a restart
if DaCycle.da_settings['time.sample.window'] != 0: # If this is a restart from a previous time step wihtin the filter lag, the TM5 model should do a restart
NewItems['istart'] = 3
# If neither one is true, simply take the istart value from the tm5.rc file that was read
Tm5Model.ModifyRC(NewItems)
self.ModifyRC(NewItems)
Tm5Model.WriteRc()
self.WriteRc()
Tm5Model.WriteRunRc()
self.WriteRunRc()
# Copy the pre-compiled MPI wrapper to the execution directory
targetdir = os.path.join(rc_da_shell['dir.exec.tm5'])
targetdir = os.path.join(DaCycle.da_settings['dir.exec.tm5'])
if not os.path.exists(os.path.join(mpi_shell_location,mpi_shell_filename) ):
msg = "Cannot find the mpi_shell wrapper needed for completion (%s) in (%s)"% (mpi_shell_filename,mpi_shell_location) ; logging.error(msg)
......@@ -120,7 +130,7 @@ class TM5(ObservationOperator):
shutil.copy(os.path.join(mpi_shell_location,mpi_shell_filename) ,os.path.join(targetdir,mpi_shell_filename) )
return None
return 0
def LoadRc(self,RcFileName):
"""
......@@ -131,11 +141,11 @@ class TM5(ObservationOperator):
self.RcFileName = RcFileName
self.Tm5RcLoaded = True
msg = 'TM5 rc-file loaded successfully' ; logging.info(msg)
msg = 'TM5 rc-file loaded successfully' ; logging.debug(msg)
return True
def ValidateRC(self):
def ValidateRc(self):
"""
Validate the contents of the tm_settings dictionary and add extra values. The required items for the TM5 rc-file
are specified in the tm5_tools module, as dictionary variable "needed_rc_items".
......@@ -244,65 +254,72 @@ class TM5(ObservationOperator):
Make sure that parameter files are written to the TM5 inputdir, and that observation lists are present
"""
datadir = self.tm_settings['das.input.dir']
if not os.path.exists(datadir):
msg = "The specified input directory for the TM5 model to read from does not exist (%s), exiting..."%datadir ; logging.error(msg)
raise IOError,msg
datafiles = os.listdir(datadir)
return 0
def Run(self,DaCycle):
"""
Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is
spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and
only if successfull on all processors will execution of the shell continue.
"""
from string import join
obsfile = 'observations.nc'
cwd = os.getcwd()
targetdir = os.path.join(self.tm_settings['rundir'])
if obsfile not in datafiles:
msg = "The specified obs input file for the TM5 model to read from does not exist (%s), exiting..."%obsfile ; logging.error(msg)
raise IOError,msg
# Go to executable directory and start the subprocess, using a new logfile
os.chdir(targetdir)
logging.debug('Changing directory to %s ' % targetdir )
self.ModelLogFilename = os.path.join(targetdir,'tm5.log')
modellogfile = open(self.ModelLogFilename,'w')
logging.info('Logging model output to %s ' % self.ModelLogFilename)
for n in range(int(DaCycle.da_settings['forecast.nmembers'])):
paramfile = 'parameters.%03d.nc'%n
if paramfile not in datafiles:
msg = "The specified parameter input file for the TM5 model to read from does not exist (%s), exiting..."%paramfile ; logging.error(msg)
raise IOError,msg
# Next, make sure there is an actual model version compiled and ready to execute
targetdir = os.path.join(self.tm_settings['rundir'])
self.Tm5Executable = os.path.join(targetdir,'tm5.x')
if not os.path.exists(self.Tm5Executable):
msg = "Required TM5 executable was not found %s"%self.Tm5Executable ; logging.error(msg)
msg = "Please compile the model with the specified rc-file and the regular TM5 scripts first" ; logging.error(msg)
raise IOError
# Remove the tm5.ok file from a previous run, placed back only if a successful TM5 run is executed
return 0
okfile = 'tm5.ok'
if os.path.exists(okfile): os.remove(okfile)
def Run(self,DaCycle):
"""
Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is
spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and
only if successfull on all processors will execution of the shell continue.
"""
# Open logfile and spawn model, wait for finish and return code
cwd = os.getcwd()
nprocesses = DaCyle.da_settings['forecast.nmembers']
#cmd = ['openmpirun','-np', '10', mpi_shell_filename,'./tm5.x']
cmd = ['mpirun','-np',str(nprocesses),mpi_shell_filename,'./tm5.x']
#cmd = ['./tm5.x']
msg = 'Starting model executable as subprocess (%s)'%join(cmd,' ') ; logging.info(msg)
# From here on, several options should be implemented.
code = subprocess.call(cmd,stdout=modellogfile,stderr=modellogfile)
#
# (1) Where an mpi process is forked to do a TM5 instance with N tracers, each an ensemble member
#
# (2) Where N processes are spawned, each being one TM5 instance representing one member
#
# (3) Where N/m processes are spawned, each being a TM5 instance that handles m ensemble members
#
# In principle, it is best to make these processes produce scripts that can be executed stand-alone, or
# be submitted to a queue.
#
modellogfile.close()
# Open logfile and spawn model, wait for finish and return code
# Interpret/Handle exit code
# Code for Option (1)
if not os.path.exists(okfile): code = -1
code = self.TM5_under_mpirun(DaCycle)
if code == 0:
logging.info('Finished model executable succesfully (%s)'%code)
self.Status = 'Success'
else:
logging.error('Error in model executable return code: %s ' % code)
logging.info('Inspect [%s] to find output from model executable ' % self.ModelLogFilename)
logging.debug('Inspect [%s] to find output from model executable ' % self.ModelLogFilename)
self.Status = 'Failed'
raise OSError
......@@ -312,6 +329,70 @@ class TM5(ObservationOperator):
return code
def TM5_under_mpirun(self, DaCycle):
""" Method handles the case where a shell runs an MPI process that forks into N TM5 model instances """
from string import join
targetdir = os.path.join(self.tm_settings['rundir'])
# Go to executable directory and start the subprocess, using a new logfile
dummy = os.chdir(targetdir)
msg = 'Changing directory to %s ' % targetdir ;logging.debug(msg)
self.ModelLogFilename = os.path.join(targetdir,'tm5.log')
# Remove the tm5.ok file from a previous run, placed back only if a successful TM5 run is executed
okfile = 'tm5.ok'
if os.path.exists(okfile):
dummy = os.remove(okfile)
modellogfile = open(self.ModelLogFilename,'w')
msg = 'Logging model output to %s ' % self.ModelLogFilename ;logging.debug(msg)
nprocesses = DaCycle.da_settings['forecast.nmembers']
template = GetJobTemplate('jet')
cmd = 'cd %s\n'%targetdir
cmd += 'mpirun -np %d %s ./tm5.x'%(int(nprocesses),mpi_shell_filename,)
template = template.replace('CMD',cmd)
#
# Done, write jobfile
#
jobfile = 'jb.%s.jb'%os.getpid()
f = open(jobfile,'w')
dummy = f.write(template)
dummy = f.close()
dummy = os.chmod(jobfile,477)
cmd = ['qsub',jobfile]
cmd = ['sh',jobfile]
msg = 'Starting model executable as subprocess (%s)'%join(cmd,' ') ; logging.info(msg)
code = subprocess.call(cmd,stdout=modellogfile,stderr=modellogfile)
dummy = modellogfile.close()
if not os.path.exists(okfile):
code = -1
return code
def TM5_With_N_tracers(self, nprocesses):
""" Method handles the case where one TM5 model instance with N tracers does the sampling of all ensemble members"""
from string import join
modellogfile = open(self.ModelLogFilename,'w')
logging.debug('Logging model output to %s ' % self.ModelLogFilename)
#cmd = ['openmpirun','-np', '10', mpi_shell_filename,'./tm5.x']
cmd = ['mpirun','-np',str(nprocesses),'./tm5.x']
msg = 'Starting model executable as subprocess (%s)'%join(cmd,' ') ; logging.info(msg)
#code = subprocess.call(cmd,stdout=modellogfile,stderr=modellogfile)
modellogfile.close()
def SaveData(self):
""" Write the TM5 recovery data for the next cycle """
......@@ -348,6 +429,31 @@ class TM5(ObservationOperator):
################### End Class TM5 ###################
def GetJobTemplate(system):
""" Return the job template for a given computing system """
if system == 'jet':
template = """
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~
# qsub (jet)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~
#$ -N tm5
#$ -A co2
#$ -cwd
#$ -pe nserial 1
#$ -r y
#$ -l h_rt=01:00:00
#$ -S /bin/sh
#$ -j y
CMD
"""
return template
if __name__ == "__main__":
import sys
......
......@@ -216,8 +216,6 @@ def AddObsToState(DaCycle, StateVector, Samples, lag):
Member.ModelSample = Samples
Member.SampleInputFile = os.path.join(DaCycle.da_settings['dir.output'],'samples.%03d.nc'%Member.membernumber)
#WP !!!! This function should become a generic call to a base class "model" from which other model classes can inherit and modify if needed
def RunForecastModel(DaCycle,lag):
"""Prepare and execute a forecast step using an external Fortran model. Note that the flavor of model
used is determined in the very first line where the import statement of module "model" depends on a
......@@ -237,14 +235,11 @@ def RunForecastModel(DaCycle,lag):
# import modules, note that depending on the type of assimilation system, different submodules are imported!
from da.tools.general import AdvanceTime
if DaCycle.da_settings['forecast.model'] == 'TM5': import da.tm5.model as model
if DaCycle.da_settings['forecast.model'] == 'TM5': from da.tm5.model import TM5ObservationOperator as ObservationOperator
elif DaCycle.da_settings['forecast.model'] == 'SIBCASA': import da.sibcasa.model as model
elif DaCycle.da_settings['forecast.model'] == 'WRF': import da.wrf.model as model
####### FROM HERE ON, PROCESSES ARE MODEL DEPENDENT AND NEED TO BE IMPLEMENTED ON A PER-MODEL BASIS ############
msg = "Using %s as forecast model" % model.identifier ; logging.debug(msg)
cyclelen = DaCycle.da_settings['cyclelength']
if lag == 0:
......@@ -263,24 +258,17 @@ def RunForecastModel(DaCycle,lag):
msg = " start date : %s " % startdate.strftime('%F %H:%M') ; logging.info(msg)
msg = " end date : %s " % enddate.strftime('%F %H:%M') ; logging.info(msg)
####### FROM HERE ON, PROCESSES ARE MODEL DEPENDENT AND NEED TO BE IMPLEMENTED ON A PER-MODEL BASIS ############
# Call a special method that creates a model instance, and overwrites the model settings with those from the DA cycle
ObsOperator = model.ObservationOperator()
ObsOperator = ObservationOperator()
dummy = ObsOperator.Initialize(DaCycle)
status = ObsOperator.Initialize(DaCycle)
status = ObsOperator.ValidateInput(DaCycle)
if status != 0 :
msg = "Validating the input for the sampling model has failed, exiting..." ; logging.error(msg)
raise Exception
#status = ObsOperator.Run(DaCycle)
status = 0
if status != 0 :
msg = "Running the sampling model has failed, exiting..." ; logging.error(msg)
raise Exception
status = ObsOperator.Run(DaCycle)
dummy = ObsOperator.SaveData()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment