Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • classes
  • ctdas-cosmo
  • dynamicFFmodel
  • feature-STILT
  • griddedstatevector
  • ivar
  • karolina
  • liesbeth
  • master
  • near-real-time
  • old-master
  • package
  • py-3
  • remotec
  • trunk
  • wouter
16 results

Target

Select target project
  • nearrealtimectdas/CTDAS
  • tsurata_a/CTDAS
  • peter050/CTDAS
  • woude033/CTDAS
  • flore019/CTDAS
  • laan035/CTDAS
  • koren007/CTDAS
  • smith036/CTDAS
  • ctdas/CTDAS
9 results
Select Git revision
  • c13_devel
  • coco2
  • ctdas-cosmo
  • ctdas-icon
  • ctdas-wrf
  • ctdas-wrf-in-situ
  • ctdas-wrf-insitu_uoc
  • ctdas-wrf-simple-cmds
  • cte-lagrange
  • ctecc
  • ctecc_europe
  • ctelw_europe
  • ctsf
  • ctsf_sib_mul-add
  • dev-sw-oco2
  • dynamicFFmodel
  • feature-STILT
  • gcp
  • ivar
  • liesbeth
  • master
  • master_cleanup
  • merge_ctecc_master
  • near-real-time
  • new_da_structure
  • old-master
  • old-structure
  • proj-biomass
  • proj-ctecc_hr
  • py-3
  • random_obsop_cte
  • remco
  • remotec
  • tm5mp
  • trunk
35 results
Show changes
Commits on Source (2)
#!/usr/bin/env python
# tm5_tools.py
"""
Author : peters
Revision History:
File created on 09 Feb 2009.
This module holds specific functions needed to use the TM5 model within the data assimilation shell. It uses the information
from the DA system in combination with the generic tm5.rc files.
The TM5 model is now controlled by a python subprocess. This subprocess consists of an MPI wrapper (written in C) that spawns
a large number ( N= nmembers) of TM5 model instances under mpirun, and waits for them all to finish.
The design of the system assumes that the tm5.x (executable) was pre-compiled with the normal TM5 tools, and is residing in a
directory specified by the ${RUNDIR} of a tm5 rc-file. This tm5 rc-file name is taken from the data assimilation rc-file. Thus,
this python shell does *not* compile the TM5 model for you!
"""
import os
import subprocess
import logging
import rc
import shutil
identifier = 'TM5'
version = '0.1'
mpi_shell_file = 'tm5_mpi_wrapper'
needed_rc_items = [
'rundir',
'outputdir',
'time.start',
'time.break.nday'
]
def ModifyRC(rc_da_shell,rc_tm5):
""" modify parts of the tm5 rc-file to give control of file locations to the DA shell
instead of to the tm5.rc script. Note that most parameters remain unchanged because they do
not pertain to the actual run in progress, but to the TM5 model as a whole.
Currently, only the savedir and outputdir variables are replaced so that TM5 writes and reads from
the DA system directories.
Also, we add a new rc-variable which describes the inputdir for the da system
Note that we replace these values in all {key,value} pairs of the tm5.rc file!
"""
orig_savedir = rc_tm5['savedir']
orig_outputdir = rc_tm5['outputdir']
for k,v in rc_tm5.iteritems():
if not isinstance(v,str): continue
if orig_savedir in v:
rc_tm5[k] = v.replace(orig_savedir,rc_da_shell['dir.save'])
logging.debug('Replaced savedir in tm5 rc-item %s ' % k)
if orig_outputdir in v:
rc_tm5[k] = v.replace(orig_outputdir,rc_da_shell['dir.output'])
logging.debug('Replaced outputdir in tm5 rc-item %s ' % k)
rc_tm5['das.input.dir'] = rc_da_shell['dir.input']
msg = 'Added das.input.dir to tm5 rc-file' ;logging.debug(msg)
tm5rcfilename = os.path.join(rc_tm5['rundir'],'tm5.rc')
dummy = rc.write(tm5rcfilename,rc_tm5)
msg = "Modified tm5.rc written (%s)" % tm5rcfilename ;logging.debug(msg)
def CreateRunRC(rc_da_shell,rc_tm5):
"""
Create the tm5-runtime.rc file which is read by initexit.F90 to control time loop and restart from save files
We replace values for start and end date, as well as save and output folders, with values from the da shell.
"""
rc_runtm5={}
rc_runtm5['year1'] = rc_da_shell['startdate'].year
rc_runtm5['month1'] = rc_da_shell['startdate'].month
rc_runtm5['day1'] = rc_da_shell['startdate'].day
rc_runtm5['hour1'] = rc_da_shell['startdate'].hour
rc_runtm5['minu1'] = 0
rc_runtm5['sec1'] = 0
rc_runtm5['year2'] = rc_da_shell['enddate'].year
rc_runtm5['month2'] = rc_da_shell['enddate'].month
rc_runtm5['day2'] = rc_da_shell['enddate'].day
rc_runtm5['hour2'] = rc_da_shell['enddate'].hour
rc_runtm5['minu2'] = 0
rc_runtm5['sec2'] = 0
if rc_da_shell['time.restart'] == True:
rc_runtm5['istart'] = 3
else:
rc_runtm5['istart'] = rc_tm5['istart']
rc_runtm5['savedir'] = rc_da_shell['dir.save']
rc_runtm5['outputdir'] = rc_da_shell['dir.output']
tm5rcfilename = os.path.join(rc_tm5['rundir'],'tm5_runtime.rc')
rc.write(tm5rcfilename,rc_runtm5)
def PrepareExe(rc_da_shell):
"""
Prepare a forward model TM5 run, this consists of:
- reading the TM5 rc-file,
- validating it,
- modifying the values,
- Creating a tm5_runtime.rc file
- Removing the existing tm5.ok file if present
"""
from tools_da import ValidateRC
# Get tm5.rc parameters defined for this forecast model run, only needed for location, etc.
rc_tm5 = rc.read(rc_da_shell['forecast.model.rc'],silent = True)
ValidateRC(rc_tm5,needed_rc_items)
# Write a modified TM5 model rc-file in which run/break times are defined by our da system
ModifyRC(rc_da_shell,rc_tm5)
# Create a TM5 runtime rc-file in which run/break times are defined by our da system
CreateRunRC(rc_da_shell,rc_tm5)
# Copy the pre-compiled MPI wrapper to the execution directory
targetdir = os.path.join(rc_da_shell['dir.exec'],'tm5')
if not os.path.exists(mpi_shell_file):
msg = "Cannot find the mpi_shell wrapper needed for completion (%s)"% mpi_shell_file ; logging.error(msg)
msg = "Please see the 'readme_wrapper.txt' file for instructions to compile" ; logging.error(msg)
raise IOError
shutil.copy(mpi_shell_file,targetdir)
# Remove the tm5.ok file from a previous run, placed back only if a successful TM5 run is executed
okfile = os.path.join(targetdir,'tm5.ok')
if os.path.exists(okfile): os.remove(okfile)
return rc_tm5
def StartExe(rc_da_shell,rc_tm5):
"""
Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is
spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and
only if successfull on all processors will execution of the shell continue.
"""
from tools_da import CreateLinks
# Create a link to the rundirectory of the TM5 model
sourcedir = rc_tm5['rundir']
targetdir = os.path.join(rc_da_shell['dir.exec'],'tm5')
CreateLinks(sourcedir,targetdir)
# Go to executable directory and start the subprocess, using a new logfile
os.chdir(targetdir)
logging.debug('Changing directory to %s ' % targetdir )
modellogfilename = os.path.join(rc_da_shell['dir.jobs'],'tm5.%s'%rc_da_shell['log'])
modellogfile = open(modellogfilename,'w')
logging.info('Logging model output to %s ' % modellogfilename)
# Open logfile and spawn model, wait for finish and return code
logging.info('Starting model executable as subprocess ')
#cmd = ['openmpirun','-np', rc_da_shell['forecast.nmembers'],mpi_shell_file,'./tm5.x']
#cmd = ['mpirun','-np', rc_da_shell['forecast.nmembers'],mpi_shell_file,'./tm5.x']
cmd = ['./tm5.x']
code = subprocess.call(cmd,stdout=modellogfile,stderr=modellogfile)
modellogfile.close()
# Interpret/Handle exit code
if not os.path.exists(os.path.join(targetdir,'tm5.ok')): code = -1
if code == 0:
logging.info('Finished model executable succesfully (%s)'%code)
else:
logging.error('Error in model executable return code: %s ' % code)
logging.info('Inspect [%s] to find output from model executable ' % modellogfilename)
raise OSError
# Return to working directory
os.chdir(rc_da_shell['dir.da_run'])
return code
def WriteSaveData(rc_da_shell):
""" Write the TM5 recovery data for the next DA cycle """
sourcedir = os.path.join(rc_da_shell['dir.output'])
targetdir = os.path.join(rc_da_shell['dir.save'])
filter = ['save_%s'%rc_da_shell['enddate'].strftime('%Y%m%d')]
msg = "Performing a full backup of TM5 save data" ; logging.debug(msg)
msg = " from directory: %s " % sourcedir ; logging.debug(msg)
msg = " to directory: %s " % targetdir ; logging.debug(msg)
msg = " with filter: %s " % filter ; logging.debug(msg)
for file in os.listdir(sourcedir):
file = os.path.join(sourcedir,file)
if os.path.isdir(file): # skip dirs
skip = True
elif filter == []: # copy all
skip= False
else: # check filter
skip = True # default skip
for f in filter:
if f in file:
skip = False # unless in filter
break
if skip:
msg = " [skip] .... %s " % file ; logging.debug(msg)
continue
msg = " [copy] .... %s " % file ; logging.debug(msg)
dummy = shutil.copy(file,file.replace(sourcedir,targetdir) )
if __name__ == "__main__":
pass