From 93ffdd03c85d74b9519cc11e9131165264019566 Mon Sep 17 00:00:00 2001 From: Liesbeth Florentie <liesbeth.florentie@wur.nl> Date: Tue, 10 Dec 2019 15:38:20 +0000 Subject: [PATCH] read statevector uncertainties from file --- xco2/da/ctsf/statevector_ctsf_newcov.py | 580 ++++++++++++++++++++++++ 1 file changed, 580 insertions(+) create mode 100644 xco2/da/ctsf/statevector_ctsf_newcov.py diff --git a/xco2/da/ctsf/statevector_ctsf_newcov.py b/xco2/da/ctsf/statevector_ctsf_newcov.py new file mode 100644 index 00000000..af1b983b --- /dev/null +++ b/xco2/da/ctsf/statevector_ctsf_newcov.py @@ -0,0 +1,580 @@ +"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. +Users are recommended to contact the developers (wouter.peters@wur.nl) to receive +updates of the code. See also: http://www.carbontracker.eu. + +This program is free software: you can redistribute it and/or modify it under the +terms of the GNU General Public License as published by the Free Software Foundation, +version 3. This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with this +program. If not, see <http://www.gnu.org/licenses/>.""" +#!/usr/bin/env python + +from __future__ import division + +import os +import sys +sys.path.append(os.getcwd()) + +import logging +import numpy as np +import da.tools.io4 as io +from da.baseclasses.statevector import StateVector, EnsembleMember + +identifier = 'CarbonTracker Statistical Fit Statevector ' +version = '0.0' + + +# polynomial function of order len(c)-1, with c vector containing polynomial coefficients +def polyfunc(c,x): + p = len(c) + f = 0 + for o in range(p): + f += c[o]*np.power(x,o) + del o + return f + + + +################### Start Class SifTinversionStateVector ################### + +class ctsfStateVector(StateVector): + + """ + This is a StateVector object for CarbonTracker. It is created for use with the SIF inversion approach, + and contains statistical parameters and SIF sensitivity parameters per ecoregion to be assembled. + It has a private method to make new ensemble members. + """ + + def setup(self, dacycle): + """ + setup the object by specifying the dimensions. + There are two major requirements for each statvector that you want to build: + (1) is that the statevector can map itself onto a regular grid + (2) is that the statevector can map itself (mean+covariance) onto TransCom regions + An example is given below. + """ + + self.nlat = 180 + self.nlon = 360 + + self.nlag = int(dacycle['time.nlag']) + self.nmembers = int(dacycle['da.optimizer.nmembers']) # number of ensemble members + + # fit settings + self.nharmonics = int(dacycle.dasystem['nharmonics']) # number of harmonical terms in baseline fit + self.nbasefitparams = 3 + self.nharmonics*4 + + self.nobs = 0 + self.obs_to_assimilate = () # empty containter to hold observations to assimilate later on + + # initialization of fit parameters + self.unit_initialization = dacycle.dasystem['fit.unit_initialization'] # if True: initial fit will be constructed based on latitude, else: read from file + + if self.unit_initialization: + + # coefficients polynomial part: p0 + p1*t + p2*t^2 + polynomial = dacycle.dasystem['polynomial'].split(',') + self.polynomial = np.asarray([float(i) for i in polynomial]) + + # initialize amplitude of first harmonics + amplitude = dacycle.dasystem['amplitude'].split(',') + if amplitude[0] == 'None': + self.amplitude = [ None, float(amplitude[1])] + amplitude_polyfit = dacycle.dasystem['amplitude_polyfit'].split(',') + self.amplitude_polyfit = np.asarray([float(i) for i in amplitude_polyfit]) + logging.debug('Amplitude of first harmonic will be initialized as %s-th order polynomial function with coefficients (increasing order): %s' \ + % (len(self.amplitude_polyfit)-1,self.amplitude_polyfit)) + else: + self.amplitude = [float(amplitude[0]), float(amplitude[1])] + + else: + self.initparamsfile = dacycle.dasystem['params.init.file'] + self.initparamsvec = dacycle.dasystem['params.init.vec'] # True: directly read statevector, False: extract statevector from gridded field + self.initparamsname = dacycle.dasystem['params.init.name'] + + # anomaly term + self.ngamma = int(dacycle.dasystem['ngamma']) # number of sensitivity parameters per ecoregion: default = 12, 1 per calendar month + self.gamma0 = dacycle.dasystem['params.init.gamma'].split(',') # enter single value, or comma-separated list of ngamma (default: one for each month) + + # parameters for temperature dependent respiration parametrization + self.add_resp = dacycle.dasystem['add.respiration'] + if self.add_resp: + # self.nresp = 2 + self.nresp = 13 # one r0, monthly s0 + self.r0 = float(dacycle.dasystem['params.init.R0']) + self.s0 = float(dacycle.dasystem['params.init.S0']) # initial value for S0 = R0*dk, k = k0 + dk + logging.info('Respiration parametrization added to statevector') + else: + self.nresp = 0 + + self.nfitparams = self.nbasefitparams + self.ngamma + self.nresp + + # These list objects hold the data for each time step of lag in the system. Note that the ensembles for each time step consist + # of lists of EnsembleMember objects, we define member 0 as the mean of the distribution and n=1,...,nmembers as the spread. + self.ensemble_members = range(self.nlag) + for n in range(self.nlag): + self.ensemble_members[n] = [] + del n + + # This specifies the file to read with the gridded mask at 1x1 degrees. Each gridbox holds a number that specifies the ecoregion it belongs to. + # From this map, a dictionary is created that allows a reverse look-up so that we can map parameters to a grid. + mapfile = os.path.join(dacycle.dasystem['regionsfile']) + ncf = io.ct_read(mapfile, 'read') + self.gridmap = ncf.get_variable('regions') # read ecoregions map (nlat x nlon) + self.tcmap = ncf.get_variable('transcom_regions') # read transcom regions map (nlat x nlon) + olsonmap = ncf.get_variable('land_ecosystems') + if self.unit_initialization: + if self.amplitude[0] is None: + areas = ncf.get_variable('grid_cell_area') + latvec = ncf.get_variable('lat') + latgrid = np.ones(areas.shape) * latvec[:,np.newaxis] + ncf.close() + + logging.debug("A TransCom map on 1x1 degree was read from file %s" % dacycle.dasystem['regionsfile']) + logging.debug("An ecoregion parameter map on 1x1 degree was read from file %s" % dacycle.dasystem['regionsfile']) + + # Create a dictionary for state <-> gridded map conversions (keys = regionnumbers) + # And calculate mean latitude per ecoregion (only if self.amplitude[0] = None) + regnums = [] + self.griddict = {} + lat_regions = [] + + for i in range(209): + sel = np.where(self.gridmap == i+1) + if len(sel[0]) > 0 and np.mean(olsonmap[sel] < 19): + regnums.append(i+1) + self.griddict[i+1] = sel + if (self.unit_initialization) and (self.amplitude[0] is None): + lat_regions.append( np.average(latgrid[sel], weights=areas[sel]) ) + del i + + if self.unit_initialization: + if self.amplitude[0] is None and len(lat_regions) != len(regnums): + logging.error('Inconsistent number of elements in lists with regional latitudes (%s) and with regionnumbers (%s)' % (len(lat_regions), len(regnums))) + raise IOError + elif self.amplitude[0] is not None and len(lat_regions) != 0: + logging.error('average latitudes per ecoregion calculated, but amplitudes will not be initialized with latitude function...') + raise IOError + + self.regionnumbers = np.asarray(regnums) + self.nregions = len(self.regionnumbers) + self.nparams = self.nregions * self.nfitparams # number of elements in statevector + if self.unit_initialization: + if self.amplitude[0] is None: + self.lat_regions = np.asarray(lat_regions) # average latitude per ecoregion + + logging.debug('There are %s non-empty ecoregions: %s' %(self.nregions, self.regionnumbers)) + logging.debug('A dictionary to map grids to states and vice versa was created') + logging.debug('Average latitude per (nonempty) ecoregion was calculated.') + + + # Create a matrix for state <-> TransCom conversions + self.tcmatrix = np.zeros((self.nparams, 23), 'float') + + for r in range(self.nregions): + er = self.regionnumbers[r] + sel = (self.gridmap.flat == er).nonzero() + if len(sel[0]) < 1: + continue + else: + n_tc = set(self.tcmap.flatten().take(sel[0])) + if len(n_tc) > 1: + logging.error("Parametergroup %d seems to map to multiple TransCom regions (%s), I do not know how to handle this" % (er, n_tc)) + raise ValueError + self.tcmatrix[r*self.nfitparams:(r+1)*self.nfitparams, n_tc.pop() - 1] = 1.0 + del r + + logging.debug("A matrix to map states to TransCom regions and vice versa was created") + + # Create a mask for species/unknowns + self.make_species_mask() + + + + def state_to_grid(self, fluxvector=None, lag=1): + # used to write statevector in grid format for coupling with TM5 (where actual fluxes are calculated) + # implemented such that variables with dimensions nfitparams x nlat x nlon are created + + """ + Transforms the StateVector information (mean + covariance) to a 1x1 degree grid. + + :param: fluxvector: a vector of length (nparams,) that holds the fluxes associated with each parameter in the StateVector + :param: lag: the lag at which to evaluate the StateVector + :rtype: a tuple of two arrays (gridmean,gridvariance) with dimensions (nfitparams,180,360) + + If the attribute `fluxvector` is not passed, the function will return the mean parameter value and its variance on a 1x1 map. + + ..note:: Although we can return the variance information for each gridbox, the covariance information contained in the original ensemble is lost when mapping to 1x1 degree! + """ + + if fluxvector == None: + fluxvector = np.ones(self.nparams) + else: + logging.error("ctsfStateVector cannot handle state_to_grid with fluxvector input") + raise ValueError + + ensemble = self.ensemble_members[lag - 1] + ensemblemean = ensemble[0].param_values + + # First transform the mean + gridmean = self.vector2grid(vectordata=ensemblemean * fluxvector) + + # And now the covariance, first create covariance matrix (!), and then multiply + deviations = np.array([mem.param_values * fluxvector - ensemblemean for mem in ensemble]) + ensemble = [] + for mem in deviations: + ensemble.append(self.vector2grid(mem)) + del mem + + return (gridmean, np.array(ensemble)) + + + + def vector2grid(self, vectordata=None): + """ + Map vector elements to a map or vice cersa + + :param vectordata: a vector dataset to use in case `reverse = False`. This dataset is mapped onto a 1x1 grid and must be of length `nparams` + :rtype: ndarray: an array of size (nfitparams,360,180) + + This method makes use of a dictionary that links every parameter number [1,...,nparams] to a series of gridindices. These + indices specify a location on a 360x180 array, stretched into a vector using `array.flat`. There are multiple ways of calling + this method:: + + griddedarray = self.vector2grid(vectordata=param_values) # simply puts the param_values onto a (180,360,) array + + .. note:: This method uses a DaSystem object that must be initialzied with a proper parameter map. See :class:`~da.baseclasses.dasystem` for details + """ + + result = np.zeros((self.nfitparams,self.gridmap.shape[0],self.gridmap.shape[1]), float) + + for k in range(len(vectordata)): + + r = int(np.floor(k / self.nfitparams)) + i = k % self.nfitparams + + regnum = self.regionnumbers[r] + + if regnum in list(self.griddict.keys()): + result[i,self.griddict[regnum][0],self.griddict[regnum][1]] = vectordata[k] + + del k, r, i, regnum + + return result + + + + def grid2vector(self, griddata=None, method='avg'): + """ + Map gridded data onto a vector of length (nparams,) + + :param griddata: a gridded dataset to use. This dataset is mapped onto a vector of length `nparams` + :param method: a string that specifies the method to combine grid boxes in case reverse=True. Must be either ['avg','sum','minval'] + :rtype: ndarray: size (nparameters,) + + This method makes use of a dictionary that links every parameter number [1,...,nparams] to a series of gridindices. These + indices specify a location on a 360x180 array, stretched into a vector using `array.flat`. There are multiple ways of calling + this method:: + + values = self.grid2vector(griddata=mygriddeddata,method='minval') # + using the minimum value of all datapoints covered by that parameter index + + values = self.grid2vector(griddata=mygriddeddata,method='avg') # + using the average value of all datapoints covered by that parameter index + + values = self.grid2vector(griddata=mygriddeddata,method='sum') # + using the sum of values of all datapoints covered by that parameter index + + .. note:: This method uses a DaSystem object that must be initialized with a proper parameter map. See :class:`~da.baseclasses.dasystem` for details + + """ + + logging.debug('grid2vector called') + + methods = ['avg', 'sum', 'minval'] + if method not in methods: + logging.error("To put data from a map into the statevector, please specify the method to use (%s)" % methods) + raise ValueError + + result = np.zeros((self.nparams,), float) + for i in range(self.nfitparams): + for regnum, v in self.griddict.iteritems(): + k = (regnum-1) * self.nfitparams + i + if method == "avg": + result[k] = griddata[i,:,:].take(v).mean() + elif method == "sum" : + result[k] = griddata[i,:,:].take(v).sum() + elif method == "minval" : + result[k] = griddata[i,:,:].take(v).min() + del r, v, k + del i + + return result # Note that the result is returned, but not yet placed in the member.param_values attrtibute! + + + + def get_covariance(self, date, dacycle): + """ + Make a new ensemble from specified matrices, the attribute lag refers to the position in the state vector. + Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below. + The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag] + """ + + # read vector with uncertainties for basefit from file + logging.info('Reading vector with uncertainty values from file %s' % self.initparamsfile) + f_in = io.ct_read(self.initparamsfile,method='read') + unc_fit = f_in.get_variable('uncertainty') + f_in.close() + if len(unc_fit) != self.nbasefitparams: + logging.error('Size uncertainty vector (%s) not consistent with number of fitparameters (%s)' % (len(unc_fit),self.nbasefitparams)) + raise IOError + + # read uncertainties for NEE (or GPP and TER) vs proxy correlation coefficients from rc file + unc_gamma = float(dacycle.dasystem['bio.cov.gamma']) + if self.add_resp: + unc_resp = np.array([float(dacycle.dasystem['bio.cov.R0']), float(dacycle.dasystem['bio.cov.S0'])]) + + # construct covariance matrix + cov_vec = np.zeros(self.nparams) + for i in range(self.nregions): + i0 = i*self.nfitparams + i1 = i0 + self.nbasefitparams + i2 = i1 + self.ngamma + i3 = i2 + self.nresp + + cov_vec[i0 : i1] = unc_fit[i*self.nbasefitparams : (i+1)*self.basefitparams] * unc_fit[i*self.nbasefitparams : (i+1)*self.basefitparams] + cov_vec[i1 : i2] = np.ones(self.ngamma) * unc_gamma * unc_gamma + + if self.add_resp: + cov_vec[i2] = unc_resp[0]**2 + cov_vec[i2+1 : i3] = np.ones(self.nresp-1) * unc_resp[1]**2 + del i + + cov_mat = np.diag(cov_vec) + + logging.info("Created diagonal covariance matrix.") + + return cov_mat + + + + def make_new_ensemble(self, lag, covariancematrix=None): + """ + :param lag: an integer indicating the time step in the lag order + :param covariancematrix: a matrix to draw random values from + :rtype: None + + Make a new ensemble, the attribute lag refers to the position in the state vector. + Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below. + The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag] + + The optional covariance object to be passed holds a matrix of dimensions [nparams, nparams] which is + used to draw ensemblemembers from. If this argument is not passed it will ne substituted with an + identity matrix of the same dimensions. + + """ + + if covariancematrix.any() == None: + covariancematrix = np.identity(self.nparams) + + # Make a cholesky decomposition of the covariance matrix + _, s, _ = np.linalg.svd(covariancematrix) + dof = (np.sum(s) ** 2) / np.sum(s ** 2) + try: + C = np.linalg.cholesky(covariancematrix) + except np.linalg.linalg.LinAlgError, err: + logging.error('Cholesky decomposition has failed ') + logging.error('For a matrix of dimensions: %d' % matrix.shape[0]) + logging.debug(err) + raise np.linalg.linalg.LinAlgError + + logging.debug('Cholesky decomposition has succeeded') + logging.info('Appr. degrees of freedom in covariance matrix is %i' % int(dof)) + + # Create mean values (= initialize statevector) + if self.unit_initialization: + newmean = np.zeros(self.nparams, float) + + for r in range(self.nregions): + + i = r * self.nfitparams + newmean[i:i+3] = self.polynomial + + # Calculate initial amplitude per ecoregion as polynomial function of latitude + if self.amplitude[0] is None: + newmean[i+3] = polyfunc(self.amplitude_polyfit,self.lat_regions[r]) + else: + newmean[i+3] = self.amplitude[0] + newmean[i+4] = self.amplitude[1] + + # initialize gamma + newmean[r*self.nfitparams + self.nbasefitparams : (r+1)*self.nfitparams - self.nresp] = self.gamma0 + + # initialize R0 and S0 + if self.add_resp: + newmean[(r+1)*self.nfitparams - self.nresp] = self.r0 + newmean[(r+1)*self.nfitparams - self.nresp + 1 : (r+1)*self.nfitparams] = self.s0 + + del i, r + + elif self.initparamsvec: + logging.info('Reading vector with initial parameter values from file %s' % self.initparamsfile) + # read initial statevector from file + f_in = io.ct_read(self.initparamsfile,method='read') + newmean = f_in.get_variable(self.initparamsname) + f_in.close() + if len(newmean) != self.nparams: + logging.error('Size initial parameter vector (%s) not consistent with number of fitparameters (%s)' % (len(newmean),self.nparams)) + raise IOError + + else: + logging.info('Reading map with initial parameter values from file %s' % self.initparamsfile) + # read initial statevector from file + f_in = io.ct_read(self.initparamsfile,method='read') + paramsmap = f_in.get_variable(self.initparamsname) + f_in.close() + if paramsmap.shape != (self.nfitparams,self.nlat,self.nlon): + logging.error('Size of initial parameter map (%s,%s,%s) does not match required shape (%s)' % (paramsmap.shape[0],paramsmap.shape[1],paramsmap.shape[2],\ + (self.nfitparams,self.nlat,self.nlon) )) + raise IOError + + # extract parameter values per ecoregion + newmean = [] + for r in self.regionnumbers: + for i in range(self.nfitparams): + newmean.append(np.nanmean(np.where(self.gridmap==r,paramsmap[i,:,:],np.nan))) + del i + del r + newmean = np.array(newmean) + + logging.info('Initial statevector created') + + # If this is not the start of the filter, average previous two optimized steps into the mix + if lag == self.nlag - 1 and self.nlag >= 3: + newmean += self.ensemble_members[lag - 1][0].param_values + self.ensemble_members[lag - 2][0].param_values + newmean = newmean / 3.0 + + # Create the first ensemble member with a deviation of 0.0 and add to list + newmember = EnsembleMember(0) + newmember.param_values = newmean.flatten() # no deviations + self.ensemble_members[lag].append(newmember) + + # Create members 1:nmembers and add to ensemble_members list + for member in range(1, self.nmembers): + rands = np.random.randn(self.nparams) + newmember = EnsembleMember(member) + newmember.param_values = np.squeeze((np.dot(C, rands) + newmean)[np.newaxis,:]) + self.ensemble_members[lag].append(newmember) + + logging.debug('%d new ensemble members were added to the state vector # %d' % (self.nmembers, (lag + 1))) + + + + def write_members_to_file(self, lag, outdir, endswith='.nc', member=None): + """ + :param: lag: Which lag step of the filter to write, must lie in range [1,...,nlag] + :param: outdir: Directory where to write files + :param: endswith: Optional label to add to the filename, default is simply .nc + :rtype: None + + Write ensemble member information to a NetCDF file for later use. The standard output filename is + *parameters.DDD.nc* where *DDD* is the number of the ensemble member. Standard output file location + is the `dir.input` of the dacycle object. In principle the output file will have only two datasets inside + called `parametervalues` which is of dimensions `nparameters` and `parametermap` which is of dimensions (180,360). + This dataset can be read and used by a :class:`~da.baseclasses.observationoperator.ObservationOperator` object. + + .. note:: if more, or other information is needed to complete the sampling of the ObservationOperator you + can simply inherit from the StateVector baseclass and overwrite this write_members_to_file function. + + """ + + # These import statements caused a crash in netCDF4 on MacOSX. No problems on Jet though. Solution was + # to do the import already at the start of the module, not just in this method. + + #import da.tools.io as io + #import da.tools.io4 as io + + if member is None: + members = self.ensemble_members[lag] + else: + members = [self.ensemble_members[lag][member]] + + # MAKE PARALLEL? + for mem in members: + + filename = os.path.join(outdir, 'fluxmodel.%03d%s' % (mem.membernumber, endswith)) + ncf = io.CT_CDF(filename, method='create') + dimgrid = ncf.add_latlon_dim() + dimparams = ncf.add_params_dim(self.nparams) + dimfitparams = ncf.add_dim('nfitparameters',self.nfitparams) + + data = mem.param_values + + savedict = io.std_savedict.copy() + savedict['name'] = "parametervalues" + savedict['long_name'] = "parameter_values_for_member_%d" % mem.membernumber + savedict['units'] = "unitless" + savedict['dims'] = dimparams + savedict['values'] = data + savedict['comment'] = 'These are parameter values to use for member %d' % mem.membernumber + ncf.add_data(savedict) + + griddata = self.vector2grid(vectordata=data) + + savedict = io.std_savedict.copy() + savedict['name'] = "parametermap" + savedict['long_name'] = "parametermap_for_member_%d" % mem.membernumber + savedict['units'] = "unitless" + savedict['dims'] = dimfitparams + dimgrid + savedict['values'] = griddata.tolist() + savedict['comment'] = 'These are gridded parameter values to use for member %d' % mem.membernumber + ncf.add_data(savedict) + + ncf.close() + + logging.debug('Successfully wrote data from ensemble member %d to file (%s) ' % (mem.membernumber, filename)) + + + def propagate(self, dacycle): + """ + :rtype: None + + Propagate the parameter values in the StateVector to the next cycle. This means a shift by one cycle + step for all states that will + be optimized once more, and the creation of a new ensemble for the time step that just + comes in for the first time (step=nlag). + In the future, this routine can incorporate a formal propagation of the statevector. + + """ + + import da.analysis.tools_time as at + + # Remove State Vector n=1 by simply "popping" it from the list and appending a new empty list at the front. This empty list will + # hold the new ensemble for the new cycle + + self.ensemble_members.pop(0) + self.ensemble_members.append([]) + + # And now create a new time step of mean + members for n=nlag + + date = dacycle['time.start'] + at.timedelta(days=(self.nlag - 0.5) * int(dacycle['time.cycle'])) + + logging.info("STATEVECTOR PROPAGATE ENDTIME = %s, date = %s" % (dacycle['time.end'],date)) + + cov = self.get_covariance(date, dacycle) + self.make_new_ensemble(self.nlag - 1, cov) + + logging.info('The state vector has been propagated by one cycle') + + + + +################### End Class SifTinversionStateVector ################### + + +if __name__ == "__main__": + pass +�������������������������� -- GitLab