Commit 025049c2 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

all model flow is now moved to a da/tools/pipeline.py module

parent 0b044f8b
......@@ -4,10 +4,12 @@
datadir : /data/CO2/carbontracker/ct08/
obs.input.dir : ${datadir}/obsnc/
obs.input.fname : obs_final
ocn.covariance : ${datadir}/covariance_ocn_base.nc
ocn.covariance : ${datadir}/oif_p3_era40.dpco2.2000.01.hdf
bio.covariance : ${datadir}/covariance_bio_olson19.nc
deltaco2.prefix : oif_p3_era40.dpco2
regtype : olson19_oif30
nparameters : 220
nparameters : 240
random.seed : 4385
regionsfile : transcom_olson19_oif30.hdf
#!/usr/bin/env python
# control.py
"""
Author : peters
Revision History:
File created on 26 Aug 2010.
"""
import os
import sys
import logging
import datetime
################### Begin Class DaSystem ###################
class DaSystem(object):
""" Information on the data assimilation system used. This is normally an rc-file with settings.
"""
def __init__(self,rcfilename):
"""
Initialization occurs from passed rc-file name
"""
self.LoadRc(rcfilename)
def __str__(self):
"""
String representation of a DaInfo object
"""
msg = "===============================================================" ; print msg
msg = "DA System Info rc-file is %s" % self.RcFileName ; print msg
msg = "===============================================================" ; print msg
return ""
def LoadRc(self,RcFileName):
"""
This method loads a DA System Info rc-file with settings for this simulation
"""
import da.tools.rc as rc
self.da_settings = rc.read(RcFileName)
self.RcFileName = RcFileName
self.DaRcLoaded = True
msg = 'DA System Info rc-file (%s) loaded successfully'%self.RcFileName ; logging.info(msg)
return True
def Validate(self, needed_rc_items):
"""
Validate the contents of the rc-file given a dictionary of required keys
"""
for k,v in self.da_settings.iteritems():
if v == 'True' : self.da_settings[k] = True
if v == 'False': self.da_settings[k] = False
for key in needed_rc_items:
if not self.da_settings.has_key(key):
status,msg = ( False,'Missing a required value in rc-file : %s' % key)
logging.error(msg)
raise IOError,msg
status,msg = ( True,'DA System Info settings have been validated succesfully' ) ; logging.debug(msg)
return None
################### End Class DaSystem ###################
if __name__ == "__main__":
pass
#!/usr/bin/env python
# pipeline.py
"""
Author : peters
Revision History:
File created on 26 Aug 2010.
"""
import logging
import os
import sys
import shutil
import datetime
import da.tools.rc as rc
header = '\n\n *************************************** '
footer = ' *************************************** \n '
validprocess=['jobstart','jobinput','sample','invert','propagate','resubmit','all']
system_not_implemented_message = "Unable to import a system specific class or function, exiting..."
def JobStart(opts,args):
""" Set up the job specific directory structure and create an expanded rc-file """
from da.tools.initexit import CycleControl
from da.baseclasses.control import DaSystem
from da.ct.tools import needed_rc_items
# First create a CycleControl object that handles all the details of the da cycle
DaCycle = CycleControl(opts,args)
# Next create a DaSystem object that contains all the details related to the DA system
DaSystem = DaSystem(DaCycle.da_settings['da.system.rc'])
dummy = DaSystem.Validate(needed_rc_items)
# And add the DaSystem info the DaCycle object
DaCycle.DaSystem = DaSystem
# Proceed with the initialization of this cycle
dummy = DaCycle.Initialize()
return DaCycle
def JobInput(DaCycle):
""" Set up the input data for the forward model: obs and parameters/fluxes"""
from da.ct.statevector import CtStateVector as StateVector
dims = ( int(DaCycle.da_settings['time.nlag']),
int(DaCycle.da_settings['forecast.nmembers']),
int(DaCycle.DaSystem.da_settings['nparameters']),
)
StateVector = StateVector(dims)
nlag = dims[0]
# We now have an empty StateVector object that we need to populate with data. If this is a continuation from a previous cycle, we can read
# the previous StateVector values from a NetCDF file in the save/ directory. If this is the first cycle, we need to populate the StateVector
# with new values for each week. After we have constructed the StateVector, it will be propagated by one cycle length so it is ready to be used
# in the current cycle
if not DaCycle.da_settings['time.restart']:
# Fill each week from n=1 to n=nlag with a new ensemble
for n in range(0,nlag):
cov = StateVector.GetCovariance(DaCycle.DaSystem)
dummy = StateVector.MakeNewEnsemble(n+1,cov)
else:
# Read the StateVector data from file
savedir = DaCycle.da_settings['dir.save']
filtertime = DaCycle.da_settings['time.start'].strftime('%Y%m%d')
filename = os.path.join(savedir,'savestate.nc')
StateVector.ReadFromFile(filename)
# Now propagate the ensemble by one cycle to prepare for the current cycle
dummy = StateVector.Propagate()
return StateVector
def SampleState(DaCycle,StateVector ):
""" Sample the filter state for the inversion """
from da.tools.pipeline import WriteEnsembleData
from da.tools.pipeline import PrepareObs
from da.tools.pipeline import RunForecastModel
from da.tools.pipeline import AddSamplesToState
from da.tools.pipeline import AddObsToState
# Before a forecast step, save all the data to a save/tmp directory so we can later recover it before the propagation step.
# This is especially important for:
# (i) The transport model restart data which holds the background mixing ratios. This is needed to run the model one cycle forward
# (ii) The random numbers (or the seed for the random number generator) so we can recreate the ensembles if needed
dummy = DaCycle.MoveSaveData(io_option='store',save_option='partial',filter=[])
msg = "All restart data have been copied to the save/tmp directory for future use" ; logging.debug(msg)
nlag = int(DaCycle.da_settings['time.nlag'])
msg = "Sampling model will be run over %d cycles"% nlag ; logging.info(msg)
for lag in range(nlag):
msg = header+".....Ensemble Kalman Filter at lag %d"% (int(lag)+1,) ; logging.info(msg)
# Implement something that writes the ensemble member parameter info to file, or manipulates them further into the
# type of info needed in your transport model
dummy = WriteEnsembleData(DaCycle, StateVector, lag)
# Get the observations corresponding to the current cycle
Samples = PrepareObs(DaCycle,lag)
# Run the forecast model
dummy = RunForecastModel(DaCycle,lag)
# Read forecast model samples that were written to NetCDF files by each member, also add the obs to the statevector
dummy = AddObsToState(DaCycle, StateVector, Samples, lag)
dummy = AddSamplesToState(StateVector, lag)
# Optionally, post-processing of the model output can be added that deals for instance with
# sub-sampling of time series, vertical averaging, etc.
return None
def Invert(DaCycle, StateVector ):
""" Perform the inverse calculation """
from da.tools.pipeline import Optimize
dummy = Optimize(DaCycle, StateVector)
return None
def Advance( DaCycle, StateVector):
""" Advance the filter state to the next step """
from da.tools.pipeline import WriteEnsembleData
from da.tools.pipeline import RunForecastModel
from da.tools.pipeline import AddSamplesToState
from da.tools.pipeline import PrepareObs
from da.tools.pipeline import AddObsToState
# This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)
# Then, restore model state from the start of the filter
dummy = DaCycle.MoveSaveData(io_option='restore',save_option='partial',filter=[])
msg = "All restart data have been recovered from the save/tmp directory, this "+ \
"resets the filter to %s "%DaCycle.da_settings['time.start'] ; logging.debug(msg)
msg = "Sampling model will be run over 1 cycle" ; logging.info(msg)
# First, get new sampling dates to collect the final model samples
Samples = PrepareObs(DaCycle,0)
# Write the new ensembles from the statevector to disk, for the cycle that is now finished
dummy = WriteEnsembleData(DaCycle, StateVector, 0)
# Run the forecast model once more to advance the background CO2 state, and to sample the model once more
dummy = RunForecastModel(DaCycle,0)
# Read forecast model samples that were written to NetCDF files by each member, also add the obs to the statevector
dummy = AddObsToState(DaCycle, StateVector, Samples, 0)
dummy = AddSamplesToState(StateVector, 0)
return None
def SaveAndSubmit( DaCycle, StateVector):
""" Save the model state and submit the next job """
savedir = DaCycle.da_settings['dir.save']
filename = os.path.join(savedir,'savestate.nc')
dummy = StateVector.WriteToFile(filename)
dummy = DaCycle.Finalize()
return None
def PrepareObs(DaCycle,lag):
""" prepare observations """
if DaCycle.da_settings['da.system'] == 'CarbonTracker':
import da.ct.obs as obs
else:
msg = system_not_implemented_message ; logging.error(msg)
raise ImportError
return obs.PrepareObs(DaCycle,lag)
def AddObsToState(DaCycle, StateVector, Samples, lag):
""" Add the observation objects to the ensemble members in the StateVector """
if DaCycle.da_settings['da.system'] != 'CarbonTracker': raise Exception,"CarbonTracker specific code in this routine!"
for Member in StateVector.EnsembleMembers[lag]:
Member.ModelSample = Samples
Member.SampleInputFile = os.path.join(DaCycle.da_settings['dir.output'],'samples.%03d.nc'%Member.membernumber)
#WP !!!! This function should become a generic call to a base class "model" from which other model classes can inherit and modify if needed
def RunForecastModel(DaCycle,lag):
"""Prepare and execute a forecast step using an external Fortran model. Note that the flavor of model
used is determined in the very first line where the import statement of module "model" depends on a
setting in your da.rc file. After that choice, the module written specifically for a particular
application takes over. There are a few required variables and methods in such a module:
version [variable, string] : defines the version number of the module, e.g. "1.0"
identifier [variable, string] : identifies the model used as a string variable, e.g. "TM5"
PrepareExe (method) : A method that preps the model simulation. It can for instance create an input
parameter file for the model (tm5.rc), or execute some scripts needed before the
actual execution starts (get_meteo). After this step, a call to the model
executable should start a succesfull simulation
Run (method) : Start the executable. How this is done depends on your model and might involve
submitting a string of jobs to the queue, or simply spawning a subprocess, or ...
"""
# import modules, note that depending on the type of assimilation system, different submodules are imported!
from da.tools.general import AdvanceTime
if DaCycle.da_settings['forecast.model'] == 'TM5': import da.tm5.model as model
elif DaCycle.da_settings['forecast.model'] == 'SIBCASA': import da.sibcasa.model as model
elif DaCycle.da_settings['forecast.model'] == 'WRF': import da.wrf.model as model
####### FROM HERE ON, PROCESSES ARE MODEL DEPENDENT AND NEED TO BE IMPLEMENTED ON A PER-MODEL BASIS ############
msg = "Using %s as forecast model" % model.identifier ; logging.debug(msg)
cyclelen = DaCycle.da_settings['cyclelength']
if lag == 0:
startdate = DaCycle.da_settings['time.start']
else:
startdate = DaCycle.da_settings['time.sample.start']
enddate = AdvanceTime(startdate,cyclelen)
DaCycle.da_settings['time.sample.start'] = startdate
DaCycle.da_settings['time.sample.end'] = enddate
DaCycle.da_settings['time.sample.window'] = lag
msg = "New simulation interval set : " ; logging.info(msg)
msg = " start date : %s " % startdate.strftime('%F %H:%M') ; logging.info(msg)
msg = " end date : %s " % enddate.strftime('%F %H:%M') ; logging.info(msg)
# Call a special method that creates a model instance, and overwrites the model settings with those from the DA cycle
executable = model.DaInitialize(DaCycle.da_settings)
# Run the forward model
#status = executable.Run(DaCycle.da_settings['forecast.nmembers'])
status = 0
if status != 0 :
msg = "Running the sampling model has failed, exiting..." ; logging.error(msg)
raise Exception
status = executable.SaveData()
# Advance the sample time
startdate = AdvanceTime( startdate, cyclelen)
DaCycle.da_settings['time.sample.start'] = startdate
return status
def WriteEnsembleData(DaCycle, StateVector, lag):
""" Write the data needed by each model ensemble member to disk"""
members = StateVector.EnsembleMembers[lag]
for mem in members:
mem.WriteToFile(DaCycle.da_settings['dir.input'])
msg = "Ensemble member parameter files were written to disk for lag = %d" % lag ; logging.info(msg)
return None
def AddSamplesToState(StateVector, lag):
""" Read forecast model samples for each ensemble member into the StateVector's ensemble members. """
# Add model written samples to the ensemble members Observations objects
silent=False
for Member in StateVector.EnsembleMembers[lag]:
print Member.ModelSample
Member.ModelSample.AddSimulations(Member.SampleInputFile,silent=silent)
silent=True
msg = "Added samples from the forecast model to the StateVector" ; logging.debug(msg)
return None
def Optimize(DaCycle, StateVector):
""" Perform least-squares minimization"""
if DaCycle.da_settings['da.system'] == 'CarbonTracker':
from da.ct.optimizer import CtOptimizer as Optimizer
else:
msg = system_not_implemented_message ; logging.error(msg)
raise ImportError
nobs = len(StateVector.EnsembleMembers[-1][0].ModelSample.MrData.getvalues('obs'))
dims = ( int(DaCycle.da_settings['time.nlag']),
int(DaCycle.da_settings['forecast.nmembers']),
int(DaCycle.DaSystem.da_settings['nparameters']),
nobs, )
optimizer = Optimizer(dims)
dummy = optimizer.StateToMatrix(StateVector )
dummy = optimizer.SetLocalization('None')
if not DaCycle.DaSystem.da_settings.has_key('opt.algorithm'):
msg = "There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)" ; logging.info(msg)
msg = "...using serial algorithm as default..." ; logging.info(msg)
dummy = optimizer.SerialMinimumLeastSquares()
elif DaCycle.DaSystem.da_settings['opt.algorithm'] == 'serial':
msg = "Using the serial minimum least squares algorithm to solve ENKF equations" ; logging.info(msg)
dummy = optimizer.SerialMinimumLeastSquares()
elif DaCycle.DaSystem.da_settings['opt.algorithm'] == 'bulk':
msg = "Using the bulk minimum least squares algorithm to solve ENKF equations" ; logging.info(msg)
dummy = optimizer.BulkMinimumLeastSquares()
dummy = optimizer.MatrixToState(StateVector )
StateVector.isOptimized = True
if __name__ == "__main__":
pass
......@@ -24,169 +24,6 @@ File created on 29 Sep 2009.
#$ -v NETCDF
#$ -v LD_LIBRARY_PATH
header = '\n\n *************************************** '
footer = ' *************************************** \n '
validprocess=['jobstart','jobinput','sample','invert','propagate','resubmit','all']
def JobStart(opts,args):
""" Set up the job specific directory structure and create an expanded rc-file """
from da.tools.initexit import CycleControl
DaCycle = CycleControl(opts,args)
DaCycle.Initialize()
return DaCycle
def JobInput():
""" Set up the input data for the forward model: obs and parameters/fluxes"""
from da.ct.statevector import CtStateVector
dims = ( int(CycleInfo.da_settings['time.nlag']),
int(CycleInfo.da_settings['forecast.nmembers']),
int(CycleInfo.DaSystem.da_settings['nparameters']),
)
StateVector = CtStateVector(dims)
nlag = dims[0]
# We now have an empty CtStateVector object that we need to populate with data. If this is a continuation from a previous cycle, we can read
# the previous StateVector values from a NetCDF file in the save/ directory. If this is the first cycle, we need to populate the CtStateVector
# with new values for each week. After we have constructed the StateVector, it will be propagated by one cycle length so it is ready to be used
# in the current cycle
if not CycleInfo.da_settings['time.restart']:
# Fill each week from n=1 to n=nlag with a new ensemble
for n in range(0,nlag):
cov = StateVector.GetCovariance(CycleInfo.DaSystem)
dummy = StateVector.MakeNewEnsemble(n+1,cov)
else:
# Read the StateVector data from file
savedir = CycleInfo.da_settings['dir.save']
filtertime = CycleInfo.da_settings['time.start'].strftime('%Y%m%d')
filename = os.path.join(savedir,'savestate.nc')
StateVector.ReadFromFile(filename)
# Now propagate the ensemble by one cycle to prepare for the current cycle
dummy = StateVector.Propagate()
return StateVector
def SampleState( ):
""" Sample the filter state for the inversion """
from da.tools.general import WriteEnsembleData
from da.tools.general import RunForecastModel
from da.tools.general import AddSamplesToState
from da.tools.general import PrepareObs
from da.tools.general import AddObsToState
# Before a forecast step, save all the data to a save/tmp directory so we can later recover it before the propagation step.
# This is especially important for:
# (i) The save*tm5.hdf data which holds the background CO2 concentrations. This is needed to run the model one cycle forward
# (ii) The random numbers (or the seed for the random number generator) so we can recreate the ensembles if needed
dummy = CycleInfo.MoveSaveData(io_option='store',save_option='partial',filter=[])
msg = "All restart data have been copied to the save/tmp directory for future use" ; logging.debug(msg)
nlag = int(CycleInfo.da_settings['time.nlag'])
msg = "Sampling model will be run over %d cycles"% nlag ; logging.info(msg)
for lag in range(nlag):
msg = header+".....Ensemble Kalman Filter at lag %d"% (int(lag)+1,) ; logging.info(msg)
# Implement something that writes the ensemble member parameter info to file, or manipulates them further into the
# type of info needed in TM5/WRF/SIBCASA
dummy = WriteEnsembleData(CycleInfo, StateVector, lag)
# Get the observations corresponding to the current cycle
Samples = PrepareObs(CycleInfo,lag)
# Run the forecast model
dummy = RunForecastModel(CycleInfo,lag)
# Read forecast model samples that were written to NetCDF files by each member, also add the obs to the statevector
dummy = AddObsToState(CycleInfo, StateVector, Samples, lag)
dummy = AddSamplesToState(StateVector, lag)
# Optionally, post-processing of the model output can be added that deals for instance with
# sub-sampling of time series, vertical averaging, etc.
return None
def Invert( ):
""" Perform the inverse calculation """
from da.tools.general import Optimize
dummy = Optimize(CycleInfo, StateVector)
return None
def Advance( ):
""" Advance the filter state to the next step """
from da.tools.general import WriteEnsembleData
from da.tools.general import RunForecastModel
from da.tools.general import AddSamplesToState
from da.tools.general import PrepareObs
from da.tools.general import AddObsToState
# This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)
# Then, restore model state from the start of the filter
dummy = CycleInfo.MoveSaveData(io_option='restore',save_option='partial',filter=[])
msg = "All restart data have been recovered from the save/tmp directory, this "+ \
"resets the filter to %s "%CycleInfo.da_settings['time.start'] ; logging.debug(msg)
msg = "Sampling model will be run over 1 cycle" ; logging.info(msg)
# First, get new sampling dates to collect the final model samples
Samples = PrepareObs(CycleInfo,0)
# Write the new ensembles from the statevector to disk, for the cycle that is now finished
dummy = WriteEnsembleData(CycleInfo, StateVector, 0)
# Run the forecast model once more to advance the background CO2 state, and to sample the model once more
dummy = RunForecastModel(CycleInfo,0)
# Read forecast model samples that were written to NetCDF files by each member, also add the obs to the statevector
dummy = AddObsToState(CycleInfo, StateVector, Samples, 0)
dummy = AddSamplesToState(StateVector, 0)
return None
def SaveAndSubmit( ):
""" Save the model state and submit the next job """
savedir = CycleInfo.da_settings['dir.save']
filename = os.path.join(savedir,'savestate.nc')
dummy = StateVector.WriteToFile(filename)
dummy = CycleInfo.Finalize()
return None
if __name__ == "__main__":
import sys
import os
......@@ -199,11 +36,11 @@ if __name__ == "__main__":
# Import methods and classes contained in this package
from da.tools.general import CleanUpCycle
from da.tools.general import ValidateOptsArgs
from da.tools.general import ParseOptions