From 71f118dcc8c779b523284ca5d84e9d105382aed4 Mon Sep 17 00:00:00 2001 From: Aki Tsuruta <Aki.Tsuruta@fmi.fi> Date: Sun, 5 Feb 2017 20:02:57 +0000 Subject: [PATCH] --- da/methane/analysis_2lambdas/__init__.py | 0 da/methane/analysis_2lambdas/expand_fluxes.py | 473 +++++++++++++++++ da/methane/statevector_2lambdas.py | 483 ++++++++++++++++++ 3 files changed, 956 insertions(+) create mode 100755 da/methane/analysis_2lambdas/__init__.py create mode 100755 da/methane/analysis_2lambdas/expand_fluxes.py create mode 100755 da/methane/statevector_2lambdas.py diff --git a/da/methane/analysis_2lambdas/__init__.py b/da/methane/analysis_2lambdas/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/da/methane/analysis_2lambdas/expand_fluxes.py b/da/methane/analysis_2lambdas/expand_fluxes.py new file mode 100755 index 0000000..0af128c --- /dev/null +++ b/da/methane/analysis_2lambdas/expand_fluxes.py @@ -0,0 +1,473 @@ +#!/usr/bin/env python +# expand_fluxes.py +import sys +sys.path.append('../../') +import os +from datetime import datetime, timedelta + +import logging +import numpy as np +from da.tools.general import date2num, num2date +import da.methane.io4 as io +from da.methane.analysis.tools_regions import globarea,state_to_grid +from da.tools.general import create_dirs + + + +""" +Author: aki + +Revision History: +File created on April 2016 by Aki T. + +""" + +def save_weekly_avg_1x1_data(dacycle, statevector): + """ + Function creates a NetCDF file with output on 1x1 degree grid. It uses the flux data written by the + :class:`~da.baseclasses.obsoperator.ObsOperator.py`, and multiplies these with the mapped parameters and + variance (not covariance!) from the :class:`~da.baseclasses.statevector.StateVector`. + + :param dacycle: a :class:`~da.tools.initexit.CycleControl` object + :param statevector: a :class:`~da.baseclasses.statevector.StateVector` + :rtype: None + """ +# + dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_flux1x1_weekly')) +# +# Some help variables +# + dectime0 = date2num(datetime(2000, 1, 1)) + dt = dacycle['cyclelength'] + startdate = dacycle['time.start'] + enddate = dacycle['time.end'] + nlag = statevector.nlag + + logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M')) + logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M')) + +# +# Region bio_land and fossil_land +# + + region_land_file = dacycle.dasystem['regionsfile'] + nfcr = io.CT_CDF(region_land_file, 'read') + logging.info('region land file read: %s' %region_land_file) + bio_land = nfcr.variables['bio_land'][:] + fossil_land = nfcr.variables['fossil_land'][:] + #logging.debug("bio_land %s" %bio_land[25,205]) + #logging.debug("ff_land %s" %fossil_land[25,205]) + +# +# Create or open NetCDF output file +# + logging.debug("Create NetCDF output file: flux_1x1.%s.nc" % startdate.strftime('%Y-%m-%d')) + saveas = os.path.join(dirname, 'flux_1x1.%s.nc' % startdate.strftime('%Y-%m-%d')) + ncf = io.CT_CDF(saveas, 'write') + +# +# Create dimensions and lat/lon grid +# + dimgrid = ncf.add_latlon_dim() + dimensemble = ncf.add_dim('members', statevector.nmembers) + dimdate = ncf.add_date_dim() +# +# set title and tell GMT that we are using "pixel registration" +# + setattr(ncf, 'Title', 'CarbonTracker fluxes') + setattr(ncf, 'node_offset', 1) +# +# skip dataset if already in file +# + ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0 + skip = ncf.has_date(ncfdate) + if skip: + logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas)) + else: + +# +# if not, process this cycle. Start by getting flux input data from CTDAS +# + filename = os.path.join(dacycle['dir.output'], 'flux1x1_%s_%s.nc' % (startdate.strftime('%Y%m%d%H'), enddate.strftime('%Y%m%d%H'))) + + file = io.ct_read(filename, 'read') + bio = np.array(file.get_variable(dacycle.dasystem['background.ch4.bio.flux'])) + ocean = np.array(file.get_variable(dacycle.dasystem['background.ch4.ocean.flux'])) + fire = np.array(file.get_variable(dacycle.dasystem['background.ch4.fires.flux'])) + fossil = np.array(file.get_variable(dacycle.dasystem['background.ch4.fossil.flux'])) + term = np.array(file.get_variable(dacycle.dasystem['background.ch4.term.flux'])) + #mapped_parameters = np.array(file.get_variable(dacycle.dasystem['final.param.mean.1x1'])) + file.close() + + next = ncf.inq_unlimlen()[0] + + +# Start adding datasets from here on, both prior and posterior datasets for bio and ocn + + for prior in [True, False]: +# +# Now fill the statevector with the prior values for this time step. Note that the prior value for this time step +# occurred nlag time steps ago, so we make a shift in the output directory, but only if we are more than nlag cycle away from the start date.. +# + + if prior: + logging.debug("prior") + qual_short = 'prior' + for n in range(nlag, 0, -1): + priordate = enddate - timedelta(dt.days * n) + savedir = dacycle['dir.output'].replace(startdate.strftime('%Y%m%d'), priordate.strftime('%Y%m%d')) + filename = os.path.join(savedir, 'savestate_%s.nc' % priordate.strftime('%Y%m%d')) + if os.path.exists(filename): + statevector.read_from_file(filename, qual=qual_short) + gridmean, gridensemble = statevector.state_to_grid(lag=n) + +# Replace the mean statevector by all ones (assumed priors) + + gridmean = statevector.vector2grid(vectordata=np.ones(statevector.nparams,)) + + logging.debug('Read prior dataset from file %s, sds %d: ' % (filename, n)) + break + else: + logging.debug("opt") + qual_short = 'opt' + savedir = dacycle['dir.output'] + filename = os.path.join(savedir, 'savestate_%s.nc' % startdate.strftime('%Y%m%d')) + statevector.read_from_file(filename, qual=qual_short) + gridmean, gridensemble = statevector.state_to_grid(lag=1) + + logging.debug('Read posterior dataset from file %s, sds %d: ' % (filename, 1)) +# +# if prior, do not multiply fluxes with parameters, otherwise do +# + logging.debug("gridmean %s" %gridmean[25,205]) + logging.debug("bio land %s" %bio_land[25,205]) + w = np.where(bio_land == 0) + s = gridmean.shape + gridmean_bio = np.zeros(shape=s) + gridmean_bio[:,:] = gridmean[:,:] + gridmean_bio[w] = 1.0 + biomapped = bio * gridmean_bio + s = gridensemble.shape + gridensemble_bio = np.zeros(shape=s) + gridensemble_bio[:,:,:] = gridensemble[:,:,:] + gridensemble_bio[:, w[0],w[1]] = 0.0 + biovarmapped = bio * gridensemble_bio + + w = np.where(fossil_land == 0) + gridmean_fossil = gridmean + gridmean_fossil[w] = 1.0 + fossilmapped = fossil * gridmean_fossil + gridensemble_fossil = gridensemble + gridensemble_fossil[:, w[0],w[1]] = 0.0 + fossilvarmapped = fossil * gridensemble_fossil + + logging.debug("gridmean %s" %gridmean[25,205]) + logging.debug("gridmean bio %s" %gridmean_bio[25,205]) + logging.debug("gridmean ff %s" %gridmean_fossil[25,205]) +# +# +# For each dataset, get the standard definitions from the module mysettings, add values, dimensions, and unlimited count, then write +# + savedict = ncf.standard_var(varname='bio_flux_' + qual_short) + savedict['values'] = biomapped.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) +# + savedict = ncf.standard_var(varname='fossil_flux_' + qual_short) + savedict['values'] = fossilmapped.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + #print biovarmapped.shape + savedict = ncf.standard_var(varname='bio_flux_%s_ensemble' % qual_short) + savedict['values'] = biovarmapped.tolist() + savedict['dims'] = dimdate + dimensemble + dimgrid + savedict['count'] = next + ncf.add_data(savedict) +# + savedict = ncf.standard_var(varname='fossil_flux_%s_ensemble' % qual_short) + savedict['values'] = fossilvarmapped.tolist() + savedict['dims'] = dimdate + dimensemble + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + # End prior/posterior block + + savedict = ncf.standard_var(varname='fire_flux_imp') + savedict['values'] = fire.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) +# + savedict = ncf.standard_var(varname='ocn_flux_imp') + savedict['values'] = ocean.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + savedict = ncf.standard_var(varname='term_flux_imp') + savedict['values'] = term.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + #area = globarea() + #savedict = ncf.standard_var(varname='cell_area') + #savedict['values'] = area.tolist() + #savedict['dims'] = dimgrid + #ncf.add_data(savedict) +# + savedict = ncf.standard_var(varname='date') + savedict['values'] = date2num(startdate) - dectime0 + dt.days / 2.0 + savedict['dims'] = dimdate + savedict['count'] = next + ncf.add_data(savedict) + + sys.stdout.write('.') + sys.stdout.flush() +# +# Done, close the new NetCDF file +# + ncf.close() +# +# Return the full name of the NetCDF file so it can be processed by the next routine +# + logging.info("Gridded weekly average fluxes now written") + + return saveas + +def save_weekly_avg_state_data(dacycle, statevector): + """ + Function creates a NetCDF file with output for all parameters. It uses the flux data written by the + :class:`~da.baseclasses.obsoperator.ObsOperator.py`, and multiplies these with the mapped parameters and + variance (not covariance!) from the :class:`~da.baseclasses.statevector.StateVector`. + + :param dacycle: a :class:`~da.tools.initexit.CycleControl` object + :param statevector: a :class:`~da.baseclasses.statevector.StateVector` + :rtype: None + """ + logging.debug('start: save weekly avg state data') + dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_state_weekly')) +# +# Some help variables +# + dectime0 = date2num(datetime(2000, 1, 1)) + dt = dacycle['cyclelength'] + startdate = dacycle['time.start'] + enddate = dacycle['time.end'] + nlag = statevector.nlag + + area = globarea() + vectorarea = statevector.grid2vector(griddata=area, method='sum') + + logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M')) + logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M')) + +# +# Create or open NetCDF output file +# + saveas = os.path.join(dirname, 'statefluxes.nc') + ncf = io.CT_CDF(saveas, 'write') + + logging.debug('save weekly avg state data to file %s' %saveas) +# +# Create dimensions and lat/lon grid +# + dimregs = ncf.add_dim('nparameters', statevector.nparams) + dimmembers = ncf.add_dim('nmembers', statevector.nmembers) + dimdate = ncf.add_date_dim() +# +# set title and tell GMT that we are using "pixel registration" +# + setattr(ncf, 'Title', 'CarbonTracker fluxes') + setattr(ncf, 'node_offset', 1) +# +# skip dataset if already in file +# + ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0 + skip = ncf.has_date(ncfdate) + if skip: + logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas)) + else: + next = ncf.inq_unlimlen()[0] + logging.debug('Writing of data for date %s: file %s' % (startdate.strftime('%Y-%m-%d'), saveas)) + +# +# if not, process this cycle. Start by getting flux input data from CTDAS +# + filename = os.path.join(dacycle['dir.output'], 'flux1x1_%s_%s.nc' % (startdate.strftime('%Y%m%d%H'), enddate.strftime('%Y%m%d%H'))) + + file = io.ct_read(filename, 'read') + bio = np.array(file.get_variable(dacycle.dasystem['background.ch4.bio.flux'])) + ocean = np.array(file.get_variable(dacycle.dasystem['background.ch4.ocean.flux'])) + fire = np.array(file.get_variable(dacycle.dasystem['background.ch4.fires.flux'])) + fossil = np.array(file.get_variable(dacycle.dasystem['background.ch4.fossil.flux'])) + term = np.array(file.get_variable(dacycle.dasystem['background.ch4.term.flux'])) + #mapped_parameters = np.array(file.get_variable(dacycle.dasystem['final.param.mean.1x1'])) + file.close() + + next = ncf.inq_unlimlen()[0] + + vectorbio = statevector.grid2vector(griddata=bio * area, method='sum') + vectorocn = statevector.grid2vector(griddata=ocean * area, method='sum') + vectorfire = statevector.grid2vector(griddata=fire * area, method='sum') + vectorfossil = statevector.grid2vector(griddata=fossil * area, method='sum') + vectorterm = statevector.grid2vector(griddata=term * area, method='sum') + + +# Start adding datasets from here on, both prior and posterior datasets for bio and ocn + + for prior in [True, False]: +# +# Now fill the statevector with the prior values for this time step. Note that the prior value for this time step +# occurred nlag time steps ago, so we make a shift in the output directory, but only if we are more than nlag cycle away from the start date.. +# + if prior: + qual_short = 'prior' + for n in range(nlag, 0, -1): + priordate = enddate - timedelta(dt.days * n) + savedir = dacycle['dir.output'].replace(startdate.strftime('%Y%m%d'), priordate.strftime('%Y%m%d')) + filename = os.path.join(savedir,'savestate_%s.nc' % priordate.strftime('%Y%m%d')) + if os.path.exists(filename): + statevector.read_from_file(filename, qual=qual_short) +# Replace the mean statevector by all ones (assumed priors) + statemean = np.ones((statevector.nparams,)) + choicelag = n + logging.debug('Read prior dataset from file %s, lag %d: ' % (filename, choicelag)) + break + else: + qual_short = 'opt' + savedir = dacycle['dir.output'] + filename = os.path.join(savedir, 'savestate_%s.nc' % startdate.strftime('%Y%m%d')) + statevector.read_from_file(filename) + choicelag = 1 + statemean = statevector.ensemble_members[choicelag - 1][0].param_values + logging.debug('Read posterior dataset from file %s, lag %d: ' % (filename, choicelag)) +# +# if prior, do not multiply fluxes with parameters, otherwise do +# + data = statemean * vectorbio # units of mole region-1 s-1 + + savedict = ncf.standard_var(varname='bio_flux_%s' % qual_short) + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + +# +# Here comes a special provision for the posterior flux covariances: these are calculated relative to the prior flux covariance to +# ensure they are indeed smaller due to the data assimilation. If they would be calculated relative to the mean posterior flux, the +# uncertainties would shift just because the mean flux had increased or decreased, which is not what we want. +# +# The implementation is done by multiplying the ensemble with the vectorbio only, and not with the statemean values +# which are assumed 1.0 in the prior always. +# + + members = statevector.ensemble_members[choicelag - 1] + deviations = np.array([mem.param_values * vectorbio for mem in members]) + deviations = deviations - deviations[0, :] + + savedict = ncf.standard_var(varname='bio_flux_%s_ensemble' % qual_short) + + savedict['values'] = deviations.tolist() + savedict['dims'] = dimdate + dimmembers + dimregs + savedict['comment'] = "This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance" + savedict['units'] = "mol region-1 s-1" + savedict['count'] = next + ncf.add_data(savedict) + + savedict = ncf.standard_var('unknown') + savedict['name'] = 'bio_flux_%s_std' % qual_short + savedict['long_name'] = 'Biosphere flux standard deviation, %s' % qual_short + savedict['values'] = deviations.std(axis=0) + savedict['dims'] = dimdate + dimregs + savedict['comment'] = "This is the standard deviation on each parameter" + savedict['units'] = "mol region-1 s-1" + savedict['count'] = next + ncf.add_data(savedict) + + data = statemean * vectorfossil # units of mole region-1 s-1 + + savedict = ncf.standard_var(varname='fossil_flux_%s' % qual_short) + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + + +# +# Here comes a special provision for the posterior flux covariances: these are calculated relative to the prior flux covariance to +# ensure they are indeed smaller due to the data assimilation. If they would be calculated relative to the mean posterior flux, the +# uncertainties would shift just because the mean flux had increased or decreased, which is not what we want. +# +# The implementation is done by multiplying the ensemble with the vectorocn only, and not with the statemean values +# which are assumed 1.0 in the prior always. +# + + deviations = np.array([mem.param_values * vectorfossil for mem in members]) + deviations = deviations - deviations[0, :] + + savedict = ncf.standard_var(varname='fossil_flux_%s_ensemble' % qual_short) + savedict['values'] = deviations.tolist() + savedict['dims'] = dimdate + dimmembers + dimregs + savedict['comment'] = "This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance" + savedict['units'] = "mol region-1 s-1" + savedict['count'] = next + ncf.add_data(savedict) + + savedict = ncf.standard_var('unknown') + savedict['name'] = 'fossil_flux_%s_std' % qual_short + savedict['long_name'] = 'Fossil flux standard deviation, %s' % qual_short + savedict['values'] = deviations.std(axis=0) + savedict['dims'] = dimdate + dimregs + savedict['comment'] = "This is the standard deviation on each parameter" + savedict['units'] = "mol region-1 s-1" + savedict['count'] = next + ncf.add_data(savedict) + + data = vectorfire + + savedict = ncf.standard_var(varname='fire_flux_imp') + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + + data = vectorterm + + savedict = ncf.standard_var(varname='term_flux_imp') + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + + data = vectorocn + + savedict = ncf.standard_var(varname='ocn_flux_imp') + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + + savedict = ncf.standard_var(varname='date') + savedict['values'] = ncfdate + savedict['dims'] = dimdate + savedict['count'] = next + ncf.add_data(savedict) + + sys.stdout.write('.') + sys.stdout.flush() +# +# Done, close the new NetCDF file +# + ncf.close() +# +# Return the full name of the NetCDF file so it can be processed by the next routine +# + logging.info("Vector weekly average fluxes now written") + + return saveas + + diff --git a/da/methane/statevector_2lambdas.py b/da/methane/statevector_2lambdas.py new file mode 100755 index 0000000..b64c633 --- /dev/null +++ b/da/methane/statevector_2lambdas.py @@ -0,0 +1,483 @@ +#!/usr/bin/env python +# ct_statevector_tools.py + +""" +Author : aki + +Revision History: +File created on 18 Nov 2013. + +""" + +import os +import sys +sys.path.append(os.getcwd()) + +import logging +import numpy as np +from da.baseclasses.statevector import StateVector, EnsembleMember + +from datetime import timedelta +import da.methane.io4 as io + +identifier = 'CarbonTracker Statevector ' +version = '0.0' + +################### Begin Class CH4StateVector ################### + +class MethaneStateVector(StateVector): + """ This is a StateVector object for CarbonTracker. It has a private method to make new ensemble members """ + + def setup(self, dacycle): + """ + setup the object by specifying the dimensions. + There are two major requirements for each statvector that you want to build: + + (1) is that the statevector can map itself onto a regular grid + (2) is that the statevector can map itself (mean+covariance) onto TransCom regions + + An example is given below. + """ + + self.nlag = int(dacycle['time.nlag']) + self.nmembers = int(dacycle['da.optimizer.nmembers']) + #self.distribution = dacycle['da.distribution'] + self.nobs = 0 + + self.obs_to_assimilate = () # empty containter to hold observations to assimilate later on + + # These list objects hold the data for each time step of lag in the system. Note that the ensembles for each time step consist + # of lists of EnsembleMember objects, we define member 0 as the mean of the distribution and n=1,...,nmembers as the spread. + + self.ensemble_members = range(self.nlag) + + for n in range(self.nlag): + self.ensemble_members[n] = [] + + + # This specifies the file to read with the gridded mask at 1x1 degrees. Each gridbox holds a number that specifies the parametermember + # that maps onto it. From this map, a dictionary is created that allows a reverse look-up so that we can map parameters to a grid. + + mapfile = os.path.join(dacycle.dasystem['regionsfile']) + ncf = io.ct_read(mapfile, 'read') + self.gridmap = ncf.get_variable('regions') + self.tcmap = ncf.get_variable('transcom_regions') + self.nparams = int(self.gridmap.max()) + self.nparams_bio = int(ncf.get_variable('nparams_bio')) + ncf.close() + + logging.info ("Regional information read from file %s" % dacycle.dasystem['regionsfile']) + logging.debug("A TransCom map on 1x1 degree was read from file %s" % dacycle.dasystem['regionsfile']) + logging.debug("A parameter map on 1x1 degree was read from file %s" % dacycle.dasystem['regionsfile']) + + # Create a dictionary for state <-> gridded map conversions + + nparams = self.gridmap.max() + self.griddict = {} + for r in range(1, int(nparams) + 1): + sel = (self.gridmap.flat == r).nonzero() + if len(sel[0]) > 0: + self.griddict[r] = sel + + logging.debug("A dictionary to map grids to states and vice versa was created") + + # Create a matrix for state <-> TransCom conversions + + self.tcmatrix = np.zeros((self.nparams, int(self.tcmap.max())), 'float') + + for r in range(1, self.nparams + 1): + sel = (self.gridmap.flat == r).nonzero() + if len(sel[0]) < 1: + continue + else: + n_tc = set(self.tcmap.flatten().take(sel[0])) + if len(n_tc) > 1: + logging.error("Parameter %d seems to map to multiple TransCom regions (%s), I do not know how to handle this" % (r, n_tc)) + raise ValueError + self.tcmatrix[r - 1, n_tc.pop() - 1] = 1.0 + + logging.debug("A matrix to map states to TransCom regions and vice versa was created") + + # Create a mask for species/unknowns + + self.make_species_mask() + + + def make_species_mask(self): + + self.speciesdict = {'ch4': np.ones(self.nparams)} + logging.debug("A species mask was created, only the following species are recognized in this system:") + for k in self.speciesdict.keys(): + logging.debug(" -> %s" % k) + + def get_covariance(self, date, dacycle): + """ Make a new ensemble from specified matrices, the attribute lag refers to the position in the state vector. + Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below. + The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag] + """ + try: + import matplotlib.pyplot as plt + except: + pass + + #----Arbitray covariance matrix ----# + #fullcov = np.zeros((self.nparams, self.nparams), float) + + #for i in xrange(self.nparams): + # #fullcov[i,i] = 1. # Identity matrix + # fullcov[i,i] = 0.08 + #fullcov[self.nparams-1,self.nparams-1] = 1.e-10 + + #---- Covariance read from file -----# + cov_file = dacycle.dasystem['covfile'] + nfcr = io.CT_CDF(cov_file, 'read') + logging.info('covariance read from file: %s' %cov_file) + fullcov = nfcr.variables['covariance_matrix'][:] + +# try: +# plt.imshow(fullcov) +# plt.colorbar() +# plt.savefig('fullcovariancematrix.png') +# plt.close('all') +# logging.debug("Covariance matrix visualized for inspection") +# except: +# pass + + return fullcov + + def read_from_legacy_file(self, filename, qual='opt'): + """ + :param filename: the full filename for the input NetCDF file + :param qual: a string indicating whether to read the 'prior' or 'opt'(imized) StateVector from file + :rtype: None + + Read the StateVector information from a NetCDF file and put in a StateVector object + In principle the input file will have only one four datasets inside + called: + * `meanstate_prior`, dimensions [nlag, nparamaters] + * `ensemblestate_prior`, dimensions [nlag,nmembers, nparameters] + * `meanstate_opt`, dimensions [nlag, nparamaters] + * `ensemblestate_opt`, dimensions [nlag,nmembers, nparameters] + + This NetCDF information can be written to file using + :meth:`~da.baseclasses.statevector.StateVector.write_to_file` + + """ + + f = io.ct_read(filename, 'read') + + for n in range(self.nlag): + if qual == 'opt': + meanstate = f.get_variable('xac_%02d' % (n + 1)) + EnsembleMembers = f.get_variable('adX_%02d' % (n + 1)) + + elif qual == 'prior': + meanstate = f.get_variable('xpc_%02d' % (n + 1)) + EnsembleMembers = f.get_variable('pdX_%02d' % (n + 1)) + + if not self.ensemble_members[n] == []: + self.ensemble_members[n] = [] + logging.warning('Existing ensemble for lag=%d was removed to make place for newly read data' % (n + 1)) + + for m in range(self.nmembers): + newmember = EnsembleMember(m) + newmember.param_values = EnsembleMembers[m, :].flatten() + meanstate # add the mean to the deviations to hold the full parameter values + self.ensemble_members[n].append(newmember) + + + f.close() + + logging.info('Successfully read the State Vector from file (%s) ' % filename) + + def write_members_to_file(self, lag, outdir, dacycle): + """ + :param: lag: Which lag step of the filter to write, must lie in range [1,...,nlag] + :rtype: None + + Write ensemble member information to a NetCDF file for later use. The standard output filename is + *parameters.DDD.nc* where *DDD* is the number of the ensemble member. Standard output file location + is the `dir.input` of the dacycle object. In principle the output file will have only two datasets inside + called `parametervalues` which is of dimensions `nparameters` and `parametermap` which is of dimensions (180,360). + This dataset can be read and used by a :class:`~da.baseclasses.observationoperator.ObservationOperator` object. + + .. note:: if more, or other information is needed to complete the sampling of the ObservationOperator you + can simply inherit from the StateVector baseclass and overwrite this write_members_to_file function. + + """ + + # These import statements caused a crash in netCDF4 on MacOSX. No problems on Jet though. Solution was + # to do the import already at the start of the module, not just in this method. + + #import da.tools.io as io + #import da.tools.io4 as io + + members = self.ensemble_members[lag] + + #--- regionfile ---# + region_land_file = dacycle.dasystem['regionsfile'] + nfcr = io.CT_CDF(region_land_file, 'read') + logging.debug('region land file read: %s' %region_land_file) + bio_land = nfcr.variables['bio_land'][:] + fossil_land = nfcr.variables['fossil_land'][:] + + + for mem in members: + filename = os.path.join(outdir, 'parameters.%03d.nc' % mem.membernumber) + ncf = io.CT_CDF(filename, method='create') + dimparams = ncf.add_params_dim(self.nparams) + dimgrid = ncf.add_latlon_dim() + + data = mem.param_values + + savedict = io.std_savedict.copy() + savedict['name'] = "parametervalues" + savedict['long_name'] = "parameter_values_for_member_%d" % mem.membernumber + savedict['units'] = "unitless" + savedict['dims'] = dimparams + savedict['values'] = data + savedict['comment'] = 'These are parameter values to use for member %d' % mem.membernumber + ncf.add_data(savedict) + +# #--- bio ---# +# dimparams_bio = ncf.add_params_dim(self.nparams_bio) +# data_bio = data[0:self.nparams_bio] +# +# savedict = io.std_savedict.copy() +# savedict['name'] = "parametervalues_bio" +# savedict['long_name'] = "parameter_bio_values_for_member_%d" % mem.membernumber +# savedict['units'] = "unitless" +# savedict['dims'] = dimparams_bio +# savedict['values'] = data_bio +# savedict['comment'] = 'These are parameter bio values to use for member %d' % mem.membernumber +# ncf.add_data(savedict) +# +# #--- fossil ---# +# nparams_fossil = self.nparams - self.nparams_bio +# dimparams_fossil = ncf.add_params_dim(nparams_fossil) +# data_fossil = data[self.nparams_bio:nparams_fossil] +# +# savedict = io.std_savedict.copy() +# savedict['name'] = "parametervalues_ff" +# savedict['long_name'] = "parameter_ff_values_for_member_%d" % mem.membernumber +# savedict['units'] = "unitless" +# savedict['dims'] = dimparams_fossil +# savedict['values'] = data_fossil +# savedict['comment'] = 'These are parameter fossil values to use for member %d' % mem.membernumber +# ncf.add_data(savedict) + + #--- All parameters, gridded ---# + griddata = self.vector2grid(vectordata=data) + + savedict = io.std_savedict.copy() + savedict['name'] = "parametermap" + savedict['long_name'] = "parametermap_for_member_%d" % mem.membernumber + savedict['units'] = "unitless" + savedict['dims'] = dimgrid + savedict['values'] = griddata.tolist() + savedict['comment'] = 'These are gridded parameter values to use for member %d' % mem.membernumber + ncf.add_data(savedict) + + #--- bio parameters, gridded ---# + griddata_bio = griddata[:] + w = np.where(bio_land==0) + griddata_bio[w] = 1.0 + + savedict = io.std_savedict.copy() + savedict['name'] = "parametermap_bio" + savedict['long_name'] = "parametermap_bio_for_member_%d" % mem.membernumber + savedict['units'] = "unitless" + savedict['dims'] = dimgrid + savedict['values'] = griddata_bio.tolist() + savedict['comment'] = 'These are gridded parameter bio values to use for member %d' % mem.membernumber + ncf.add_data(savedict) + + #--- All parameters, gridded ---# + griddata = self.vector2grid(vectordata=data) + griddata_fossil = griddata[:] + w = np.where(fossil_land==0) + griddata_fossil[w] = 1.0 + + savedict = io.std_savedict.copy() + savedict['name'] = "parametermap_ff" + savedict['long_name'] = "parametermap_ff_for_member_%d" % mem.membernumber + savedict['units'] = "unitless" + savedict['dims'] = dimgrid + savedict['values'] = griddata_fossil.tolist() + savedict['comment'] = 'These are gridded parameter fossil values to use for member %d' % mem.membernumber + ncf.add_data(savedict) + + ncf.close() + + logging.debug('Successfully wrote data from ensemble member %d to file (%s) ' % (mem.membernumber, filename)) + +# def write_to_file(self, filename, qual): +# """ +# :param filename: the full filename for the output NetCDF file +# :rtype: None +# +# Write the StateVector information to a NetCDF file for later use. +# In principle the output file will have only one two datasets inside +# called: +# * `meanstate`, dimensions [nlag, nparamaters] +# * `ensemblestate`, dimensions [nlag,nmembers, nparameters] +# +# This NetCDF information can be read back into a StateVector object using +# :meth:`~da.baseclasses.statevector.StateVector.read_from_file` +# +# """ +# #import da.tools.io4 as io +# #import da.tools.io as io +# +# if qual == 'prior': +# f = io.CT_CDF(filename, method='create') +# logging.debug('Creating new StateVector output file (%s)' % filename) +# #qual = 'prior' +# else: +# f = io.CT_CDF(filename, method='write') +# logging.debug('Opening existing StateVector output file (%s)' % filename) +# #qual = 'opt' +# +# dimparams = f.add_params_dim(self.nparams) +# dimmembers = f.add_members_dim(self.nmembers) +# dimlag = f.add_lag_dim(self.nlag, unlimited=True) +# +# for n in range(self.nlag): +# members = self.ensemble_members[n] +# mean_state = members[0].param_values +# +# savedict = f.standard_var(varname='meanstate_%s' % qual) +# savedict['dims'] = dimlag + dimparams +# savedict['values'] = mean_state +# savedict['count'] = n +# savedict['comment'] = 'this represents the mean of the ensemble' +# f.add_data(savedict) +# +# savedict = f.standard_var(varname='meanstate_bio_%s' % qual) +# savedict['dims'] = dimlag + dimparams_bio +# savedict['values'] = mean_state_bio +# savedict['count'] = n +# savedict['comment'] = 'this represents the mean of the ensemble' +# f.add_data(savedict) +# +# savedict = f.standard_var(varname='meanstate_fossil_%s' % qual) +# savedict['dims'] = dimlag + dimparams_fossil +# savedict['values'] = mean_state_fossil +# savedict['count'] = n +# savedict['comment'] = 'this represents the mean of the ensemble' +# f.add_data(savedict) +# +# members = self.ensemble_members[n] +# devs = np.asarray([m.param_values.flatten() for m in members]) +# data = devs - np.asarray(mean_state) +# +# savedict = f.standard_var(varname='ensemblestate_%s' % qual) +# savedict['dims'] = dimlag + dimmembers + dimparams +# savedict['values'] = data +# savedict['count'] = n +# savedict['comment'] = 'this represents deviations from the mean of the ensemble' +# f.add_data(savedict) +# f.close() +# +# logging.info('Successfully wrote the State Vector to file (%s) ' % filename) + + + def make_new_ensemble(self, lag, dacycle, covariancematrix=None): + """ + :param lag: an integer indicating the time step in the lag order + :param covariancematrix: a matrix to draw random values from + :rtype: None + + Make a new ensemble, the attribute lag refers to the position in the state vector. + Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below. + The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag] + + The optional covariance object to be passed holds a matrix of dimensions [nparams, nparams] which is + used to draw ensemblemembers from. If this argument is not passed it will be substituted with an + identity matrix of the same dimensions. + + """ + + if covariancematrix == None: + covariancematrix = np.identity(self.nparams) + + # Make a cholesky decomposition of the covariance matrix + + + _, s, _ = np.linalg.svd(covariancematrix) + dof = np.sum(s) ** 2 / sum(s ** 2) + C = np.linalg.cholesky(covariancematrix) + + logging.debug('Cholesky decomposition has succeeded ') + logging.info('Appr. degrees of freedom in covariance matrix is %s' % (int(dof))) + + + # Create mean values + + newmean = np.ones(self.nparams, float) # standard value for a new time step is 1.0 + + # If this is not the start of the filter, average previous two optimized steps into the mix + + if lag == self.nlag - 1 and self.nlag >= 3: + newmean += self.ensemble_members[lag - 1][0].param_values + \ + self.ensemble_members[lag - 2][0].param_values + newmean = newmean / 3.0 + + # Create the first ensemble member with a deviation of 0.0 and add to list + + newmember = EnsembleMember(0) + newmember.param_values = newmean.flatten() # no deviations + self.ensemble_members[lag].append(newmember) + + # Create members 1:nmembers and add to ensemble_members list + #dist = self.distribution + + for member in range(1, self.nmembers): + # if dist == 'normal': + # rands = np.random.randn(self.nparams) + # elif dist == 'gamma11': + # rands = np.random.exponential(1,size=self.nparams) - 1 #substruct mean=1. + # elif dist == 'lognormal': + # rands = np.random.lognormal(0,0.5,size=self.nparams) - 1 #substruct median=1. + # else: + # logging.error('Distribution (%s)to generate from not known' %dist) + # sys.exit(2) + + rands = np.random.randn(self.nparams) + newmember = EnsembleMember(member) + newmember.param_values = np.dot(C, rands) + newmean + self.ensemble_members[lag].append(newmember) + + logging.info('%d new ensemble members generated from %s were added to the state vector # %d'\ + #%(self.nmembers,dist, (lag + 1))) + %(self.nmembers,'normal', (lag + 1))) + + def propagate(self, dacycle): + """ + :rtype: None + + Propagate the parameter values in the StateVector to the next cycle. This means a shift by one cycle + step for all states that will + be optimized once more, and the creation of a new ensemble for the time step that just + comes in for the first time (step=nlag). + In the future, this routine can incorporate a formal propagation of the statevector. + + """ + + # Remove State Vector n=1 by simply "popping" it from the list and appending a new empty list at the front. This empty list will + # hold the new ensemble for the new cycle + + self.ensemble_members.pop(0) + self.ensemble_members.append([]) + + # And now create a new time step of mean + members for n=nlag + date = dacycle['time.start'] + timedelta(days=(self.nlag - 0.5) * int(dacycle['time.cycle'])) + cov = self.get_covariance(date, dacycle) + self.make_new_ensemble(self.nlag - 1, dacycle, cov) + + logging.info('The state vector has been propagated by one cycle') + +################### End Class MethaneStateVector ################### + + +if __name__ == "__main__": + pass -- GitLab