Commit 5fa9dbb4 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

more structure to modules

parent 2b70cb26
#!/usr/bin/env python
# ct_observation_tools.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
import rc
import logging
import datetime
identifier = 'CarbonTracker CO2'
################### Begin Class MixingRatioSample ###################
class MixingRatioSample():
"""
Holds the data that defines a Mixing Ratio Sample in the data assimilation framework. Sor far, this includes all
attributes listed below in the __init__ method. One can additionally make more types of data, or make new
objects for specific projects.
"""
def __init__(self,xdate,code='XXX',obs=0.0,simulated=0.0,resid=0.0,hphr=0.0,mdm=0.0,flag=0,height=0.0,lat=-999.,lon=-999.,evn='0000',sdev=0.0):
self.code = code.strip() # Site code
self.xdate = xdate # Date of obs
self.obs = obs # Value observed
self.simulated = simulated # Value simulated by model
self.resid = resid # Mixing ratio residuals
self.hphr = hphr # Mixing ratio prior uncertainty from fluxes and (HPH) and model data mismatch (R)
self.mdm = mdm # Model data mismatch
self.flag = flag # Flag
self.height = height # Sample height
self.lat = lat # Sample lat
self.lon = lon # Sample lon
self.id = evn # Event number
self.sdev = sdev # standard deviation of ensemble
self.masl = True # Sample is in Meters Above Sea Level
self.mag = not self.masl # Sample is in Meters Above Ground
def __str__(self):
day=self.xdate.strftime('%Y-%m-%d %H:%M:%S')
return ' '.join(map(str,[self.code,day,self.obs,self.flag,self.id]))
################### End Class MixingRatioSample ###################
################### Begin Class MixingRatioList ###################
class MixingRatioList(list):
""" This is a special type of list that holds MixingRatioSample objects. It has methods to extract data from such a list easily """
from numpy import array
def getvalues(self,name,constructor=array):
return constructor([getattr(o,name) for o in self])
def unflagged(self):
return MixingRatioSample([o for o in self if o.flag == 0])
def flagged(self):
return MixingRatioSample([o for o in self if o.flag != 0])
def selectsite(self,site='mlo'):
l=[o for o in self if o.code == site]
return MixingRatioSample(l)
def selecthours(self,hours=range(1,24)):
l=[o for o in self if o.xdate.hour in hours]
return MixingRatioSample(l)
################### End Class MixingRatioList ###################
################### Begin Class CtObservations ###################
class CtObservations():
""" an object that holds data + methods and attributes needed to manipulate mixing ratio values """
def __init__(self,DaInfo):
sfname = DaInfo.da_settings['obs.input.fname']+'.%s'%('20050305')+'.nc'
msg = 'This filename is hardcoded but needs replacement' ; logging.warning(msg)
filename = os.path.join(DaInfo.da_settings['obs.input.dir'],sfname)
if not os.path.exists(filename):
msg = 'Could not find the required observation input file (%s) ' % filename ; logging.error(msg)
raise IOError,msg
else:
self.ObsFilename = filename
self.MrData = None
def __str__(self):
""" Prints a list of MixingRatioSample objects"""
return self.ObsFilename
def AddObs(self):
""" Returns a MixingRatioList holding individual MixingRatioSample objects for all obs in a file"""
import pyhdf.SD as HDF
import datetime as dtm
from string import strip, join
ncf = HDF.SD(self.ObsFilename)
idates = ncf.select('date_components').get()
ids = ncf.select('id').get()
sites = ncf.select('site').get()
sites = [join(s,'').lower() for s in sites]
sites = map(strip,sites)
lats = ncf.select('lat').get()
lons = ncf.select('lon').get()
alts = ncf.select('alt').get()
obs = ncf.select('obs').get()
species = ncf.select('species').get()
date = ncf.select('date').get()
window = ncf.select('sampling_strategy').get()
flags = ncf.select('NOAA_QC_flags').get()
flags = [join(s,'') for s in flags]
flags = map(strip,flags)
dummy = ncf.end()
msg = "Successfully read data from obs file (%s)"%self.ObsFilename ; logging.debug(msg)
dates = [dtm.datetime(*d) for d in idates]
if self.MrData == None:
self.MrData = MixingRatioList([])
for n in range(len(dates)):
self.MrData.append( MixingRatioSample(dates[n],sites[n],obs[n],0.0,0.0,0.0,0.0,flags[n],alts[n],lats[n],lons[n],ids[n],0.0) )
msg = "Added %d observations to the MrData list" % len(dates) ; logging.info(msg)
def AddSimulations(self, filename):
""" Adds model simulated values to the mixing ratio objects """
import pyhdf.SD as HDF
ncf = HDF.SD(filename)
ids = ncf.select('id').get()
simulated = ncf.select('sampled_mole_frac').get()*1e6
dummy = ncf.end()
msg = "Successfully read data from model sample file (%s)"%filename ; logging.debug(msg)
obs_ids = self.MrData.getvalues('id')
for n in range(len(ids)):
try:
index = obs_ids.index(ids[n])
except:
msg = 'A model sample was found that did not match any ID in the existing observation list. Skipping...' ; logging.warning(msg)
continue
dummy = self.MrData[index].simulated = simulated[n]
msg = "Added %d simulated values to the MrData list" % len(ids) ; logging.info(msg)
################### End Class CtObservations ###################
################# Stand alone methods that manipulate the objects above ##############
def PrepareObs(CycleInfo,type='forecast'):
""" PrepareObs """
import shutil
from ct_tools import DaInfo
# First, we make a CarbonTracker Observation object which holds all the information related to the mixing ratios going in
# and coming out of the model. This object also holds the observations themselves, and the simulated value + statistics
DaSystem = DaInfo(CycleInfo.da_settings['da.system.rc'])
samples = CtObservations(DaSystem)
dummy = samples.AddObs()
#
# Next should follow a section where the observations that are read are further processes, and written to a NetCDF file for
# the transport model to use for x,y,z,t smapling.
#
# For carbontracker the observations already come in netcdf format thanks to Andy and Ken. We will therefore simply copy
# the observations.nc file to the pre-cooked obs files
#
filename= samples.ObsFilename
saveas = os.path.join(CycleInfo.da_settings['dir.input'],'observations.nc')
dummy = shutil.copy2(filename,saveas)
msg = 'Successfully copied file (%s) to (%s) ' % (filename,saveas) ; logging.info(msg)
return samples
if __name__ == "__main__":
import os
import sys
from tools_da import StartLogger
from da_initexit import CycleControl
import numpy as np
from ct_tools import DaInfo
opts = ['-v']
args = {'rc':'da.rc','logfile':'da_initexit.log','jobrcfilename':'test.rc'}
StartLogger()
DaCycle = CycleControl(opts,args)
Da_Info = DaInfo('carbontracker.rc')
DaCycle.Initialize()
print DaCycle
samples = CtObservations(Da_Info)
dummy = samples.AddObs()
dummy = samples.AddSimulations('/Users/peters/tmp/test_da/output/samples.000.nc')
print samples.MrData.getvalues('code')
print samples.MrData.getvalues('obs')
mlo=samples.MrData.selectsite('mlo_01d0')
dummy = PrepareObs(DaCycle,'forecast')
#!/usr/bin/env python
# ct_optimizer_tools.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
import rc
import logging
import datetime
identifier = 'CarbonTracker CO2'
################### Begin Class CtOptimizer ###################
class CtOptimizer():
"""
This creates an instance of a CarbonTracker optimization object. It handles the minimum least squares optimization
of the CT state vector given a set of CT sample objects. Two routines will be implemented: one where the optimization
is sequential and one where it is the equivalent matrix solution. The choice can be made based on considerations of speed
and efficiency.
"""
def __init__(self, CycleInfo, DaInfo):
self.nlag = int(CycleInfo.da_settings['time.nlag'])
self.nmembers = int(CycleInfo.da_settings['forecast.nmembers'])
self.nparams = int(DaInfo.da_settings['nparameters'])
self.CreateMatrices()
return None
def CreateMatrices(self):
""" Create Matrix space needed in optimization routine """
import numpy as np
self.X_prior = np.zeros( (self.nlag*self.nparams,), float)
self.X_prior_prime = np.zeros( (self.nmembers,self.nlag*self.nparams,), float)
def FillMatrices(self,StateVector):
import numpy as np
for n in range(self.nlag):
self.X_prior[n*self.nparams:(n+1)*self.nparams] = StateVector.MeanValues[n]
members = StateVector.EnsembleMembers[n]
self.X_prior_prime[:,n*self.nparams:(n+1)*self.nparams] = np.array([m.MeanDeviations for m in members])
return None
################### End Class CtOptimizer ###################
if __name__ == "__main__":
import os
import sys
from tools_da import StartLogger
from da_initexit import CycleControl
import numpy as np
from ct_tools import DaInfo
from ct_statevector_tools import CtStateVector, PrepareState
opts = ['-v']
args = {'rc':'da.rc','logfile':'da_initexit.log','jobrcfilename':'test.rc'}
StartLogger()
DaCycle = CycleControl(opts,args)
Da_Info = DaInfo('carbontracker.rc')
DaCycle.Initialize()
print DaCycle
StateVector = PrepareState(DaCycle)
opt = CtOptimizer(DaCycle,Da_Info)
opt.FillMatrices(StateVector)
#!/usr/bin/env python
# ct_statevector_tools.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
import rc
import logging
import datetime
################### Begin Class CtMember ###################
class CtMember():
"""
An ensemble member for CarbonTracker. This consists of
* parameter values
* a CtObservations object with sampled mixing ratios
* file names to write/read the data for this member
"""
def __init__(self, membernumber):
self.membernumber = membernumber # the member number
self.randomstate = (None,) # the state of the random number generator needed to reproduce the random sequence
self.ModelSamples = None # CtObservations object that holds model samples for this member
self.MeanDeviations = None # Parameter values of this member
self.ParameterOutputFile = None # File to which the member info should be written
self.SampleInputFile = None # File from which the sample info can be read
def __str__(self):
return "%03d"%self.membernumber
def WriteToFile(self, outdir):
""" Write the information needed by an external model to a netcdf file for future use """
from ct_netcdf import CT_CDF, std_savedict
import pycdf as cdf
import mysettings
import numpy as np
filename = os.path.join(outdir,'parameters.%03d.nc'% self.membernumber)
f = CT_CDF(filename,'create')
dimparams = f.AddParamsDim(len(self.MeanDeviations))
data = self.MeanDeviations
savedict = std_savedict.copy()
savedict['name'] = "parametervalues"
savedict['long_name'] = "parameter_values_for_member_%d"%self.membernumber
savedict['units'] = "unitless"
savedict['dims'] = dimparams
savedict['values'] = data.tolist()
savedict['comment'] = 'These are parameter values to use for member %d'%self.membernumber
dummy = f.AddData(savedict)
dummy = f.close()
msg = 'Successfully wrote data from ensemble member %d to file (%s) ' % (self.membernumber,filename,) ; logging.info(msg)
################### End Class CtMember ###################
################### Begin Class CtStateVector ###################
class CtStateVector():
""" an object that holds data + methods and attributes needed to manipulate state vector values """
def __init__(self,CycleInfo,DaInfo):
self.CycleInfo = CycleInfo
self.DaInfo = DaInfo
self.nlag = int(CycleInfo.da_settings['time.nlag'])
self.nmembers = int(CycleInfo.da_settings['forecast.nmembers'])
self.nparams = int(DaInfo.da_settings['nparameters'])
self.isOptimized = False
# These list objects hold the data for each time step of lag in the system. Note that the ensembles for each time step consist
# of lists of CtMember objects, whereas the MeanValues will simply be a list of array values.
self.MeanValues = range(self.nlag)
self.EnsembleMembers = range(self.nlag)
for n in range(self.nlag):
self.EnsembleMembers[n] = []
def __str__(self):
return "This is a CarbonTracker state vector object"
def MakeNewEnsemble(self,nlag):
""" Make a new ensemble from specified matrices """
import pyhdf.SD as hdf
import numpy as np
import matplotlib.pyplot as plt
# Get the needed matrices from the specified covariance files
file_ocn_cov = self.DaInfo.da_settings['ocn.covariance']
file_bio_cov = self.DaInfo.da_settings['bio.covariance']
for file in [file_ocn_cov,file_bio_cov]:
if not os.path.exists(file):
msg = "Cannot find the specified file %s" % file ; logging.error(msg)
raise IOError,msg
else:
msg = "Using covariance file: %s" % file ; logging.info(msg)
f_ocn = hdf.SD(file_ocn_cov)
f_bio = hdf.SD(file_bio_cov)
cov_ocn = f_ocn.select('qprior').get()
cov_bio = f_bio.select('qprior').get()
dummy = f_ocn.end()
dummy = f_bio.end()
dummy = logging.debug("Succesfully closed files after retrieving prior covariance matrices")
# Once we have the matrices, we can start to make the full covariance matrix, and then decompose it
fullcov = np.zeros((self.nparams,self.nparams),float)
nocn = cov_ocn.shape[0]
nbio = cov_bio.shape[0]
fullcov[0:nbio,0:nbio] = cov_bio
fullcov[nbio:nbio+nocn,nbio:nbio+nocn] = cov_ocn
# Visually inspect full covariance matrix if verbose
if self.CycleInfo.da_settings['verbose']:
plt.imshow(fullcov)
plt.colorbar()
plt.savefig('fullcovariancematrix.png')
plt.close('all')
# Make a cholesky decomposition of the covariance matrix
U,s,Vh = np.linalg.svd(fullcov)
dof = np.sum(s)**2/sum(s**2)
C = np.linalg.cholesky(fullcov)
msg = 'Cholesky decomposition has succeeded offline' ; logging.info(msg)
msg = 'Appr. degrees of freedom in covariance matrix is %s'%(int(dof)) ; logging.info(msg)
if self.CycleInfo.da_settings['verbose']:
plt.plot(s)
plt.savefig('eigenvaluespectrum.png')
plt.close('all')
# Now create a set of random instances of the given covariance structure
dummy = np.random.seed()
# Create mean values for the new time step (default = 1.0) and add to the MeanValues list
NewMean = np.ones(self.nparams,float)
self.MeanValues[nlag] = NewMean
# Create the first ensemble member with a deviation of 0.0 and add to list
NewMember = CtMember(0)
NewMember.MeanDeviations = np.zeros((self.nparams),float)
dummy = self.EnsembleMembers[nlag].append(NewMember)
# Create members 1:nmembers and add to EnsembleMembers list
for member in range(1,self.nmembers):
randstate = np.random.get_state()
rands = np.random.randn(self.nparams)
NewMember = CtMember(member)
NewMember.MeanDeviations = np.dot(C,rands)
NewMember.randomstate = randstate
dummy = self.EnsembleMembers[nlag].append(NewMember)
msg = '%d new ensemble members were added to the state vector'%(self.nmembers) ; logging.info(msg)
def Propagate(self,nlag):
return self.MakeNewEnsemble(nlag)
def WriteToFile(self):
""" Write the StateVector object to a netcdf file for future use """
from ct_netcdf import CT_CDF
import pycdf as cdf
import mysettings
import numpy as np
outdir = self.CycleInfo.da_settings['dir.output']
filtertime = self.CycleInfo.da_settings['startdate'].strftime('%Y%m%d')
filename = os.path.join(outdir,'savestate.%s.nc'% filtertime)
f = CT_CDF(filename,'create')
dimparams = f.AddParamsDim(self.nparams)
dimmembers = f.AddMembersDim(self.nmembers)
dimlag = f.AddLagDim(self.nlag,unlimited=True)
setattr(f,'date',filtertime)
setattr(f,'isOptimized',str(self.isOptimized))
for n in range(self.nlag):
data = StateVector.MeanValues[n]
savedict = f.StandardVar(varname='meanstate')
savedict['dims'] = dimlag+dimparams
savedict['values'] = data.tolist()
savedict['count'] = n
savedict['comment'] = 'this represents the mean of the ensemble'
dummy = f.AddData(savedict)
members = StateVector.EnsembleMembers[n]
data = np.array([m.MeanDeviations for m in members])
savedict = f.StandardVar(varname='ensemblestate')
savedict['dims'] = dimlag+dimmembers+dimparams
savedict['values'] = data.tolist()
savedict['count'] = n
savedict['comment'] = 'this represents deviations from the mean of the ensemble'
dummy = f.AddData(savedict)
dummy = f.close()
msg = 'Successfully wrote the State Vector to file (%s) ' % (filename,) ; logging.info(msg)
def ReadFromFile(self):
""" Read the StateVector object from a netcdf file for future use """
import pyhdf.SD as SD
import numpy as np
outdir = self.CycleInfo.da_settings['dir.output']
filtertime = self.CycleInfo.da_settings['startdate'].strftime('%Y%m%d')
filename = os.path.join(outdir,'savestate.%s.nc'% filtertime)
f = SD.SD(filename)
MeanState = f.select('statevectormean').get()
EnsembleMembers = f.select('statevectorensemble').get()
dummy = f.end()
for n in range(self.nlag):
self.MeanValues[n] = MeanState[n,:]
members = self.EnsembleMembers[n]
for i,m in enumerate(members):
m.MeanDeviations = EnsembleMembers[n,i,:]
msg = 'Successfully read the State Vector from file (%s) ' % (filename,) ; logging.info(msg)
################### End Class CtStateVector ###################
################# Stand alone methods that manipulate the objects above ##############
def PrepareState(CycleInfo):
"""
Prepare the ensemble of parameters needed by the forecast model. There are two possible pathways:
(A) Construct a brand new ensemble from a covariance matrix
(B) Get an existing ensemble and propagate it
The steps for procedure A are:
(A1) Construct the covariance matrix
(A2) Make a Cholesky decomposition
(A3) Prepare a set of [nmembers] input parameters for each forecast member
The steps for procedure B are:
(B1) Get existing parameter values for each member
(B2) Run them through a forecast model
Both procedures finish with writing the parameter values to a NetCDF file
"""
from ct_tools import DaInfo
# Get the da system specific rc-items, note that they've been validated already by FillObs
DaSystem = DaInfo(CycleInfo.da_settings['da.system.rc'])
StateVector = CtStateVector(CycleInfo,DaSystem)
nlag = int(CycleInfo.da_settings['time.nlag'])
for n in range(1,nlag+1):