From 5fa9dbb48bd9d09c1da09c6ac370cdc3977a263a Mon Sep 17 00:00:00 2001 From: Wouter Peters <wouter.peters@wur.nl> Date: Wed, 28 Jul 2010 09:25:48 +0000 Subject: [PATCH] more structure to modules --- ct_observation_tools.py | 227 ++++++++++++++++++++++++++ ct_optimizer_tools.py | 88 ++++++++++ ct_statevector_tools.py | 350 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 665 insertions(+) create mode 100755 ct_observation_tools.py create mode 100755 ct_optimizer_tools.py create mode 100755 ct_statevector_tools.py diff --git a/ct_observation_tools.py b/ct_observation_tools.py new file mode 100755 index 00000000..56c7d667 --- /dev/null +++ b/ct_observation_tools.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python +# ct_observation_tools.py + +""" +Author : peters + +Revision History: +File created on 28 Jul 2010. + +""" +import os +import sys +import rc +import logging +import datetime + +identifier = 'CarbonTracker CO2' + + + +################### Begin Class MixingRatioSample ################### + +class MixingRatioSample(): + """ + Holds the data that defines a Mixing Ratio Sample in the data assimilation framework. Sor far, this includes all + attributes listed below in the __init__ method. One can additionally make more types of data, or make new + objects for specific projects. + + """ + + def __init__(self,xdate,code='XXX',obs=0.0,simulated=0.0,resid=0.0,hphr=0.0,mdm=0.0,flag=0,height=0.0,lat=-999.,lon=-999.,evn='0000',sdev=0.0): + + self.code = code.strip() # Site code + self.xdate = xdate # Date of obs + self.obs = obs # Value observed + self.simulated = simulated # Value simulated by model + self.resid = resid # Mixing ratio residuals + self.hphr = hphr # Mixing ratio prior uncertainty from fluxes and (HPH) and model data mismatch (R) + self.mdm = mdm # Model data mismatch + self.flag = flag # Flag + self.height = height # Sample height + self.lat = lat # Sample lat + self.lon = lon # Sample lon + self.id = evn # Event number + self.sdev = sdev # standard deviation of ensemble + self.masl = True # Sample is in Meters Above Sea Level + self.mag = not self.masl # Sample is in Meters Above Ground + + def __str__(self): + day=self.xdate.strftime('%Y-%m-%d %H:%M:%S') + return ' '.join(map(str,[self.code,day,self.obs,self.flag,self.id])) + +################### End Class MixingRatioSample ################### + +################### Begin Class MixingRatioList ################### + +class MixingRatioList(list): + """ This is a special type of list that holds MixingRatioSample objects. It has methods to extract data from such a list easily """ + from numpy import array + + def getvalues(self,name,constructor=array): + return constructor([getattr(o,name) for o in self]) + def unflagged(self): + return MixingRatioSample([o for o in self if o.flag == 0]) + def flagged(self): + return MixingRatioSample([o for o in self if o.flag != 0]) + def selectsite(self,site='mlo'): + l=[o for o in self if o.code == site] + return MixingRatioSample(l) + def selecthours(self,hours=range(1,24)): + l=[o for o in self if o.xdate.hour in hours] + return MixingRatioSample(l) + +################### End Class MixingRatioList ################### + + +################### Begin Class CtObservations ################### + +class CtObservations(): + """ an object that holds data + methods and attributes needed to manipulate mixing ratio values """ + + def __init__(self,DaInfo): + + sfname = DaInfo.da_settings['obs.input.fname']+'.%s'%('20050305')+'.nc' + msg = 'This filename is hardcoded but needs replacement' ; logging.warning(msg) + filename = os.path.join(DaInfo.da_settings['obs.input.dir'],sfname) + + if not os.path.exists(filename): + msg = 'Could not find the required observation input file (%s) ' % filename ; logging.error(msg) + raise IOError,msg + else: + self.ObsFilename = filename + + self.MrData = None + + def __str__(self): + """ Prints a list of MixingRatioSample objects""" + return self.ObsFilename + + def AddObs(self): + """ Returns a MixingRatioList holding individual MixingRatioSample objects for all obs in a file""" + import pyhdf.SD as HDF + import datetime as dtm + from string import strip, join + + ncf = HDF.SD(self.ObsFilename) + idates = ncf.select('date_components').get() + ids = ncf.select('id').get() + sites = ncf.select('site').get() + sites = [join(s,'').lower() for s in sites] + sites = map(strip,sites) + lats = ncf.select('lat').get() + lons = ncf.select('lon').get() + alts = ncf.select('alt').get() + obs = ncf.select('obs').get() + species = ncf.select('species').get() + date = ncf.select('date').get() + window = ncf.select('sampling_strategy').get() + flags = ncf.select('NOAA_QC_flags').get() + flags = [join(s,'') for s in flags] + flags = map(strip,flags) + dummy = ncf.end() + msg = "Successfully read data from obs file (%s)"%self.ObsFilename ; logging.debug(msg) + + dates = [dtm.datetime(*d) for d in idates] + + if self.MrData == None: + self.MrData = MixingRatioList([]) + + for n in range(len(dates)): + self.MrData.append( MixingRatioSample(dates[n],sites[n],obs[n],0.0,0.0,0.0,0.0,flags[n],alts[n],lats[n],lons[n],ids[n],0.0) ) + + msg = "Added %d observations to the MrData list" % len(dates) ; logging.info(msg) + + def AddSimulations(self, filename): + """ Adds model simulated values to the mixing ratio objects """ + + import pyhdf.SD as HDF + + ncf = HDF.SD(filename) + ids = ncf.select('id').get() + simulated = ncf.select('sampled_mole_frac').get()*1e6 + dummy = ncf.end() + msg = "Successfully read data from model sample file (%s)"%filename ; logging.debug(msg) + + obs_ids = self.MrData.getvalues('id') + + for n in range(len(ids)): + + try: + index = obs_ids.index(ids[n]) + except: + msg = 'A model sample was found that did not match any ID in the existing observation list. Skipping...' ; logging.warning(msg) + continue + + dummy = self.MrData[index].simulated = simulated[n] + + msg = "Added %d simulated values to the MrData list" % len(ids) ; logging.info(msg) + + +################### End Class CtObservations ################### + + +################# Stand alone methods that manipulate the objects above ############## + +def PrepareObs(CycleInfo,type='forecast'): + """ PrepareObs """ + + import shutil + from ct_tools import DaInfo + + # First, we make a CarbonTracker Observation object which holds all the information related to the mixing ratios going in + # and coming out of the model. This object also holds the observations themselves, and the simulated value + statistics + + DaSystem = DaInfo(CycleInfo.da_settings['da.system.rc']) + + samples = CtObservations(DaSystem) + + dummy = samples.AddObs() + + # + # Next should follow a section where the observations that are read are further processes, and written to a NetCDF file for + # the transport model to use for x,y,z,t smapling. + # + # For carbontracker the observations already come in netcdf format thanks to Andy and Ken. We will therefore simply copy + # the observations.nc file to the pre-cooked obs files + # + + filename= samples.ObsFilename + saveas = os.path.join(CycleInfo.da_settings['dir.input'],'observations.nc') + dummy = shutil.copy2(filename,saveas) + msg = 'Successfully copied file (%s) to (%s) ' % (filename,saveas) ; logging.info(msg) + + return samples + + + + +if __name__ == "__main__": + import os + import sys + from tools_da import StartLogger + from da_initexit import CycleControl + import numpy as np + from ct_tools import DaInfo + + opts = ['-v'] + args = {'rc':'da.rc','logfile':'da_initexit.log','jobrcfilename':'test.rc'} + + StartLogger() + DaCycle = CycleControl(opts,args) + Da_Info = DaInfo('carbontracker.rc') + + DaCycle.Initialize() + print DaCycle + + samples = CtObservations(Da_Info) + + dummy = samples.AddObs() + dummy = samples.AddSimulations('/Users/peters/tmp/test_da/output/samples.000.nc') + + print samples.MrData.getvalues('code') + print samples.MrData.getvalues('obs') + + mlo=samples.MrData.selectsite('mlo_01d0') + + dummy = PrepareObs(DaCycle,'forecast') diff --git a/ct_optimizer_tools.py b/ct_optimizer_tools.py new file mode 100755 index 00000000..eba1b867 --- /dev/null +++ b/ct_optimizer_tools.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python +# ct_optimizer_tools.py + +""" +Author : peters + +Revision History: +File created on 28 Jul 2010. + +""" + +import os +import sys +import rc +import logging +import datetime + +identifier = 'CarbonTracker CO2' +################### Begin Class CtOptimizer ################### + +class CtOptimizer(): + """ + This creates an instance of a CarbonTracker optimization object. It handles the minimum least squares optimization + of the CT state vector given a set of CT sample objects. Two routines will be implemented: one where the optimization + is sequential and one where it is the equivalent matrix solution. The choice can be made based on considerations of speed + and efficiency. + """ + + def __init__(self, CycleInfo, DaInfo): + + self.nlag = int(CycleInfo.da_settings['time.nlag']) + self.nmembers = int(CycleInfo.da_settings['forecast.nmembers']) + self.nparams = int(DaInfo.da_settings['nparameters']) + + self.CreateMatrices() + + return None + + def CreateMatrices(self): + """ Create Matrix space needed in optimization routine """ + import numpy as np + + self.X_prior = np.zeros( (self.nlag*self.nparams,), float) + self.X_prior_prime = np.zeros( (self.nmembers,self.nlag*self.nparams,), float) + + def FillMatrices(self,StateVector): + import numpy as np + + for n in range(self.nlag): + + self.X_prior[n*self.nparams:(n+1)*self.nparams] = StateVector.MeanValues[n] + members = StateVector.EnsembleMembers[n] + self.X_prior_prime[:,n*self.nparams:(n+1)*self.nparams] = np.array([m.MeanDeviations for m in members]) + + return None + + +################### End Class CtOptimizer ################### + + + + + +if __name__ == "__main__": + + import os + import sys + from tools_da import StartLogger + from da_initexit import CycleControl + import numpy as np + from ct_tools import DaInfo + from ct_statevector_tools import CtStateVector, PrepareState + + opts = ['-v'] + args = {'rc':'da.rc','logfile':'da_initexit.log','jobrcfilename':'test.rc'} + + StartLogger() + DaCycle = CycleControl(opts,args) + Da_Info = DaInfo('carbontracker.rc') + + DaCycle.Initialize() + print DaCycle + + StateVector = PrepareState(DaCycle) + + opt = CtOptimizer(DaCycle,Da_Info) + + opt.FillMatrices(StateVector) diff --git a/ct_statevector_tools.py b/ct_statevector_tools.py new file mode 100755 index 00000000..9f68ea75 --- /dev/null +++ b/ct_statevector_tools.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python +# ct_statevector_tools.py + +""" +Author : peters + +Revision History: +File created on 28 Jul 2010. + +""" + +import os +import sys +import rc +import logging +import datetime + + +################### Begin Class CtMember ################### + +class CtMember(): + """ + An ensemble member for CarbonTracker. This consists of + * parameter values + * a CtObservations object with sampled mixing ratios + * file names to write/read the data for this member + """ + + def __init__(self, membernumber): + self.membernumber = membernumber # the member number + self.randomstate = (None,) # the state of the random number generator needed to reproduce the random sequence + self.ModelSamples = None # CtObservations object that holds model samples for this member + self.MeanDeviations = None # Parameter values of this member + self.ParameterOutputFile = None # File to which the member info should be written + self.SampleInputFile = None # File from which the sample info can be read + + def __str__(self): + return "%03d"%self.membernumber + + def WriteToFile(self, outdir): + """ Write the information needed by an external model to a netcdf file for future use """ + from ct_netcdf import CT_CDF, std_savedict + import pycdf as cdf + import mysettings + import numpy as np + + filename = os.path.join(outdir,'parameters.%03d.nc'% self.membernumber) + f = CT_CDF(filename,'create') + dimparams = f.AddParamsDim(len(self.MeanDeviations)) + + data = self.MeanDeviations + + savedict = std_savedict.copy() + savedict['name'] = "parametervalues" + savedict['long_name'] = "parameter_values_for_member_%d"%self.membernumber + savedict['units'] = "unitless" + savedict['dims'] = dimparams + savedict['values'] = data.tolist() + savedict['comment'] = 'These are parameter values to use for member %d'%self.membernumber + dummy = f.AddData(savedict) + + dummy = f.close() + + msg = 'Successfully wrote data from ensemble member %d to file (%s) ' % (self.membernumber,filename,) ; logging.info(msg) + + + +################### End Class CtMember ################### + +################### Begin Class CtStateVector ################### + +class CtStateVector(): + """ an object that holds data + methods and attributes needed to manipulate state vector values """ + def __init__(self,CycleInfo,DaInfo): + + self.CycleInfo = CycleInfo + self.DaInfo = DaInfo + self.nlag = int(CycleInfo.da_settings['time.nlag']) + self.nmembers = int(CycleInfo.da_settings['forecast.nmembers']) + self.nparams = int(DaInfo.da_settings['nparameters']) + self.isOptimized = False + + # These list objects hold the data for each time step of lag in the system. Note that the ensembles for each time step consist + # of lists of CtMember objects, whereas the MeanValues will simply be a list of array values. + + self.MeanValues = range(self.nlag) + self.EnsembleMembers = range(self.nlag) + + for n in range(self.nlag): + self.EnsembleMembers[n] = [] + + def __str__(self): + return "This is a CarbonTracker state vector object" + + def MakeNewEnsemble(self,nlag): + """ Make a new ensemble from specified matrices """ + + import pyhdf.SD as hdf + import numpy as np + import matplotlib.pyplot as plt + + # Get the needed matrices from the specified covariance files + + file_ocn_cov = self.DaInfo.da_settings['ocn.covariance'] + file_bio_cov = self.DaInfo.da_settings['bio.covariance'] + + for file in [file_ocn_cov,file_bio_cov]: + + if not os.path.exists(file): + + msg = "Cannot find the specified file %s" % file ; logging.error(msg) + raise IOError,msg + else: + + msg = "Using covariance file: %s" % file ; logging.info(msg) + + f_ocn = hdf.SD(file_ocn_cov) + f_bio = hdf.SD(file_bio_cov) + + cov_ocn = f_ocn.select('qprior').get() + cov_bio = f_bio.select('qprior').get() + + dummy = f_ocn.end() + dummy = f_bio.end() + + dummy = logging.debug("Succesfully closed files after retrieving prior covariance matrices") + + # Once we have the matrices, we can start to make the full covariance matrix, and then decompose it + + fullcov = np.zeros((self.nparams,self.nparams),float) + + nocn = cov_ocn.shape[0] + nbio = cov_bio.shape[0] + + fullcov[0:nbio,0:nbio] = cov_bio + fullcov[nbio:nbio+nocn,nbio:nbio+nocn] = cov_ocn + + # Visually inspect full covariance matrix if verbose + + if self.CycleInfo.da_settings['verbose']: + + plt.imshow(fullcov) + plt.colorbar() + plt.savefig('fullcovariancematrix.png') + plt.close('all') + + # Make a cholesky decomposition of the covariance matrix + + U,s,Vh = np.linalg.svd(fullcov) + dof = np.sum(s)**2/sum(s**2) + + C = np.linalg.cholesky(fullcov) + + msg = 'Cholesky decomposition has succeeded offline' ; logging.info(msg) + msg = 'Appr. degrees of freedom in covariance matrix is %s'%(int(dof)) ; logging.info(msg) + + if self.CycleInfo.da_settings['verbose']: + + plt.plot(s) + plt.savefig('eigenvaluespectrum.png') + plt.close('all') + + # Now create a set of random instances of the given covariance structure + + dummy = np.random.seed() + + # Create mean values for the new time step (default = 1.0) and add to the MeanValues list + + NewMean = np.ones(self.nparams,float) + self.MeanValues[nlag] = NewMean + + # Create the first ensemble member with a deviation of 0.0 and add to list + + NewMember = CtMember(0) + NewMember.MeanDeviations = np.zeros((self.nparams),float) + dummy = self.EnsembleMembers[nlag].append(NewMember) + + # Create members 1:nmembers and add to EnsembleMembers list + + for member in range(1,self.nmembers): + randstate = np.random.get_state() + rands = np.random.randn(self.nparams) + + NewMember = CtMember(member) + NewMember.MeanDeviations = np.dot(C,rands) + NewMember.randomstate = randstate + dummy = self.EnsembleMembers[nlag].append(NewMember) + + msg = '%d new ensemble members were added to the state vector'%(self.nmembers) ; logging.info(msg) + + def Propagate(self,nlag): + + return self.MakeNewEnsemble(nlag) + + def WriteToFile(self): + """ Write the StateVector object to a netcdf file for future use """ + from ct_netcdf import CT_CDF + import pycdf as cdf + import mysettings + import numpy as np + + outdir = self.CycleInfo.da_settings['dir.output'] + filtertime = self.CycleInfo.da_settings['startdate'].strftime('%Y%m%d') + filename = os.path.join(outdir,'savestate.%s.nc'% filtertime) + f = CT_CDF(filename,'create') + dimparams = f.AddParamsDim(self.nparams) + dimmembers = f.AddMembersDim(self.nmembers) + dimlag = f.AddLagDim(self.nlag,unlimited=True) + + setattr(f,'date',filtertime) + setattr(f,'isOptimized',str(self.isOptimized)) + + for n in range(self.nlag): + data = StateVector.MeanValues[n] + + savedict = f.StandardVar(varname='meanstate') + savedict['dims'] = dimlag+dimparams + savedict['values'] = data.tolist() + savedict['count'] = n + savedict['comment'] = 'this represents the mean of the ensemble' + dummy = f.AddData(savedict) + + + members = StateVector.EnsembleMembers[n] + data = np.array([m.MeanDeviations for m in members]) + + savedict = f.StandardVar(varname='ensemblestate') + savedict['dims'] = dimlag+dimmembers+dimparams + savedict['values'] = data.tolist() + savedict['count'] = n + savedict['comment'] = 'this represents deviations from the mean of the ensemble' + dummy = f.AddData(savedict) + + dummy = f.close() + + msg = 'Successfully wrote the State Vector to file (%s) ' % (filename,) ; logging.info(msg) + + def ReadFromFile(self): + """ Read the StateVector object from a netcdf file for future use """ + import pyhdf.SD as SD + import numpy as np + + outdir = self.CycleInfo.da_settings['dir.output'] + filtertime = self.CycleInfo.da_settings['startdate'].strftime('%Y%m%d') + filename = os.path.join(outdir,'savestate.%s.nc'% filtertime) + + f = SD.SD(filename) + MeanState = f.select('statevectormean').get() + EnsembleMembers = f.select('statevectorensemble').get() + dummy = f.end() + + for n in range(self.nlag): + self.MeanValues[n] = MeanState[n,:] + members = self.EnsembleMembers[n] + for i,m in enumerate(members): + m.MeanDeviations = EnsembleMembers[n,i,:] + + msg = 'Successfully read the State Vector from file (%s) ' % (filename,) ; logging.info(msg) + +################### End Class CtStateVector ################### + +################# Stand alone methods that manipulate the objects above ############## + +def PrepareState(CycleInfo): + """ + + Prepare the ensemble of parameters needed by the forecast model. There are two possible pathways: + + (A) Construct a brand new ensemble from a covariance matrix + + (B) Get an existing ensemble and propagate it + + The steps for procedure A are: + + (A1) Construct the covariance matrix + (A2) Make a Cholesky decomposition + (A3) Prepare a set of [nmembers] input parameters for each forecast member + + The steps for procedure B are: + + (B1) Get existing parameter values for each member + (B2) Run them through a forecast model + + + Both procedures finish with writing the parameter values to a NetCDF file + + + """ + from ct_tools import DaInfo + + # Get the da system specific rc-items, note that they've been validated already by FillObs + + DaSystem = DaInfo(CycleInfo.da_settings['da.system.rc']) + + StateVector = CtStateVector(CycleInfo,DaSystem) + + nlag = int(CycleInfo.da_settings['time.nlag']) + + for n in range(1,nlag+1): + + if n == nlag: + + # Route A: make a new ensemble + + dummy = StateVector.MakeNewEnsemble(n-1) + + else: + + # Route B: Propagate existing ensemble + + dummy = StateVector.Propagate(n-1) + + return StateVector + + + + +if __name__ == "__main__": + import os + import sys + from tools_da import StartLogger + from da_initexit import CycleControl + import numpy as np + from ct_tools import DaInfo + + opts = ['-v'] + args = {'rc':'da.rc','logfile':'da_initexit.log','jobrcfilename':'test.rc'} + + StartLogger() + DaCycle = CycleControl(opts,args) + Da_Info = DaInfo('carbontracker.rc') + + dummy = CtMember(0) + print dummy + + DaCycle.Initialize() + print DaCycle + + StateVector = PrepareState(DaCycle) + + members = StateVector.EnsembleMembers[1] + + members[0].WriteToFile(DaCycle.da_settings['dir.input']) + + data = np.array([m.MeanDeviations for m in members]) + + StateVector.WriteToFile() + + StateVector.ReadFromFile() + -- GitLab