statevector.py 14.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python
# ct_statevector_tools.py

"""
.. module:: statevector
.. moduleauthor:: Wouter Peters 

Revision History:
File created on 28 Jul 2010.
Adapted by super004 on 26 Jan 2017.

The module statevector implements the data structure and methods needed to work with state vectors (a set of unknown parameters to be optimized by a DA system) of different lengths, types, and configurations. Two baseclasses together form a generic framework:
    * :class:`~da.baseclasses.statevector.StateVector`
    * :class:`~da.baseclasses.statevector.EnsembleMember`

As usual, specific implementations of StateVector objects are done through inheritance form these baseclasses. An example of designing 
your own baseclass StateVector we refer to :ref:`tut_chapter5`.

.. autoclass:: da.baseclasses.statevector.StateVector 

.. autoclass:: da.baseclasses.statevector.EnsembleMember 

"""

import os
import sys
sys.path.append(os.getcwd())

import logging
import numpy as np
from da.baseclasses.statevector import StateVector, EnsembleMember
from da.tools.general import create_dirs, to_datetime
import datetime as dtm
import da.tools.io4 as io
import netCDF4 as nc
import pickle
identifier = 'CarbonTracker Statevector '
version = '0.0'

################### Begin Class CO2StateVector ###################

class CO2StateVector(StateVector):

    def __init__(self, dacycle=None):
        if dacycle != None:
            self.dacycle = dacycle
        else:
            self.dacycle = {}
    
    def setup(self, dacycle):
        """
        setup the object by specifying the dimensions. 
        There are two major requirements for each statvector that you want to build:
        
            (1) is that the statevector can map itself onto a regular grid
            (2) is that the statevector can map itself (mean+covariance) onto TransCom regions

        An example is given below.
        """

        self.dacycle = dacycle
        self.nlag = int(self.dacycle['time.nlag'])
        self.nmembers = int(self.dacycle['da.optimizer.nmembers'])
        self.nparams = int(self.dacycle.dasystem['nparameters'])
        self.obsdir = self.dacycle.dasystem['datadir']
        self.pparam = self.dacycle.dasystem['emis.pparam']
        self.covm = self.dacycle.dasystem['ff.covariance']
        self.prop = int(self.dacycle.dasystem['run.propscheme'])
        self.nobs = 0
        
        self.obs_to_assimilate = ()  # empty containter to hold observations to assimilate later on

        # These list objects hold the data for each time step of lag in the system. Note that the ensembles for each time step consist 
        # of lists of EnsembleMember objects, we define member 0 as the mean of the distribution and n=1,...,nmembers as the spread.

        self.ensemble_members = [[] for n in range(self.nlag)]

        # This specifies the file to read with the gridded mask at 1x1 degrees. Each gridbox holds a number that specifies the parametermember
        #  that maps onto it. From this map, a dictionary is created that allows a reverse look-up so that we can map parameters to a grid.

        ## Initialise an array with an element for each parameter to optimise
        self.gridmap = np.arange(1,self.nparams+1,1)

        # Create a dictionary for state <-> gridded map conversions

86
87
88
89
90
91
92
#        nparams = self.gridmap.max()
#        self.griddict = {}
#        for r in range(1, int(nparams) + 1):
#            sel = (self.gridmap.flat == r).nonzero()
#            if len(sel[0]) > 0: 
#                self.griddict[r] = sel
#
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230

#        biosphere_fluxes = nc.Dataset(dacycle.dasystem['biosphere_fluxdir']) 
#        self.gpp = biosphere_fluxes['GPP']#[time_indices]
#        self.ter = biosphere_fluxes['TER']#[time_indices]
#        logging.debug("A dictionary to map grids to states and vice versa was created")

        # Create a mask for species/unknowns

        self.make_species_mask()

    def nearestPD(self, matrix):
        """Find the nearest positive-definite matrix to input
    
        A Python/Numpy port of John D'Errico's `nearestSPD` MATLAB code [1], which
        credits [2].
    
        [1] https://www.mathworks.com/matlabcentral/fileexchange/42885-nearestspd
    
        [2] N.J. Higham, "Computing a nearest symmetric positive semidefinite
        matrix" (1988): https://doi.org/10.1016/0024-3795(88)90223-6
        taken from https://stackoverflow.com/questions/43238173/python-convert-matrix-to-positive-semi-definite
        """
    
        B = (matrix + matrix.T) / 2
        _, s, V = np.linalg.svd(B)
    
        H = np.dot(V.T, np.dot(np.diag(s), V))
    
        A2 = (B + H) / 2
    
        A3 = (A2 + A2.T) / 2
    
        if self.isPD(A3):
            return A3
    
        spacing = np.spacing(np.linalg.norm(matrix))
        # The above is different from [1]. It appears that MATLAB's `chol` Cholesky
        # decomposition will accept matrixes with exactly 0-eigenvalue, whereas
        # Numpy's will not. So where [1] uses `eps(mineig)` (where `eps` is Matlab
        # for `np.spacing`), we use the above definition. CAVEAT: our `spacing`
        # will be much larger than [1]'s `eps(mineig)`, since `mineig` is usually on
        # the order of 1e-16, and `eps(1e-16)` is on the order of 1e-34, whereas
        # `spacing` will, for Gaussian random matrixes of small dimension, be on
        # othe order of 1e-16. In practice, both ways converge, as the unit test
        # below suggests.
        I = np.eye(matrix.shape[0])
        k = 1
        while not self.isPD(A3):
            mineig = np.min(np.real(np.linalg.eigvals(A3)))
            A3 += I * (-mineig * k**2 + spacing)
            k += 1
    
        return A3
   
    def isPD(self, matrix):
        """Returns true when input is positive-definite, via Cholesky"""
        try:
            _ = np.linalg.cholesky(matrix)
            return True
        except np.linalg.LinAlgError:
            return False
        
    def get_covariance(self, date, dacycle):
        
        file=os.path.join(self.obsdir,self.covm)
        f = io.ct_read(file, 'read')
        covmatrix = f.get_variable('covariances')[:self.nparams,:self.nparams]
        f.close()
        
        return covmatrix
    
    def write_members_to_file(self, lag, outdir,endswith='.nc'):
        """ 
           :param: lag: Which lag step of the filter to write, must lie in range [1,...,nlag]
           :param: outdir: Directory where to write files
           :param: endswith: Optional label to add to the filename, default is simply .nc
           :rtype: None

           Write ensemble member information to a NetCDF file for later use. The standard output filename is 
           *parameters.DDD.nc* where *DDD* is the number of the ensemble member. Standard output file location 
           is the `dir.input` of the dacycle object. In principle the output file will have only two datasets inside 
           called `parametervalues` which is of dimensions `nparameters` and `parametermap` which is of dimensions (180,360). 
           This dataset can be read and used by a :class:`~da.baseclasses.observationoperator.ObservationOperator` object. 

           .. note:: if more, or other information is needed to complete the sampling of the ObservationOperator you
                     can simply inherit from the StateVector baseclass and overwrite this write_members_to_file function.

        """

        # These import statements caused a crash in netCDF4 on MacOSX. No problems on Jet though. Solution was
        # to do the import already at the start of the module, not just in this method.
           
        #import da.tools.io as io
        #import da.tools.io4 as io

        members = self.ensemble_members[lag]

        for mem in members:
            filename = os.path.join(outdir, 'parameters.%03d%s' % (mem.membernumber, endswith))
            ncf = io.CT_CDF(filename, method='create')
            dimparams = ncf.add_params_dim(self.nparams)

            data = mem.param_values

            savedict = io.std_savedict.copy()
            savedict['name'] = "parametervalues"
            savedict['long_name'] = "parameter_values_for_member_%d" % mem.membernumber
            savedict['units'] = "unitless"
            savedict['dims'] = dimparams 
            savedict['values'] = data
            savedict['comment'] = 'These are parameter values to use for member %d' % mem.membernumber
            ncf.add_data(savedict)

            ncf.close()

            logging.debug('Successfully wrote data from ensemble member %d to file (%s) ' % (mem.membernumber, filename))


    def propagate(self, dacycle):
        """
        :rtype: None

        Propagate the parameter values in the StateVector to the next cycle. This means a shift by one cycle
        step for all states that will
        be optimized once more, and the creation of a new ensemble for the time step that just
        comes in for the first time (step=nlag).
        In the future, this routine can incorporate a formal propagation of the statevector.

        """

        # Remove State Vector n=1 by simply "popping" it from the list and appending a new empty list at the front. This empty list will
        # hold the new ensemble for the new cycle

        self.ensemble_members.append([])
        #option 1: start each cycle with the same prior values (makes several independent estimates)
        if self.prop == 1 or dacycle['time.restart']==False:
            file=os.path.join(self.obsdir,self.pparam)
            f = io.ct_read(file, 'read')
231
            #prmval = f.get_variable('prior_values')[:self.nparams]
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
            prmval = f.get_variable('prior_values')[:self.nparams]
            f.close()
        # 2: Propagate mean
        else:
            prmval = self.ensemble_members.pop(0)
            prmval = prmval[0].param_values
            self.prmval = prmval
            self.ensemble_members.append([])
        # 3: Kopieer alle ens members
        if self.prop == 3:
            self.ensemble_members = self.ensemble_members[:-1]
            logging.debug('Ensemble members are propagated')
            covariancematrix = (np.dot(self.deviations.T,self.deviations)/(self.deviations.shape[1]-1))
            covariancematrix = covariancematrix[:len(covariancematrix)//self.nlag, :len(covariancematrix)//self.nlag]
            covariancematrix = self.nearestPD(covariancematrix) # make it positive definite

        # And now create a new time step of mean + members for n=nlag
        if not self.prop == 3:
            date = dacycle['time.start'] + dtm.timedelta(days=(self.nlag - 0.5) * int(dacycle['time.cycle']))
            covariancematrix = self.get_covariance(date, dacycle)

        self.make_new_ensemble(self.nlag - 1, covariancematrix)

        logging.info('The state vector has been propagated by one cycle')


    def make_new_ensemble(self, lag, covariancematrix=None):
        """ 
        :param lag: an integer indicating the time step in the lag order
        :param covariancematrix: a matrix to draw random values from
        :rtype: None
    
        Make a new ensemble, the attribute lag refers to the position in the state vector. 
        Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below.
        The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag]

        The optional covariance object to be passed holds a matrix of dimensions [nparams, nparams] which is
        used to draw ensemblemembers from. If this argument is not passed it will ne substituted with an 
        identity matrix of the same dimensions.

        """
        dacycle = self.dacycle    
        self.seed = int(self.dacycle.dasystem['random.seed'])
        if self.seed != 0:
            np.random.seed(self.seed)
            sds = np.random.randint(1,10000,1000)
        else:
            sds = np.random.randint(1,10000,1000)
        sid = (dacycle['time.start'] - dacycle['time.fxstart']).days
        
        np.random.seed(sds[sid])
        enssds = np.random.randint(1,10000,self.nmembers)
        
      #  #option 1: start each cycle with the same prior values (makes several independent estimates)
        if self.prop == 1 or dacycle['time.restart']==False:
            file=os.path.join(self.obsdir,self.pparam)
            f = io.ct_read(file, 'read')
289
            #prmval = f.get_variable('prior_values')[:self.nparams]
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
            prmval = f.get_variable('prior_values')[:self.nparams]
            f.close()
        else: prmval = self.prmval


        try:
            _, s, _ = np.linalg.svd(covariancematrix)
        except:
            s = np.linalg.svd(covariancematrix, full_matrices=1, compute_uv=0) #Cartesius fix

        dof = np.sum(s) ** 2 / sum(s ** 2)
        
        C = np.linalg.cholesky(covariancematrix)
        self.C = C
        logging.debug('Cholesky decomposition has succeeded ')
        logging.info('Appr. degrees of freedom in covariance matrix is %s' % (int(dof)))

        # Create mean values 
        newmean = np.ones(self.nparams, float) * prmval # standard value for a new time step is 1.0

        # If this is not the start of the filter, average previous two optimized steps into the mix
        if lag == self.nlag - 1 and self.nlag >= 3:
            newmean += self.ensemble_members[lag - 1][0].param_values + \
                                           self.ensemble_members[lag - 2][0].param_values 
            newmean = newmean / 3.0

        # Create the first ensemble member with a deviation of 0.0 and add to list

        newmember = EnsembleMember(0)
        newmember.param_values = newmean.flatten()  # no deviations
        self.ensemble_members[lag].append(newmember)

        # Create members 1:nmembers and add to ensemble_members list

        for member in range(1, self.nmembers):
            np.random.seed(enssds[member])
            rands = np.random.randn(self.nparams)
            self.rands = rands
            # logging.debug('rands are %f, %f, %f, %f, %f'%(rands[0],rands[1],rands[2],rands[3],rands[4]))

            newmember = EnsembleMember(member)
            self.newmean = newmean
            newmember.param_values = np.dot(C, rands) + newmean
            self.ensemble_members[lag].append(newmember)

        logging.debug('%d new ensemble members were added to the state vector # %d' % (self.nmembers, (lag + 1)))    
    

################### End Class OPSStateVector ###################

if __name__ == "__main__":
    pass