Commit 2079c5a7 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

moved all calls to pipeline giving the user a simpler script

parent 40b00b65
......@@ -22,6 +22,67 @@ validprocess=['jobstart','jobinput','sample','invert','propagate','resubmit','al
system_not_implemented_message = "Unable to import a system specific class or function, exiting..."
def Main(Samples,StateVector,ObsOperator,Optimizer):
""" The main point of entry for the pipeline """
# Append current working dir to path
sys.path.append(os.getcwd())
# Import methods and classes contained in this package
from da.tools.initexit import CleanUpCycle
from da.tools.initexit import ValidateOptsArgs
from da.tools.initexit import ParseOptions
from da.tools.pipeline import *
from da.tools.general import StoreData,RestoreData
# Parse options from the command line
opts, args = ParseOptions()
# Validate Options and arguments passed
opts,args = ValidateOptsArgs(opts,args)
# Create the Cycle Control object for this job
DaCycle = JobStart(opts,args)
msg = header+"Initializing Da Cycle"+footer ; logging.info(msg)
dummy = DaCycle.Initialize()
msg = header+"starting JobInput"+footer ; logging.info(msg)
StateVector = JobInput(DaCycle, StateVector)
msg = header+"starting SampleState"+footer ; logging.info(msg)
dummy = SampleState(DaCycle,Samples,StateVector,ObsOperator)
msg = header+"starting Invert"+footer ; logging.info(msg)
dummy = Invert(DaCycle, StateVector, Optimizer)
msg = header+"starting Advance"+footer ; logging.info(msg)
dummy = Advance(DaCycle, Samples, StateVector, ObsOperator)
msg = header+"starting SaveAndSubmit"+footer ; logging.info(msg)
dummy = SaveAndSubmit(DaCycle, StateVector)
msg = "Cycle finished...exiting" ; logging.info(msg)
dummy = CleanUpCycle(DaCycle)
sys.exit(0)
####################################################################################################
def JobStart(opts,args):
""" Set up the job specific directory structure and create an expanded rc-file """
from da.tools.initexit import CycleControl
......@@ -59,19 +120,15 @@ def JobStart(opts,args):
return DaCycle
def JobInput(DaCycle):
def JobInput(DaCycle, StateVector):
""" Set up the input data for the forward model: obs and parameters/fluxes"""
if DaCycle.da_settings['da.system'] == 'CarbonTracker':
from da.ct.statevector import CtStateVector as StateVector
else:
from da.baseclasses.statevector import StateVector
dims = ( int(DaCycle.da_settings['time.nlag']),
int(DaCycle.da_settings['forecast.nmembers']),
int(DaCycle.DaSystem.da_settings['nparameters']),
)
StateVector = StateVector(dims)
dummy = StateVector.Initialize(dims)
nlag = dims[0]
......@@ -104,14 +161,10 @@ def JobInput(DaCycle):
return StateVector
def SampleState(DaCycle,StateVector ):
def SampleState(DaCycle,Samples,StateVector, ObservationOperator):
""" Sample the filter state for the inversion """
from da.tools.pipeline import WriteEnsembleData
from da.tools.pipeline import PrepareObs
from da.tools.pipeline import RunForecastModel
from da.tools.pipeline import AddSamplesToState
from da.tools.pipeline import AddObsToState
from da.tools.general import AdvanceTime
# Before a forecast step, save all the data to a save/tmp directory so we can later recover it before the propagation step.
# This is especially important for:
......@@ -134,39 +187,74 @@ def SampleState(DaCycle,StateVector ):
dummy = WriteEnsembleData(DaCycle, StateVector, lag)
# Get the observations corresponding to the current cycle
DaSystem = DaCycle.DaSystem
sampledate = DaCycle.da_settings['time.start']
sampledate = AdvanceTime(sampledate,lag*int(DaCycle.da_settings['cyclelength']))
dummy = Samples.Initialize(DaSystem,sampledate)
Samples = PrepareObs(DaCycle,lag)
dummy = Samples.Validate()
dummy = Samples.AddObs()
# Run the forecast model
dummy = RunForecastModel(DaCycle,lag)
dummy = RunForecastModel(DaCycle,ObservationOperator,lag)
# Read forecast model samples that were written to NetCDF files by each member, also add the obs to the statevector
dummy = AddObsToState(DaCycle, StateVector, Samples, lag)
silent = False
for Member in StateVector.EnsembleMembers[lag]:
filename = os.path.join(DaCycle.da_settings['dir.output'],'samples.%03d.nc'%Member.membernumber)
dummy = Samples.AddSimulations(filename,silent)
Member.ModelSample = Samples
silent=True
dummy = AddSamplesToState(StateVector, lag)
msg = "Added samples from the forecast model to each member of the StateVector" ; logging.debug(msg)
# Optionally, post-processing of the model output can be added that deals for instance with
# sub-sampling of time series, vertical averaging, etc.
return None
def Invert(DaCycle, StateVector ):
def Invert(DaCycle, StateVector, Optimizer ):
""" Perform the inverse calculation """
from da.tools.pipeline import Optimize
dummy = Optimize(DaCycle, StateVector)
nobs = len(StateVector.EnsembleMembers[-1][0].ModelSample.MrData.getvalues('obs'))
dims = ( int(DaCycle.da_settings['time.nlag']),
int(DaCycle.da_settings['forecast.nmembers']),
int(DaCycle.DaSystem.da_settings['nparameters']),
nobs, )
dummy = Optimizer.Initialize(dims)
dummy = Optimizer.StateToMatrix(StateVector )
dummy = Optimizer.SetLocalization('None')
if not DaCycle.DaSystem.da_settings.has_key('opt.algorithm'):
msg = "There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)" ; logging.info(msg)
msg = "...using serial algorithm as default..." ; logging.info(msg)
dummy = Optimizer.SerialMinimumLeastSquares()
elif DaCycle.DaSystem.da_settings['opt.algorithm'] == 'serial':
msg = "Using the serial minimum least squares algorithm to solve ENKF equations" ; logging.info(msg)
dummy = Optimizer.SerialMinimumLeastSquares()
elif DaCycle.DaSystem.da_settings['opt.algorithm'] == 'bulk':
msg = "Using the bulk minimum least squares algorithm to solve ENKF equations" ; logging.info(msg)
dummy = Optimizer.BulkMinimumLeastSquares()
dummy = Optimizer.MatrixToState(StateVector )
StateVector.isOptimized = True
return None
def Advance( DaCycle, StateVector):
def Advance( DaCycle, Samples, StateVector, ObservationOperator):
""" Advance the filter state to the next step """
from da.tools.pipeline import WriteEnsembleData
from da.tools.pipeline import RunForecastModel
from da.tools.pipeline import AddSamplesToState
from da.tools.pipeline import PrepareObs
from da.tools.pipeline import AddObsToState
# This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)
......@@ -180,7 +268,15 @@ def Advance( DaCycle, StateVector):
# First, get new sampling dates to collect the final model samples
Samples = PrepareObs(DaCycle,0)
# Get the observations corresponding to the current cycle
DaSystem = DaCycle.DaSystem
sampledate = DaCycle.da_settings['time.start']
dummy = Samples.Initialize(DaSystem,sampledate)
dummy = Samples.Validate()
dummy = Samples.AddObs()
# Write the new ensembles from the statevector to disk, for the cycle that is now finished
......@@ -188,13 +284,22 @@ def Advance( DaCycle, StateVector):
# Run the forecast model once more to advance the background CO2 state, and to sample the model once more
dummy = RunForecastModel(DaCycle,0)
dummy = RunForecastModel(DaCycle,ObservationOperator,0)
# Read forecast model samples that were written to NetCDF files by each member, also add the obs to the statevector
dummy = Samples.AddObs()
# Read forecast model samples that were written to NetCDF files by each member, also add the obs to the statevector
dummy = AddObsToState(DaCycle, StateVector, Samples, 0)
silent = False
for Member in StateVector.EnsembleMembers[0]:
filename = os.path.join(DaCycle.da_settings['dir.output'],'samples.%03d.nc'%Member.membernumber)
dummy = Samples.AddSimulations(filename,silent)
Member.ModelSample = Samples
silent=True
dummy = AddSamplesToState(StateVector, 0)
msg = "Added samples from the forecast model to each member of the StateVector" ; logging.debug(msg)
return None
......@@ -209,27 +314,7 @@ def SaveAndSubmit( DaCycle, StateVector):
return None
def PrepareObs(DaCycle,lag):
""" prepare observations """
if DaCycle.da_settings['da.system'] == 'CarbonTracker':
import da.ct.obs as obs
else:
msg = system_not_implemented_message ; logging.error(msg)
raise ImportError
return obs.PrepareObs(DaCycle,lag)
def AddObsToState(DaCycle, StateVector, Samples, lag):
""" Add the observation objects to the ensemble members in the StateVector """
if DaCycle.da_settings['da.system'] != 'CarbonTracker': raise Exception,"CarbonTracker specific code in this routine!"
for Member in StateVector.EnsembleMembers[lag]:
Member.ModelSample = Samples
Member.SampleInputFile = os.path.join(DaCycle.da_settings['dir.output'],'samples.%03d.nc'%Member.membernumber)
def RunForecastModel(DaCycle,lag):
def RunForecastModel(DaCycle,ObsOperator,lag):
"""Prepare and execute a forecast step using an external Fortran model. Note that the flavor of model
used is determined in the very first line where the import statement of module "model" depends on a
setting in your da.rc file. After that choice, the module written specifically for a particular
......@@ -248,7 +333,6 @@ def RunForecastModel(DaCycle,lag):
# import modules, note that depending on the type of assimilation system, different submodules are imported!
from da.tools.general import AdvanceTime
if DaCycle.da_settings['forecast.model'] == 'TM5': from da.tm5.observationoperator import TM5ObservationOperator as ObservationOperator
cyclelen = DaCycle.da_settings['cyclelength']
......@@ -270,15 +354,11 @@ def RunForecastModel(DaCycle,lag):
####### FROM HERE ON, PROCESSES ARE MODEL DEPENDENT AND NEED TO BE IMPLEMENTED ON A PER-MODEL BASIS ############
# Call a special method that creates a model instance, and overwrites the model settings with those from the DA cycle
ObsOperator = ObservationOperator()
status = ObsOperator.Initialize(DaCycle)
status = ObsOperator.ValidateInput(DaCycle)
status = ObsOperator.Run(DaCycle)
#status = ObsOperator.Run(DaCycle)
dummy = ObsOperator.SaveData()
......@@ -302,68 +382,3 @@ def WriteEnsembleData(DaCycle, StateVector, lag):
return None
def AddSamplesToState(StateVector, lag):
""" Read forecast model samples for each ensemble member into the StateVector's ensemble members. """
# Add model written samples to the ensemble members Observations objects
silent=False
for Member in StateVector.EnsembleMembers[lag]:
print Member.ModelSample
Member.ModelSample.AddSimulations(Member.SampleInputFile,silent=silent)
silent=True
msg = "Added samples from the forecast model to the StateVector" ; logging.debug(msg)
return None
def Optimize(DaCycle, StateVector):
""" Perform least-squares minimization"""
if DaCycle.da_settings['da.system'] == 'CarbonTracker':
from da.ct.optimizer import CtOptimizer as Optimizer
else:
msg = system_not_implemented_message ; logging.error(msg)
raise ImportError
nobs = len(StateVector.EnsembleMembers[-1][0].ModelSample.MrData.getvalues('obs'))
dims = ( int(DaCycle.da_settings['time.nlag']),
int(DaCycle.da_settings['forecast.nmembers']),
int(DaCycle.DaSystem.da_settings['nparameters']),
nobs, )
optimizer = Optimizer(dims)
dummy = optimizer.StateToMatrix(StateVector )
dummy = optimizer.SetLocalization('None')
if not DaCycle.DaSystem.da_settings.has_key('opt.algorithm'):
msg = "There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)" ; logging.info(msg)
msg = "...using serial algorithm as default..." ; logging.info(msg)
dummy = optimizer.SerialMinimumLeastSquares()
elif DaCycle.DaSystem.da_settings['opt.algorithm'] == 'serial':
msg = "Using the serial minimum least squares algorithm to solve ENKF equations" ; logging.info(msg)
dummy = optimizer.SerialMinimumLeastSquares()
elif DaCycle.DaSystem.da_settings['opt.algorithm'] == 'bulk':
msg = "Using the bulk minimum least squares algorithm to solve ENKF equations" ; logging.info(msg)
dummy = optimizer.BulkMinimumLeastSquares()
dummy = optimizer.MatrixToState(StateVector )
StateVector.isOptimized = True
if __name__ == "__main__":
jobparams = {'jobname':'das.jobinut'}
template = DaPlatForm.GetJobTemplate(jobparams)
template += 'cd %s\n'%os.getcwd()
template += './das.py','rc=%s process=jobinput' % self.da_settings['da.restart.fname']
jobfile = DaPlatForm.WriteJob(self,template,'cycle.%s.das'%cd)
jobid = DaPlatForm.SubmitJob(jobfile)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment