Skip to content
Snippets Groups Projects
Commit 2b4980ed authored by Peters, Wouter's avatar Peters, Wouter
Browse files

changed some of the initialization logic. Now, the DaCycle object has become...

changed some of the initialization logic. Now, the DaCycle object has become internal to the obsoperator, sample, and statevector objects
parent 5971406a
No related branches found
No related tags found
No related merge requests found
......@@ -47,7 +47,7 @@ class Observation(object):
"""
def __init__(self):
def __init__(self, DaCycle = None):
"""
create an object with an identifier, version, and an empty ObservationList
"""
......@@ -55,6 +55,14 @@ class Observation(object):
self.Version = self.getversion()
self.Data = ObservationList([]) # Initialize with an empty list of obs
# The following code allows the object to be initialized with a DaCycle object already present. Otherwise, it can
# be added at a later moment.
if DaCycle != None:
self.DaCycle = DaCycle
else:
self.DaCycle = {}
msg = 'Observation object initialized: %s'%self.Identifier ; logging.info(msg)
def getid(self):
......@@ -96,13 +104,13 @@ class Observation(object):
""" Add the simulation data to the Observation object.
"""
def AddModelDataMismatch(self, DaCycle):
def AddModelDataMismatch(self ):
"""
Get the model-data mismatch values for this cycle.
"""
def WriteSampleInfo(self, DaCycle):
def WriteSampleInfo(self ):
"""
Write the information needed by the observation operator to a file. Return the filename that was written for later use
"""
......
......@@ -31,7 +31,7 @@ class ObservationOperator(object):
"""
def __init__(self, RcFileName):
def __init__(self, RcFileName,DaCycle = None):
""" The instance of an ObservationOperator is application dependent """
self.Identifier = self.getid()
self.Version = self.getversion()
......@@ -43,13 +43,21 @@ class ObservationOperator(object):
msg = 'Observation Operator object initialized: %s'%self.Identifier ; logging.info(msg)
# The following code allows the object to be initialized with a DaCycle object already present. Otherwise, it can
# be added at a later moment.
if DaCycle != None:
self.DaCycle = DaCycle
else:
self.DaCycle = {}
def getid(self):
return identifier
def getversion(self):
return version
def GetInitialData(self,DaCycle):
def GetInitialData(self):
""" This method places all initial data needed by an ObservationOperator in the proper folder for the model """
return None
......@@ -57,10 +65,10 @@ class ObservationOperator(object):
def __str__(self):
return "This is a %s object, version %s"%(self.Identifier,self.Version)
def Initialize(self,DaCycle):
def Initialize(self):
""" Perform all steps necessary to start the observation operator through a simple Run() call """
def ValidateInput(self,DaCycle):
def ValidateInput(self):
""" Make sure that data needed for the ObservationOperator (such as observation input lists, or parameter files)
are present.
"""
......
......@@ -68,9 +68,8 @@ class EnsembleMember(object):
def __str__(self):
return "%03d"%self.membernumber
def WriteToFile(self,DaCycle):
def WriteToFile(self, DaCycle):
"""
:param DaCycle: a DaCycle object
:rtype: None
Write an EnsembleMember information to a NetCDF file for later use. The standard output filename is
......@@ -106,13 +105,13 @@ class EnsembleMember(object):
savedict['comment'] = 'These are parameter values to use for member %d'%self.membernumber
dummy = ncf.AddData(savedict)
dummy = self.AddCustomFields(ncf,DaCycle)
dummy = self.AddCustomFields(ncf, DaCycle)
dummy = ncf.close()
msg = 'Successfully wrote data from ensemble member %d to file (%s) ' % (self.membernumber,filename,) ; logging.debug(msg)
def AddCustomFields(self,filehandle,DaCycle):
def AddCustomFields(self,filehandle):
""" Placeholder to add more custom fields to the output of the ensemble member"""
return None
......@@ -164,10 +163,19 @@ class StateVector(object):
"""
def __init__(self):
def __init__(self, DaCycle = None):
self.Identifier = self.getid()
self.Version = self.getversion()
# The following code allows the object to be initialized with a DaCycle object already present. Otherwise, it can
# be added at a later moment.
if DaCycle != None:
self.DaCycle = DaCycle
else:
self.DaCycle = {}
msg = 'Statevector object initialized: %s'%self.Identifier ; logging.info(msg)
def getid(self):
......@@ -176,14 +184,17 @@ class StateVector(object):
def getversion(self):
return version
def Initialize(self,dims):
def Initialize(self):
"""
:param dims: a tuple with 3 values: (nlag,nmembers,nparams)
:rtype: None
Initialize the object by specifying the dimensions
"""
dims = ( int(self.DaCycle['time.nlag']),
int(self.DaCycle['forecast.nmembers']),
int(self.DaCycle.DaSystem['nparameters']),
)
self.nlag = dims[0]
self.nmembers = dims[1]
self.nparams = dims[2]
......@@ -273,9 +284,8 @@ class StateVector(object):
return EnsembleMember(memberno)
def Propagate(self,DaCycle):
def Propagate(self):
"""
:param DaCycle: a DaCycle object
:rtype: None
Propagate the parameter values in the StateVector to the next cycle. This means a shift by one cycle
......@@ -295,8 +305,9 @@ class StateVector(object):
# And now create a new time step of mean + members for n=nlag
date = DaCycle['time.start']+timedelta(days = (self.nlag-0.5)* int(DaCycle['time.cycle']))
cov = self.GetCovariance(DaCycle.DaSystem,date)
date = self.DaCycle['time.start']+timedelta(days = (self.nlag-0.5)* int(self.DaCycle['time.cycle']))
cov = self.GetCovariance(date)
dummy = self.MakeNewEnsemble(self.nlag,cov)
msg = 'The state vector has been propagated by one cycle ' ; logging.info(msg)
......
......@@ -33,11 +33,11 @@ class CtObservations(Observation):
def getlength(self):
return len(self.Data)
def Initialize(self,DaCycle):
def Initialize(self):
self.startdate = DaCycle['time.sample.start']
self.enddate = DaCycle['time.sample.end']
DaSystem = DaCycle.DaSystem
self.startdate = self.DaCycle['time.sample.start']
self.enddate = self.DaCycle['time.sample.end']
DaSystem = self.DaCycle.DaSystem
sfname = DaSystem['obs.input.fname']
......@@ -154,7 +154,7 @@ class CtObservations(Observation):
msg = "Added %d simulated values to the Data list" % (len(ids)-len(missing_samples) ) ; logging.debug(msg)
def WriteSampleInfo(self, DaCycle):
def WriteSampleInfo(self):
"""
Write the information needed by the observation operator to a file. Return the filename that was written for later use
......@@ -164,7 +164,7 @@ class CtObservations(Observation):
import da.tools.io4 as io
from da.tools.general import ToDectime
obsinputfile = os.path.join(DaCycle['dir.input'],'observations.nc')
obsinputfile = os.path.join(self.DaCycle['dir.input'],'observations.nc')
f = io.CT_CDF(obsinputfile,method='create')
msg = 'Creating new observations file for ObservationOperator (%s)' % obsinputfile ; logging.debug(msg)
......@@ -307,7 +307,7 @@ class CtObservations(Observation):
return obsinputfile
def AddModelDataMismatch(self, DaCycle):
def AddModelDataMismatch(self ):
"""
Get the model-data mismatch values for this cycle.
......@@ -319,7 +319,7 @@ class CtObservations(Observation):
"""
import da.tools.rc as rc
filename = DaCycle.DaSystem['obs.sites.rc']
filename = self.DaCycle.DaSystem['obs.sites.rc']
if not os.path.exists(filename):
msg = 'Could not find the required sites.rc input file (%s) ' % filename ; logging.error(msg)
......@@ -469,7 +469,7 @@ if __name__ == "__main__":
DaCycle['time.sample.start'] = datetime(2000,1,1)
DaCycle['time.sample.end'] = datetime(2000,1,2)
obs.Initialize(DaCycle)
obs.Initialize()
obs.Validate()
obs.AddObs()
print(obs.Data.getvalues('obs'))
......
......@@ -25,7 +25,7 @@ version = '0.0'
class CTEnsembleMember(EnsembleMember):
def AddCustomFields(self,filehandle,DaCycle):
def AddCustomFields(self,filehandle, DaCycle):
""" Placeholder to add more custom fields to the output of the ensemble member"""
#import da.tools.io4 as io
import da.tools.io as io
......@@ -59,7 +59,7 @@ class CtStateVector(StateVector):
def getversion(self):
return version
def GetCovariance(self,DaSystem,date):
def GetCovariance(self,date):
""" Make a new ensemble from specified matrices, the attribute lag refers to the position in the state vector.
Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below.
The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag]
......@@ -73,8 +73,8 @@ class CtStateVector(StateVector):
# Get the needed matrices from the specified covariance files
file_ocn_cov = DaSystem['ocn.covariance']
file_bio_cov = DaSystem['bio.covariance']
file_ocn_cov = self.DaCycle.DaSystem['ocn.covariance']
file_bio_cov = self.DaCycle.DaSystem['bio.covariance']
# replace YYYY.MM in the ocean covariance file string
......@@ -154,15 +154,10 @@ if __name__ == "__main__":
StateVector = CtStateVector()
dims = ( int(DaCycle['time.nlag']),
int(DaCycle['forecast.nmembers']),
int(DaCycle.DaSystem['nparameters']),
)
StateVector.Initialize(dims)
StateVector.Initialize()
for n in range(dims[0]):
cov = StateVector.GetCovariance(DaCycle)
cov = StateVector.GetCovariance()
dummy = StateVector.MakeNewEnsemble(n+1,cov)
StateVector.Propagate()
......@@ -170,7 +165,7 @@ if __name__ == "__main__":
savedir = DaCycle['dir.output']
filename = os.path.join(savedir,'savestate.nc')
dummy = StateVector.WriteToFile(DaCycle)
dummy = StateVector.WriteToFile()
StateVector.ReadFromFile(filename)
......@@ -57,7 +57,7 @@ class TM5ObservationOperator(ObservationOperator):
"""
def __init__(self,RcFileName):
def __init__(self,RcFileName, DaCycle=None):
""" The instance of an TMObservationOperator is application dependent """
self.Identifier = self.getid() # the identifier gives the model name
self.Version = self.getversion() # the model version used
......@@ -69,6 +69,14 @@ class TM5ObservationOperator(ObservationOperator):
dummy = self.LoadRc(RcFileName) # load the specified rc-file
dummy = self.ValidateRc() # validate the contents
# The following code allows the object to be initialized with a DaCycle object already present. Otherwise, it can
# be added at a later moment.
if DaCycle != None:
self.DaCycle = DaCycle
else:
self.DaCycle = {}
msg = 'Observation Operator initialized: %s (%s)'%(self.Identifier, self.Version,) ; logging.info(msg)
def getid(self):
......@@ -77,7 +85,7 @@ class TM5ObservationOperator(ObservationOperator):
def getversion(self):
return version
def Initialize(self, DaCycle):
def Initialize(self ):
"""
Prepare a forward model TM5 run, this consists of:
......@@ -98,27 +106,27 @@ class TM5ObservationOperator(ObservationOperator):
# Create a link from TM5 to the rundirectory of the das system
sourcedir = self.tm_settings[self.rundirkey]
targetdir = os.path.join(DaCycle['dir.exec'],'tm5')
targetdir = os.path.join(self.DaCycle['dir.exec'],'tm5')
dummy = CreateLinks(sourcedir,targetdir)
self.outputdir = self.tm_settings[self.outputdirkey]
DaCycle['dir.exec.tm5'] = targetdir
self.DaCycle['dir.exec.tm5'] = targetdir
# Write a modified TM5 model rc-file in which run/break times are defined by our da system
NewItems = {
self.timestartkey : DaCycle['time.sample.start'] ,
self.timefinalkey : DaCycle['time.sample.end'] ,
'jobstep.timerange.start' : DaCycle['time.sample.start'] ,
'jobstep.timerange.end' : DaCycle['time.sample.end'] ,
'ct.params.input.dir' : DaCycle['dir.input'] ,
'ct.params.input.file' : os.path.join(DaCycle['dir.input'],'parameters') ,
'output.flask.infile' : DaCycle['ObsOperator.inputfile']
self.timestartkey : self.DaCycle['time.sample.start'] ,
self.timefinalkey : self.DaCycle['time.sample.end'] ,
'jobstep.timerange.start' : self.DaCycle['time.sample.start'] ,
'jobstep.timerange.end' : self.DaCycle['time.sample.end'] ,
'ct.params.input.dir' : self.DaCycle['dir.input'] ,
'ct.params.input.file' : os.path.join(self.DaCycle['dir.input'],'parameters') ,
'output.flask.infile' : self.DaCycle['ObsOperator.inputfile']
}
if DaCycle['time.restart']: # If this is a restart from a previous cycle, the TM5 model should do a restart
if self.DaCycle['time.restart']: # If this is a restart from a previous cycle, the TM5 model should do a restart
NewItems[self.istartkey] = self.restartvalue
if DaCycle['time.sample.window'] != 0: # If this is a restart from a previous time step wihtin the filter lag, the TM5 model should do a restart
if self.DaCycle['time.sample.window'] != 0: # If this is a restart from a previous time step wihtin the filter lag, the TM5 model should do a restart
NewItems[self.istartkey] = self.restartvalue
# If neither one is true, simply take the istart value from the tm5.rc file that was read
......@@ -131,7 +139,7 @@ class TM5ObservationOperator(ObservationOperator):
# Copy the pre-compiled MPI wrapper to the execution directory
targetdir = os.path.join(DaCycle['dir.exec.tm5'])
targetdir = os.path.join(self.DaCycle['dir.exec.tm5'])
if not os.path.exists(os.path.join(mpi_shell_location,mpi_shell_filename) ):
msg = "Cannot find the mpi_shell wrapper needed for completion (%s) in (%s)"% (mpi_shell_filename,mpi_shell_location) ; logging.error(msg)
......@@ -306,7 +314,7 @@ class TM5ObservationOperator(ObservationOperator):
msg = "Modified tm5_runtime.rc written (%s)" % tm5rcfilename ;logging.debug(msg)
def ValidateInput(self,DaCycle):
def ValidateInput(self):
"""
Make sure that parameter files are written to the TM5 inputdir, and that observation lists are present
"""
......@@ -318,14 +326,14 @@ class TM5ObservationOperator(ObservationOperator):
datafiles = os.listdir(datadir)
obsfile = DaCycle['ObsOperator.inputfile']
obsfile = self.DaCycle['ObsOperator.inputfile']
if not os.path.exists(obsfile):
msg = "The specified obs input file for the TM5 model to read from does not exist (%s), exiting..."%obsfile ; logging.error(msg)
raise IOError,msg
for n in range(int(DaCycle['forecast.nmembers'])):
for n in range(int(self.DaCycle['forecast.nmembers'])):
paramfile = 'parameters.%03d.nc'%n
if paramfile not in datafiles:
msg = "The specified parameter input file for the TM5 model to read from does not exist (%s), exiting..."%paramfile ; logging.error(msg)
......@@ -347,7 +355,7 @@ class TM5ObservationOperator(ObservationOperator):
return 0
def GetInitialData(self,DaCycle):
def GetInitialData(self):
""" This method places all initial data needed by an ObservationOperator in the proper folder for the model.
For TM5, this means copying the save_*.hdf* files to the dir.save directory from which TM5 will read initial
concentrations for all tracers.
......@@ -363,7 +371,7 @@ class TM5ObservationOperator(ObservationOperator):
# First get the restart data for TM5 from the current restart dir of the filter
sourcedir = DaCycle['dir.restart.current']
sourcedir = self.DaCycle['dir.restart.current']
targetdir = self.tm_settings[self.savedirkey]
for file in os.listdir(sourcedir):
......@@ -391,7 +399,7 @@ class TM5ObservationOperator(ObservationOperator):
return None
def Run(self,DaCycle):
def Run(self):
"""
Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is
spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and
......@@ -418,7 +426,7 @@ class TM5ObservationOperator(ObservationOperator):
# Code for Option (1)
code = self.TM5_With_N_tracers(DaCycle)
code = self.TM5_With_N_tracers()
if code == 0:
logging.info('Finished model executable succesfully (%s)'%code)
......@@ -434,12 +442,12 @@ class TM5ObservationOperator(ObservationOperator):
return code
def TM5_under_mpirun(self, DaCycle):
def TM5_under_mpirun(self ):
""" Method handles the case where a shell runs an MPI process that forks into N TM5 model instances """
from string import join
import datetime
DaPlatForm = DaCycle.DaPlatForm
DaPlatForm = self.DaCycle.DaPlatForm
targetdir = os.path.join(self.tm_settings[self.rundirkey])
......@@ -454,18 +462,18 @@ class TM5ObservationOperator(ObservationOperator):
if os.path.exists(okfile):
dummy = os.remove(okfile)
nprocesses = DaCycle['forecast.nmembers']
nprocesses = self.DaCycle['forecast.nmembers']
jobparams = {'jobname':'tm5',
'jobnodes':'ncomp %d'%int(nprocesses),
'jobtime':'00:30:00',
'joblog':os.path.join(DaCycle['dir.jobs'])
'joblog':os.path.join(self.DaCycle['dir.jobs'])
}
template = DaPlatForm.GetJobTemplate(jobparams,block=True)
template += 'cd %s\n'%targetdir
template += 'mpirun -np %d %s ./tm5.x\n'%(int(nprocesses),mpi_shell_filename,)
jobfile = DaPlatForm.WriteJob(DaCycle,template,'tm5')
jobfile = DaPlatForm.WriteJob(self.DaCycle,template,'tm5')
msg = 'Submitting job at %s'%datetime.datetime.now() ; logging.info(msg)
code = DaPlatForm.SubmitJob(jobfile)
......@@ -479,12 +487,12 @@ class TM5ObservationOperator(ObservationOperator):
return code
def TM5_With_N_tracers(self, DaCycle):
def TM5_With_N_tracers(self):
""" Method handles the case where one TM5 model instance with N tracers does the sampling of all ensemble members"""
from string import join
import datetime
DaPlatForm = DaCycle.DaPlatForm
DaPlatForm = self.DaCycle.DaPlatForm
targetdir = os.path.join(self.tm_settings[self.rundirkey])
......@@ -499,18 +507,18 @@ class TM5ObservationOperator(ObservationOperator):
if os.path.exists(okfile):
dummy = os.remove(okfile)
nprocesses = int(DaCycle['forecast.nmembers'])/5 # Note that we assign 5 tracers to each processor, this seems good for TM5
nprocesses = int(self.DaCycle['forecast.nmembers'])/5 # Note that we assign 5 tracers to each processor, this seems good for TM5
jobparams = {'jobname':'tm5',
'jobnodes':'ncomp %d'%int(nprocesses),
'jobtime':'00:30:00',
'joblog':os.path.join(DaCycle['dir.jobs'])
'joblog':os.path.join(self.DaCycle['dir.jobs'])
}
template = DaPlatForm.GetJobTemplate(jobparams,block=True)
template += 'cd %s\n'%targetdir
template += '%s -np %d %s tm5.rc\n'%(self.tm_settings['mpirun.command'],int(nprocesses),self.Tm5Executable,)
jobfile = DaPlatForm.WriteJob(DaCycle,template,'tm5')
jobfile = DaPlatForm.WriteJob(self.DaCycle,template,'tm5')
msg = 'Submitting job at %s'%datetime.datetime.now() ; logging.info(msg)
code = DaPlatForm.SubmitJob(jobfile)
......@@ -625,8 +633,8 @@ if __name__ == "__main__":
DaCycle['time.sample.window'] = 0
tm=TM5ObservationOperator()
tm.Initialize(DaCycle)
tm.Run(DaCycle)
tm.Initialize()
tm.Run()
tm.SaveData()
......
......@@ -34,7 +34,7 @@ def Main(DaCycle,PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer):
msg = header+"Initializing current cycle"+footer ; logging.info(msg)
dummy = JobStart(DaCycle, DaSystem, PlatForm, StateVector)
dummy = JobStart(DaCycle, DaSystem, PlatForm, StateVector, Samples, ObsOperator)
msg = header+"starting JobInput"+footer ; logging.info(msg)
......@@ -65,7 +65,7 @@ def Main(DaCycle,PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer):
####################################################################################################
def JobStart(DaCycle, DaSystem, DaPlatForm, StateVector):
def JobStart(DaCycle, DaSystem, DaPlatForm, StateVector, Samples, ObsOperator):
""" Set up the job specific directory structure and create an expanded rc-file """
dummy = DaSystem.Initialize()
......@@ -78,15 +78,11 @@ def JobStart(DaCycle, DaSystem, DaPlatForm, StateVector):
dummy = DaCycle.Initialize()
dims = ( int(DaCycle['time.nlag']),
int(DaCycle['forecast.nmembers']),
int(DaCycle.DaSystem['nparameters']),
)
StateVector.DaCycle = DaCycle # also embed object in StateVector so it can access cycle information for I/O etc
dummy = StateVector.Initialize(dims)
StateVector.DaCycle = DaCycle
Samples.DaCycle = DaCycle # also embed object in Samples object so it can access cycle information for I/O etc
ObsOperator.DaCycle = DaCycle # also embed object in ObsOperator object so it can access cycle information for I/O etc
return None
......@@ -99,6 +95,8 @@ def JobInput(DaCycle, StateVector):
# with new values for each week. After we have constructed the StateVector, it will be propagated by one cycle length so it is ready to be used
# in the current cycle
dummy = StateVector.Initialize()
if not DaCycle['time.restart']:
# Fill each week from n=1 to n=nlag with a new ensemble
......@@ -106,7 +104,7 @@ def JobInput(DaCycle, StateVector):
nlag = StateVector.nlag
for n in range(0,nlag):
date = DaCycle['time.start']+datetime.timedelta(days = (n+0.5)* int(DaCycle['time.cycle']))
cov = StateVector.GetCovariance(DaCycle.DaSystem,date)
cov = StateVector.GetCovariance(date)
dummy = StateVector.MakeNewEnsemble(n+1,cov)
else:
......@@ -121,7 +119,7 @@ def JobInput(DaCycle, StateVector):
# Now propagate the ensemble by one cycle to prepare for the current cycle
dummy = StateVector.Propagate(DaCycle)
dummy = StateVector.Propagate()
return None
......@@ -163,7 +161,7 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag):
if lag == 0:
startdate = DaCycle['time.start']
dummy = ObservationOperator.GetInitialData(DaCycle)
dummy = ObservationOperator.GetInitialData()
else:
startdate = DaCycle['time.sample.start']
......@@ -185,13 +183,13 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag):
dummy = WriteEnsembleData(DaCycle, StateVector, lag)
dummy = Samples.Initialize(DaCycle)
dummy = Samples.Initialize()
dummy = Samples.Validate()
dummy = Samples.AddObs()
filename = Samples.WriteSampleInfo(DaCycle)
filename = Samples.WriteSampleInfo()
DaCycle['ObsOperator.inputfile'] = filename
......@@ -207,7 +205,7 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag):
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
dummy = Samples.AddModelDataMismatch(DaCycle)
dummy = Samples.AddModelDataMismatch()
msg = "Added Model Data Mismatch to all samples " ; logging.debug(msg)
......@@ -328,11 +326,11 @@ def RunForecastModel(DaCycle,ObsOperator):
submitting a string of jobs to the queue, or simply spawning a subprocess, or ...
"""
status = ObsOperator.Initialize(DaCycle)
status = ObsOperator.Initialize()
status = ObsOperator.ValidateInput(DaCycle)
status = ObsOperator.ValidateInput()
status = ObsOperator.Run(DaCycle)
status = ObsOperator.Run()
dummy = ObsOperator.SaveData()
......
......@@ -28,7 +28,7 @@ from da.tools.initexit import ParseOptions
from da.tools.pipeline import Main
#################################################################################################
# Parse and validate the command line options
# Parse and validate the command line options, start logging
#################################################################################################
dummy = StartLogger()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment