Commit 8f88ead7 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

Slowly getting closer to the new design. Observations and StateVector are now...

Slowly getting closer to the new design. Observations and StateVector are now class based, but objects are getting more elaborate each time. One remaining problem is the relation between the StateVector, the ensemble members, and the observations. These are all linked but no object of one type can easily contain the other. 

For instance, does a model mixing ratio sample belong to an observation object because that is what it represents and is derived from? Or is it part of an ensemble member object because each member creates one sample set? And does a member object belong to the state vector and if so, how does it play together with the ohter time steps in the filter? Are these all separate StateVectoir objects or is there only one StateVector with nlag sets of data??

The issue with nlag is not nicely resolved now.

Next task is to build the optimizer such that it takes all info from the different objects and performs a LSQM.
parent cfc6bb67
This diff is collapsed.
...@@ -31,6 +31,7 @@ from tools_da import CreateDirs ...@@ -31,6 +31,7 @@ from tools_da import CreateDirs
from tools_da import AdvanceTime from tools_da import AdvanceTime
from tools_da import ParseOptions from tools_da import ParseOptions
class CycleControl(): class CycleControl():
""" """
This object controls a data assimilation cycle. It is created using information from a da rc-file with settings, and This object controls a data assimilation cycle. It is created using information from a da rc-file with settings, and
...@@ -62,11 +63,11 @@ class CycleControl(): ...@@ -62,11 +63,11 @@ class CycleControl():
""" """
msg = "===============================================================" ; print msg msg = "===============================================================" ; print msg
msg = "DAS rc-file is %s" % self.RcFileName ; print msg msg = "DA Cycle rc-file is %s" % self.RcFileName ; print msg
msg = "DAS log file is %s" % self.da_settings['log'] ; print msg msg = "DA Cycle log file is %s" % self.da_settings['log'] ; print msg
msg = "DAS run directory is %s" % self.da_settings['dir.da_run'] ; print msg msg = "DA Cycle run directory is %s" % self.da_settings['dir.da_run'] ; print msg
msg = "DAS inverse system is %s" % self.da_settings['da.system'] ; print msg msg = "DA Cycle inverse system is %s" % self.da_settings['da.system'] ; print msg
msg = "DAS forecast model is %s" % self.da_settings['forecast.model'] ; print msg msg = "DA Cycle forecast model is %s" % self.da_settings['forecast.model'] ; print msg
msg = "===============================================================" ; print msg msg = "===============================================================" ; print msg
return "" return ""
...@@ -74,14 +75,14 @@ class CycleControl(): ...@@ -74,14 +75,14 @@ class CycleControl():
def LoadRc(self,RcFileName): def LoadRc(self,RcFileName):
""" """
This method loads a DAS rc-file with settings for this simulation This method loads a DA Cycle rc-file with settings for this simulation
""" """
self.da_settings = rc.read(RcFileName) self.da_settings = rc.read(RcFileName)
self.RcFileName = RcFileName self.RcFileName = RcFileName
self.DaRcLoaded = True self.DaRcLoaded = True
msg = 'DAS rc-file (%s) loaded successfully'%self.RcFileName ; logging.info(msg) msg = 'DA Cycle rc-file (%s) loaded successfully'%self.RcFileName ; logging.info(msg)
return True return True
...@@ -107,7 +108,7 @@ class CycleControl(): ...@@ -107,7 +108,7 @@ class CycleControl():
logging.error(msg) logging.error(msg)
raise IOError,msg raise IOError,msg
status,msg = ( True,'DAS settings have been validated succesfully' ) ; logging.debug(msg) status,msg = ( True,'DA Cycle settings have been validated succesfully' ) ; logging.debug(msg)
return None return None
...@@ -144,11 +145,11 @@ class CycleControl(): ...@@ -144,11 +145,11 @@ class CycleControl():
self.da_settings['cyclelength'] = cyclelength self.da_settings['cyclelength'] = cyclelength
msg = "===============================================================" ; logging.info(msg) msg = "===============================================================" ; logging.info(msg)
msg = "DAS start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg)
msg = "DAS end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) msg = "DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg)
msg = "DAS final date is %s" % finaldate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) msg = "DA Cycle final date is %s" % finaldate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg)
msg = "DAS cycle length is %s" % cyclelength ; logging.info(msg) msg = "DA Cycle cycle length is %s" % cyclelength ; logging.info(msg)
msg = "DAS restart is %s" % str(self.da_settings['time.restart']) ; logging.info(msg) msg = "DA Cycle restart is %s" % str(self.da_settings['time.restart']) ; logging.info(msg)
msg = "===============================================================" ; logging.info(msg) msg = "===============================================================" ; logging.info(msg)
return None return None
......
...@@ -28,30 +28,43 @@ def JobStart(opts,args): ...@@ -28,30 +28,43 @@ def JobStart(opts,args):
def JobInput(CycleInfo): def JobInput(CycleInfo):
""" Set up the input data for the forward model: obs and parameters/fluxes""" """ Set up the input data for the forward model: obs and parameters/fluxes"""
from tools_da import PrepareObs from tools_da import PrepareObs
from tools_da import PrepareEnsemble from tools_da import PrepareState
from tools_da import AddObsToState
dummy = PrepareEnsemble(CycleInfo) Samples = PrepareObs(CycleInfo,'forecast')
dummy = PrepareObs(CycleInfo,'forecast') StateVector = PrepareState(CycleInfo )
return None dummy = AddObsToState(CycleInfo, StateVector, Samples)
return StateVector
def Sample(CycleInfo): def Sample(CycleInfo, StateVector):
""" Sample the filter state for the inversion """ """ Sample the filter state for the inversion """
from tools_da import RunForecastModel from tools_da import RunForecastModel
from tools_da import ReadModelSamples
# Implement something that writes the ensemble member parameter info to file, or manipulates them further into the
# type of info needed in TM5/WRF/SIBCASA
# Run the forecast model
dummy = RunForecastModel(CycleInfo,'forecast') dummy = RunForecastModel(CycleInfo,'forecast')
# Optionally, post-processing of the model ouptu can be added that deals for instance with # Read forecast model samples that were written to NetCDF files
dummy = ReadModelSamples(StateVector)
# Optionally, post-processing of the model output can be added that deals for instance with
# sub-sampling of time series, vertical averaging, etc. # sub-sampling of time series, vertical averaging, etc.
return None return None
def Invert(CycleInfo): def Invert(CycleInfo, StateVector ):
""" Perform the inverse calculation """ """ Perform the inverse calculation """
import tools_da import tools_da
dummy = tools_da.Invert(CycleInfo) dummy = tools_da.Optimize(CycleInfo,StateVector)
return None return None
...@@ -104,14 +117,14 @@ if __name__ == "__main__": ...@@ -104,14 +117,14 @@ if __name__ == "__main__":
msg = header+"starting JobStart"+footer ; logging.info(msg) msg = header+"starting JobStart"+footer ; logging.info(msg)
CycleInfo = JobStart(opts,args) CycleInfo = JobStart(opts,args)
#msg = header+"starting JobInput"+footer ; logging.info(msg) msg = header+"starting JobInput"+footer ; logging.info(msg)
#dummy = JobInput(CycleInfo) StateVector = JobInput(CycleInfo)
msg = header+"starting Sample Taking"+footer ; logging.info(msg) msg = header+"starting Sample Taking"+footer ; logging.info(msg)
dummy = Sample(CycleInfo) dummy = Sample(CycleInfo, StateVector)
#msg = header+"starting Invert"+footer ; logging.info(msg) msg = header+"starting Invert"+footer ; logging.info(msg)
#dummy = Invert(CycleInfo) dummy = Invert(CycleInfo, StateVector )
msg = header+"starting Propagate"+footer ; logging.info(msg) msg = header+"starting Propagate"+footer ; logging.info(msg)
dummy = Propagate(CycleInfo) dummy = Propagate(CycleInfo)
......
...@@ -199,7 +199,7 @@ class TM5(): ...@@ -199,7 +199,7 @@ class TM5():
def Run(self): def Run(self,nprocesses):
""" """
Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is
spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and
...@@ -234,8 +234,8 @@ class TM5(): ...@@ -234,8 +234,8 @@ class TM5():
# Open logfile and spawn model, wait for finish and return code # Open logfile and spawn model, wait for finish and return code
logging.info('Starting model executable as subprocess ') logging.info('Starting model executable as subprocess ')
cmd = ['openmpirun','-np', '10', mpi_shell_file,'./tm5.x'] #cmd = ['openmpirun','-np', '10', mpi_shell_file,'./tm5.x']
#cmd = ['mpirun','-np', rc_da_shell['forecast.nmembers'],mpi_shell_file,'./tm5.x'] cmd = ['mpirun','-np',str(nprocesses),mpi_shell_file,'./tm5.x']
#cmd = ['./tm5.x'] #cmd = ['./tm5.x']
code = subprocess.call(cmd,stdout=modellogfile,stderr=modellogfile) code = subprocess.call(cmd,stdout=modellogfile,stderr=modellogfile)
...@@ -332,8 +332,8 @@ def DaInitialize(rc_da_shell): ...@@ -332,8 +332,8 @@ def DaInitialize(rc_da_shell):
'time.final' : rc_da_shell['time.sample.end'] , 'time.final' : rc_da_shell['time.sample.end'] ,
'rundir' : rc_da_shell['dir.exec.tm5'] , 'rundir' : rc_da_shell['dir.exec.tm5'] ,
'outputdir' : rc_da_shell['dir.output'] , 'outputdir' : rc_da_shell['dir.output'] ,
'savedir' : rc_da_shell['dir.save'] #, 'savedir' : rc_da_shell['dir.save'] ,
#'das.input.dir' : '/data/CO2/carbontracker/ct08//obsnc/' 'das.input.dir' : rc_da_shell['dir.input']
} }
if rc_da_shell['time.restart'] == True: NewItems['istart'] = 3 if rc_da_shell['time.restart'] == True: NewItems['istart'] = 3
...@@ -370,7 +370,7 @@ if __name__ == "__main__": ...@@ -370,7 +370,7 @@ if __name__ == "__main__":
#tm=TM5('/Users/peters/Modeling/TM5/ct_new.rc') #tm=TM5('/Users/peters/Modeling/TM5/ct_new.rc')
#tm.WriteRc() #tm.WriteRc()
#tm.WriteRunRc() #tm.WriteRunRc()
#tm.Run() #tm.Run(1)
#tm.SaveData() #tm.SaveData()
#sys.exit(0) #sys.exit(0)
...@@ -385,7 +385,7 @@ if __name__ == "__main__": ...@@ -385,7 +385,7 @@ if __name__ == "__main__":
dasrc['dir.input'] = os.path.join(dasrc['dir.da_run'],'input') dasrc['dir.input'] = os.path.join(dasrc['dir.da_run'],'input')
tm = DaInitialize(dasrc) tm = DaInitialize(dasrc)
tm.Run() tm.Run(dasrc['forecast.nmembers'])
#tm.SaveData() #tm.SaveData()
......
...@@ -110,6 +110,24 @@ def ValidateOptsArgs(opts,args): ...@@ -110,6 +110,24 @@ def ValidateOptsArgs(opts,args):
return opts,args return opts,args
def ValidateRC(rcfile,needed_items):
""" validate the contents of an rc-file given a dictionary of required keys """
for k,v in rcfile.iteritems():
if v == 'True' : rcfile[k] = True
if v == 'False': rcfile[k] = False
if 'date' in k : rcfile[k] = datetime.datetime.strptime(v,'%Y-%m-%d %H:%M:%S')
for key in needed_items:
if not rcfile.has_key(key):
status,msg = ( False,'Missing a required value in rc-file : %s' % key)
logging.error(msg)
raise IOError,msg
status,msg = ( True,'rc-file has been validated succesfully' ) ; logging.debug(msg)
def CreateDirs(dirname,forceclean=False): def CreateDirs(dirname,forceclean=False):
""" Create a directory and report success, only if non-existent """ """ Create a directory and report success, only if non-existent """
...@@ -199,28 +217,28 @@ def AdvanceTime(time_in,interval): ...@@ -199,28 +217,28 @@ def AdvanceTime(time_in,interval):
return time_out return time_out
def PrepareObs(rc_da_shell,type='forecast'): def PrepareObs(CycleInfo,type='forecast'):
""" Prepare a set of observations to be co-sampled by the model. Although the collecting and parsing of """ Prepare a set of observations to be co-sampled by the model. Although the collecting and parsing of
the observations will depend on the specific application, the final result of this step is an output the observations will depend on the specific application, the final result of this step is an output
file called "observations.nc" which carries the x,y,z,t, dt information of each observation """ file called "observations.nc" which carries the x,y,z,t, dt information of each observation """
if rc_da_shell['da.system'] == 'CarbonTracker': import ct_tools as da_system if CycleInfo.da_settings['da.system'] == 'CarbonTracker': import ct_tools as da_system
msg = "Using %s as DA system" % da_system.identifier ; logging.debug(msg) msg = "Using %s as DA system" % da_system.identifier ; logging.debug(msg)
rc_da_system = rc.read(rc_da_shell['da.system.rc'],silent = True) rc_da_system = rc.read(CycleInfo.da_settings['da.system.rc'],silent = True)
dummy = ValidateRC(rc_da_system,da_system.needed_rc_items) dummy = ValidateRC(rc_da_system,da_system.needed_rc_items)
# Add da_system specific rc-file to the rc_da_shell dictionary # Add da_system specific rc-file to the rc_da_shell dictionary
rc_da_shell['da.system.info'] = rc_da_system CycleInfo.da_settings['da.system.info'] = rc_da_system
dummy = da_system.PrepareObs(rc_da_shell,rc_da_system,type=type) Samples = da_system.PrepareObs(CycleInfo,type=type)
return None return Samples
def PrepareEnsemble(rc_da_shell ): def PrepareState(CycleInfo):
""" """
Prepare the ensemble of parameters needed by the forecast model. This step will depend on the assimilation system type, Prepare the ensemble of parameters needed by the forecast model. This step will depend on the assimilation system type,
...@@ -229,11 +247,20 @@ def PrepareEnsemble(rc_da_shell ): ...@@ -229,11 +247,20 @@ def PrepareEnsemble(rc_da_shell ):
""" """
if rc_da_shell['da.system'] == 'CarbonTracker': import ct_tools as da_system if CycleInfo.da_settings['da.system'] == 'CarbonTracker': import ct_tools as da_system
dummy = da_system.PrepareEnsemble(rc_da_shell ) StateVector = da_system.PrepareState(CycleInfo )
return None return StateVector
def AddObsToState(CycleInfo, StateVector, Samples):
""" Add the observation objects to the ensemble members in the StateVector """
if CycleInfo.da_settings['da.system'] != 'CarbonTracker': raise Exception,"CarbonTracker specific code in this routine!"
for Member in StateVector.EnsembleMembers[-1]:
Member.ModelSample = Samples
Member.SampleInputFile = os.path.join(CycleInfo.da_settings['dir.output'],'samples.%03d.nc'%Member.membernumber)
def RunForecastModel(CycleInfo,step='forecast'): def RunForecastModel(CycleInfo,step='forecast'):
...@@ -285,18 +312,33 @@ def RunForecastModel(CycleInfo,step='forecast'): ...@@ -285,18 +312,33 @@ def RunForecastModel(CycleInfo,step='forecast'):
# Run the forward model # Run the forward model
status = executable.Run() #status = executable.Run(CycleInfo.da_settings['forecast.nmembers'])
status = 0
########################################### RETURN CONTROL TO DA SHELL ######################################### ########################################### RETURN CONTROL TO DA SHELL #########################################
return status return status
def Invert(rc_da_shell): def ReadModelSamples(StateVector):
""" Read forecast model samples for each ensemble member into the StateVector's ensemble members. """
# Add model written samples to the ensemble members CtObservations objects
for Member in StateVector.EnsembleMembers[-1]:
Member.ModelSample.AddSimulations(Member.SampleInputFile)
msg = "Added samples from the forecast model to the StateVector" ; logging.info(msg)
return None
def Optimize(CycleInfo, StateVector ):
""" Perform least-squares minimization""" """ Perform least-squares minimization"""
if rc_da_shell['da.system'] == 'CarbonTracker': import ct_tools as da_system if CycleInfo.da_settings['da.system'] == 'CarbonTracker': import ct_tools as da_system
DaSystem = da_system.DaInfo(CycleInfo.da_settings['da.system.rc'])
dummy = da_system.MakeResiduals(rc_da_shell) optimizer = da_system.CtOptimizer(CycleInfo, DaSystem)
def CleanUpCycle(CycleInfo): def CleanUpCycle(CycleInfo):
""" """
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment