Commit fb54246f authored by Peters, Wouter's avatar Peters, Wouter
Browse files

first working version of a class based module. TM5 model now runs from a TM5 object

parent db4419a6
#!/usr/bin/env python
# ct_tools.py
"""
Author : peters
Revision History:
File created on 12 Feb 2009.
"""
import os
import sys
import rc
import logging
identifier = 'CarbonTracker CO2'
needed_rc_items = ['obs.input.dir',
'obs.input.fname',
'ocn.covariance',
'nparameters',
'bio.covariance',
'deltaco2.prefix',
'regtype']
def MakeResiduals(rc_da_shell):
""" Method makes residuals of CO2 mixing ratios from the observed and sampled values """
import pyhdf.SD as HDF
# Read observations NetCDF file
ObsFileName = os.path.join(rc_da_shell['dir.output'],rc_da_shell['obs.full.filename'])
ncf = HDF.SD(ObsFileName)
obsdates = ncf.select('decimal_date').get()
dummy = ncf.end()
msg = "Successfully read data from obs file (%s)"%ObsFileName ; logging.debug(msg)
# Read samples NetCDF file
SamplesFileName = ObsFileName.replace('observations','samples').replace('input','output')
ncf = HDF.SD(SamplesFileName)
sampdates = ncf.select('decimal_date').get()
dummy = ncf.end()
msg = "Successfully read data from model sample file (%s)"%SamplesFileName ; logging.debug(msg)
if (obsdates==sampdates).all():
pass
else:
msg = "Dates in the observation file and samples file are not equal, exiting..." ; logging.error(msg)
raise IOError,msg
msg = "Residuals created, continuing" ; logging.info(msg)
return None
def PrepareObs(rc_da_shell,rc_da_system,type='forecast'):
""" PrepareObs """
import pycdf as CDF
import shutil
saveas = os.path.join(rc_da_shell['dir.input'],'observations.nc')
# ncf = CDF.CDF(saveas,CDF.NC.WRITE|CDF.NC.TRUNC|CDF.NC.CREATE)
# ncf.CreationDate = datetime.datetime.now().strftime('%Y %B %d')
# ncf.CreationUser = os.environ['USER']
# ncf.CreationDir = rc_da_shell['dir.da_submit']
# ncf.CreationScript = sys.argv[0]
# ncf.close()
# msg = 'Created a new observation input file (%s) ' % saveas ; logging.info(msg)
# For carbontracker the observations already come in netcdf format thanks to Andy and Ken. We will therefore simply merge
# the observations.nc file with the pre-cooked obs files
# Select observation input file for this time step
#sfname = rc_da_system['obs.input.fname']+'.%s'%(rc_da_shell['startdate'].strftime('%Y%m%d') )+'.nc'
sfname = rc_da_system['obs.input.fname']+'.%s'%('20050305')+'.nc'
msg = 'This filename is hardcoded but needs replacement' ; logging.warning(msg)
filename = os.path.join(rc_da_system['obs.input.dir'],sfname)
if not os.path.exists(filename):
msg = 'Could not find the required observation input file (%s) ' % filename ; logging.error(msg)
raise IOError,msg
else:
dummy = shutil.copy2(filename,saveas)
msg = 'Successfully copied file (%s) to (%s) ' % (filename,saveas) ; logging.info(msg)
rc_da_shell['obs.full.filename'] = saveas
return rc_da_shell
def PrepareEnsemble(rc_da_shell ):
"""
Prepare the ensemble of parameters needed by the forecast model. There are two possible pathways:
(A) Construct a brand new ensemble from a covariance matrix
(B) Get an existing ensemble and propagate it
The steps for procedure A are:
(A1) Construct the covariance matrix
(A2) Make a Cholesky decomposition
(A3) Prepare a set of [nmembers] input parameters for each forecast member
The steps for procedure B are:
(B1) Get existing parameter values for each member
(B2) Run them through a forecast model
Both procedures finish with writing the parameter values to a NetCDF file
"""
# Get the da system specific rc-items, note that they've been validated already by FillObs
rc_da_system = rc.read(rc_da_shell['da.system.rc'],silent = True)
nlag = int(rc_da_shell['time.nlag'])
for n in range(nlag):
if n == nlag:
# Route A: make a new ensemble
Xprime,Qprior = MakeNewEnsemble(rc_da_shell, rc_da_system)
else:
# Route B: Propagate existing ensemble
Xprime,Qprior = [0,0]
return Xprime, Qprior
def MakeNewEnsemble(rc_da_shell, rc_da_system):
""" Make a new ensemble from specified matrices """
import pyhdf.SD as hdf
import numpy as np
import matplotlib.pyplot as plt
# Get the needed matrices from the specified covariance files
file_ocn_cov = rc_da_system['ocn.covariance']
file_bio_cov = rc_da_system['bio.covariance']
for file in [file_ocn_cov,file_bio_cov]:
if not os.path.exists(file):
msg = "Cannot find the specified file %s" % file ; logging.error(msg)
raise IOError,msg
else:
msg = "Using covariance file: %s" % file ; logging.info(msg)
f_ocn = hdf.SD(file_ocn_cov)
f_bio = hdf.SD(file_bio_cov)
cov_ocn = f_ocn.select('qprior').get()
cov_bio = f_bio.select('qprior').get()
dummy = f_ocn.end()
dummy = f_bio.end()
dummy = logging.debug("Succesfully closed files after retrieving prior covariance matrices")
# Once we have the matrices, we can start to make the full covariance matrix, and then decompose it
nparams = int(rc_da_system['nparameters'])
nmembers = int(rc_da_shell['forecast.nmembers'])
fullcov = np.zeros((nparams,nparams),float)
nocn = cov_ocn.shape[0]
nbio = cov_bio.shape[0]
fullcov[0:nbio,0:nbio] = cov_bio
fullcov[nbio:nbio+nocn,nbio:nbio+nocn] = cov_ocn
# Visually inspect full covariance matrix if verbose
if rc_da_shell['verbose']:
plt.imshow(fullcov)
plt.colorbar()
plt.savefig('fullcovariancematrix.png')
plt.close('all')
# Make a cholesky decomposition of the covariance matrix
U,s,Vh = np.linalg.svd(fullcov)
dof = np.sum(s)**2/sum(s**2)
C = np.linalg.cholesky(fullcov)
msg = 'Cholesky decomposition has succeeded offline' ; logging.info(msg)
msg = 'Appr. degrees of freedom in covariance matrix is %s'%(int(dof)) ; logging.info(msg)
if rc_da_shell['verbose']:
plt.plot(s)
plt.savefig('eigenvaluespectrum.png')
plt.close('all')
# Now create a set of random instances of the given covariance structure
# First see the random number generator with a fixed value from the rc-file, this is to make reproducible results
dummy = np.random.seed(int(rc_da_system['random.seed']))
# Structure to hold the data
parameter_deviations = np.zeros((nparams,nmembers),float) # This is array X'
for member in range(nmembers):
rands = np.random.randn(nparams)
parameter_deviations[:,member] = np.dot(C,rands)
if rc_da_shell['verbose']:
plt.hist(parameter_deviations[20,:],20)
plt.savefig('par20histogram.png')
plt.close('all')
return parameter_deviations, fullcov
if __name__ == "__main__":
import rc
import os
import logging
logging.basicConfig(level = logging.DEBUG,
format = ' [%(levelname)-7s] (%(asctime)s) %(module)-20s : %(message)s',
datefmt = '%Y-%m-%d %H:%M:%S')
rc_da_system = rc.read('carbontracker.rc')
rc_da_shell = rc.read('da.rc')
rc_da_shell['dir.input'] = os.path.join(rc_da_shell['dir.da_run'],'input')
rc_da_shell['dir.output'] = os.path.join(rc_da_shell['dir.da_run'],'output')
rc_da_system['verbose'] = True
X,C = PrepareEnsemble(rc_da_shell)
rc_da_shell = PrepareObs(rc_da_shell,rc_da_system)
resids = MakeResiduals(rc_da_shell)
! Info on the data assimilation cycle
time.restart : False
time.start : 2005030500
time.finish : 2005030700
time.cycle : 1
time.nlag : 2
dir.da_run : ${HOME}/tmp/test_da
! Info on the DA system used
da.system : CarbonTracker
da.system.rc : carbontracker.rc
! Info on the forward model to be used
forecast.model : TM5
forecast.model.rc : ${HOME}/Modeling/TM5/ct_new.rc
forecast.nmembers : 2
......@@ -6,6 +6,7 @@ Author : peters
Revision History:
File created on 09 Feb 2009.
Major modifications to go to a class-based approach, July 2010.
This module holds specific functions needed to use the TM5 model within the data assimilation shell. It uses the information
from the DA system in combination with the generic tm5.rc files.
......@@ -26,7 +27,7 @@ import rc
import shutil
identifier = 'TM5'
version = '0.1'
version = 'release 3.0'
mpi_shell_file = 'tm5_mpi_wrapper'
needed_rc_items = [
......@@ -36,173 +37,271 @@ needed_rc_items = [
'time.break.nday'
]
def ModifyRC(rc_da_shell,rc_tm5):
""" modify parts of the tm5 rc-file to give control of file locations to the DA shell
instead of to the tm5.rc script. Note that most parameters remain unchanged because they do
not pertain to the actual run in progress, but to the TM5 model as a whole.
################### Begin Class TM5 ###################
Currently, only the savedir and outputdir variables are replaced so that TM5 writes and reads from
the DA system directories.
class TM5():
""" This class holds methods and variables that are needed to run the TM5 model. It is initiated with as only argument a TM5 rc-file
location. This rc-file will be used to figure out the settings for the run. This method of running TM5 assumes that a pre-compiled
tm5.exe is present, and it will be run from time.start to time.final. These settings can be modified later. To run a model version,
simply compile the model using an existing TM5 rc-file, then open python, and type:
Also, we add a new rc-variable which describes the inputdir for the da system
[]> tm=TM5('/Users/peters/Modeling/TM5/tutorial.rc')
[]> tm.WriteRc()
[]> tm.WriteRunRc()
[]> tm.Run()
Note that we replace these values in all {key,value} pairs of the tm5.rc file!
"""
orig_savedir = rc_tm5['savedir']
orig_outputdir = rc_tm5['outputdir']
def __init__(self, RcFileName='dummy.rc',log=True):
"""In instance of the TM5 class requires information on the model version only"""
import tools_da as tools
for k,v in rc_tm5.iteritems():
if not isinstance(v,str): continue
if orig_savedir in v:
rc_tm5[k] = v.replace(orig_savedir,rc_da_shell['dir.save'])
logging.debug('Replaced savedir in tm5 rc-item %s ' % k)
if orig_outputdir in v:
rc_tm5[k] = v.replace(orig_outputdir,rc_da_shell['dir.output'])
logging.debug('Replaced outputdir in tm5 rc-item %s ' % k)
self.Identifier = identifier # the identifier gives the model name
self.Version = version # the TM5 model version used
rc_tm5['das.input.dir'] = rc_da_shell['dir.input']
msg = 'Added das.input.dir to tm5 rc-file' ;logging.debug(msg)
if log:
self.Log = tools.StartLogger()
else:
self.Log = False
tm5rcfilename = os.path.join(rc_tm5['rundir'],'tm5.rc')
dummy = rc.write(tm5rcfilename,rc_tm5)
msg = "Modified tm5.rc written (%s)" % tm5rcfilename ;logging.debug(msg)
dummy = self.LoadRc(RcFileName)
dummy = self.ValidateRc()
msg = '%s object initialized'%self.Identifier ; logging.info(msg)
msg = '%s version: %s'%(self.Identifier,self.Version) ; logging.info(msg)
def CreateRunRC(rc_da_shell,rc_tm5):
"""
Create the tm5-runtime.rc file which is read by initexit.F90 to control time loop and restart from save files
We replace values for start and end date, as well as save and output folders, with values from the da shell.
"""
rc_runtm5={}
def __str__(self):
""" String representation of the TM5 object """
rc_runtm5['year1'] = rc_da_shell['startdate'].year
rc_runtm5['month1'] = rc_da_shell['startdate'].month
rc_runtm5['day1'] = rc_da_shell['startdate'].day
rc_runtm5['hour1'] = rc_da_shell['startdate'].hour
rc_runtm5['minu1'] = 0
rc_runtm5['sec1'] = 0
return "This is a %s object, version %s"%(self.Identifier,self.Version)
rc_runtm5['year2'] = rc_da_shell['enddate'].year
rc_runtm5['month2'] = rc_da_shell['enddate'].month
rc_runtm5['day2'] = rc_da_shell['enddate'].day
rc_runtm5['hour2'] = rc_da_shell['enddate'].hour
rc_runtm5['minu2'] = 0
rc_runtm5['sec2'] = 0
def LoadRc(self,RcFileName):
""" This method loads a TM5 rc-file with settings for this simulation """
if rc_da_shell['time.restart'] == True:
rc_runtm5['istart'] = 3
else:
rc_runtm5['istart'] = rc_tm5['istart']
self.tm_settings = rc.read(RcFileName)
self.RcFileName = RcFileName
self.Tm5RcLoaded = True
rc_runtm5['savedir'] = rc_da_shell['dir.save']
rc_runtm5['outputdir'] = rc_da_shell['dir.output']
msg = 'TM5 rc-file loaded successfully' ; logging.info(msg)
tm5rcfilename = os.path.join(rc_tm5['rundir'],'tm5_runtime.rc')
rc.write(tm5rcfilename,rc_runtm5)
return True
def PrepareExe(rc_da_shell):
"""
Prepare a forward model TM5 run, this consists of:
def ValidateRc(self):
"""
Validate the contents of the tm_settings dictionary and add extra values
"""
import datetime
for k,v in self.tm_settings.iteritems():
if v == 'True' : self.tm_settings[k] = True
if v == 'False': self.tm_settings[k] = False
if 'date' in k : self.tm_settings[k] = datetime.datetime.strptime(v,'%Y-%m-%d %H:%M:%S')
if 'time.start' in k :
self.tm_settings[k] = datetime.datetime.strptime(v,'%Y%m%d%H')
if 'time.final' in k :
self.tm_settings[k] = datetime.datetime.strptime(v,'%Y%m%d%H')
for key in needed_rc_items:
if not self.tm_settings.has_key(key):
status,msg = ( False,'Missing a required value in rc-file : %s' % key)
logging.error(msg)
raise IOError,msg
status,msg = ( True,'rc-file has been validated succesfully' ) ; logging.debug(msg)
def ModifyRC(self,NewValues):
""" modify parts of the tm5 settings to give control of file locations to the DA shell
instead of to the tm5.rc script. Note that most parameters remain unchanged because they do
not pertain to the actual run in progress, but to the TM5 model as a whole.
Currently, only the savedir and outputdir variables are replaced so that TM5 writes and reads from
the DA system directories.
Also, we add a new rc-variable which describes the inputdir for the da system
Note that we replace these values in all {key,value} pairs of the tm5.rc file!
"""
- reading the TM5 rc-file,
- validating it,
- modifying the values,
- Creating a tm5_runtime.rc file
- Removing the existing tm5.ok file if present
"""
for k,v in NewValues.iteritems():
if self.tm_settings.has_key(k):
self.tm_settings[k] = v
logging.debug('Replaced tm5 rc-item %s ' % k)
else:
logging.debug('Skipped new rc-item %s ' % k)
from tools_da import ValidateRC
# Get tm5.rc parameters defined for this forecast model run, only needed for location, etc.
def WriteRc(self):
""" Write the rc-file settings to a tm5.rc file in the rundir"""
rc_tm5 = rc.read(rc_da_shell['forecast.model.rc'],silent = True)
tm5rcfilename = os.path.join(self.tm_settings['rundir'],'tm5.rc')
ValidateRC(rc_tm5,needed_rc_items)
dummy = rc.write(tm5rcfilename,self.tm_settings)
# Write a modified TM5 model rc-file in which run/break times are defined by our da system
msg = "Modified tm5.rc written (%s)" % tm5rcfilename ;logging.debug(msg)
ModifyRC(rc_da_shell,rc_tm5)
def WriteRunRc(self):
"""
Create the tm5-runtime.rc file which is read by initexit.F90 to control time loop and restart from save files
"""
# Create a TM5 runtime rc-file in which run/break times are defined by our da system
tm5rcfilename = os.path.join(self.tm_settings['rundir'],'tm5_runtime.rc')
CreateRunRC(rc_da_shell,rc_tm5)
dummy = rc.write(tm5rcfilename,self.tm_settings)
# Copy the pre-compiled MPI wrapper to the execution directory
rc_runtm5={}
targetdir = os.path.join(rc_da_shell['dir.exec'],'tm5')
rc_runtm5['year1'] = self.tm_settings['time.start'].year
rc_runtm5['month1'] = self.tm_settings['time.start'].month
rc_runtm5['day1'] = self.tm_settings['time.start'].day
rc_runtm5['hour1'] = self.tm_settings['time.start'].hour
rc_runtm5['minu1'] = 0
rc_runtm5['sec1'] = 0
if not os.path.exists(mpi_shell_file):
msg = "Cannot find the mpi_shell wrapper needed for completion (%s)"% mpi_shell_file ; logging.error(msg)
msg = "Please see the 'readme_wrapper.txt' file for instructions to compile" ; logging.error(msg)
raise IOError
rc_runtm5['year2'] = self.tm_settings['time.final'].year
rc_runtm5['month2'] = self.tm_settings['time.final'].month
rc_runtm5['day2'] = self.tm_settings['time.final'].day
rc_runtm5['hour2'] = self.tm_settings['time.final'].hour
rc_runtm5['minu2'] = 0
rc_runtm5['sec2'] = 0
shutil.copy(mpi_shell_file,targetdir)
rc_runtm5['istart'] = self.tm_settings['istart']
rc_runtm5['savedir'] = self.tm_settings['savedir']
rc_runtm5['outputdir'] = self.tm_settings['outputdir']
# Remove the tm5.ok file from a previous run, placed back only if a successful TM5 run is executed
rc.write(tm5rcfilename,rc_runtm5)
okfile = os.path.join(targetdir,'tm5.ok')
if os.path.exists(okfile): os.remove(okfile)
msg = "Modified tm5_runtime.rc written (%s)" % tm5rcfilename ;logging.debug(msg)
return rc_tm5
def StartExe(rc_da_shell,rc_tm5):
"""
def Run(self):
"""
Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is
spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and
only if successfull on all processors will execution of the shell continue.
"""
cwd = os.getcwd()
targetdir = os.path.join(self.tm_settings['rundir'])
# Go to executable directory and start the subprocess, using a new logfile
os.chdir(targetdir)
logging.debug('Changing directory to %s ' % targetdir )
self.ModelLogFilename = 'tm5.log'
modellogfile = open(self.ModelLogFilename,'w')
logging.info('Logging model output to %s ' % self.ModelLogFilename)
# Next, make sure there is an actual model version compiled and ready to execute
self.Tm5Executable = os.path.join(targetdir,'tm5.x')
if not os.path.exists(self.Tm5Executable):
msg = "Required TM5 executable was not found %s"%self.Tm5Executable ; logging.error(msg)
msg = "Please compile the model with the specified rc-file and the regular TM5 scripts first" ; logging.error(msg)
raise IOError
# Remove the tm5.ok file from a previous run, placed back only if a successful TM5 run is executed
okfile = 'tm5.ok'
if os.path.exists(okfile): os.remove(okfile)
# Open logfile and spawn model, wait for finish and return code
logging.info('Starting model executable as subprocess ')
#cmd = ['openmpirun','-np', rc_da_shell['forecast.nmembers'],mpi_shell_file,'./tm5.x']
#cmd = ['mpirun','-np', rc_da_shell['forecast.nmembers'],mpi_shell_file,'./tm5.x']
cmd = ['./tm5.x']
code = subprocess.call(cmd,stdout=modellogfile,stderr=modellogfile)
modellogfile.close()
# Interpret/Handle exit code
if not os.path.exists(okfile): code = -1
if code == 0:
logging.info('Finished model executable succesfully (%s)'%code)
self.Status = 'Success'
else:
logging.error('Error in model executable return code: %s ' % code)
logging.info('Inspect [%s] to find output from model executable ' % modellogfilename)
self.Status = 'Failed'
raise OSError
# Return to working directory
os.chdir(cwd)
################### End Class TM5 ###################
def PrepareExe(rc_da_shell):
"""
Prepare a forward model TM5 run, this consists of:
- reading the TM5 rc-file,
- validating it,
- modifying the values,
- Creating a tm5_runtime.rc file
- Removing the existing tm5.ok file if present
"""
from tools_da import CreateLinks
from tools_da import CreateLinks, CreateDirs, ParseTimes
# Get tm5.rc parameters defined for this forecast model run, only needed for location, etc.
Tm5Model = TM5( rc_da_shell['forecast.model.rc'] )
# Create a link to the rundirectory of the TM5 model
sourcedir = rc_tm5['rundir']
targetdir = os.path.join(rc_da_shell['dir.exec'],'tm5')
sourcedir = Tm5Model.tm_settings['rundir']