Commit c3b3a2fb authored by Peters, Wouter's avatar Peters, Wouter
Browse files

copies of CT specific classes to be converted into general classes next

parent 407eb114
#!/usr/bin/env python
# obs.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
import logging
import datetime
identifier = 'CarbonTracker CO2'
################### Begin Class MixingRatioSample ###################
class MixingRatioSample(object):
"""
Holds the data that defines a Mixing Ratio Sample in the data assimilation framework. Sor far, this includes all
attributes listed below in the __init__ method. One can additionally make more types of data, or make new
objects for specific projects.
"""
def __init__(self,xdate,code='XXX',obs=0.0,simulated=0.0,resid=0.0,hphr=0.0,mdm=0.0,flag=0,height=0.0,lat=-999.,lon=-999.,evn='0000',sdev=0.0):
self.code = code.strip() # Site code
self.xdate = xdate # Date of obs
self.obs = obs # Value observed
self.simulated = simulated # Value simulated by model
self.resid = resid # Mixing ratio residuals
self.hphr = hphr # Mixing ratio prior uncertainty from fluxes and (HPH) and model data mismatch (R)
self.mdm = mdm # Model data mismatch
self.flag = flag # Flag
self.height = height # Sample height
self.lat = lat # Sample lat
self.lon = lon # Sample lon
self.id = evn # Event number
self.sdev = sdev # standard deviation of ensemble
self.masl = True # Sample is in Meters Above Sea Level
self.mag = not self.masl # Sample is in Meters Above Ground
def __str__(self):
day=self.xdate.strftime('%Y-%m-%d %H:%M:%S')
return ' '.join(map(str,[self.code,day,self.obs,self.flag,self.id]))
################### End Class MixingRatioSample ###################
################### Begin Class MixingRatioList ###################
class MixingRatioList(list):
""" This is a special type of list that holds MixingRatioSample objects. It has methods to extract data from such a list easily """
from numpy import array, ndarray
def getvalues(self,name,constructor=array):
from numpy import ndarray
result = constructor([getattr(o,name) for o in self])
if isinstance(result,ndarray):
return result.squeeze()
else:
return result
def unflagged(self):
return MixingRatioSample([o for o in self if o.flag == 0])
def flagged(self):
return MixingRatioSample([o for o in self if o.flag != 0])
def selectsite(self,site='mlo'):
l=[o for o in self if o.code == site]
return MixingRatioSample(l)
def selecthours(self,hours=range(1,24)):
l=[o for o in self if o.xdate.hour in hours]
return MixingRatioSample(l)
################### End Class MixingRatioList ###################
################### Begin Class CtObservations ###################
class CtObservations(object):
""" an object that holds data + methods and attributes needed to manipulate mixing ratio values """
def __init__(self,DaSystem,sampledate):
sfname = DaSystem.da_settings['obs.input.fname']+'.%s'%(sampledate.strftime('%Y%m%d'),)+'.nc'
#msg = 'This filename is hardcoded but needs replacement' ; logging.warning(msg)
filename = os.path.join(DaSystem.da_settings['obs.input.dir'],sfname)
if not os.path.exists(filename):
msg = 'Could not find the required observation input file (%s) ' % filename ; logging.error(msg)
raise IOError,msg
else:
self.ObsFilename = filename
self.MrData = MixingRatioList([])
def __str__(self):
""" Prints a list of MixingRatioSample objects"""
return self.ObsFilename
def AddObs(self):
""" Returns a MixingRatioList holding individual MixingRatioSample objects for all obs in a file
Note that this routine was not yet ported to Nio because it cannot read string arrays !!!
"""
import pyhdf.SD as HDF
import datetime as dtm
from string import strip, join
ncf = HDF.SD(self.ObsFilename)
idates = ncf.select('date_components').get()
ids = ncf.select('id').get()
sites = ncf.select('site').get()
sites = [s.tostring().lower() for s in sites]
sites = map(strip,sites)
lats = ncf.select('lat').get()
lons = ncf.select('lon').get()
alts = ncf.select('alt').get()
obs = ncf.select('obs').get()
species = ncf.select('species').get()
date = ncf.select('date').get()
window = ncf.select('sampling_strategy').get()
flags = ncf.select('NOAA_QC_flags').get()
flags = [s.tostring().lower() for s in flags]
flags = map(strip,flags)
dummy = ncf.end()
msg = "Successfully read data from obs file (%s)"%self.ObsFilename ; logging.debug(msg)
dates = [dtm.datetime(*d) for d in idates]
if self.MrData == None:
self.MrData = MixingRatioList([])
for n in range(len(dates)):
self.MrData.append( MixingRatioSample(dates[n],sites[n],obs[n],0.0,0.0,0.0,0.0,flags[n],alts[n],lats[n],lons[n],ids[n],0.0) )
msg = "Added %d observations to the MrData list" % len(dates) ; logging.debug(msg)
def AddSimulations(self, filename,silent=True):
""" Adds model simulated values to the mixing ratio objects """
import pyhdf.SD as HDF
import Nio
if not os.path.exists(filename):
msg = "Could not find required model sample file (%s)"%filename ; logging.debug(msg)
raise IOError,msg
ncf = Nio.open_file(filename,format='hdf')
ids = ncf.variables['id'].get_value()
simulated = ncf.variables['sampled_mole_frac'].get_value()*1e6
dummy = ncf.close()
msg = "Successfully read data from model sample file (%s)"%filename ; logging.info(msg)
obs_ids = self.MrData.getvalues('id')
obs_ids = obs_ids.tolist()
ids = map(int,ids)
missing_samples = []
for id,val in zip(ids,simulated):
if id in obs_ids:
index = obs_ids.index(id)
dummy = self.MrData[index].simulated = val
else:
missing_samples.append(id)
if not silent and missing_samples != []:
msg = 'Model samples were found that did not match any ID in the observation list. Skipping them...' ; logging.warning(msg)
msg = '%s'%missing_samples ; logging.warning(msg)
msg = "Added %d simulated values to the MrData list" % (len(ids)-len(missing_samples) ) ; logging.debug(msg)
################### End Class CtObservations ###################
################# Stand alone methods that manipulate the objects above ##############
def PrepareObs(CycleInfo,lag):
""" PrepareObs """
import shutil
from da.tools.general import AdvanceTime
# First, we make a CarbonTracker Observation object which holds all the information related to the mixing ratios going in
# and coming out of the model. This object also holds the observations themselves, and the simulated value + statistics
sampledate = CycleInfo.da_settings['time.start']
sampledate = AdvanceTime(sampledate,lag*int(CycleInfo.da_settings['cyclelength']))
samples = CtObservations(CycleInfo.DaSystem, sampledate)
dummy = samples.AddObs()
#
# Next should follow a section where the observations that are read are further processes, and written to a NetCDF file for
# the transport model to use for x,y,z,t smapling.
#
# For carbontracker the observations already come in netcdf format thanks to Andy and Ken. We will therefore simply copy
# the observations.nc file to the pre-cooked obs files
#
filename= samples.ObsFilename
saveas = os.path.join(CycleInfo.da_settings['dir.input'],'observations.nc')
dummy = shutil.copy2(filename,saveas)
msg = 'Successfully copied file (%s) to (%s) ' % (filename,saveas) ; logging.info(msg)
return samples
if __name__ == "__main__":
sys.path.append('../../')
import os
import sys
from da.tools.general import StartLogger
from da.tools.initexit import CycleControl
import numpy as np
import da.tools.rc as rc
opts = ['-v']
args = {'rc':'../../da.rc','logfile':'da_initexit.log','jobrcfilename':'test.rc'}
StartLogger()
DaCycle = CycleControl(opts,args)
DaCycle.Initialize()
print DaCycle
samples = CtObservations(DaCycle.DaSystem,datetime.datetime(2005,3,5))
dummy = samples.AddObs()
dummy = samples.AddSimulations('/Users/peters/tmp/test_da/output/samples.000.nc')
print samples.MrData.getvalues('code')
print samples.MrData.getvalues('obs')
mlo=samples.MrData.selectsite('mlo_01d0')
#!/usr/bin/env python
# optimizer.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
import logging
import datetime
identifier = 'CarbonTracker CO2'
################### Begin Class CtOptimizer ###################
class CtOptimizer(object):
"""
This creates an instance of a CarbonTracker optimization object. It handles the minimum least squares optimization
of the CT state vector given a set of CT sample objects. Two routines will be implemented: one where the optimization
is sequential and one where it is the equivalent matrix solution. The choice can be made based on considerations of speed
and efficiency.
"""
def __init__(self, dims):
self.nlag = dims[0]
self.nmembers = dims[1]
self.nparams = dims[2]
self.nobs = dims[3]
self.localization = False
self.localization = False
self.CreateMatrices()
return None
def CreateMatrices(self):
""" Create Matrix space needed in optimization routine """
import numpy as np
# mean state [X]
self.x = np.zeros( (self.nlag*self.nparams,), float)
# deviations from mean state [X']
self.X_prime = np.zeros( (self.nlag*self.nparams,self.nmembers,), float)
# mean state, transported to observation space [ H(X) ]
self.Hx = np.zeros( (self.nobs,), float)
# deviations from mean state, transported to observation space [ H(X') ]
self.HX_prime = np.zeros( (self.nobs,self.nmembers), float)
# observations
self.obs = np.zeros( (self.nobs,), float)
# covariance of observations
self.R = np.zeros( (self.nobs,self.nobs,), float)
# Total covariance of fluxes and obs in units of obs [H P H^t + R]
self.HPHR = np.zeros( (self.nobs,self.nobs,), float)
# Kalman Gain matrix
self.KG = np.zeros( (self.nlag*self.nparams,self.nobs,), float)
# flags of observations
self.flags = np.zeros( (self.nobs,), int)
def StateToMatrix(self,StateVector):
import numpy as np
for n in range(self.nlag):
members = StateVector.EnsembleMembers[n]
self.x[n*self.nparams:(n+1)*self.nparams] = members[0].ParameterValues[n]
self.X_prime[n*self.nparams:(n+1)*self.nparams,:] = np.transpose(np.array([m.ParameterValues for m in members]))
self.X_prime = self.X_prime - self.x[:,np.newaxis] # make into a deviation matrix
self.obs[:] = StateVector.EnsembleMembers[-1][0].ModelSample.MrData.getvalues('obs')
self.Hx[:] = StateVector.EnsembleMembers[-1][0].ModelSample.MrData.getvalues('simulated')
for m,mem in enumerate(StateVector.EnsembleMembers[-1]):
#self.HX_prime[:,m] = mem.ModelSample.MrData.getvalues('simulated')
self.HX_prime[:,m] = self.Hx + np.random.randn(self.nobs)*3.0
self.HX_prime = self.HX_prime - self.Hx[:,np.newaxis] # make a deviation matrix
self.R[:,:] = np.identity(self.nobs)
return None
def MatrixToState(self,StateVector):
import numpy as np
for n in range(self.nlag):
members = StateVector.EnsembleMembers[n]
for m,mem in enumerate(members):
members[m].ParameterValues[:] = self.X_prime[n*self.nparams:(n+1)*self.nparams,m] + self.x[n*self.nparams:(n+1)*self.nparams]
return None
def SerialMinimumLeastSquares(self):
""" Make minimum least squares solution by looping over obs"""
import numpy as np
import numpy .linalg as la
tvalue=1.97591
for n in range(self.nobs):
if self.flags[n] == 1: continue
PHt = 1./(self.nmembers-1)*np.dot(self.X_prime,self.HX_prime[n,:])
self.HPHR[n,n] = 1./(self.nmembers-1)*(self.HX_prime[n,:]*self.HX_prime[n,:]).sum()+self.R[n,n]
self.KG[:,n] = PHt/self.HPHR[n,n]
dummy = self.Localize(n)
alpha = np.double(1.0)/(np.double(1.0)+np.sqrt( (self.R[n,n])/self.HPHR[n,n] ) )
res = self.obs[n]-self.Hx[n]
self.x[:] = self.x + self.KG[:,n]*res
for r in range(self.nmembers):
self.X_prime[:,r] = self.X_prime[:,r]-alpha*self.KG[:,n]*(self.HX_prime[n,r])
#WP !!!! Very important to first do all obervations from n=1 through the end, and only then update 1,...,n. The current observation
#WP should always be updated last because it features in the loop of the adjustments !!!!
for m in range(n+1,self.nobs):
res = self.obs[n]-self.Hx[n]
fac = 1.0/(self.nmembers-1)*(self.HX_prime[n,:]*self.HX_prime[m,:]).sum()/self.HPHR[n,n]
self.Hx[m] = self.Hx[m] + fac * res
self.HX_prime[m,:] = self.HX_prime[m,:] - alpha*fac*self.HX_prime[n,:]
for m in range(1,n+1):
res = self.obs[n]-self.Hx[n]
fac = 1.0/(self.nmembers-1)*(self.HX_prime[n,:]*self.HX_prime[m,:]).sum()/self.HPHR[n,n]
self.Hx[m] = self.Hx[m] + fac * res
self.HX_prime[m,:] = self.HX_prime[m,:] - alpha*fac*self.HX_prime[n,:]
def BulkMinimumLeastSquares(self):
""" Make minimum least squares solution by solving matrix equations"""
import numpy as np
import numpy.linalg as la
# Create full solution, first calculate the mean of the posterior analysis
HPH = np.dot(self.HX_prime,np.transpose(self.HX_prime))/(self.nmembers-1) # HPH = 1/N * HX' * (HX')^T
self.HPHR[:,:] = HPH+self.R # HPHR = HPH + R
HPb = np.dot(self.X_prime,np.transpose(self.HX_prime))/(self.nmembers-1) # HP = 1/N X' * (HX')^T
self.KG[:,:] = np.dot(HPb,la.inv(self.HPHR)) # K = HP/(HPH+R)
for n in range(self.nobs):
dummy = self.Localize(n)
self.x[:] = self.x + np.dot(self.KG,self.obs-self.Hx) # xa = xp + K (y-Hx)
# And next make the updated ensemble deviations. Note that we calculate P by using the full equation (10) at once, and
# not in a serial update fashion as described in Whitaker and Hamill.
# For the current problem with limited N_obs this is easier, or at least more straightforward to do.
I = np.identity(self.nlag*self.nparams)
sHPHR = la.cholesky(self.HPHR) # square root of HPH+R
part1 = np.dot(HPb,np.transpose(la.inv(sHPHR))) # HP(sqrt(HPH+R))^-1
part2 = la.inv(sHPHR+np.sqrt(self.R)) # (sqrt(HPH+R)+sqrt(R))^-1
Kw = np.dot(part1,part2) # K~
self.X_prime[:,:] = np.dot(I,self.X_prime)-np.dot(Kw,self.HX_prime) # HX' = I - K~ * HX'
P_opt = np.dot(self.X_prime,np.transpose(self.X_prime))/(self.nmembers-1)
# Now do the adjustments of the modeled mixing ratios using the linearized ensemble. These are not strictly needed but can be used
# for diagnosis.
part3 = np.dot(HPH,np.transpose(la.inv(sHPHR))) # HPH(sqrt(HPH+R))^-1
Kw = np.dot(part3,part2) # K~
self.Hx[:] = self.Hx + np.dot(np.dot(HPH,la.inv(self.HPHR)),self.obs-self.Hx) # Hx = Hx+ HPH/HPH+R (y-Hx)
self.HX_prime[:,:] = self.HX_prime-np.dot(Kw,self.HX_prime) # HX' = HX'- K~ * HX'
msg = 'Minimum Least Squares solution was calculated, returning' ; logging.info(msg)
return None
def SetLocalization(self,type='None'):
""" determine which localization to use """
if type == 'CT2007':
self.localization = True
self.localizetype = 'CT2007'
else:
self.localization = False
self.localizetype = 'None'
msg = "Current localization option is set to %s"%self.localizetype ; logging.info(msg)
def Localize(self,n):
""" localize the Kalman Gain matrix """
import numpy as np
if not self.localization: return
if self.localizetype == 'CT2007':
tvalue=1.97591
if np.sqrt(self.R[n,n]) >= 1.5:
for r in range(self.nlag*self.nparams):
corr=np.corrcoef(self.HX_prime[n,:],self.X_prime[r,:].squeeze())[0,1]
prob=corr/np.sqrt((1.0-corr**2)/(self.nmembers-2))
if abs(prob) < tvalue:
self.KG[r,n]=0.0
################### End Class CtOptimizer ###################
if __name__ == "__main__":
sys.path.append('../../')
import os
import sys
from da.tools.general import StartLogger
from da.tools.initexit import CycleControl
from da.ct.statevector import CtStateVector, PrepareState
from da.ct.obs import CtObservations
import numpy as np
import datetime
import da.tools.rc as rc
opts = ['-v']
args = {'rc':'../../da.rc','logfile':'da_initexit.log','jobrcfilename':'test.rc'}
StartLogger()
DaCycle = CycleControl(opts,args)
DaCycle.Initialize()
print DaCycle
StateVector = PrepareState(DaCycle)
samples = CtObservations(DaCycle.DaSystem,datetime.datetime(2005,3,5))
dummy = samples.AddObs()
dummy = samples.AddSimulations('/Users/peters/tmp/test_da/output/20050305/samples.000.nc')
nobs = len(samples.MrData)
dims = ( int(DaCycle.da_settings['time.nlag']),
int(DaCycle.da_settings['forecast.nmembers']),
int(DaCycle.DaSystem.da_settings['nparameters']),
nobs, )
opt = CtOptimizer(dims)
opt.StateToMatrix(StateVector)
opt.MinimumLeastSquares()
opt.MatrixToState(StateVector)
#!/usr/bin/env python
# ct_statevector_tools.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
import logging
import datetime
################### Begin Class CtMember ###################
class CtMember(object):
"""
An ensemble member for CarbonTracker. This consists of
* parameter values
* file names to write/read the data for this member
"""
def __init__(self, membernumber):
self.membernumber = membernumber # the member number
self.ParameterValues = None # Parameter values of this member
self.ModelSample = None # Model Sampled Parameter values of this member
def __str__(self):
return "%03d"%self.membernumber
def WriteToFile(self, outdir):
""" Write the information needed by an external model to a netcdf file for future use """
from da.tools.io import CT_CDF, std_savedict
import numpy as np
filename = os.path.join(outdir,'parameters.%03d.nc'% self.membernumber)
f = CT_CDF(filename,'create')
dimparams = f.AddParamsDim(len(self.ParameterValues))
data = self.ParameterValues
savedict = std_savedict.copy()
savedict['name'] = "parametervalues"
savedict['long_name'] = "parameter_values_for_member_%d"%self.membernumber
savedict['units'] = "unitless"
savedict['dims'] = dimparams
savedict['values'] = data.tolist()
savedict['comment'] = 'These are parameter values to use for member %d'%self.membernumber
dummy = f.AddData(savedict)
dummy = f.close()
msg = 'Successfully wrote data from ensemble member %d to file (%s) ' % (self.membernumber,filename,) ; logging.debug(msg)
################### End Class CtMember ###################
################### Begin Class CtStateVector ###################
class CtStateVector(object):
""" an object that holds data + methods and attributes needed to manipulate state vector values """
def __init__(self,CycleInfo):
self.CycleInfo = CycleInfo
self.DaSystem = CycleInfo.DaSystem
self.nlag = int(self.CycleInfo.da_settings['time.nlag'])
self.nmembers = int(self.CycleInfo.da_settings['forecast.nmembers'])
self.nparams = int(self.DaSystem.da_settings['nparameters'])
self.isOptimized = False
# These list objects hold the data for each time step of lag in the system. Note that the ensembles for each time step consist
# of lists of CtMember objects, we define member 0 as the mean of the distribution and n=1,...,nmembers as the spread.
self.EnsembleMembers = range(self.nlag)
for n in range(self.nlag):
self.EnsembleMembers[n] = []
def __str__(self):
return "This is a CarbonTracker state vector object"
def MakeNewEnsemble(self,lag):
""" Make a new ensemble from specified matrices, the attribute lag refers to the position in the state vector.
Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below.