Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • nearrealtimectdas/CTDAS
  • tsurata_a/CTDAS
  • peter050/CTDAS
  • woude033/CTDAS
  • flore019/CTDAS
  • laan035/CTDAS
  • koren007/CTDAS
  • smith036/CTDAS
  • ctdas/CTDAS
9 results
Select Git revision
Show changes
Commits on Source (166)
Showing
with 1836 additions and 0 deletions
.DEFAULT_GOAL: clean
.PHONY: help clean
help:
@echo " "
@echo " Usage:"
@echo " make clean # remove temporary (log) files"
@echo " "
clean:
# -------------------
/bin/rm -f jb.[0-9]*.jb
/bin/rm -f jb.[0-9]*.rc
/bin/rm -f jb.[0-9]*.log
/bin/rm -f *.pyc
/bin/rm -f */*/jb.[0-9]*.rc
/bin/rm -f */*/jb.[0-9]*.jb
/bin/rm -f */*/jb.[0-9]*.log
/bin/rm -f */*/*.pyc
/bin/rm -f das.[0-9]*.log
/bin/rm -f *.o*
/bin/rm -f *.po*
/bin/rm -f *.e*
/bin/rm -f *.pe*
!!! Info for the CarbonTracker data assimilation system
!datadir : /lfs0/projects/co2/input/
datadir : /data/CO2/carbontracker/ct08/
obs.input.dir : ${datadir}/obsnc/with_fillvalue/
obs.input.fname : obs_final.nc
ocn.covariance : ${datadir}/oif_p3_era40.dpco2.2000.01.hdf
bio.covariance : ${datadir}/covariance_bio_olson19.nc
deltaco2.prefix : oif_p3_era40.dpco2
regtype : olson19_oif30
nparameters : 240
random.seed : 4385
regionsfile : transcom_olson19_oif30.hdf
!!! Info for the CarbonTracker data assimilation system
datadir : /lfs0/projects/co2/input/ct_new_2010
obs.input.dir : ${datadir}/obsnc/with_fillvalue
obs.input.fname : obs_final
ocn.covariance : ${datadir}/oif_p3_era40.dpco2.2000.01.hdf
bio.covariance : ${datadir}/covariance_bio_olson19.nc
deltaco2.prefix : oif_p3_era40.dpco2
regtype : olson19_oif30
nparameters : 240
random.seed : 4385
regionsfile : transcom_olson19_oif30.hdf
! Info on the data assimilation cycle
time.restart : False
time.start : 2000-01-01 00:00:00
time.finish : 2000-01-08 00:00:00
time.cycle : 7
time.nlag : 2
dir.da_run : ${HOME}/tmp/test_da
! Info on the DA system used
da.system : CarbonTracker
da.system.rc : carbontracker.rc
da.platform : maunaloa
! Info on the forward model to be used
forecast.model : TM5
forecast.model.rc : ${HOME}/Modeling/TM5/ct_new.rc
forecast.nmembers : 2
#!/usr/bin/env python
# control.py
"""
Author : peters
Revision History:
File created on 26 Aug 2010.
"""
import os
import sys
import logging
import datetime
################### Begin Class DaSystem ###################
class DaSystem(object):
""" Information on the data assimilation system used. This is normally an rc-file with settings.
"""
def __init__(self,rcfilename):
"""
Initialization occurs from passed rc-file name
"""
self.LoadRc(rcfilename)
def __str__(self):
"""
String representation of a DaInfo object
"""
msg = "===============================================================" ; print msg
msg = "DA System Info rc-file is %s" % self.RcFileName ; print msg
msg = "===============================================================" ; print msg
return ""
def LoadRc(self,RcFileName):
"""
This method loads a DA System Info rc-file with settings for this simulation
"""
import da.tools.rc as rc
self.da_settings = rc.read(RcFileName)
self.RcFileName = RcFileName
self.DaRcLoaded = True
msg = 'DA System Info rc-file (%s) loaded successfully'%self.RcFileName ; logging.info(msg)
return True
def Validate(self ):
"""
Validate the contents of the rc-file given a dictionary of required keys
"""
needed_rc_items={}
for k,v in self.da_settings.iteritems():
if v == 'True' : self.da_settings[k] = True
if v == 'False': self.da_settings[k] = False
for key in needed_rc_items:
if not self.da_settings.has_key(key):
status,msg = ( False,'Missing a required value in rc-file : %s' % key)
logging.error(msg)
raise IOError,msg
status,msg = ( True,'DA System Info settings have been validated succesfully' ) ; logging.debug(msg)
return None
################### End Class DaSystem ###################
if __name__ == "__main__":
pass
#!/usr/bin/env python
# obs.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
import logging
import datetime
identifier = 'Observation baseclass'
version = '0.0'
################### Begin Class Observation ###################
class Observation(object):
""" an object that holds data + methods and attributes needed to manipulate observations for a DA cycle """
def __init__(self):
self.Identifier = self.getid()
self.Version = self.getversion()
self.Data = ObservationList([]) # Initialize with an empty list of obs
msg = 'Observation object initialized: %s'%self.Identifier ; logging.info(msg)
def getid(self):
return identifier
def getversion(self):
return identifier
def __str__(self):
""" Prints a list of Observation objects"""
return "This is a %s object, version %s"%(self.Identifier,self.Version)
def Initialize(self):
""" Perform all steps needed to start working with observational data, this can include moving data, concatenating files,
selecting datasets, etc.
"""
def Validate(self):
""" Make sure that data needed for the ObservationOperator (such as observation input lists, or parameter files)
are present.
"""
def AddObs(self):
""" Add the observation data to the Observation object. This is in a form of a list that goes under the name Data. The
list has as only requirement that it can return the observed+simulated values through a method "getvalues"
"""
def AddSimulations(self):
""" Add the simulation data to the Observation object.
"""
def WriteSampleInfo(self, DaCycle):
"""
Write the information needed by the observation operator to a file. Return the filename that was written for later use
"""
################### End Class Observations ###################
################### Begin Class ObservationList ###################
class ObservationList(list):
""" This is a special type of list that holds observed sample objects. It has methods to extract data from such a list easily """
from numpy import array, ndarray
def getvalues(self,name,constructor=array):
from numpy import ndarray
result = constructor([getattr(o,name) for o in self])
if isinstance(result,ndarray):
return result.squeeze()
else:
return result
################### End Class MixingRatioList ###################
#!/usr/bin/env python
# model.py
"""
Author : peters
Revision History:
File created on 30 Aug 2010.
"""
import os
import sys
import logging
import datetime
identifier = 'GeneralObservationOperator'
version = '0.0'
################### Begin Class ObservationOperator ###################
class ObservationOperator(object):
"""
This is a class that defines an ObervationOperator. This object is used to control the sampling of
a statevector in the ensemble Kalman filter framework. The methods of this class specify which (external) code
is called to perform the sampling, and which files should be read for input and are written for output.
The baseclasses consist mainly of empty mehtods that require an application specific application
"""
def __init__(self):
""" The instance of an ObservationOperator is application dependent """
self.Identifier = self.getid()
self.Version = self.getversion()
msg = 'Observation Operator object initialized: %s'%self.Identifier ; logging.info(msg)
def getid(self):
return identifier
def getversion(self):
return version
def __str__(self):
return "This is a %s object, version %s"%(self.Identifier,self.Version)
def Initialize(self,DaCycle):
""" Perform all steps necessary to start the observation operator through a simple Run() call """
def ValidateInput(self,DaCycle):
""" Make sure that data needed for the ObservationOperator (such as observation input lists, or parameter files)
are present.
"""
def SaveData(self):
""" Write the data that is needed for a restart or recovery of the Observation Operator to the save directory """
################### End Class ObservationOperator ###################
if __name__ == "__main__":
pass
#!/usr/bin/env python
# optimizer.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
import logging
import datetime
identifier = 'Optimizer baseclass'
version = '0.0'
################### Begin Class Optimizer ###################
class Optimizer(object):
"""
This creates an instance of an optimization object. It handles the minimum least squares optimization
of the state vector given a set of sample objects. Two routines will be implemented: one where the optimization
is sequential and one where it is the equivalent matrix solution. The choice can be made based on considerations of speed
and efficiency.
"""
def __init__(self):
self.Identifier = self.getid()
self.Version = self.getversion()
msg = 'Optimizer object initialized: %s'%self.Identifier ; logging.info(msg)
def getid(self):
return identifier
def getversion(self):
return version
def Initialize(self, dims):
self.nlag = dims[0]
self.nmembers = dims[1]
self.nparams = dims[2]
self.nobs = dims[3]
self.localization = False
self.localization = False
self.CreateMatrices()
return None
def CreateMatrices(self):
""" Create Matrix space needed in optimization routine """
import numpy as np
# mean state [X]
self.x = np.zeros( (self.nlag*self.nparams,), float)
# deviations from mean state [X']
self.X_prime = np.zeros( (self.nlag*self.nparams,self.nmembers,), float)
# mean state, transported to observation space [ H(X) ]
self.Hx = np.zeros( (self.nobs,), float)
# deviations from mean state, transported to observation space [ H(X') ]
self.HX_prime = np.zeros( (self.nobs,self.nmembers), float)
# observations
self.obs = np.zeros( (self.nobs,), float)
# covariance of observations
self.R = np.zeros( (self.nobs,self.nobs,), float)
# Total covariance of fluxes and obs in units of obs [H P H^t + R]
self.HPHR = np.zeros( (self.nobs,self.nobs,), float)
# Kalman Gain matrix
self.KG = np.zeros( (self.nlag*self.nparams,self.nobs,), float)
# flags of observations
self.flags = np.zeros( (self.nobs,), int)
def StateToMatrix(self,StateVector):
import numpy as np
for n in range(self.nlag):
members = StateVector.EnsembleMembers[n]
self.x[n*self.nparams:(n+1)*self.nparams] = members[0].ParameterValues[n]
self.X_prime[n*self.nparams:(n+1)*self.nparams,:] = np.transpose(np.array([m.ParameterValues for m in members]))
self.X_prime = self.X_prime - self.x[:,np.newaxis] # make into a deviation matrix
self.obs[:] = StateVector.EnsembleMembers[-1][0].ModelSample.Data.getvalues('obs')
self.Hx[:] = StateVector.EnsembleMembers[-1][0].ModelSample.Data.getvalues('simulated')
for m,mem in enumerate(StateVector.EnsembleMembers[-1]):
self.HX_prime[:,m] = mem.ModelSample.Data.getvalues('simulated')
self.HX_prime = self.HX_prime - self.Hx[:,np.newaxis] # make a deviation matrix
self.R[:,:] = np.identity(self.nobs)
return None
def MatrixToState(self,StateVector):
import numpy as np
for n in range(self.nlag):
members = StateVector.EnsembleMembers[n]
for m,mem in enumerate(members):
members[m].ParameterValues[:] = self.X_prime[n*self.nparams:(n+1)*self.nparams,m] + self.x[n*self.nparams:(n+1)*self.nparams]
return None
def SerialMinimumLeastSquares(self):
""" Make minimum least squares solution by looping over obs"""
import numpy as np
import numpy .linalg as la
tvalue=1.97591
for n in range(self.nobs):
if self.flags[n] == 1: continue
PHt = 1./(self.nmembers-1)*np.dot(self.X_prime,self.HX_prime[n,:])
self.HPHR[n,n] = 1./(self.nmembers-1)*(self.HX_prime[n,:]*self.HX_prime[n,:]).sum()+self.R[n,n]
self.KG[:,n] = PHt/self.HPHR[n,n]
dummy = self.Localize(n)
alpha = np.double(1.0)/(np.double(1.0)+np.sqrt( (self.R[n,n])/self.HPHR[n,n] ) )
res = self.obs[n]-self.Hx[n]
self.x[:] = self.x + self.KG[:,n]*res
for r in range(self.nmembers):
self.X_prime[:,r] = self.X_prime[:,r]-alpha*self.KG[:,n]*(self.HX_prime[n,r])
#WP !!!! Very important to first do all obervations from n=1 through the end, and only then update 1,...,n. The current observation
#WP should always be updated last because it features in the loop of the adjustments !!!!
for m in range(n+1,self.nobs):
res = self.obs[n]-self.Hx[n]
fac = 1.0/(self.nmembers-1)*(self.HX_prime[n,:]*self.HX_prime[m,:]).sum()/self.HPHR[n,n]
self.Hx[m] = self.Hx[m] + fac * res
self.HX_prime[m,:] = self.HX_prime[m,:] - alpha*fac*self.HX_prime[n,:]
for m in range(1,n+1):
res = self.obs[n]-self.Hx[n]
fac = 1.0/(self.nmembers-1)*(self.HX_prime[n,:]*self.HX_prime[m,:]).sum()/self.HPHR[n,n]
self.Hx[m] = self.Hx[m] + fac * res
self.HX_prime[m,:] = self.HX_prime[m,:] - alpha*fac*self.HX_prime[n,:]
def BulkMinimumLeastSquares(self):
""" Make minimum least squares solution by solving matrix equations"""
import numpy as np
import numpy.linalg as la
# Create full solution, first calculate the mean of the posterior analysis
HPH = np.dot(self.HX_prime,np.transpose(self.HX_prime))/(self.nmembers-1) # HPH = 1/N * HX' * (HX')^T
self.HPHR[:,:] = HPH+self.R # HPHR = HPH + R
HPb = np.dot(self.X_prime,np.transpose(self.HX_prime))/(self.nmembers-1) # HP = 1/N X' * (HX')^T
self.KG[:,:] = np.dot(HPb,la.inv(self.HPHR)) # K = HP/(HPH+R)
for n in range(self.nobs):
dummy = self.Localize(n)
self.x[:] = self.x + np.dot(self.KG,self.obs-self.Hx) # xa = xp + K (y-Hx)
# And next make the updated ensemble deviations. Note that we calculate P by using the full equation (10) at once, and
# not in a serial update fashion as described in Whitaker and Hamill.
# For the current problem with limited N_obs this is easier, or at least more straightforward to do.
I = np.identity(self.nlag*self.nparams)
sHPHR = la.cholesky(self.HPHR) # square root of HPH+R
part1 = np.dot(HPb,np.transpose(la.inv(sHPHR))) # HP(sqrt(HPH+R))^-1
part2 = la.inv(sHPHR+np.sqrt(self.R)) # (sqrt(HPH+R)+sqrt(R))^-1
Kw = np.dot(part1,part2) # K~
self.X_prime[:,:] = np.dot(I,self.X_prime)-np.dot(Kw,self.HX_prime) # HX' = I - K~ * HX'
P_opt = np.dot(self.X_prime,np.transpose(self.X_prime))/(self.nmembers-1)
# Now do the adjustments of the modeled mixing ratios using the linearized ensemble. These are not strictly needed but can be used
# for diagnosis.
part3 = np.dot(HPH,np.transpose(la.inv(sHPHR))) # HPH(sqrt(HPH+R))^-1
Kw = np.dot(part3,part2) # K~
self.Hx[:] = self.Hx + np.dot(np.dot(HPH,la.inv(self.HPHR)),self.obs-self.Hx) # Hx = Hx+ HPH/HPH+R (y-Hx)
self.HX_prime[:,:] = self.HX_prime-np.dot(Kw,self.HX_prime) # HX' = HX'- K~ * HX'
msg = 'Minimum Least Squares solution was calculated, returning' ; logging.info(msg)
return None
def SetLocalization(self):
""" determine which localization to use """
self.localization = True
self.localizetype = "None"
msg = "Current localization option is set to %s"%self.localizetype ; logging.info(msg)
def Localize(self,n):
""" localize the Kalman Gain matrix """
import numpy as np
if not self.localization: return
return
################### End Class Optimizer ###################
if __name__ == "__main__":
sys.path.append('../../')
import os
import sys
from da.tools.general import StartLogger
from da.tools.initexit import CycleControl
from da.ct.statevector import CtStateVector, PrepareState
from da.ct.obs import CtObservations
import numpy as np
import datetime
import da.tools.rc as rc
opts = ['-v']
args = {'rc':'../../da.rc','logfile':'da_initexit.log','jobrcfilename':'test.rc'}
StartLogger()
DaCycle = CycleControl(opts,args)
DaCycle.Initialize()
print DaCycle
StateVector = PrepareState(DaCycle)
samples = CtObservations(DaCycle.DaSystem,datetime.datetime(2005,3,5))
dummy = samples.AddObs()
dummy = samples.AddSimulations('/Users/peters/tmp/test_da/output/20050305/samples.000.nc')
nobs = len(samples.Data)
dims = ( int(DaCycle.da_settings['time.nlag']),
int(DaCycle.da_settings['forecast.nmembers']),
int(DaCycle.DaSystem.da_settings['nparameters']),
nobs, )
opt = CtOptimizer(dims)
opt.StateToMatrix(StateVector)
opt.MinimumLeastSquares()
opt.MatrixToState(StateVector)
#!/usr/bin/env python
# jobcontrol.py
"""
Author : peters
Revision History:
File created on 06 Sep 2010.
"""
import sys
import os
import logging
import subprocess
std_joboptions={'jobname':'test','jobaccount':'co2','jobnodes':'nserial 1','jobshell':'/bin/sh','depends':'','jobtime':'00:30:00'}
class PlatForm(object):
""" This specifies platform dependent options under generic object calls. A platform object is used to control and submit jobs"""
def __init__(self):
self.Identifier = 'iPad' # the identifier gives the plaform name
self.Version = '1.0' # the platform version used
msg1 = '%s object initialized'%self.Identifier ; logging.debug(msg1)
msg2 = '%s version: %s'%(self.Identifier,self.Version) ; logging.debug(msg2)
def __str__(self):
return None
def GetJobTemplate(self,joboptions={}):
""" Return the job template for a given computing system, and fill it with options from the dictionary provided as argument"""
template = """## \n"""+ \
"""## This is a set of dummy names, to be replaced by values from the dictionary \n"""+ \
"""## Please make your own platform specific template with your own keys and place it in a subfolder of the da package.\n """+ \
"""## \n"""+ \
""" \n"""+ \
"""#$ jobname \n"""+ \
"""#$ jobaccount \n"""+ \
"""#$ jobnodes \n"""+ \
"""#$ jobtime \n"""+ \
"""#$ jobshell \n """
if 'depends' in joboptions:
template += """#$ -hold_jid depends \n"""
# First replace from passed dictionary
for k,v in joboptions.iteritems():
while k in template:
template = template.replace(k,v)
# Fill remaining values with std_options
for k,v in std_joboptions.iteritems():
while k in template:
template = template.replace(k,v)
return template
def GetMyID(self):
return os.getpid()
def WriteJob(self,DaCycle,template, jobid):
""" This method writes a jobfile to the exec dir"""
#
# Done, write jobfile
#
targetdir = os.path.join(DaCycle.da_settings['dir.exec'])
jobfile = os.path.join(targetdir,'jb.%s.jb'%jobid)
f = open(jobfile,'w')
dummy = f.write(template)
dummy = f.close()
dummy = os.chmod(jobfile,477)
msg = "A job file was created (%s)"%jobfile ; logging.debug(msg)
return jobfile
def SubmitJob(self,jobfile):
""" This method submits a jobfile to the queue, and returns the queue ID """
cmd = ['sh',jobfile]
msg = "A new task will be started (%s)\n\n\n"%cmd ; logging.debug(msg)
process = subprocess.Popen(cmd)
jobid = subprocess.Popen(cmd).pid
dummy = subprocess.Popen(cmd).wait()
return 0
def KillJob(self,jobid):
""" This method kills a running job """
return None
def StatJob(self,jobid):
""" This method gets the status of a running job """
import subprocess
output = subprocess.Popen(['qstat',jobid], stdout=subprocess.PIPE).communicate()[0] ; logging.info(output)
return output
if __name__ == "__main__":
pass
#!/usr/bin/env python
# ct_statevector_tools.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
import logging
import datetime
identifier = 'Baseclass Statevector '
version = '0.0'
################### Begin Class EnsembleMember ###################
class EnsembleMember(object):
"""
An ensemble member. This consists of
* a member number
* parameter values
* a ModelSample object to hold sampled values for this member
"""
def __init__(self, membernumber):
self.membernumber = membernumber # the member number
self.ParameterValues = None # Parameter values of this member
self.ModelSample = None # Model Sampled Parameter values of this member
def __str__(self):
return "%03d"%self.membernumber
def WriteToFile(self, outdir):
""" Write the information needed by an external model to a netcdf file for future use """
from da.tools.io import CT_CDF, std_savedict
import numpy as np
filename = os.path.join(outdir,'parameters.%03d.nc'% self.membernumber)
f = CT_CDF(filename,'create')
dimparams = f.AddParamsDim(len(self.ParameterValues))
data = self.ParameterValues
savedict = std_savedict.copy()
savedict['name'] = "parametervalues"
savedict['long_name'] = "parameter_values_for_member_%d"%self.membernumber
savedict['units'] = "unitless"
savedict['dims'] = dimparams
savedict['values'] = data.tolist()
savedict['comment'] = 'These are parameter values to use for member %d'%self.membernumber
dummy = f.AddData(savedict)
dummy = f.close()
msg = 'Successfully wrote data from ensemble member %d to file (%s) ' % (self.membernumber,filename,) ; logging.info(msg)
################### End Class EnsembleMember ###################
################### Begin Class StateVector ###################
import numpy as np
class StateVector(object):
""" an object that holds data + methods and attributes needed to manipulate state vector values """
def __init__(self):
self.Identifier = self.getid()
self.Version = self.getversion()
msg = 'Statevector object initialized: %s'%self.Identifier ; logging.info(msg)
def getid(self):
return identifier
def getversion(self):
return version
def Initialize(self,dims):
self.nlag = dims[0]
self.nmembers = dims[1]
self.nparams = dims[2]
self.isOptimized = False
# These list objects hold the data for each time step of lag in the system. Note that the ensembles for each time step consist
# of lists of EnsembleMember objects, we define member 0 as the mean of the distribution and n=1,...,nmembers as the spread.
self.EnsembleMembers = range(self.nlag)
for n in range(self.nlag):
self.EnsembleMembers[n] = []
def __str__(self):
return "This is a base class derived state vector object"
def MakeNewEnsemble(self,lag, covariancematrix = None):
""" Make a new ensemble, the attribute lag refers to the position in the state vector.
Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below.
The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag]
The optional DaCycle object to be passed holds info on the data assimilation system settings needed
for instance to read covariance data from file. This is not part of the base class object here, hence, this
mehtod will almost always be overwritten in the derived class for the specific application.
The code below is just an example of what could be used
"""
if covariancematrix == None:
covariancematrix=np.identity(self.nparams)
# Make a cholesky decomposition of the covariance matrix
U,s,Vh = np.linalg.svd(covariancematrix)
dof = np.sum(s)**2/sum(s**2)
C = np.linalg.cholesky(covariancematrix)
msg = 'Cholesky decomposition has succeeded offline' ; logging.debug(msg)
msg = 'Appr. degrees of freedom in covariance matrix is %s'%(int(dof)) ; logging.info(msg)
# Create mean values
NewMean = np.ones(self.nparams,float)
# Create the first ensemble member with a deviation of 0.0 and add to list
NewMember = EnsembleMember(0)
NewMember.ParameterValues = np.zeros((self.nparams),float) + NewMean
dummy = self.EnsembleMembers[lag-1].append(NewMember)
# Create members 1:nmembers and add to EnsembleMembers list
for member in range(1,self.nmembers):
randstate = np.random.get_state()
rands = np.random.randn(self.nparams)
NewMember = EnsembleMember(member)
NewMember.ParameterValues = np.dot(C,rands) + NewMean
dummy = self.EnsembleMembers[lag-1].append(NewMember)
msg = '%d new ensemble members were added to the state vector # %d'%(self.nmembers,lag) ; logging.debug(msg)
def Propagate(self):
"""
Propagate the parameter values in the StateVector to the next cycle. This means a shift by one cycle step for all states that will
be optimized once more, and the creation of a new ensemble for the time step that just comes in for the first time (step=nlag).
In the future, this routine can incorporate a formal propagation of the statevector.
"""
# Remove State Vector n=1 by simply "popping" it from the list and appending a new empty list at the front. This empty list will
# hold the new ensemble for the new cycle
dummy = self.EnsembleMembers.pop(0)
dummy = self.EnsembleMembers.append([])
# And now create a new week of mean + members for n=nlag
dummy = self.MakeNewEnsemble(self.nlag)
msg = 'The state vector has been propagated by one cycle ' ; logging.info(msg)
return None
def WriteToFile(self,filename):
""" Write the StateVector object to a netcdf file for future use """
from da.tools.io import CT_CDF
import numpy as np
f = CT_CDF(filename,'create')
dimparams = f.AddParamsDim(self.nparams)
dimmembers = f.AddMembersDim(self.nmembers)
dimlag = f.AddLagDim(self.nlag,unlimited=True)
for n in range(self.nlag):
members = self.EnsembleMembers[n]
MeanState = members[0].ParameterValues
savedict = f.StandardVar(varname='meanstate')
savedict['dims'] = dimlag+dimparams
savedict['values'] = MeanState
savedict['count'] = n
savedict['comment'] = 'this represents the mean of the ensemble'
dummy = f.AddData(savedict)
members = self.EnsembleMembers[n]
data = np.array([m.ParameterValues for m in members])- MeanState
savedict = f.StandardVar(varname='ensemblestate')
savedict['dims'] = dimlag+dimmembers+dimparams
savedict['values'] = data
savedict['count'] = n
savedict['comment'] = 'this represents deviations from the mean of the ensemble'
dummy = f.AddData(savedict)
dummy = f.close()
msg = 'Successfully wrote the State Vector to file (%s) ' % (filename,) ; logging.info(msg)
def ReadFromFile(self,filename):
""" Read the StateVector object from a netcdf file for future use """
import Nio
import numpy as np
f = Nio.open_file(filename,'r')
MeanState = f.variables['statevectormean'].get_value()
EnsembleMembers = f.variables['statevectorensemble'].get_value()
dummy = f.close()
for n in range(self.nlag):
for m in range(self.nmembers):
NewMember = EnsembleMember(m)
NewMember.ParameterValues = EnsembleMembers[n,m,:] + MeanState[n] # add the mean to the deviations to hold the full parameter values
dummy = self.EnsembleMembers[n].append(NewMember)
msg = 'Successfully read the State Vector from file (%s) ' % (filename,) ; logging.info(msg)
################### End Class StateVector ###################
if __name__ == "__main__":
sys.path.append('../../')
import os
import sys
from da.tools.general import StartLogger
from da.tools.initexit import CycleControl
import numpy as np
import da.tools.rc as rc
opts = ['-v']
args = {'rc':'../../da.rc','logfile':'da_initexit.log','jobrcfilename':'test.rc'}
StartLogger()
DaCycle = CycleControl(opts,args)
dummy = EnsembleMember(0)
print dummy
DaCycle.Initialize()
print DaCycle
dims = ( int(DaCycle.da_settings['time.nlag']),
int(DaCycle.da_settings['forecast.nmembers']),
int(DaCycle.DaSystem.da_settings['nparameters']),
)
StateVector = StateVector(dims)
StateVector.MakeNewEnsemble(lag=1)
StateVector.MakeNewEnsemble(lag=2)
members = StateVector.EnsembleMembers[1]
members[0].WriteToFile(DaCycle.da_settings['dir.input'])
StateVector.Propagate()
savedir = DaCycle.da_settings['dir.output']
filename = os.path.join(savedir,'savestate.nc')
StateVector.WriteToFile(filename)
savedir = DaCycle.da_settings['dir.output']
filename = os.path.join(savedir,'savestate.nc')
StateVector.ReadFromFile(filename)
mpicc tm5_mpi_wrapper.c -o tm5_mpi_wrapper
#include <stdio.h>
#include <errno.h>
#include "mpi.h"
#define CMDLENGTH 200
int safe_system (const char *command);
int main(int argc, char *argv[]) {
int ierr;
int myrank;
char cmd[CMDLENGTH];
ierr = MPI_Init(&argc, &argv);
if (argc != 2) {
fprintf(stderr, "[tm5_mpi_wrapper] Expecting 1 argument, got %d.\n",argc);
exit(-1);
}
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
snprintf(cmd,CMDLENGTH,"%s %03d",argv[1],myrank);
//snprintf(cmd,CMDLENGTH,"%s %03d >> tm5.%03d.log",argv[1],myrank,myrank);//
printf( "MPI rank %d about to execute command \"%s\".\n",myrank,cmd );
ierr = safe_system(cmd);
if(ierr != 0) {
MPI_Abort(MPI_COMM_WORLD,ierr);
exit(ierr);
}
ierr = MPI_Finalize( );
exit(ierr);
}
int safe_system (const char *command) {
int pid, status;
if (command == 0)
return 1;
pid = fork();
if (pid == -1) // fork failed
return -1;
if (pid == 0) { // then this is the child
char *argv[4];
argv[0] = "sh";
argv[1] = "-c";
argv[2] = (char*)command;
argv[3] = 0;
execv("/bin/sh", argv);
_exit(127);
}
do {
if (waitpid(pid, &status, 0) == -1) {
if (errno != EINTR)
return -1;
} else
return status;
} while(1);
}
#!/usr/bin/env python
# control.py
"""
Author : peters
Revision History:
File created on 26 Aug 2010.
"""
import os
import sys
import logging
import datetime
################### Begin Class CtDaSystem ###################
from da.baseclasses.dasystem import DaSystem
class CtDaSystem(DaSystem):
""" Information on the data assimilation system used. This is normally an rc-file with settings.
"""
def Validate(self):
"""
Validate the contents of the rc-file given a dictionary of required keys
"""
needed_rc_items = ['obs.input.dir',
'obs.input.fname',
'ocn.covariance',
'nparameters',
'bio.covariance',
'deltaco2.prefix',
'regtype']
for k,v in self.da_settings.iteritems():
if v == 'True' : self.da_settings[k] = True
if v == 'False': self.da_settings[k] = False
for key in needed_rc_items:
if not self.da_settings.has_key(key):
status,msg = ( False,'Missing a required value in rc-file : %s' % key)
logging.error(msg)
raise IOError,msg
status,msg = ( True,'DA System Info settings have been validated succesfully' ) ; logging.debug(msg)
return None
################### End Class CtDaSystem ###################
if __name__ == "__main__":
pass
#!/usr/bin/env python
# io.py
"""
Author : peters
Revision History:
File created on 15 Oct 2008.
File modified for CT data assimilation system in July 2010, Wouter Peters
"""
import standardvariables
import pycdf as CDF
import datetime as dt
from numpy import array
import os
disclaimer = "This data belongs to the CarbonTracker project"
email = "wouter.peters@wur.nl"
url = "http://carbontracker.wur.nl"
institution = "Wageningen University and Research Center"
source = "CarbonTracker release 2.0"
conventions = "CF-1.1"
historytext = 'Created on '+dt.datetime.now().strftime('%B %d, %Y')+' by %s'%os.environ['USER']
std_savedict={'name':'unknown','values':[],'dims':(0,0,),'units':'','long_name':'','_FillValue':float(-999.),'comment':''}
class CT_CDF(CDF.CDF):
""" function opens a NetCDF/HDF/GRIB file for writing of output"""
def __init__(self,filename, method='read'):
if method not in ['read','write','create']:
raise ValueError, 'Method %s is not defined for a CarbonTracker NetCDF file object' % method
if method == 'read':
print 'Reading from file'
super(CDF.CDF,self).__init__(filename, CDF.NC.NOWRITE)
elif method == 'write':
#print 'Adding to existing file'
super(CT_CDF,self).__init__(filename, CDF.NC.WRITE|CDF.NC.CREATE)
self.AddCTHeader()
elif method == 'create':
#print 'Creating new file'
super(CT_CDF,self).__init__(filename, CDF.NC.WRITE|CDF.NC.TRUNC|CDF.NC.CREATE)
self.AddCTHeader()
def AddCTHeader(self):
self.automode()
#
setattr(self,'Institution',institution)
setattr(self,'Contact',email)
setattr(self,'URL',url)
setattr(self,'Source',source)
setattr(self,'Convention',conventions)
setattr(self,'Disclaimer',disclaimer)
setattr(self,'History',historytext)
def AddParamsDim(self,nparams):
self.automode()
dimparams=self.def_dim('nparameters',nparams)
return (dimparams,)
def AddMembersDim(self,nmembers):
self.automode()
dimmembers=self.def_dim('nmembers',nmembers)
return (dimmembers,)
def AddLagDim(self,nlag,unlimited=True):
self.automode()
if unlimited:
dimlag =self.def_dim('nlag',CDF.NC.UNLIMITED)
else:
dimlag=self.def_dim('nlag',nlag)
return (dimlag,)
def AddObsDim(self,nobs):
self.automode()
dimobs=self.def_dim('nobs',nobs)
return (dimobs,)
def AddLatLonDim(self,istart=0,iend=360,jstart=0,jend=180):
from numpy import arange, float64
if 'latitude' in self.dimensions(): return (self.dim('latitude'),self.dim('longitude'),) # already exists
lons=-180+arange(360)*1.0+0.5
lats=-90+arange(180)*1.0+0.5
#
lats=lats[jstart:jend]
lons=lons[istart:iend]
#
self.automode()
dimlon=self.def_dim('longitude',lons.shape[0])
dimlat=self.def_dim('latitude',lats.shape[0])
savedict=self.StandardVar(varname='latitude')
savedict['values']=lats.tolist()
savedict['actual_range']=(float(lats[0]),float(lats[-1]))
savedict['dims']=(dimlat,)
self.AddData(savedict)
savedict=self.StandardVar(varname='longitude')
savedict['values']=lons.tolist()
savedict['actual_range']=(float(lons[0]),float(lons[-1]))
savedict['dims']=(dimlon,)
self.AddData(savedict)
return (dimlat,dimlon,)
def AddDateDim(self):
self.automode()
if 'date' in self.dimensions(): return (self.dim('date'),)
return (self.def_dim('date',CDF.NC.UNLIMITED),)
def AddDateDimFormat(self):
self.automode()
if 'yyyymmddhhmmss' in self.dimensions(): return (self.dim('yyyymmddhhmmss'),) # already exists
return (self.def_dim('yyyymmddhhmmss',6),)
def has_date(self,dd):
if self.inq_unlimlen() > 0:
if dd in self.GetVariable('date').tolist():
return True
else:
return False
else:
return False
def GetVariable(self,varname):
""" get variable from ncf file"""
return array(self.var(varname).get())
def StandardVar(self,varname):
""" return properties of standard variables """
import standardvariables
if varname in standardvariables.standard_variables.keys():
return standardvariables.standard_variables[varname]
else:
return standardvariables.standard_variables['unknown']
def AddData(self,datadict,nsets=1,silent=True):
""" add fields to file, at end of unlimited dimension"""
existing_vars=self.variables()
try:
next = datadict['count']
except:
next=0
if existing_vars.has_key(datadict['name']):
var = self.var(datadict['name'])
var[next:next+nsets]=datadict['values']
else:
if not silent: print 'Creating new dataset: '+datadict['name']
if datadict.has_key('dtype'):
if datadict['dtype'] == 'int':
var = self.def_var(datadict['name'],CDF.NC.INT,datadict['dims'])
elif datadict['dtype'] == 'char':
var = self.def_var(datadict['name'],CDF.NC.CHAR,datadict['dims'])
elif datadict['dtype'] == 'double':
var = self.def_var(datadict['name'],CDF.NC.DOUBLE,datadict['dims'])
else:
var = self.def_var(datadict['name'],CDF.NC.FLOAT,datadict['dims'])
else:
var = self.def_var(datadict['name'],CDF.NC.FLOAT,datadict['dims'])
for k,v in datadict.iteritems():
if k not in ['name','dims','values','_FillValue','count']:
setattr(var,k,v)
if var.isrecord():
var[next:next+nsets]=datadict['values']
else:
var[:]=datadict['values']
def GetVariable(file,varname):
""" get variable from HDF file"""
return array(file.select(varname).get())
def CreateDirs(rundat,dirname):
dirname=os.path.join(rundat.outputdir,dirname)
if not os.path.exists(dirname):
print "Creating new output directory "+dirname
os.makedirs(dirname)
else:
print 'Writing files to directory: %s'%(dirname,)
return dirname
if __name__ == '__main__':
try:
os.remove('test.nc')
except:
pass
ncf=CT_CDF('test.nc','create')
dimgrid=ncf.AddLatLonDim()
dimdate=ncf.AddDateDim()
dimidate=ncf.AddDateDimFormat()
#!/usr/bin/env python
# obs.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
import logging
import datetime
identifier = 'CarbonTracker CO2 mixing ratios'
version = '0.0'
from da.baseclasses.obs import Observation
################### Begin Class CtObservations ###################
class CtObservations(Observation):
""" an object that holds data + methods and attributes needed to manipulate mixing ratio values """
def getid(self):
return identifier
def getversion(self):
return version
def Initialize(self,DaCycle):
self.startdate = DaCycle.da_settings['time.sample.start']
self.enddate = DaCycle.da_settings['time.sample.end']
DaSystem = DaCycle.DaSystem
sfname = DaSystem.da_settings['obs.input.fname']
filename = os.path.join(DaSystem.da_settings['obs.input.dir'],sfname)
if not os.path.exists(filename):
msg = 'Could not find the required observation input file (%s) ' % filename ; logging.error(msg)
raise IOError,msg
else:
self.ObsFilename = filename
self.Data = MixingRatioList([])
return None
def AddObs(self):
""" Returns a MixingRatioList holding individual MixingRatioSample objects for all obs in a file
The CarbonTracker mixing ratio files are provided as one long list of obs for all possible dates. So we can
either:
(1) read all, and the subselect the data we will use in the rest of this cycle
(2) Use nco to make a subset of the data
For now, we will stick with option (1)
"""
import Nio
import datetime as dtm
from string import strip, join
from numpy import array, logical_and
ncf = Nio.open_file(self.ObsFilename)
idates = ncf.variables['date_components'].get_value()
dates = array([dtm.datetime(*d) for d in idates])
subselect = logical_and(dates >= self.startdate , dates <= self.enddate).nonzero()[0]
dates = dates.take(subselect,axis=0)
ids = ncf.variables['id'].get_value().take(subselect,axis=0)
sites = ncf.variables['site'].get_value().take(subselect,axis=0)
sites = [s.tostring().lower() for s in sites]
sites = map(strip,sites)
lats = ncf.variables['lat'].get_value().take(subselect,axis=0)
lons = ncf.variables['lon'].get_value().take(subselect,axis=0)
alts = ncf.variables['alt'].get_value().take(subselect,axis=0)
obs = ncf.variables['obs'].get_value().take(subselect,axis=0)
species = ncf.variables['species'].get_value().take(subselect,axis=0)
date = ncf.variables['date'].get_value().take(subselect,axis=0)
window = ncf.variables['sampling_strategy'].get_value().take(subselect,axis=0)
flags = ncf.variables['NOAA_QC_flags'].get_value().take(subselect,axis=0)
flags = [s.tostring().lower() for s in flags]
flags = map(strip,flags)
dummy = ncf.close()
msg = "Successfully read data from obs file (%s)"%self.ObsFilename ; logging.debug(msg)
for n in range(len(dates)):
self.Data.append( MixingRatioSample(dates[n],sites[n],obs[n],0.0,0.0,0.0,0.0,flags[n],alts[n],lats[n],lons[n],ids[n],0.0) )
msg = "Added %d observations to the Data list" % len(dates) ; logging.debug(msg)
def AddSimulations(self,filename,silent=True):
""" Adds model simulated values to the mixing ratio objects """
import Nio
ncf = Nio.open_file(filename,format='hdf')
ids = ncf.variables['id'].get_value()
simulated = ncf.variables['sampled_mole_frac'].get_value()*1e6
dummy = ncf.close()
msg = "Successfully read data from model sample file (%s)"%filename ; logging.info(msg)
obs_ids = self.Data.getvalues('id')
obs_ids = obs_ids.tolist()
ids = map(int,ids)
missing_samples = []
for id,val in zip(ids,simulated):
if id in obs_ids:
index = obs_ids.index(id)
dummy = self.Data[index].simulated = val
else:
missing_samples.append(id)
if not silent and missing_samples != []:
msg = 'Model samples were found that did not match any ID in the observation list. Skipping them...' ; logging.warning(msg)
msg = '%s'%missing_samples ; logging.warning(msg)
msg = "Added %d simulated values to the Data list" % (len(ids)-len(missing_samples) ) ; logging.debug(msg)
def WriteSampleInfo(self, DaCycle):
"""
Write the information needed by the observation operator to a file. Return the filename that was written for later use
For CarbonTracker, the input files are already in a format suitable for TM5 to read. All we need to do return the name of the file
that served as input for the Observation object.
"""
import shutil
obsinputfile = os.path.join(DaCycle.da_settings['dir.input'],'observations.nc')
dummy = shutil.copy(self.ObsFilename,obsinputfile)
msg = " [copying source] .... %s " % self.ObsFilename ; logging.debug(msg)
msg = " [to destination] .... %s " % obsinputfile ; logging.debug(msg)
msg = "Sample input file for obs operator now in place (%s)"%obsinputfile ; logging.info(msg)
return obsinputfile
################### End Class CtObservations ###################
################### Begin Class MixingRatioSample ###################
class MixingRatioSample(object):
"""
Holds the data that defines a Mixing Ratio Sample in the data assimilation framework. Sor far, this includes all
attributes listed below in the __init__ method. One can additionally make more types of data, or make new
objects for specific projects.
"""
def __init__(self,xdate,code='XXX',obs=0.0,simulated=0.0,resid=0.0,hphr=0.0,mdm=0.0,flag=0,height=0.0,lat=-999.,lon=-999.,evn='0000',sdev=0.0):
self.code = code.strip() # Site code
self.xdate = xdate # Date of obs
self.obs = obs # Value observed
self.simulated = simulated # Value simulated by model
self.resid = resid # Mixing ratio residuals
self.hphr = hphr # Mixing ratio prior uncertainty from fluxes and (HPH) and model data mismatch (R)
self.mdm = mdm # Model data mismatch
self.flag = flag # Flag
self.height = height # Sample height
self.lat = lat # Sample lat
self.lon = lon # Sample lon
self.id = evn # Event number
self.sdev = sdev # standard deviation of ensemble
self.masl = True # Sample is in Meters Above Sea Level
self.mag = not self.masl # Sample is in Meters Above Ground
def __str__(self):
day=self.xdate.strftime('%Y-%m-%d %H:%M:%S')
return ' '.join(map(str,[self.code,day,self.obs,self.flag,self.id]))
################### End Class MixingRatioSample ###################
################### Begin Class MixingRatioList ###################
class MixingRatioList(list):
""" This is a special type of list that holds MixingRatioSample objects. It has methods to extract data from such a list easily """
from numpy import array, ndarray
def getvalues(self,name,constructor=array):
from numpy import ndarray
result = constructor([getattr(o,name) for o in self])
if isinstance(result,ndarray):
return result.squeeze()
else:
return result
def unflagged(self):
return MixingRatioSample([o for o in self if o.flag == 0])
def flagged(self):
return MixingRatioSample([o for o in self if o.flag != 0])
def selectsite(self,site='mlo'):
l=[o for o in self if o.code == site]
return MixingRatioSample(l)
def selecthours(self,hours=range(1,24)):
l=[o for o in self if o.xdate.hour in hours]
return MixingRatioSample(l)
################### End Class MixingRatioList ###################
if __name__ == "__main__":
from da.tools.pipeline import JobStart
from datetime import datetime
import logging
obs = CtObservations()
DaCycle = JobStart([],{'rc':'da.rc'})
DaCycle.da_settings['time.sample.start'] = datetime(2000,1,1)
DaCycle.da_settings['time.sample.end'] = datetime(2000,1,2)
obs.Initialize(DaCycle)
obs.Validate()
obs.AddObs()
print(obs.Data.getvalues('obs'))
#!/usr/bin/env python
# optimizer.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
import logging
import datetime
from da.baseclasses.optimizer import Optimizer
identifier = 'Ensemble Square Root Filter'
version = '0.0'
################### Begin Class CtOptimizer ###################
class CtOptimizer(Optimizer):
"""
This creates an instance of a CarbonTracker optimization object. The base class it derives from is the optimizer object.
Additionally, this CtOptimizer implements a special localization option following the CT2007 method.
All other methods are inherited from the base class Optimizer.
"""
def getid(self):
return identifier
def getversion(self):
return version
def SetLocalization(self,type='None'):
""" determine which localization to use """
if type == 'CT2007':
self.localization = True
self.localizetype = 'CT2007'
else:
self.localization = False
self.localizetype = 'None'
msg = "Current localization option is set to %s"%self.localizetype ; logging.info(msg)
def Localize(self,n):
""" localize the Kalman Gain matrix """
import numpy as np
if not self.localization: return
if self.localizetype == 'CT2007':
tvalue=1.97591
if np.sqrt(self.R[n,n]) >= 1.5:
for r in range(self.nlag*self.nparams):
corr=np.corrcoef(self.HX_prime[n,:],self.X_prime[r,:].squeeze())[0,1]
prob=corr/np.sqrt((1.0-corr**2)/(self.nmembers-2))
if abs(prob) < tvalue:
self.KG[r,n]=0.0
################### End Class CtOptimizer ###################
if __name__ == "__main__":
sys.path.append('../../')
import os
import sys
from da.tools.general import StartLogger
from da.tools.initexit import CycleControl
from da.ct.statevector import CtStateVector, PrepareState
from da.ct.obs import CtObservations
import numpy as np
import datetime
import da.tools.rc as rc
opts = ['-v']
args = {'rc':'../../da.rc','logfile':'da_initexit.log','jobrcfilename':'test.rc'}
StartLogger()
DaCycle = CycleControl(opts,args)
DaCycle.Initialize()
print DaCycle
StateVector = PrepareState(DaCycle)
samples = CtObservations(DaCycle.DaSystem,datetime.datetime(2005,3,5))
dummy = samples.AddObs()
dummy = samples.AddSimulations('/Users/peters/tmp/test_da/output/20050305/samples.000.nc')
nobs = len(samples.Data)
dims = ( int(DaCycle.da_settings['time.nlag']),
int(DaCycle.da_settings['forecast.nmembers']),
int(DaCycle.DaSystem.da_settings['nparameters']),
nobs, )
opt = CtOptimizer(dims)
opt.StateToMatrix(StateVector)
opt.MinimumLeastSquares()
opt.MatrixToState(StateVector)
standard_variables = { 'bio_flux_prior' : {'name' : 'bio_flux_prior',\
'units' : 'mol m-2 s-1' ,\
'long_name' : 'Surface flux of carbon dioxide, terrestrial vegetation, not optimized ', \
'comment' : 'time-interval average, centered on times in the date axis', \
'standard_name' : 'surface_carbon_dioxide_mole_flux', \
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'bio_flux_opt' : {'name' : 'bio_flux_opt',\
'units' : 'mol m-2 s-1' ,\
'long_name' : 'Surface flux of carbon dioxide, terrestrial biosphere , optimized ', \
'comment' : 'time-interval average, centered on times in the date axis', \
'standard_name' : 'surface_carbon_dioxide_mole_flux', \
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'ocn_flux_prior' : {'name' : 'ocn_flux_prior',\
'units' : 'mol m-2 s-1' ,\
'long_name' : 'Surface flux of carbon dioxide, open ocean , not optimized ', \
'comment' : 'time-interval average, centered on times in the date axis', \
'standard_name' : 'surface_carbon_dioxide_mole_flux', \
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'ocn_flux_opt' : {'name' : 'ocn_flux_opt',\
'units' : 'mol m-2 s-1' ,\
'long_name' : 'Surface flux of carbon dioxide, open ocean , optimized ', \
'comment' : 'time-interval average, centered on times in the date axis', \
'standard_name' : 'surface_carbon_dioxide_mole_flux', \
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'fossil_flux_imp' : {'name' : 'fossil_flux_imp',\
'units' : 'mol m-2 s-1' ,\
'long_name' : 'Surface flux of carbon dioxide, fossil fuel burning , imposed ', \
'comment' : 'time-interval average, centered on times in the date axis', \
'standard_name' : 'surface_carbon_dioxide_mole_flux', \
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'fire_flux_imp' : {'name' : 'fire_flux_imp',\
'units' : 'mol m-2 s-1' ,\
'long_name' : 'Surface flux of carbon dioxide, biomass burning , imposed ', \
'comment' : 'time-interval average, centered on times in the date axis', \
'standard_name' : 'surface_carbon_dioxide_mole_flux', \
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'bio_flux_prior_cov' : {'name' : 'bio_flux_prior_cov',\
'units' : 'mol2 region-2 s-2' ,\
'long_name' : 'Covariance of surface flux of carbon dioxide, terrestrial vegetation , not optimized ', \
'comment' : 'time-interval average, centered on times in the date axis', \
'standard_name' : '', \
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'bio_flux_opt_cov' : {'name' : 'bio_flux_opt_cov',\
'units' : 'mol2 region-2 s-2' ,\
'long_name' : 'Covariance of surface flux of carbon dioxide, terrestrial vegetation , optimized ', \
'comment' : 'time-interval average, centered on times in the date axis', \
'standard_name' : '', \
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'ocn_flux_prior_cov' : {'name' : 'ocn_flux_prior_cov',\
'units' : 'mol2 region-2 s-2' ,\
'long_name' : 'Covariance of surface flux of carbon dioxide, open ocean , not optimized ', \
'comment' : 'time-interval average, centered on times in the date axis', \
'standard_name' : '', \
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'ocn_flux_opt_cov' : {'name' : 'ocn_flux_opt_cov',\
'units' : 'mol2 region-2 s-2' ,\
'long_name' : 'Covariance of surface flux of carbon dioxide, open ocean , optimized ', \
'comment' : 'time-interval average, centered on times in the date axis', \
'standard_name' : '', \
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'decimal_date' : {'name' : 'decimal_date',\
'units' : 'years' ,\
'long_name' : 'dates and times', \
'comment' : 'time-interval average, centered on times in the date axis', \
'standard_name' : 'date', \
'dims' : (), \
'dtype' : 'double', \
'values' : [], \
'count' : 0 \
} , \
'date' : {'name' : 'date',\
'units' : 'days since 2000-01-01 00:00:00 UTC' ,\
'long_name' : 'UTC dates and times', \
'comment' : 'time-interval average, centered on times in the date axis', \
'standard_name' : 'date', \
'dims' : (), \
'dtype' : 'double', \
'values' : [], \
'count' : 0 \
} , \
'idate' : {'name' : 'idate',\
'units' : 'yyyy MM dd hh mm ss ' ,\
'long_name' : 'integer components of date and time', \
'standard_name' : 'calendar_components', \
'comment' : 'time-interval average, centered on times in the date axis', \
'dims' : (), \
'dtype' : 'int', \
'values' : [], \
'count' : 0 \
} , \
'latitude' : {'name' : 'latitude',\
'units' : 'degrees_north ' ,\
'long_name' : 'latitude', \
'standard_name' : 'latitude', \
'comment' : 'center of interval',\
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'longitude' : {'name' : 'longitude',\
'units' : 'degrees_east ' ,\
'long_name' : 'longitude', \
'standard_name' : 'longitude', \
'comment' : 'center of interval',\
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'height' : {'name' : 'height',\
'units' : 'masl ' ,\
'long_name' : 'height_above_ground_level', \
'standard_name' : 'height_above_ground_level', \
'comment' : 'value is meters above sea level',\
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'co2' : {'name' : 'co2',\
'units' : 'micromol mol-1 ' ,\
'long_name' : 'mole_fraction_of_carbon_dioxide_in_air', \
'standard_name' : 'mole_fraction_of_carbon_dioxide_in_air', \
'comment' : '',\
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'meanstate' : {'name' : 'statevectormean',\
'units' : 'unitless' ,\
'long_name' : 'mean_value_of_state_vector', \
'standard_name' : 'mean_value_of_state_vector', \
'comment' : '',\
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'ensemblestate': {'name' : 'statevectorensemble',\
'units' : 'unitless' ,\
'long_name' : 'ensemble_value_of_state_vector', \
'standard_name' : 'ensemble_value_of_state_vector', \
'comment' : '',\
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
'unknown' : {'name' : '',\
'units' : '' ,\
'long_name' : '', \
'standard_name' : '', \
'comment' : '',\
'dims' : (), \
'values' : [], \
'count' : 0 \
} , \
}