Commit 50d20ff8 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

Modified the method in which save/restart data is propagated. The observation...

Modified the method in which save/restart data is propagated. The observation operator now handles its own start/restart files in its private directory, and
passes the information of data that is needed for a restart to the DaCycle through a list. This list gets appended by all methods, and at the end all files of that
list are copied to the restart/current directory. The contents of that directory replace the restart/one-ago directory at the end of the cycle.
parent 000cf913
......@@ -26,14 +26,18 @@ class ObservationOperator(object):
a statevector in the ensemble Kalman filter framework. The methods of this class specify which (external) code
is called to perform the sampling, and which files should be read for input and are written for output.
The baseclasses consist mainly of empty mehtods that require an application specific application
The baseclasses consist mainly of empty methods that require an application specific application
"""
def __init__(self):
def __init__(self, RcFileName):
""" The instance of an ObservationOperator is application dependent """
self.Identifier = self.getid()
self.Version = self.getversion()
self.RestartFileList = []
dummy = self.LoadRc(RcFilename) # load the specified rc-file
dummy = self.ValidateRc() # validate the contents
msg = 'Observation Operator object initialized: %s'%self.Identifier ; logging.info(msg)
......@@ -42,8 +46,11 @@ class ObservationOperator(object):
def getversion(self):
return version
def GetInitialData(self,DaCycle):
""" This method places all initial data needed by an ObservationOperator in the proper folder for the model """
return None
def __str__(self):
return "This is a %s object, version %s"%(self.Identifier,self.Version)
......
......@@ -63,12 +63,16 @@ class TM5ObservationOperator(ObservationOperator):
"""
def __init__(self):
def __init__(self,RcFileName):
""" The instance of an TMObservationOperator is application dependent """
self.Identifier = self.getid() # the identifier gives the model name
self.Version = self.getversion() # the model version used
self.RestartFileList = []
msg = 'Observation Operator object initialized: %s (%s)'%(self.Identifier, self.Version,) ; logging.info(msg)
dummy = self.LoadRc(RcFileName) # load the specified rc-file
dummy = self.ValidateRc() # validate the contents
msg = 'Observation Operator initialized: %s (%s)'%(self.Identifier, self.Version,) ; logging.info(msg)
def getid(self):
return identifier
......@@ -89,8 +93,9 @@ class TM5ObservationOperator(ObservationOperator):
"""
from da.tools.general import CreateLinks
RcFileName = DaCycle['forecast.model.rc']
dummy = self.LoadRc(RcFileName) # load the specified rc-file
# First reload the original tm5.rc file to get unmodified settings
dummy = self.LoadRc(self.RcFileName) # load the specified rc-file
dummy = self.ValidateRc() # validate the contents
# Create a link from TM5 to the rundirectory of the das system
......@@ -107,8 +112,6 @@ class TM5ObservationOperator(ObservationOperator):
'time.start' : DaCycle['time.sample.start'] ,
'time.final' : DaCycle['time.sample.end'] ,
'rundir' : DaCycle['dir.exec.tm5'] ,
'outputdir' : DaCycle['dir.output'] ,
'savedir' : DaCycle['dir.save'] ,
'das.input.dir' : DaCycle['dir.input']
}
......@@ -294,6 +297,49 @@ class TM5ObservationOperator(ObservationOperator):
return 0
def GetInitialData(self,DaCycle):
""" This method places all initial data needed by an ObservationOperator in the proper folder for the model.
For TM5, this means copying the save_*.hdf* files to the dir.save directory from which TM5 will read initial
concentrations for all tracers.
We get the input data from the restart.current directory at 2 times:
(1) When the model starts the forecast over nlag cycles
(2) When the model starts the advance step over 1 cycle
"""
msg = "Moving TM5 model restart data from the restart/current directory to the TM5 save dir" ; logging.debug(msg)
# First get the save.hdf* data for TM5 from the current restart dir of the filter
sourcedir = DaCycle['dir.restart.current']
targetdir = self.tm_settings['savedir']
for file in os.listdir(sourcedir):
file = os.path.join(sourcedir,file)
if os.path.isdir(file): # skip dirs
msg = " [skip] .... %s " % file ; logging.debug(msg)
continue
if not file.startswith('save_'):
msg = " [skip] .... %s " % file ; logging.debug(msg)
continue
# all okay, copy file
msg = " [copy] .... %s " % file ; logging.debug(msg)
dummy = shutil.copy(file,file.replace(sourcedir,targetdir) )
msg = "All restart data have been copied from the restart/current directory to the TM5 save dir" ; logging.debug(msg)
return None
def Run(self,DaCycle):
"""
Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is
......@@ -396,13 +442,15 @@ class TM5ObservationOperator(ObservationOperator):
modellogfile.close()
def SaveData(self):
""" Write the TM5 recovery data for the next cycle """
""" Copy the TM5 recovery data from the outputdir to the TM5 savedir, also add the restart files to a list of names
that is used by the DaCycle object to collect restart data for the filter.
"""
sourcedir = os.path.join(self.tm_settings['outputdir'])
targetdir = os.path.join(self.tm_settings['savedir'])
filter = ['save_%s'%self.tm_settings['time.final'].strftime('%Y%m%d')]
msg = "Performing a full backup of TM5 save data" ; logging.debug(msg)
msg = "Performing a backup of TM5 save data" ; logging.debug(msg)
msg = " from directory: %s " % sourcedir ; logging.debug(msg)
msg = " to directory: %s " % targetdir ; logging.debug(msg)
msg = " with filter: %s " % filter ; logging.debug(msg)
......@@ -428,6 +476,8 @@ class TM5ObservationOperator(ObservationOperator):
msg = " [copy] .... %s " % file ; logging.debug(msg)
dummy = shutil.copy(file,file.replace(sourcedir,targetdir) )
dummy = self.RestartFileList.append( file )
msg = " [added to restart list] .... %s " % file ; logging.debug(msg)
################### End Class TM5 ###################
......
......@@ -15,11 +15,7 @@ needed_da_items=[
'time.nlag',
'time.cycle',
'dir.da_run',
'forecast.model',
'forecast.nmembers',
'forecast.model.rc',
'da.platform',
'da.system']
'forecast.nmembers']
# only needed in an earlier implemented where each substep was a separate job
# validprocesses = ['start','done','samplestate','advance','invert']
......@@ -61,6 +57,7 @@ class CycleControl(dict):
self['da.crash.recover'] = '-r' in opts
self['verbose'] = '-v' in opts
self.DaSystem = None # to be filled later
self.RestartFileList = [] # List of files needed for restart, to be extended later
def __str__(self):
......@@ -174,7 +171,7 @@ class CycleControl(dict):
import cPickle
import numpy as np
filename = os.path.join(self['dir.save'],'randomseed.pickle')
filename = os.path.join(self['dir.restart.current'],'randomseed.pickle')
if action == 'write':
f = open(filename,'wb')
......@@ -202,7 +199,7 @@ class CycleControl(dict):
1. Fresh start : set up the required file structure for this simulation and start
2. Restart : use latest da_runtime variables from the exec dir and restart
3. Recover : restart after crash by getting data from save/one-ago folder
3. Recover : restart after crash by getting data from restart/one-ago folder
The choice that gets executed depends on the presence of
......@@ -221,7 +218,7 @@ class CycleControl(dict):
* dummy = :meth:`~da.tools.initexit.CycleControl.RandomSeed` <- Read the random seed from file
3.
* dummy = :meth:`~da.tools.initexit.CycleControl.SetupFileStructure()`
* dummy = :meth:`~da.tools.initexit.CycleControl.RecoverRun()` <- Recover files from save/one-ago dir, reset ``time.start``
* dummy = :meth:`~da.tools.initexit.CycleControl.RecoverRun()` <- Recover files from restart/one-ago dir, reset ``time.start``
* dummy = :meth:`~da.tools.initexit.CycleControl.RandomSeed`
And is always followed by a call to
......@@ -285,9 +282,8 @@ class CycleControl(dict):
* ``${da_rundir}/diagnostics``
* ``${da_rundir}/analysis``
* ``${da_rundir}/jobs``
* ``${da_rundir}/save``
* ``${da_rundir}/save/one-ago``
* ``${da_rundir}/save/two-ago``
* ``${da_rundir}/restart/current``
* ``${da_rundir}/restart/one-ago``
Note that the exec dir will actually be a simlink to the directory where
the observation operator executable lives. This directory is passed through
......@@ -307,7 +303,9 @@ class CycleControl(dict):
self['dir.diagnostics'] = os.path.join(self['dir.da_run'],'diagnostics')
self['dir.analysis'] = os.path.join(self['dir.da_run'],'analysis')
self['dir.jobs'] = os.path.join(self['dir.da_run'],'jobs')
self['dir.save'] = os.path.join(self['dir.da_run'],'save')
self['dir.restart'] = os.path.join(self['dir.da_run'],'restart')
self['dir.restart.current'] = os.path.join(self['dir.restart'],'current')
self['dir.restart.oneago'] = os.path.join(self['dir.restart'],'one-ago')
CreateDirs(self['dir.da_run'])
CreateDirs(os.path.join(self['dir.exec']))
......@@ -316,9 +314,9 @@ class CycleControl(dict):
CreateDirs(os.path.join(self['dir.diagnostics']))
CreateDirs(os.path.join(self['dir.analysis']))
CreateDirs(os.path.join(self['dir.jobs']))
CreateDirs(os.path.join(self['dir.save']))
CreateDirs(os.path.join(self['dir.save'],'one-ago'))
CreateDirs(os.path.join(self['dir.save'],'two-ago'))
CreateDirs(os.path.join(self['dir.restart']))
CreateDirs(os.path.join(self['dir.restart.current']))
CreateDirs(os.path.join(self['dir.restart.oneago']))
msg = 'Succesfully created the file structure for the assimilation job' ; logging.info(msg)
......@@ -327,19 +325,19 @@ class CycleControl(dict):
"""
Prepare a recovery from a crashed run. This consists of:
- copying all data from the save/one-ago folder (:meth:`~da.tools.initexit.CycleControl.MoveSaveData`),
- copying all data from the restart/one-ago folder (:meth:`~da.tools.initexit.CycleControl.MoveRestartData`),
- replacing all ``rc-file`` items with those from ``da_runtime.rc``
- resetting the seed of the random number generator to the value it had before the crash (:meth:`~da.tools.initexit.CycleControl.RandomSeed`)
"""
# Move all data from the save/one-ago directory to the save/current directory
# Move all data from the restart/one-ago directory to the restart/current directory
dummy = self.MoveSaveData(io_option='restore',save_option='full',filter=[])
dummy = self.MoveRestartData(io_option='restore')
# Replace rc-items with those from the crashed run last rc-file
# Replace rc-items with those from the crashed run's last rc-file (now in restart.current dir)
file_rc_rec = os.path.join(self['dir.save'],'da_runtime.rc')
file_rc_rec = os.path.join(self['dir.restart.current'],'da_runtime.rc')
rc_rec = rc.read(file_rc_rec)
for k,v in rc_rec.iteritems():
......@@ -347,7 +345,7 @@ class CycleControl(dict):
self.ValidateRC()
msg = "Replaced rc-items.... " ; logging.debug(msg)
msg = "Replaced rc-items.... " ; logging.debug(msg)
msg = "Next cycle start date is %s" % self['time.start'] ; logging.debug(msg)
return None
......@@ -367,110 +365,83 @@ class CycleControl(dict):
dummy = self.RandomSeed('write')
dummy = self.WriteNewRCfile()
dummy = self.CollectSaveData()
dummy = self.MoveSaveData(io_option='store',save_option='full')
dummy = self.CollectRestartData()
dummy = self.MoveRestartData(io_option='store')
dummy = self.SubmitNextCycle()
def CollectSaveData(self):
def CollectRestartData(self):
""" Collect files needed for the restart of this cycle in case of a crash, or for the continuation of the next cycle.
All files needded are written to the save/ directory.
All files needded are written to the restart/current directory. The list of files included is read from the
attribute "RestartFileList" which is a simple list of files that can be appended by other objects/methods that
require restart data to be saved.
Currently, the following files are included
- The ``randomseed.pickle`` file, also already in the save directory
- The ``randomseed.pickle`` file
- The ``da_runtime.rc`` file
- The savestate.yyyymmdd.nc file
- The savestate.nc file
- ...
Note that we assume that the transport model restart files are directly written to the save/ directory and do not need to be collected
Note that we assume that the restart files for the ObservationOperator reside in a separate folder, i.e, the
ObservationOperator does *not* write directly to the restart dir!
"""
filtertime = self['time.start'].strftime('%Y%m%d')
file1 = os.path.join(self['dir.exec'],'da_runtime.rc')
file2 = os.path.join(self['dir.output'],'savestate.nc')
self.RestartFileList.append( os.path.join(self['dir.exec'],'da_runtime.rc') )
self.RestartFileList.append( os.path.join(self['dir.output'],'savestate.nc') )
#self.RestartFileList.append( os.path.join(self['dir.exec'],'randomseed.pickle') )
FilesToSave = [file1,file2]
targetdir = os.path.join(self['dir.restart.current'])
targetdir = os.path.join(self['dir.save'])
msg = "Collecting the required restart/save data" ; logging.info(msg)
msg = "Collecting the required restart data" ; logging.info(msg)
msg = " to directory: %s " % targetdir ; logging.debug(msg)
for file in FilesToSave:
for file in self.RestartFileList:
if os.path.isdir(file): # skip dirs
continue
msg = " [copy] .... %s " % file ; logging.debug(msg)
dummy = shutil.copy(file,file.replace(os.path.split(file)[0],targetdir) )
def MoveSaveData(self, io_option='restore', save_option='partial',filter=[]):
def MoveRestartData(self, io_option='restore'):
"""
:
Store or restore model state to/from a temporary directory. Two save_option options are available:
Store or restore model state to/from a restart directory.
(1) save_option = 'partial' : Save the background concentration fields only, to use for a later advance of the background
(2) save_option = 'full' : Save all restart data, to use for a next cycle
Two IO options are available:
While also two IO options are available:
(1) io_option = restore : Get data from restart.oneago directory
(2) io_option = store : Save data to restart.oneago directory
(1) io_option = restore : Get data from a directory
(2) io_option = store : Save data to a directory
In case of a 'store' command the target folder is re-created so that the contents are empty to begin with.
There is currently room for the specific sub-models to also add save data to the mix. This is done through a
forecast.model dependend import statement, followed by a call to function:
model.MoveSaveData(**args)
In case of a 'store' command the restart.oneago folder is re-created so that the contents are empty to begin with.
"""
from da.tools.general import CreateDirs
if io_option not in ['store','restore']:
raise ValueError,'Invalid option specified for io_option (%s)' % io_option
if save_option not in ['partial','full']:
raise ValueError,'Invalid option specified for save_option (%s)' % save_option
savedir = os.path.join(self['dir.save'])
oneagodir = os.path.join(self['dir.save'],'one-ago')
tempdir = os.path.join(self['dir.save'],'tmp')
if save_option == 'partial': # save background data to tmp dir
if io_option == 'store':
targetdir = tempdir
sourcedir = savedir
elif io_option == 'restore':
sourcedir = tempdir
targetdir = savedir
elif save_option == 'full': # move all save data to one-ago dir
if io_option == 'store':
if io_option == 'store':
targetdir = oneagodir
sourcedir = savedir
targetdir = self['dir.restart.oneago']
sourcedir = self['dir.restart.current']
elif io_option == 'restore':
elif io_option == 'restore':
sourcedir = oneagodir
targetdir = savedir
sourcedir = self['dir.restart.oneago']
targetdir = self['dir.restart.current']
# If "store" is requested, recreate target dir, cleaning the contents
if io_option == 'store':
CreateDirs(os.path.join(targetdir),forceclean=True)
msg = "Performing a %s %s of data" % (save_option,io_option) ; logging.debug(msg)
msg = "Performing a %s of data" % (io_option) ; logging.debug(msg)
msg = " from directory: %s " % sourcedir ; logging.debug(msg)
msg = " to directory: %s " % targetdir ; logging.debug(msg)
msg = " with filter : %s " % filter ; logging.debug(msg)
for file in os.listdir(sourcedir):
......@@ -478,28 +449,11 @@ class CycleControl(dict):
file = os.path.join(sourcedir,file)
if os.path.isdir(file): # skip dirs
skip = True
elif filter == []: # copy all
skip= False
else: # check filter
skip = True # default skip
for f in filter:
if f in file:
skip = False # unless in filter
break
if skip:
msg = " [skip] .... %s " % file ; logging.debug(msg)
continue
else:
if io_option == 'store' and save_option == 'full':
#msg = " [move] .... %s " % file ; logging.debug(msg)
#dummy = shutil.move(file,file.replace(sourcedir,targetdir) )
msg = " [copy] .... %s " % file ; logging.debug(msg)
dummy = shutil.copy(file,file.replace(sourcedir,targetdir) )
else:
msg = " [copy] .... %s " % file ; logging.debug(msg)
dummy = shutil.copy(file,file.replace(sourcedir,targetdir) )
......
......@@ -87,6 +87,7 @@ def JobStart(opts,args, DaSystem, DaPlatForm):
DaCycle = CycleControl(opts,args)
# Next fill the DaSystem object that contains all the details related to the DA system
dummy = DaSystem.Validate()
dummy = DaSystem.Initialize()
......@@ -101,8 +102,6 @@ def JobStart(opts,args, DaSystem, DaPlatForm):
def JobInput(DaCycle, StateVector):
""" Set up the input data for the forward model: obs and parameters/fluxes"""
print DaCycle.DaSystem
dims = ( int(DaCycle['time.nlag']),
int(DaCycle['forecast.nmembers']),
int(DaCycle.DaSystem['nparameters']),
......@@ -113,7 +112,7 @@ def JobInput(DaCycle, StateVector):
nlag = dims[0]
# We now have an empty StateVector object that we need to populate with data. If this is a continuation from a previous cycle, we can read
# the previous StateVector values from a NetCDF file in the save/ directory. If this is the first cycle, we need to populate the StateVector
# the previous StateVector values from a NetCDF file in the restart/current directory. If this is the first cycle, we need to populate the StateVector
# with new values for each week. After we have constructed the StateVector, it will be propagated by one cycle length so it is ready to be used
# in the current cycle
......@@ -129,7 +128,7 @@ def JobInput(DaCycle, StateVector):
# Read the StateVector data from file
savedir = DaCycle['dir.save']
savedir = DaCycle['dir.restart.current']
filtertime = DaCycle['time.start'].strftime('%Y%m%d')
filename = os.path.join(savedir,'savestate.nc')
......@@ -150,8 +149,8 @@ def SampleState(DaCycle,Samples,StateVector, ObservationOperator):
# (i) The transport model restart data which holds the background mixing ratios. This is needed to run the model one cycle forward
# (ii) The random numbers (or the seed for the random number generator) so we can recreate the ensembles if needed
dummy = DaCycle.MoveSaveData(io_option='store',save_option='partial',filter=[])
msg = "All restart data have been copied to the save/tmp directory for future use" ; logging.debug(msg)
#dummy = DaCycle.MoveSaveData(io_option='store',save_option='partial',filter=[])
#msg = "All restart data have been copied to the save/tmp directory for future use" ; logging.debug(msg)
nlag = int(DaCycle['time.nlag'])
msg = "Sampling model will be run over %d cycles"% nlag ; logging.info(msg)
......@@ -179,6 +178,8 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag):
if lag == 0:
startdate = DaCycle['time.start']
dummy = ObservationOperator.GetInitialData(DaCycle)
else:
startdate = DaCycle['time.sample.start']
......@@ -213,6 +214,11 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag):
dummy = RunForecastModel(DaCycle,ObservationOperator)
# Get the required data from the ObsOperator output dir
DaCycle.RestartFileList.extend( ObservationOperator.RestartFileList )
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
dummy = Samples.AddModelDataMismatch(DaCycle)
......@@ -288,10 +294,6 @@ def Advance( DaCycle, Samples, StateVector, ObservationOperator):
# Then, restore model state from the start of the filter
dummy = DaCycle.MoveSaveData(io_option='restore',save_option='partial',filter=[])
msg = "All restart data have been recovered from the save/tmp directory, this "+ \
"resets the filter to %s "%DaCycle['time.start'] ; logging.debug(msg)
msg = "Sampling model will be run over 1 cycle" ; logging.info(msg)
SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,0)
......@@ -329,7 +331,7 @@ def RunForecastModel(DaCycle,ObsOperator):
status = ObsOperator.ValidateInput(DaCycle)
status = ObsOperator.Run(DaCycle)
#status = ObsOperator.Run(DaCycle)
dummy = ObsOperator.SaveData()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment