diff --git a/da/methane/__init__.py b/da/methane/__init__.py new file mode 100755 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/da/methane/analysis/__init__.py b/da/methane/analysis/__init__.py new file mode 100755 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/da/methane/analysis/expand_fluxes.py b/da/methane/analysis/expand_fluxes.py new file mode 100755 index 0000000000000000000000000000000000000000..e91f7c1abb443ceb09f6fba3f3020c0024dbd6af --- /dev/null +++ b/da/methane/analysis/expand_fluxes.py @@ -0,0 +1,1100 @@ +#!/usr/bin/env python +# expand_fluxes.py +import sys +sys.path.append('../../') +import os +from datetime import datetime, timedelta + +import logging +import numpy as np +from da.tools.general import date2num, num2date +import da.methane.io4 as io +from da.methane.analysis.tools_regions import globarea +from da.methane.tools import state_to_grid +from da.tools.general import create_dirs +from da.analysis.tools_country import countryinfo # needed here +from da.methane.analysis.tools_transcom import transcommask, ExtendedTCRegions + + +import da.methane.analysis.tools_transcom as tc +import da.analysis.tools_country as ct +import da.analysis.tools_time as timetools + + + +""" +Author: Wouter Peters (Wouter.Peters@noaa.gov) + +Revision History: +File created on 21 Ocotber 2008. + +""" + +def proceed_dialog(txt, yes=['y', 'yes'], all=['a', 'all', 'yes-to-all']): + """ function to ask whether to proceed or not """ + response = raw_input(txt) + if response.lower() in yes: + return 1 + if response.lower() in all: + return 2 + return 0 + +def save_weekly_avg_1x1_data(dacycle, statevector): + """ + Function creates a NetCDF file with output on 1x1 degree grid. It uses the flux data written by the + :class:`~da.baseclasses.obsoperator.ObsOperator.py`, and multiplies these with the mapped parameters and + variance (not covariance!) from the :class:`~da.baseclasses.statevector.StateVector`. + + :param dacycle: a :class:`~da.tools.initexit.CycleControl` object + :param statevector: a :class:`~da.baseclasses.statevector.StateVector` + :rtype: None + """ +# + dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_flux1x1_weekly')) +# +# Some help variables +# + dectime0 = date2num(datetime(2000, 1, 1)) + dt = dacycle['cyclelength'] + startdate = dacycle['time.start'] + enddate = dacycle['time.end'] + nlag = statevector.nlag + + logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M')) + logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M')) + +# +# Create or open NetCDF output file +# + logging.debug("Create NetCDF output file: flux_1x1.%s.nc" % startdate.strftime('%Y-%m-%d')) + saveas = os.path.join(dirname, 'flux_1x1.%s.nc' % startdate.strftime('%Y-%m-%d')) + ncf = io.CT_CDF(saveas, 'write') + +# +# Create dimensions and lat/lon grid +# + dimgrid = ncf.add_latlon_dim() + dimensemble = ncf.add_dim('members', statevector.nmembers) + dimdate = ncf.add_date_dim() +# +# set title and tell GMT that we are using "pixel registration" +# + setattr(ncf, 'Title', 'CarbonTracker fluxes') + setattr(ncf, 'node_offset', 1) +# +# skip dataset if already in file +# + ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0 + skip = ncf.has_date(ncfdate) + if skip: + logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas)) + else: + +# +# if not, process this cycle. Start by getting flux input data from CTDAS +# + filename = os.path.join(dacycle['dir.output'], 'flux1x1_%s_%s.nc' % (startdate.strftime('%Y%m%d%H'), enddate.strftime('%Y%m%d%H'))) + + file = io.ct_read(filename, 'read') + bio = np.array(file.get_variable(dacycle.dasystem['background.ch4.bio.flux'])) + ocean = np.array(file.get_variable(dacycle.dasystem['background.ch4.ocean.flux'])) + fire = np.array(file.get_variable(dacycle.dasystem['background.ch4.fires.flux'])) + fossil = np.array(file.get_variable(dacycle.dasystem['background.ch4.fossil.flux'])) + term = np.array(file.get_variable(dacycle.dasystem['background.ch4.term.flux'])) + #mapped_parameters = np.array(file.get_variable(dacycle.dasystem['final.param.mean.1x1'])) + file.close() + + next = ncf.inq_unlimlen()[0] + + +# Start adding datasets from here on, both prior and posterior datasets for bio and ocn + + for prior in [True, False]: +# +# Now fill the statevector with the prior values for this time step. Note that the prior value for this time step +# occurred nlag time steps ago, so we make a shift in the output directory, but only if we are more than nlag cycle away from the start date.. +# + + if prior: + qual_short = 'prior' + for n in range(nlag, 0, -1): + priordate = enddate - timedelta(dt.days * n) + savedir = dacycle['dir.output'].replace(startdate.strftime('%Y%m%d'), priordate.strftime('%Y%m%d')) + filename = os.path.join(savedir, 'savestate_%s.nc' % priordate.strftime('%Y%m%d')) + if os.path.exists(filename): + statevector.read_from_file(filename, qual=qual_short) + gridmean, gridensemble = statevector.state_to_grid(lag=n) + +# Replace the mean statevector by all ones (assumed priors) + + gridmean = statevector.vector2grid(vectordata=np.ones(statevector.nparams,)) + + logging.debug('Read prior dataset from file %s, sds %d: ' % (filename, n)) + break + else: + qual_short = 'opt' + savedir = dacycle['dir.output'] + filename = os.path.join(savedir, 'savestate_%s.nc' % startdate.strftime('%Y%m%d')) + statevector.read_from_file(filename, qual=qual_short) + gridmean, gridensemble = statevector.state_to_grid(lag=1) + + logging.debug('Read posterior dataset from file %s, sds %d: ' % (filename, 1)) +# +# if prior, do not multiply fluxes with parameters, otherwise do +# + #print gridensemble.shape, bio.shape, gridmean.shape + biomapped = bio * gridmean + fossilmapped = fossil * gridmean + biovarmapped = bio * gridensemble + fossilvarmapped = fossil * gridensemble + +# +# +# For each dataset, get the standard definitions from the module mysettings, add values, dimensions, and unlimited count, then write +# + savedict = ncf.standard_var(varname='bio_flux_' + qual_short) + savedict['values'] = biomapped.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) +# + savedict = ncf.standard_var(varname='fossil_flux_' + qual_short) + savedict['values'] = fossilmapped.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + #print biovarmapped.shape + savedict = ncf.standard_var(varname='bio_flux_%s_ensemble' % qual_short) + savedict['values'] = biovarmapped.tolist() + savedict['dims'] = dimdate + dimensemble + dimgrid + savedict['count'] = next + ncf.add_data(savedict) +# + savedict = ncf.standard_var(varname='fossil_flux_%s_ensemble' % qual_short) + savedict['values'] = fossilvarmapped.tolist() + savedict['dims'] = dimdate + dimensemble + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + # End prior/posterior block + + savedict = ncf.standard_var(varname='fire_flux_imp') + savedict['values'] = fire.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) +# + savedict = ncf.standard_var(varname='ocn_flux_imp') + savedict['values'] = ocean.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + savedict = ncf.standard_var(varname='term_flux_imp') + savedict['values'] = term.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + #area = globarea() + #savedict = ncf.standard_var(varname='cell_area') + #savedict['values'] = area.tolist() + #savedict['dims'] = dimgrid + #ncf.add_data(savedict) +# + savedict = ncf.standard_var(varname='date') + savedict['values'] = date2num(startdate) - dectime0 + dt.days / 2.0 + savedict['dims'] = dimdate + savedict['count'] = next + ncf.add_data(savedict) + + sys.stdout.write('.') + sys.stdout.flush() +# +# Done, close the new NetCDF file +# + ncf.close() +# +# Return the full name of the NetCDF file so it can be processed by the next routine +# + logging.info("Gridded weekly average fluxes now written") + + return saveas + +def save_weekly_avg_state_data(dacycle, statevector): + """ + Function creates a NetCDF file with output for all parameters. It uses the flux data written by the + :class:`~da.baseclasses.obsoperator.ObsOperator.py`, and multiplies these with the mapped parameters and + variance (not covariance!) from the :class:`~da.baseclasses.statevector.StateVector`. + + :param dacycle: a :class:`~da.tools.initexit.CycleControl` object + :param statevector: a :class:`~da.baseclasses.statevector.StateVector` + :rtype: None + """ + + dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_state_weekly')) +# +# Some help variables +# + dectime0 = date2num(datetime(2000, 1, 1)) + dt = dacycle['cyclelength'] + startdate = dacycle['time.start'] + enddate = dacycle['time.end'] + nlag = statevector.nlag + + area = globarea() + vectorarea = statevector.grid2vector(griddata=area, method='sum') + + logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M')) + logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M')) + +# +# Create or open NetCDF output file +# + saveas = os.path.join(dirname, 'statefluxes.nc') + ncf = io.CT_CDF(saveas, 'write') + +# +# Create dimensions and lat/lon grid +# + dimregs = ncf.add_dim('nparameters', statevector.nparams) + dimmembers = ncf.add_dim('nmembers', statevector.nmembers) + dimdate = ncf.add_date_dim() +# +# set title and tell GMT that we are using "pixel registration" +# + setattr(ncf, 'Title', 'CarbonTracker fluxes') + setattr(ncf, 'node_offset', 1) +# +# skip dataset if already in file +# + ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0 + skip = ncf.has_date(ncfdate) + if skip: + logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas)) + else: + next = ncf.inq_unlimlen()[0] + +# +# if not, process this cycle. Start by getting flux input data from CTDAS +# + filename = os.path.join(dacycle['dir.output'], 'flux1x1_%s_%s.nc' % (startdate.strftime('%Y%m%d%H'), enddate.strftime('%Y%m%d%H'))) + + file = io.ct_read(filename, 'read') + bio = np.array(file.get_variable(dacycle.dasystem['background.ch4.bio.flux'])) + ocean = np.array(file.get_variable(dacycle.dasystem['background.ch4.ocean.flux'])) + fire = np.array(file.get_variable(dacycle.dasystem['background.ch4.fires.flux'])) + fossil = np.array(file.get_variable(dacycle.dasystem['background.ch4.fossil.flux'])) + term = np.array(file.get_variable(dacycle.dasystem['background.ch4.term.flux'])) + #mapped_parameters = np.array(file.get_variable(dacycle.dasystem['final.param.mean.1x1'])) + file.close() + + next = ncf.inq_unlimlen()[0] + + vectorbio = statevector.grid2vector(griddata=bio * area, method='sum') + vectorocn = statevector.grid2vector(griddata=ocean * area, method='sum') + vectorfire = statevector.grid2vector(griddata=fire * area, method='sum') + vectorfossil = statevector.grid2vector(griddata=fossil * area, method='sum') + vectorterm = statevector.grid2vector(griddata=term * area, method='sum') + + +# Start adding datasets from here on, both prior and posterior datasets for bio and ocn + + for prior in [True, False]: +# +# Now fill the statevector with the prior values for this time step. Note that the prior value for this time step +# occurred nlag time steps ago, so we make a shift in the output directory, but only if we are more than nlag cycle away from the start date.. +# + if prior: + qual_short = 'prior' + for n in range(nlag, 0, -1): + priordate = enddate - timedelta(dt.days * n) + savedir = dacycle['dir.output'].replace(startdate.strftime('%Y%m%d'), priordate.strftime('%Y%m%d')) + filename = os.path.join(savedir,'savestate_%s.nc' % priordate.strftime('%Y%m%d')) + if os.path.exists(filename): + statevector.read_from_file(filename, qual=qual_short) +# Replace the mean statevector by all ones (assumed priors) + statemean = np.ones((statevector.nparams,)) + choicelag = n + logging.debug('Read prior dataset from file %s, lag %d: ' % (filename, choicelag)) + break + else: + qual_short = 'opt' + savedir = dacycle['dir.output'] + filename = os.path.join(savedir, 'savestate_%s.nc' % startdate.strftime('%Y%m%d')) + statevector.read_from_file(filename) + choicelag = 1 + statemean = statevector.ensemble_members[choicelag - 1][0].param_values + logging.debug('Read posterior dataset from file %s, lag %d: ' % (filename, choicelag)) +# +# if prior, do not multiply fluxes with parameters, otherwise do +# + data = statemean * vectorbio # units of mole region-1 s-1 + + savedict = ncf.standard_var(varname='bio_flux_%s' % qual_short) + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + +# +# Here comes a special provision for the posterior flux covariances: these are calculated relative to the prior flux covariance to +# ensure they are indeed smaller due to the data assimilation. If they would be calculated relative to the mean posterior flux, the +# uncertainties would shift just because the mean flux had increased or decreased, which is not what we want. +# +# The implementation is done by multiplying the ensemble with the vectorbio only, and not with the statemean values +# which are assumed 1.0 in the prior always. +# + + members = statevector.ensemble_members[choicelag - 1] + deviations = np.array([mem.param_values * vectorbio for mem in members]) + deviations = deviations - deviations[0, :] + + savedict = ncf.standard_var(varname='bio_flux_%s_ensemble' % qual_short) + + savedict['values'] = deviations.tolist() + savedict['dims'] = dimdate + dimmembers + dimregs + savedict['comment'] = "This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance" + savedict['units'] = "mol region-1 s-1" + savedict['count'] = next + ncf.add_data(savedict) + + savedict = ncf.standard_var('unknown') + savedict['name'] = 'bio_flux_%s_std' % qual_short + savedict['long_name'] = 'Biosphere flux standard deviation, %s' % qual_short + savedict['values'] = deviations.std(axis=0) + savedict['dims'] = dimdate + dimregs + savedict['comment'] = "This is the standard deviation on each parameter" + savedict['units'] = "mol region-1 s-1" + savedict['count'] = next + ncf.add_data(savedict) + + data = statemean * vectorfossil # units of mole region-1 s-1 + + savedict = ncf.standard_var(varname='fossil_flux_%s' % qual_short) + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + + +# +# Here comes a special provision for the posterior flux covariances: these are calculated relative to the prior flux covariance to +# ensure they are indeed smaller due to the data assimilation. If they would be calculated relative to the mean posterior flux, the +# uncertainties would shift just because the mean flux had increased or decreased, which is not what we want. +# +# The implementation is done by multiplying the ensemble with the vectorocn only, and not with the statemean values +# which are assumed 1.0 in the prior always. +# + + deviations = np.array([mem.param_values * vectorfossil for mem in members]) + deviations = deviations - deviations[0, :] + + savedict = ncf.standard_var(varname='fossil_flux_%s_ensemble' % qual_short) + savedict['values'] = deviations.tolist() + savedict['dims'] = dimdate + dimmembers + dimregs + savedict['comment'] = "This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance" + savedict['units'] = "mol region-1 s-1" + savedict['count'] = next + ncf.add_data(savedict) + + savedict = ncf.standard_var('unknown') + savedict['name'] = 'fossil_flux_%s_std' % qual_short + savedict['long_name'] = 'Fossil flux standard deviation, %s' % qual_short + savedict['values'] = deviations.std(axis=0) + savedict['dims'] = dimdate + dimregs + savedict['comment'] = "This is the standard deviation on each parameter" + savedict['units'] = "mol region-1 s-1" + savedict['count'] = next + ncf.add_data(savedict) + + data = vectorfire + + savedict = ncf.standard_var(varname='fire_flux_imp') + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + + data = vectorterm + + savedict = ncf.standard_var(varname='term_flux_imp') + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + + data = vectorocn + + savedict = ncf.standard_var(varname='ocn_flux_imp') + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + + savedict = ncf.standard_var(varname='date') + savedict['values'] = ncfdate + savedict['dims'] = dimdate + savedict['count'] = next + ncf.add_data(savedict) + + sys.stdout.write('.') + sys.stdout.flush() +# +# Done, close the new NetCDF file +# + ncf.close() +# +# Return the full name of the NetCDF file so it can be processed by the next routine +# + logging.info("Vector weekly average fluxes now written") + + return saveas + + +def save_weekly_avg_tc_data(dacycle, statevector): + """ + Function creates a NetCDF file with output on TransCom regions. It uses the flux input from the + function `save_weekly_avg_1x1_data` to create fluxes of length `nparameters`, which are then projected + onto TC regions using the internal methods from :class:`~da.baseclasses.statevector.StateVector`. + + :param dacycle: a :class:`~da.tools.initexit.CycleControl` object + :param statevector: a :class:`~da.baseclasses.statevector.StateVector` + :rtype: None + + This function only read the prior fluxes from the flux_1x1.nc files created before, because we want to convolve + these with the parameters in the statevector. This creates posterior fluxes, and the posterior covariance for the complete + statevector in units of mol/box/s which we then turn into TC fluxes and covariances. + """ + +# + logging.debug("Analysis data tc weekly") + dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_tc_weekly')) +# +# Some help variables +# + dectime0 = date2num(datetime(2000, 1, 1)) + dt = dacycle['cyclelength'] + startdate = dacycle['time.start'] + enddate = dacycle['time.end'] + ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0 + + logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M')) + logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M')) + + # Write/Create NetCDF output file + # + logging.debug("...write to file: tcfluxes.nc") + saveas = os.path.join(dirname, 'tcfluxes.nc') + ncf = io.CT_CDF(saveas, 'write') + dimdate = ncf.add_date_dim() + dimidateformat = ncf.add_date_dim_format() + dimregs = ncf.add_region_dim(type='tc') +# +# set title and tell GMT that we are using "pixel registration" +# + logging.debug("...Set attributes") + setattr(ncf, 'Title', 'CarbonTracker TransCom fluxes') + setattr(ncf, 'node_offset', 1) + # + + skip = ncf.has_date(ncfdate) + if skip: + logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas)) + else: + + # Get input data + + logging.debug("...get input data") + area = globarea() + + infile = os.path.join(dacycle['dir.analysis'], 'data_state_weekly', 'statefluxes.nc') + if not os.path.exists(infile): + logging.error("Needed input file (%s) does not exist yet, please create file first, returning..." % infile) + return None + + logging.debug("...read file") + ncf_in = io.ct_read(infile, 'read') + + # Transform data one by one + + # Get the date variable, and find index corresponding to the dacycle date + + try: + dates = ncf_in.variables['date'][:] + except KeyError: + logging.error("The variable date cannot be found in the requested input file (%s) " % infile) + logging.error("Please make sure you create gridded fluxes before making TC fluxes ") + raise KeyError + + try: + index = dates.tolist().index(ncfdate) + except ValueError: + logging.error("The requested cycle date is not yet available in file %s " % infile) + logging.error("Please make sure you create state based fluxes before making TC fluxes") + raise ValueError + + # First add the date for this cycle to the file, this grows the unlimited dimension + + logging.debug("...add date") + savedict = ncf.standard_var(varname='date') + savedict['values'] = ncfdate + savedict['dims'] = dimdate + savedict['count'] = index + ncf.add_data(savedict) + + # Now convert other variables that were inside the flux_1x1 file + + logging.debug("...convert other variables") + vardict = ncf_in.variables + for vname, vprop in vardict.iteritems(): + + logging.debug("......%s" %vname) + data = ncf_in.get_variable(vname)[index] + + if vname in ['latitude','longitude', 'date', 'idate'] or 'std' in vname: + continue + elif 'ensemble' in vname: + tcdata = [] + logging.debug(".........append ensembles") + for member in data: + tcdata.append(statevector.vector2tc(vectordata=member)) + + logging.debug(".........create tcdata") + tcdata = np.array(tcdata) + try: + cov = tcdata.transpose().dot(tcdata) / (statevector.nmembers - 1) + except: + cov = np.dot(tcdata.transpose(), tcdata) / (statevector.nmembers - 1) # Huygens fix + + #print vname,cov.sum() + + tcdata = cov + + logging.debug(".........save") + savedict = ncf.standard_var(varname=vname.replace('ensemble', 'cov')) + savedict['units'] = '[mol/region/s]**2' + savedict['dims'] = dimdate + dimregs + dimregs + + else: + + logging.debug(".........create tcdata") + tcdata = statevector.vector2tc(vectordata=data) # vector to TC + + logging.debug(".........create variables, dimension, units, count and values") + savedict = ncf.standard_var(varname=vname) + savedict['dims'] = dimdate + dimregs + savedict['units'] = 'mol/region/s' + + savedict['count'] = index + savedict['values'] = tcdata + logging.debug("......save to file") + ncf.add_data(savedict) + + ncf_in.close() + ncf.close() + + logging.info("TransCom weekly average fluxes now written") + + return saveas + +def save_weekly_avg_ext_tc_data(dacycle): + """ Function SaveTCDataExt saves surface flux data to NetCDF files for extended TransCom regions + + *** Inputs *** + rundat : a RunInfo object + + *** Outputs *** + NetCDF file containing n-hourly global surface fluxes per TransCom region + + *** Example *** + ./expand_savestate project=enkf_release sd=20000101 ed=20010101 """ + + +# + dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_tc_weekly')) +# +# Some help variables +# + dectime0 = date2num(datetime(2000, 1, 1)) + dt = dacycle['cyclelength'] + startdate = dacycle['time.start'] + enddate = dacycle['time.end'] + ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0 + + logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M')) + logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M')) + + # Write/Create NetCDF output file + # + saveas = os.path.join(dirname, 'tc_extfluxes.nc') + ncf = io.CT_CDF(saveas, 'write') + dimdate = ncf.add_date_dim() + dimidateformat = ncf.add_date_dim_format() + dimregs = ncf.add_region_dim(type='tc_ext') +# +# set title and tell GMT that we are using "pixel registration" +# + setattr(ncf, 'Title', 'CarbonTracker TransCom fluxes') + setattr(ncf, 'node_offset', 1) + # + + skip = ncf.has_date(ncfdate) + if skip: + logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas)) + else: + infile = os.path.join(dacycle['dir.analysis'], 'data_tc_weekly', 'tcfluxes.nc') + if not os.path.exists(infile): + logging.error("Needed input file (%s) does not exist yet, please create file first, returning..." % infile) + return None + + ncf_in = io.ct_read(infile, 'read') + + # Transform data one by one + + # Get the date variable, and find index corresponding to the dacycle date + + try: + dates = ncf_in.variables['date'][:] + except KeyError: + logging.error("The variable date cannot be found in the requested input file (%s) " % infile) + logging.error("Please make sure you create gridded fluxes before making extended TC fluxes") + raise KeyError + + try: + index = dates.tolist().index(ncfdate) + except ValueError: + logging.error("The requested cycle date is not yet available in file %s " % infile) + logging.error("Please make sure you create state based fluxes before making extended TC fluxes ") + raise ValueError + + # First add the date for this cycle to the file, this grows the unlimited dimension + + savedict = ncf.standard_var(varname='date') + savedict['values'] = ncfdate + savedict['dims'] = dimdate + savedict['count'] = index + ncf.add_data(savedict) + + # Now convert other variables that were inside the tcfluxes.nc file + + vardict = ncf_in.variables + for vname, vprop in vardict.iteritems(): + + data = ncf_in.get_variable(vname)[index] + + if vname == 'latitude': continue + elif vname == 'longitude': continue + elif vname == 'date': continue + elif vname == 'idate': continue + elif 'cov' in vname: + + tcdata = ExtendedTCRegions(data, cov=True) + + savedict = ncf.standard_var(varname=vname) + savedict['units'] = '[mol/region/s]**2' + savedict['dims'] = dimdate + dimregs + dimregs + + else: + + tcdata = ExtendedTCRegions(data, cov=False) + + savedict = ncf.standard_var(varname=vname) + savedict['dims'] = dimdate + dimregs + savedict['units'] = 'mol/region/s' + + savedict['count'] = index + savedict['values'] = tcdata + ncf.add_data(savedict) + + ncf_in.close() + ncf.close() + + logging.info("TransCom weekly average extended fluxes now written") + + return saveas + +def save_weekly_avg_agg_data(dacycle, region_aggregate='olson'): + """ + Function creates a NetCDF file with output on TransCom regions. It uses the flux input from the + function `save_weekly_avg_1x1_data` to create fluxes of length `nparameters`, which are then projected + onto TC regions using the internal methods from :class:`~da.baseclasses.statevector.StateVector`. + + :param dacycle: a :class:`~da.tools.initexit.CycleControl` object + :param StateVector: a :class:`~da.baseclasses.statevector.StateVector` + :rtype: None + + This function only read the prior fluxes from the flux_1x1.nc files created before, because we want to convolve + these with the parameters in the statevector. This creates posterior fluxes, and the posterior covariance for the complete + statevector in units of mol/box/s which we then turn into TC fluxes and covariances. + """ + +# + dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_%s_weekly' % region_aggregate)) +# +# Some help variables +# + dectime0 = date2num(datetime(2000, 1, 1)) + dt = dacycle['cyclelength'] + startdate = dacycle['time.start'] + enddate = dacycle['time.end'] + ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0 + + logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M')) + logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M')) + + logging.debug("Aggregating 1x1 fluxes to %s totals" % region_aggregate) + + + # Write/Create NetCDF output file + # + saveas = os.path.join(dirname, '%s_fluxes.%s.nc' % (region_aggregate, startdate.strftime('%Y-%m-%d'))) + ncf = io.CT_CDF(saveas, 'write') + dimdate = ncf.add_date_dim() + dimidateformat = ncf.add_date_dim_format() + dimgrid = ncf.add_latlon_dim() # for mask +# +# Select regions to aggregate to +# + + if region_aggregate == "olson": + regionmask = tc.olson240mask + dimname = 'olson' + dimregs = ncf.add_dim(dimname, regionmask.max()) + + regionnames = [] + for i in range(11): + for j in range(19): + regionnames.append("%s_%s" % (tc.transnams[i], tc.olsonnams[j],)) + regionnames.extend(tc.oifnams) + + for i, name in enumerate(regionnames): + lab = 'Aggregate_Region_%03d' % (i + 1,) + setattr(ncf, lab, name) + + elif region_aggregate == "transcom": + regionmask = tc.transcommask + dimname = 'tc' + dimregs = ncf.add_region_dim(type='tc') + + elif region_aggregate == "country": + + countrydict = ct.get_countrydict() + selected = ['Russia', 'Canada', 'China', 'United States', 'EU27', 'Brazil', 'Australia', 'India'] #,'G8','UNFCCC_annex1','UNFCCC_annex2'] + regionmask = np.zeros((180, 360,), 'float') + + for i, name in enumerate(selected): + lab = 'Country_%03d' % (i + 1,) + setattr(ncf, lab, name) + + if name == 'EU27': + namelist = ct.EU27 + elif name == 'EU25': + namelist = ct.EU25 + elif name == 'G8': + namelist = ct.G8 + elif name == 'UNFCCC_annex1': + namelist = ct.annex1 + elif name == 'UNFCCC_annex2': + namelist = ct.annex2 + else: + namelist = [name] + + for countryname in namelist: + try: + country = countrydict[countryname] + regionmask.put(country.gridnr, i + 1) + except: + continue + + dimname = 'country' + dimregs = ncf.add_dim(dimname, regionmask.max()) + + elif region_aggregate == "methane": + regionmask = tc.methane71mask + dimname = 'methaneland' + dimregs = ncf.add_dim(dimname, regionmask.max()) + + regionnames = [] + for i in range(6): + for j in range(16): + regionnames.append("%s_%s" % (tc.transnams[i], tc.methanenams[j],)) + regionnames.extend(tc.oceannams) + + for i, name in enumerate(regionnames): + lab = 'Aggregate_Region_%03d' % (i + 1,) + setattr(ncf, lab, name) + + + # + + skip = ncf.has_date(ncfdate) + if skip: + logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas)) + else: + # + # set title and tell GMT that we are using "pixel registration" + # + setattr(ncf, 'Title', 'CTDAS Aggregated fluxes') + setattr(ncf, 'node_offset', 1) + + savedict = ncf.standard_var('unknown') + savedict['name'] = 'regionmask' + savedict['comment'] = 'numerical mask used to aggregate 1x1 flux fields, each integer 0,...,N is one region aggregated' + savedict['values'] = regionmask.tolist() + savedict['units'] = '-' + savedict['dims'] = dimgrid + savedict['count'] = 0 + ncf.add_data(savedict) + + # Get input data from 1x1 degree flux files + + area = globarea() + + infile = os.path.join(dacycle['dir.analysis'], 'data_flux1x1_weekly', 'flux_1x1.%s.nc' % startdate.strftime('%Y-%m-%d')) + if not os.path.exists(infile): + logging.error("Needed input file (%s) does not exist yet, please create file first, returning..." % infile) + return None + + ncf_in = io.ct_read(infile, 'read') + + # Transform data one by one + + # Get the date variable, and find index corresponding to the dacycle date + + try: + dates = ncf_in.variables['date'][:] + except KeyError: + logging.error("The variable date cannot be found in the requested input file (%s) " % infile) + logging.error("Please make sure you create gridded fluxes before making TC fluxes ") + raise KeyError + + try: + index = dates.tolist().index(ncfdate) + except ValueError: + logging.error("The requested cycle date is not yet available in file %s " % infile) + logging.error("Please make sure you create state based fluxes before making TC fluxes ") + raise ValueError + + # First add the date for this cycle to the file, this grows the unlimited dimension + + savedict = ncf.standard_var(varname='date') + savedict['values'] = ncfdate + savedict['dims'] = dimdate + savedict['count'] = index + ncf.add_data(savedict) + + # Now convert other variables that were inside the statevector file + + vardict = ncf_in.variables + for vname, vprop in vardict.iteritems(): + if vname == 'latitude': continue + elif vname == 'longitude': continue + elif vname == 'date': continue + elif vname == 'idate': continue + elif 'std' in vname: continue + elif 'ensemble' in vname: + + data = ncf_in.get_variable(vname)[index] + + dimensemble = ncf.add_dim('members', data.shape[0]) + + regiondata = [] + for member in data: + aggdata = state_to_grid(member * area, regionmask, reverse=True, mapname=region_aggregate) + regiondata.append(aggdata) + + regiondata = np.array(regiondata) + + savedict = ncf.standard_var(varname=vname) + savedict['units'] = 'mol/region/s' + savedict['dims'] = dimdate + dimensemble + dimregs + + elif 'flux' in vname: + + data = ncf_in.get_variable(vname)[index] + + regiondata = state_to_grid(data * area, regionmask, reverse=True, mapname=region_aggregate) + + savedict = ncf.standard_var(varname=vname) + savedict['dims'] = dimdate + dimregs + savedict['units'] = 'mol/region/s' + + else: + + data = ncf_in.get_variable(vname)[:] + regiondata = state_to_grid(data, regionmask, reverse=True, mapname=region_aggregate) + + savedict = ncf.standard_var(varname=vname) + savedict['dims'] = dimdate + dimregs + + savedict['count'] = index + savedict['values'] = regiondata + ncf.add_data(savedict) + + ncf_in.close() + ncf.close() + + logging.info("%s aggregated weekly average fluxes now written" % dimname) + + return saveas + +def save_time_avg_data(dacycle, infile, avg='monthly'): + """ Function saves time mean surface flux data to NetCDF files + + *** Inputs *** + rundat : a RunInfo object + + *** Outputs *** + daily NetCDF file containing 1-hourly global surface fluxes at 1x1 degree + + *** Example *** + ./expand_savestate project=enkf_release sd=20000101 ed=20010101 """ + + if 'weekly' in infile: + intime = 'weekly' + if 'monthly' in infile: + intime = 'monthly' + if 'yearly' in infile: + intime = 'yearly' + + dirname, filename = os.path.split(infile) + outdir = create_dirs(os.path.join(dacycle['dir.analysis'], dirname.replace(intime, avg))) + + dectime0 = date2num(datetime(2000, 1, 1)) + +# Create NetCDF output file +# + saveas = os.path.join(outdir, filename) + ncf = io.CT_CDF(saveas, 'create') + dimdate = ncf.add_date_dim() +# +# Open input file specified from the command line +# + if not os.path.exists(infile): + logging.error("Needed input file (%s) not found. Please create this first:" % infile) + logging.error("returning...") + return None + else: + pass + + file = io.ct_read(infile, 'read') + datasets = file.variables.keys() + date = file.get_variable('date') + globatts = file.ncattrs() + + for att in globatts: + attval = file.getncattr(att) + if not att in ncf.ncattrs(): + ncf.setncattr(att, attval) + + + time = [datetime(2000, 1, 1) + timedelta(days=d) for d in date] + +# loop over datasets in infile, skip idate and date as we will make new time axis for the averaged data + + for sds in ['date'] + datasets: + +# get original data + + data = file.get_variable(sds) + varatts = file.variables[sds].ncattrs() + vardims = file.variables[sds].dimensions +# +# Depending on dims of input dataset, create dims for output dataset. Note that we add the new dimdate now. +# + + for d in vardims: + if 'date' in d: + continue + if d in ncf.dimensions.keys(): + pass + else: + dim = ncf.createDimension(d, size=len(file.dimensions[d])) + + savedict = ncf.standard_var(sds) + savedict['name'] = sds + savedict['dims'] = vardims + savedict['units'] = file.variables[sds].units + savedict['long_name'] = file.variables[sds].long_name + savedict['comment'] = file.variables[sds].comment + savedict['standard_name'] = file.variables[sds].standard_name + savedict['count'] = 0 + + if not 'date' in vardims: + savedict['values'] = data + ncf.add_data(savedict) + else: + + if avg == 'monthly': + time_avg, data_avg = timetools.monthly_avg(time, data) + elif avg == 'seasonal': + time_avg, data_avg = timetools.season_avg(time, data) + elif avg == 'yearly': + time_avg, data_avg = timetools.yearly_avg(time, data) + elif avg == 'longterm': + time_avg, data_avg = timetools.longterm_avg(time, data) + time_avg = [time_avg] + data_avg = [data_avg] + else: + raise ValueError, 'Averaging (%s) does not exist' % avg + + count = -1 + for dd, data in zip(time_avg, data_avg): + count = count + 1 + if sds == 'date': + savedict['values'] = date2num(dd) - dectime0 + else: + savedict['values'] = data + savedict['count'] = count + ncf.add_data(savedict, silent=True) + + sys.stdout.write('.') + + sys.stdout.write('\n') + sys.stdout.flush() + +# end NetCDF file access + file.close() + ncf.close() + + logging.info("------------------- Finished time averaging---------------------------------") + + return saveas + +if __name__ == "__main__": + from da.tools.initexit import CycleControl + from da.carbondioxide.dasystem import CO2DaSystem + from da.carbondioxide.statevector import CO2StateVector + + sys.path.append('../../') + + logging.root.setLevel(logging.DEBUG) + + dacycle = CycleControl(args={'rc':'../../ctdas-od-gfed2-glb6x4-obspack-full.rc'}) + dacycle.setup() + dacycle.parse_times() + + dasystem = CO2DaSystem('../rc/carbontracker_ct09_opfnew.rc') + + dacycle.dasystem = dasystem + + statevector = CO2StateVector() + statevector.setup(dacycle) + + while dacycle['time.end'] < dacycle['time.finish']: + save_weekly_avg_1x1_data(dacycle, statevector) + save_weekly_avg_state_data(dacycle, statevector) + save_weekly_avg_tc_data(dacycle, statevector) + save_weekly_avg_ext_tc_data(dacycle) + save_weekly_avg_agg_data(dacycle, region_aggregate='olson') + save_weekly_avg_agg_data(dacycle, region_aggregate='transcom') + save_weekly_avg_agg_data(dacycle, region_aggregate='country') + + dacycle.advance_cycle_times() + + statevector = None # free memory + + sys.exit(0) + diff --git a/da/methane/analysis/tools_regions.py b/da/methane/analysis/tools_regions.py new file mode 100755 index 0000000000000000000000000000000000000000..2f267f2474e7ce410a5535dbf77143b9c230f9b0 --- /dev/null +++ b/da/methane/analysis/tools_regions.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python + +import numpy as np +import cPickle + +def state_to_grid(values, regionmap, reverse=False, avg=False, mapname=None): + """ + This method converts parameters from a CarbonTracker StateVector object to a gridded map of linear multiplication values. These + can subsequently be used in the transport model code to multiply/manipulate fluxes + + """ + nregions = regionmap.max() + try: + if not mapname: + raise Exception + + regionselect = cPickle.load(open('%s_regiondict.pickle' % mapname, 'rb')) + except: + + # dictionary for region <-> map conversions + regs = {} + for r in np.arange(1, nregions + 1): + sel = (regionmap.flat == r).nonzero() + if len(sel[0]) > 0: + regs[r] = sel + + regionselect = regs + + cPickle.dump(regionselect, open('%s_regiondict.pickle' % mapname, 'wb'), -1) + print 'Pickling region map' + + if reverse: + """ project 1x1 degree map onto ecoregions """ + + result = np.zeros(nregions, float) + for k, v in regionselect.iteritems(): + if avg: + result[k - 1] = values.ravel().take(v).mean() + else : + result[k - 1] = values.ravel().take(v).sum() + return result + + else: + """ project ecoregion properties onto 1x1 degree map """ + + result = np.zeros((180, 360,), float) + for k, v in regionselect.iteritems(): + result.put(v, values[k - 1]) + + return result + +def globarea(im=360, jm=180, silent=True): + """ Function calculates the surface area according to TM5 definitions""" + + radius = 6.371e6 # the earth radius in meters + deg2rad = np.pi / 180. + g = 9.80665 + + dxx = 360.0 / im * deg2rad + dyy = 180.0 / jm * deg2rad + lat = np.arange(-90 * deg2rad, 90 * deg2rad, dyy) + dxy = dxx * (np.sin(lat + dyy) - np.sin(lat)) * radius ** 2 + area = np.resize(np.repeat(dxy, im, axis=0) , [jm, im]) + if not silent: + print 'total area of field = ', np.sum(area.flat) + print 'total earth area = ', 4 * np.pi * radius ** 2 + return area + diff --git a/da/methane/dasystem.py b/da/methane/dasystem.py new file mode 100755 index 0000000000000000000000000000000000000000..65184e9020fb4db89d474e4c8aa08d2d3bcbce90 --- /dev/null +++ b/da/methane/dasystem.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +# control.py + +""" +Author : aki + +Revision History: +File created on 16 Nov 2013. + +""" + +import logging + +################### Begin Class MethaneDaSystem ################### + +from da.baseclasses.dasystem import DaSystem + +class MethaneDaSystem(DaSystem): + """ Information on the data assimilation system used. This is normally an rc-file with settings. + """ + def validate(self): + """ + validate the contents of the rc-file given a dictionary of required keys + """ + + needed_rc_items = ['obs.input.dir', + 'obs.input.fname'] + + + for k, v in self.iteritems(): + if v == 'True' : + self[k] = True + if v == 'False': + self[k] = False + + for key in needed_rc_items: + if not self.has_key(key): + logging.warning('Missing a required value in rc-file : %s' % key) + logging.debug('DA System Info settings have been validated succesfully') + +################### End Class MethaneDaSystem ################### + + +if __name__ == "__main__": + pass diff --git a/da/methane/initexit.py b/da/methane/initexit.py new file mode 100755 index 0000000000000000000000000000000000000000..8e9fb5b045f80ca9a31d548b676118a616518d1e --- /dev/null +++ b/da/methane/initexit.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python +# da_initexit.py + +""" +modified for module initext +Revision History: +File created on Apr. 2016 by Aki +File revised on Feb. 2017 by Aki + +""" +import logging +import os +import sys +import numpy as np +from string import join + +import da.tools.rc as rc +from da.tools.initexit import * + +needed_da_items = [ + 'time.start', + 'time.finish', + 'time.nlag', + 'time.cycle', + 'dir.da_run', + 'da.system', + 'da.system.rc', + 'da.obsoperator', + 'da.obsoperator.rc', + 'da.optimizer.nmembers', + 'da.suffixname'] + + +#def validate_rc2(self): +# """ +# Validate the contents of the rc-file given a dictionary of required keys. +# Currently required keys are :attr:`~da.tools.initexit.needed_da_items` +# """ +# +# for k, v in self.iteritems(): +# if v in ['True', 'true', 't', 'T', 'y', 'yes']: +# self[k] = True +# if v in ['False', 'false', 'f', 'F', 'n', 'no']: +# self[k] = False +# if 'date' in k : +# self[k] = to_datetime(v) +# if k in ['time.start', 'time.end', 'time.finish', 'da.restart.tstamp']: +# self[k] = to_datetime(v) +# for key in needed_da_items: +# if not self.has_key(key): +# msg = 'Missing a required value in rc-file : %s' % key +# logging.error(msg) +# logging.error('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ') +# logging.error('Please note the update on Dec 02 2011 where rc-file names for DaSystem and ') +# logging.error('are from now on specified in the main rc-file (see da/rc/da.rc for example)') +# logging.error('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ') +# raise IOError, msg +# logging.debug('DA Cycle settings have been validated succesfully') +#CycleControl.validate_rc = validate_rc2 + +def submit_next_cycle_fmi(self): + """ + Submit the next job of a DA cycle, this consists of + * Changing to the working directory from which the job was started initially + * create a line to start the master script again with a newly created rc-file + * Submitting the jobfile + + If the end of the cycle series is reached, no new job is submitted. + + """ + + + if self['time.end'] < self['time.finish']: + + # file ID and names + jobid = self['time.end'].strftime('%Y%m%d') + targetdir = os.path.join(self['dir.exec']) + jobfile = os.path.join(targetdir, 'jb.%s.jb' % jobid) + logfile = os.path.join(targetdir, 'jb.%s.log' % jobid) + # Template and commands for job + jobparams = {'jobname':"ctdas", 'jobtime':'05:00:00', 'logfile': logfile, 'errfile': logfile, + 'jobpes':'120','jobnodes':'20'} + template = self.daplatform.get_job_template(jobparams) + execcommand = os.path.join(self['dir.da_submit'], sys.argv[0]) + if '-t' in self.opts: + (self.opts).remove('-t') + #template += 'python %s rc=%s %s' % (execcommand, self['da.restart.fname'], join(self.opts, '')) + template += 'cd %s \n' %self['dir.da_submit'] + template += '%s rc=%s %s' % (sys.argv[0], self['da.restart.fname'], join(self.opts, '')) + # write and submit + self.daplatform.write_job(jobfile, template, jobid) + jobid = self.daplatform.submit_job(jobfile, joblog=logfile) + job_file = 'ctdas.o%s' %jobid[0:-4] + logging.info('Jobfile %s' %job_file ) + logging.info('DA cycle has been submitted.') + + else: + logging.info('Final date reached, no new cycle started') + +def setup_file_structure_fmi(self): + + # Aki: name of file structure changed + # At FMI, job files are stored in executed folders, and cannot define where to put them. + # So job file folder is not created. + + filtertime = self['time.start'].strftime('%Y%m%d') + + suf = self['da.suffixname'] + self['dir.exec'] = os.path.join(self['dir.da_run'], 'exec') + self['dir.input'] = os.path.join(self['dir.da_run'], 'input_%s' %suf ) + self['dir.output'] = os.path.join(self['dir.da_run'], 'output_%s' %suf ) + self['dir.analysis'] = os.path.join(self['dir.da_run'], 'analysis_%s' %suf ) + #self['dir.jobs'] = os.path.join(self['dir.da_run'], 'jobs') + self['dir.restart'] = os.path.join(self['dir.da_run'], 'restart_%s' %suf ) + + logging.info("setup_file_structure %s" %self['dir.output']) + create_dirs(self['dir.da_run']) + create_dirs(os.path.join(self['dir.exec'])) + create_dirs(os.path.join(self['dir.input'])) + create_dirs(os.path.join(self['dir.output'])) + create_dirs(os.path.join(self['dir.analysis'])) + #create_dirs(os.path.join(self['dir.jobs'])) + create_dirs(os.path.join(self['dir.restart'])) + + logging.info('Succesfully created the file structure for the assimilation job') + +def setup_fmi(self): + + if self['transition']: + logging.info("Transition of filter from previous step with od meteo from 25 to 34 levels") + self.setup_file_structure() + strippedname = os.path.split(self['jobrcfilename'])[-1] + self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname) + self.read_random_seed(False) + + elif self['time.restart']: + logging.info("Restarting filter from previous step") + self.setup_file_structure() + strippedname = os.path.split(self['jobrcfilename'])[-1] + self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname) + self.read_random_seed(False) + + else: #assume that it is a fresh start, change this condition to more specific if crash recover added + logging.info("First time step in filter sequence") + self.setup_file_structure() + + # expand jobrcfilename to include exec dir from now on. + # First strip current leading path from filename + + strippedname = os.path.split(self['jobrcfilename'])[-1] + self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname) + if self.dasystem.has_key('copyregionsfile'): + shutil.copy(os.path.join(self.dasystem['regionsfile']),os.path.join(self['dir.exec'],'da','methane','analysis','copied_regions.nc')) + logging.info('Copied regions file to the analysis directory: %s'%os.path.join(self.dasystem['regionsfile'])) + + if self.dasystem.has_key('extendedregionsfile'): + shutil.copy(os.path.join(self.dasystem['extendedregionsfile']),os.path.join(self['dir.exec'],'da','analysis','copied_regions_extended.nc')) + logging.info('Copied extended regions file to the analysis directory: %s'%os.path.join(self.dasystem['extendedregionsfile'])) + + for filename in glob.glob(os.path.join(self['dir.exec'],'da','analysis','*.pickle')): + logging.info('Deleting pickle file %s to make sure the correct regions are used'%os.path.split(filename)[1]) + os.remove(filename) + + for filename in glob.glob(os.path.join(self['dir.exec'],'*.pickle')): + logging.info('Deleting pickle file %s to make sure the correct regions are used'%os.path.split(filename)[1]) + os.remove(filename) + + if self.dasystem.has_key('random.seed.init'): + self.read_random_seed(True) + + self.parse_times() + #self.write_rc(self['jobrcfilename']) + + +CycleControl.submit_next_cycle = submit_next_cycle_fmi +CycleControl.setup_file_structure = setup_file_structure_fmi +CycleControl.setup = setup_fmi + +if __name__ == "__main__": + pass + diff --git a/da/methane/io4.py b/da/methane/io4.py new file mode 100755 index 0000000000000000000000000000000000000000..2a6b506212db09e155fc406e74cf19dd06606357 --- /dev/null +++ b/da/methane/io4.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# io.py + +""" +Author : aki + +Revision History: +File created on Apr 2016. + +""" +import standardvariables +import datetime as dt +from numpy import array, arange +import os +import logging +import sys +sys.path.append('/stornext/store/carbon/CarbonTracker/netcdf4/python_packages_install_dir/lib/python2.7/site-packages/') +import netCDF4 +sys.path.append('/stornext/store/carbon/CarbonTracker/pyhdf/pyhdf-0.8.3/lib/python2.7/site-packages/') +import pyhdf.SD as hdf +sys.path.append('../') +from da.tools.io4 import * + + +disclaimer = "This data belongs to the CarbonTracker project" +email = "aki.tsuruta@fmi.fi" +url = "http://en.ilmatieteenlaitos.fi/carbon-cycle-modelling" +institution = "Finnish Meteorological Institute, Climate research" +source = "CarbonTracker release 1.0" +conventions = "CF-1.1" +historytext = 'created on '+dt.datetime.now().strftime('%B %d, %Y')+' by %s'%os.environ['USER'] + + +def add_tc_header2(self): + + # + self.setncattr('Institution',institution) + self.setncattr('Contact',email) + self.setncattr('URL',url) + self.setncattr('Source',source) + self.setncattr('Convention',conventions) + self.setncattr('Disclaimer',disclaimer) + self.setncattr('History',historytext) + +def standard_var2(self,varname): + """ return properties of standard variables """ + import standardvariables + + if varname in standardvariables.standard_variables.keys(): + return standardvariables.standard_variables[varname] + else: + return standardvariables.standard_variables['unknown'] + + + +CT_CDF.add_tc_header = add_tc_header2 +CT_CDF.standard_var = standard_var2 +CT_HDF.standard_var = standard_var2 diff --git a/da/methane/obs.py b/da/methane/obs.py new file mode 100755 index 0000000000000000000000000000000000000000..6f08d9dd2b82d0a0351cd12a17beb00ec9030d12 --- /dev/null +++ b/da/methane/obs.py @@ -0,0 +1,461 @@ +#!/usr/bin/env python +# obs.py + +""" +Author : aki + +Revision History: +File revised on Feb 2017. + +""" +import os +import sys +import logging +#from da.baseclasses.statevector import filename +import datetime as dtm +from string import strip +from numpy import array, logical_and + +sys.path.append(os.getcwd()) +sys.path.append('../../') + +identifier = 'CarbonTracker CH4 mole fractions' +version = '0.0' + +from da.baseclasses.obs import Observations +import da.methane.io4 as io +import da.tools.rc as rc + +################### Begin Class CH4Observations ################### + +class MethaneObservations(Observations): + """ an object that holds data + methods and attributes needed to manipulate mole fraction values """ + + def setup(self, dacycle): + self.startdate = dacycle['time.sample.start'] + self.enddate = dacycle['time.sample.end'] + + sfname = dacycle.dasystem['obs.input.fname'] + if sfname.endswith('.nc'): + filename = os.path.join(dacycle.dasystem['obs.input.dir'], sfname) + else: + filename = os.path.join(dacycle.dasystem['obs.input.dir'], sfname + '.' + self.startdate.strftime('%Y%m%d') + '.nc') + + if not os.path.exists(filename): + msg = 'Could not find the required observation input file (%s) ' % filename + logging.error(msg) + raise IOError, msg + else: + self.obs_filename = filename + self.datalist = [] + + def add_observations(self): + """ Returns a MoleFractionList holding individual MoleFractionSample objects for all obs in a file + + The CarbonTracker mole fraction files are provided as one long list of obs for all possible dates. So we can + either: + + (1) read all, and the subselect the data we will use in the rest of this cycle + (2) Use nco to make a subset of the data + + For now, we will stick with option (1) + + """ + ncf = io.ct_read(self.obs_filename, 'read') + idates = ncf.get_variable('date_components') + dates = array([dtm.datetime(*d) for d in idates]) + + subselect = logical_and(dates >= self.startdate, dates <= self.enddate).nonzero()[0] + + dates = dates.take(subselect, axis=0) + + ids = ncf.get_variable('obs_num').take(subselect, axis=0) + #evn = ncf.get_variable('eventnumber').take(subselect, axis=0) + #evn = [s.tostring().lower() for s in evn] + #evn = map(strip, evn) + sites = ncf.get_variable('obs_id').take(subselect, axis=0) + sites = [s.tostring().lower() for s in sites] + sites = map(strip, sites) + lats = ncf.get_variable('latitude').take(subselect, axis=0) + lons = ncf.get_variable('longitude').take(subselect, axis=0) + alts = ncf.get_variable('altitude').take(subselect, axis=0) + obs = ncf.get_variable('obs').take(subselect, axis=0) * 1.e-9 + logging.info("Converting observed values from ppb to mol/mol!!!!") + #species = ncf.get_variable('species').take(subselect, axis=0) + #species = [s.tostring().lower() for s in species] + #species = map(strip, species) + #strategy = ncf.get_variable('sampling_strategy').take(subselect, axis=0) + #flags = ncf.get_variable('NOAA_QC_flags').take(subselect, axis=0) + #flags = [s.tostring().lower() for s in flags] + #flags = map(strip, flags) + #flags = [int(f == '...') for f in flags] + ncf.close() + + logging.debug("Successfully read data from obs file (%s)" % self.obs_filename) + + for n in range(len(dates)): + obs[n] = obs[n] + #self.datalist.append(MoleFractionSample(ids[n], dates[n], sites[n], obs[n], 0.0, 0.0, 0.0, 0.0, flags[n], alts[n], lats[n], lons[n], evn[n], species[n], strategy[n], 0.0, self.obs_filename)) + self.datalist.append( MoleFractionSample(ids[n], dates[n], sites[n], obs[n], 0.0, 0.0, 0.0, 0.0, 0.0, alts[n], lats[n], lons[n], '0000', 'ch4', 1.0, 0.0, self.obs_filename) ) + logging.debug("Added %d observations to the Data list" % len(dates)) + + def add_simulations(self, filename, silent=True): + """ Adds model simulated values to the mole fraction objects """ + + + if not os.path.exists(filename): + msg = "Sample output filename for observations could not be found : %s" % filename + logging.error(msg) + logging.error("Did the sampling step succeed?") + logging.error("...exiting") + raise IOError, msg + + ncf = io.ct_read(filename, method='read') + ids = ncf.get_variable('obs_num') + simulated = ncf.get_variable('flask') + #for i in xrange(simulated.shape[0]): + # print simulated[i,:] + ncf.close() + logging.info("Successfully read data from model sample file (%s)" % filename) + + obs_ids = self.getvalues('id') + + obs_ids = obs_ids.tolist() + ids = map(int, ids) + + missing_samples = [] + + for idx, val in zip(ids, simulated): + if idx in obs_ids: + index = obs_ids.index(idx) + #print id,val,val.shape + self.datalist[index].simulated = val + else: + missing_samples.append(idx) + + if not silent and missing_samples != []: + logging.warning('Model samples were found that did not match any ID in the observation list. Skipping them...') + #msg = '%s'%missing_samples ; logging.warning(msg) + + logging.info("Added %d simulated values to the Data list" % (len(ids) - len(missing_samples))) + + def write_sample_coords(self, obsinputfile): + """ + Write the information needed by the observation operator to a file. Return the filename that was written for later use + + """ + f = io.CT_CDF(obsinputfile, method='create') + logging.debug('Creating new observations file for ObservationOperator (%s)' % obsinputfile) + + dimid = f.add_dim('obs', len(self.datalist)) + dim200char = f.add_dim('string_of200chars', 200) + dimcalcomp = f.add_dim('calendar_components', 6) + + if len(self.datalist) == 0: + f.close() + #return obsinputfile + + data = self.getvalues('id') + + savedict = io.std_savedict.copy() + savedict['name'] = "obs_num" + savedict['dtype'] = "int" + savedict['long_name'] = "Unique_Dataset_observation_index_number" + savedict['units'] = "" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED." + f.add_data(savedict) + + data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate') ] + + savedict = io.std_savedict.copy() + savedict['dtype'] = "int" + savedict['name'] = "date_components" + savedict['units'] = "integer components of UTC date/time" + savedict['dims'] = dimid + dimcalcomp + savedict['values'] = data + savedict['missing_value'] = -9 + savedict['comment'] = "Calendar date components as integers. Times and dates are UTC." + savedict['order'] = "year, month, day, hour, minute, second" + f.add_data(savedict) + + data = self.getvalues('lat') + + savedict = io.std_savedict.copy() + savedict['name'] = "latitude" + savedict['units'] = "degrees_north" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -999.9 + f.add_data(savedict) + + data = self.getvalues('lon') + + savedict = io.std_savedict.copy() + savedict['name'] = "longitude" + savedict['units'] = "degrees_east" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -999.9 + f.add_data(savedict) + + data = self.getvalues('height') + + savedict = io.std_savedict.copy() + savedict['name'] = "altitude" + savedict['units'] = "meters_above_sea_level" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -999.9 + f.add_data(savedict) + + data = self.getvalues('samplingstrategy') + + savedict = io.std_savedict.copy() + savedict['dtype'] = "int" + savedict['name'] = "sampling_strategy" + savedict['units'] = "NA" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -9 + f.add_data(savedict) + + data = self.getvalues('evn') + + savedict = io.std_savedict.copy() + savedict['dtype'] = "char" + savedict['name'] = "obs_id" + savedict['units'] = "NOAA database identifier" + savedict['dims'] = dimid + dim200char + savedict['values'] = data + savedict['missing_value'] = '!' + f.add_data(savedict) + + f.close() + + logging.debug("Successfully wrote data to obs file") + logging.info("Sample input file for obs operator now in place [%s]" % obsinputfile) + + + + + def add_model_data_mismatch(self, filename): + """ + Get the model-data mismatch values for this cycle. + + (1) Open a sites_weights file + (2) Parse the data + (3) Compare site list against data + (4) Take care of double sites, etc + + """ + + if not os.path.exists(filename): + msg = 'Could not find the required sites.rc input file (%s)' % filename + logging.error(msg) + raise IOError, msg + else: + self.sites_file = filename + + sites_weights = rc.read(self.sites_file) + + self.rejection_threshold = int(sites_weights['obs.rejection.threshold']) + self.global_R_scaling = float(sites_weights['global.R.scaling']) + self.n_site_categories = int(sites_weights['n.site.categories']) + self.n_sites_active = int(sites_weights['n.sites.active']) + self.n_sites_moved = int(sites_weights['n.sites.moved']) + + logging.debug('Model-data mismatch rejection threshold: %d ' % self.rejection_threshold) + logging.debug('Model-data mismatch scaling factor : %s ' % self.global_R_scaling) + logging.debug('Model-data mismatch site categories : %d ' % self.n_site_categories) + logging.debug('Model-data mismatch active sites : %d ' % self.n_sites_active) + logging.debug('Model-data mismatch moved sites : %d ' % self.n_sites_moved) + + cats = [k for k in sites_weights.keys() if 'site.category' in k] + + SiteCategories = {} + for key in cats: + name, error, may_localize, may_reject = sites_weights[key].split(';') + name = name.strip().lower() + error = float(error) + may_reject = ("TRUE" in may_reject.upper()) + may_localize = ("TRUE" in may_localize.upper()) + SiteCategories[name] = {'error':error, 'may_localize':may_localize, 'may_reject':may_reject} + #print name,SiteCategories[name] + + active = [k for k in sites_weights.keys() if 'site.active' in k] + + site_info = {} + for key in active: + sitename, sitecategory = sites_weights[key].split(';') + sitename = sitename.strip().lower() + sitecategory = sitecategory.strip().lower() + site_info[sitename] = SiteCategories[sitecategory] + #print sitename,site_info[sitename] + + for obs in self.datalist: + obs.mdm = 1000.0 # default is very high model-data-mismatch, until explicitly set by script + if site_info.has_key(obs.code): + logging.debug("Observation found (%s)" % obs.code) + obs.mdm = site_info[obs.code]['error'] * self.global_R_scaling + obs.may_localize = site_info[obs.code]['may_localize'] + obs.may_reject = site_info[obs.code]['may_reject'] + else: + logging.warning("Observation NOT found (%s, %s), please check sites.rc file (%s) !!!" % (obs.code, identifier, self.sites_file)) + # raise IOError + obs.flag = 99 + logging.debug("obs %s model-data-mismatch: %s" %(obs.code, obs.mdm)) + + # Add site_info dictionary to the Observations object for future use + + self.site_info = site_info + + def write_sample_auxiliary(self, auxoutputfile): + """ + Write selected information contained in the Observations object to a file. + + """ + + f = io.CT_CDF(auxoutputfile, method='create') + logging.debug('Creating new auxiliary sample output file for postprocessing (%s)' % auxoutputfile) + + dimid = f.add_dim('obs', len(self.datalist)) + dim200char = f.add_dim('string_of200chars', 200) + dimcalcomp = f.add_dim('calendar_components', 6) + + if len(self.datalist) == 0: + f.close() + #return outfile + + data = self.getvalues('id') + + savedict = io.std_savedict.copy() + savedict['name'] = "obs_num" + savedict['dtype'] = "int" + savedict['long_name'] = "Unique_Dataset_observation_index_number" + savedict['units'] = "" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED." + f.add_data(savedict) + + data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate')] + + savedict = io.std_savedict.copy() + savedict['dtype'] = "int" + savedict['name'] = "date_components" + savedict['units'] = "integer components of UTC date/time" + savedict['dims'] = dimid + dimcalcomp + savedict['values'] = data + savedict['missing_value'] = -9 + savedict['comment'] = "Calendar date components as integers. Times and dates are UTC." + savedict['order'] = "year, month, day, hour, minute, second" + f.add_data(savedict) + + data = self.getvalues('obs') + + savedict = io.std_savedict.copy() + savedict['name'] = "observed" + savedict['long_name'] = "observedvalues" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = 'Observations used in optimization' + f.add_data(savedict) + + data = self.getvalues('mdm') + + savedict = io.std_savedict.copy() + savedict['name'] = "modeldatamismatch" + savedict['long_name'] = "modeldatamismatch" + savedict['units'] = "[mol mol-1]" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = 'Standard deviation of mole fractions resulting from model-data mismatch' + f.add_data(savedict) + + data = self.getvalues('simulated') + + dimmembers = f.add_dim('members', data.shape[1]) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamples" + savedict['long_name'] = "modelsamples for all ensemble members" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + dimmembers + savedict['values'] = data.tolist() + savedict['comment'] = 'simulated mole fractions based on optimized state vector' + f.add_data(savedict) + + data = self.getvalues('fromfile') + + savedict = io.std_savedict.copy() + savedict['name'] = "inputfilename" + savedict['long_name'] = "name of file where original obs data was taken from" + savedict['dtype'] = "char" + savedict['dims'] = dimid + dim200char + savedict['values'] = data + savedict['missing_value'] = '!' + f.add_data(savedict) + + data = self.getvalues('code') + + savedict = io.std_savedict.copy() + savedict['name'] = "sitecode" + savedict['long_name'] = "site code propagated from observation file" + savedict['dtype'] = "char" + savedict['dims'] = dimid + dim200char + savedict['values'] = data + savedict['missing_value'] = '!' + f.add_data(savedict) + + f.close() + + logging.debug("Successfully wrote data to auxiliary sample output file (%s)" % auxoutputfile) + + #return outfile + + +################### End Class CH4Observations ################### + + + +################### Begin Class MoleFractionSample ################### + +class MoleFractionSample(object): + """ + Holds the data that defines a mole fraction Sample in the data assimilation framework. Sor far, this includes all + attributes listed below in the __init__ method. One can additionally make more types of data, or make new + objects for specific projects. + + """ + + def __init__(self, idx, xdate, code='XXX', obs=0.0, simulated=0.0, resid=0.0, hphr=0.0, mdm=0.0, flag=0, height=0.0, lat= -999., lon= -999., evn='0000', species='ch4', samplingstrategy=1, sdev=0.0, fromfile='none.nc'): + self.code = code.strip() # Site code + self.xdate = xdate # Date of obs + self.obs = obs # Value observed + self.simulated = simulated # Value simulated by model + self.resid = resid # Mole fraction residuals + self.hphr = hphr # Mole fraction prior uncertainty from fluxes and (HPH) and model data mismatch (R) + self.mdm = mdm # Model data mismatch + self.may_localize = True # Whether sample may be localized in optimizer + self.may_reject = True # Whether sample may be rejected if outside threshold + self.flag = flag # Flag + self.height = height # Sample height + self.lat = lat # Sample lat + self.lon = lon # Sample lon + self.id = idx # ID number + self.evn = evn # Event number + self.sdev = sdev # standard deviation of ensemble + self.masl = True # Sample is in Meters Above Sea Level + self.mag = not self.masl # Sample is in Meters Above Ground + self.species = species.strip() + self.samplingstrategy = samplingstrategy + self.fromfile = fromfile # netcdf filename inside observation distribution, to write back later + +################### End Class MoleFractionSample ################### + + +if __name__ == "__main__": + pass diff --git a/da/methane/optimizer.py b/da/methane/optimizer.py new file mode 100755 index 0000000000000000000000000000000000000000..c27feaca19cb52b74f652f823e2ed2bd5c388f8e --- /dev/null +++ b/da/methane/optimizer.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python +# optimizer.py + +""" +Author : Aki + +Revision History: +File revised on Feb 2017. + +""" + +import os +import sys +sys.path.append(os.getcwd()) +import logging +import numpy as np +from da.baseclasses.optimizer import Optimizer + +identifier = 'Ensemble Square Root Filter' +version = '0.0' + +################### Begin Class MethaneOptimizer ################### + +class MethaneOptimizer(Optimizer): + """ + This creates an instance of a CarbonTracker optimization object. The base class it derives from is the optimizer object. + Additionally, this MethaneOptimizer implements a special localization option following the CT2007 method. + + All other methods are inherited from the base class Optimizer. + """ + #def getid(self): + # return identifier + + #def getversion(self): + # return version + + + def set_localization(self, loctype='None'): + """ determine which localization to use """ + + if loctype == 'CT2007': + self.localization = True + self.localizetype = 'CT2007' + #T-test values for two-tailed student's T-test using 95% confidence interval for some options of nmembers + if self.nmembers == 50: + self.tvalue = 2.0086 + elif self.nmembers == 12: + self.tvalue = 2.1789 + elif self.nmembers == 20: + self.tvalue = 2.0860 + elif self.nmembers == 60: + self.tvalue = 2.0003 + elif self.nmembers == 70: + self.tvalue = 1.9944 + elif self.nmembers == 100: + self.tvalue = 1.9840 + elif self.nmembers == 120: + self.tvalue = 1.97993 + elif self.nmembers == 150: + self.tvalue = 1.97591 + elif self.nmembers == 180: + self.tvalue = 1.973231 + elif self.nmembers == 200: + self.tvalue = 1.9719 + elif self.nmembers == 240: + self.tvalue = 1.969898 + elif self.nmembers == 480: + self.tvalue = 1.964918 + elif self.nmembers == 500: + self.tvalue = 1.96472 + elif self.nmembers == 1000: + self.tvalue = 1.962339 + else: self.tvalue = 0.0 + else: + self.localization = False + self.localizetype = 'None' + + logging.info("Current localization option is set to %s" % self.localizetype) + if self.localization == True: + if self.tvalue == 0: + logging.error("Critical tvalue for localization not set for %i ensemble members"%(self.nmembers)) + sys.exit(2) + else: logging.info("Used critical tvalue %0.05f is based on 95%% probability and %i ensemble members in a two-tailed student's T-test"%(self.tvalue,self.nmembers)) + + def localize(self, n): + """ localize the Kalman Gain matrix """ + import numpy as np + + if not self.localization: + logging.debug('Not localized observation %i' % self.obs_ids[n]) + return + if self.localizetype == 'CT2007': + count_localized = 0 + for r in range(self.nlag * self.nparams): + corr = np.corrcoef(self.HX_prime[n, :], self.X_prime[r, :].squeeze())[0, 1] + prob = corr / np.sqrt((1.000000001 - corr ** 2) / (self.nmembers - 2)) + if abs(prob) < self.tvalue: + self.KG[r] = 0.0 + count_localized = count_localized + 1 + logging.debug('Localized observation %i, %i%% of values set to 0' % (self.obs_ids[n],count_localized*100/(self.nlag * self.nparams))) + + def set_algorithm(self, algorithm='Serial'): + """ determine which minimum least squares algorithm to use """ + + if algorithm == 'Serial': + self.algorithm = 'Serial' + else: + self.algorithm = 'Bulk' + + logging.info("Current minimum least squares algorithm is set to %s" % self.algorithm) + + + +################### End Class MethaneOptimizer ################### + +if __name__ == "__main__": + pass diff --git a/da/methane/pipeline.py b/da/methane/pipeline.py new file mode 100755 index 0000000000000000000000000000000000000000..01e2456dc4de02dabb4cdc19a638473565df5e2e --- /dev/null +++ b/da/methane/pipeline.py @@ -0,0 +1,408 @@ +#!/usr/bin/env python +# pipeline.py + +""" +author:: Aki + +Revision History: +File revised on Feb 2017. + +The module functions are mostly similar to baseclass, but the all functions needed copies to properly update the functions. + +""" +import logging +import os +import sys +import datetime +import copy + +header = '\n\n *************************************** ' +footer = ' *************************************** \n ' + +def ensemble_smoother_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator, optimizer): + """ The main point of entry for the pipeline """ + sys.path.append(os.getcwd()) + + logging.info(header + "Initializing current cycle" + footer) + start_job(dacycle, dasystem, platform, statevector, samples, obsoperator) + + prepare_state(dacycle, statevector) + sample_state(dacycle, samples, statevector, obsoperator) + + invert(dacycle, statevector, optimizer) + + advance(dacycle, samples, statevector, obsoperator) + + save_and_submit(dacycle, statevector) + logging.info("Cycle finished...exiting pipeline") + +def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator): + """ The main point of entry for the pipeline """ + sys.path.append(os.getcwd()) + + logging.info(header + "Initializing current cycle" + footer) + start_job(dacycle, dasystem, platform, statevector, samples, obsoperator) + + if dacycle.has_key('forward.savestate.exceptsam'): + sam = (dacycle['forward.savestate.exceptsam'].upper() in ["TRUE","T","YES","Y"]) + else: + sam = False + + if dacycle.has_key('forward.savestate.dir'): + fwddir = dacycle['forward.savestate.dir'] + else: + logging.debug("No forward.savestate.dir key found in rc-file, proceeding with self-constructed prior parameters") + fwddir = False + + if dacycle.has_key('forward.savestate.legacy'): + legacy = (dacycle['forward.savestate.legacy'].upper() in ["TRUE","T","YES","Y"]) + else: + legacy = False + logging.debug("No forward.savestate.legacy key found in rc-file") + + if not fwddir: + # Simply make a prior statevector using the normal method + + + prepare_state(dacycle, statevector)#LU tutaj zamiast tego raczej to stworzenie nowej kowariancji i ensembli bo pozostale rzeczy sa na gorze i na doel. + + else: + # Read prior information from another simulation into the statevector. + # This loads the results from another assimilation experiment into the current statevector + + if sam: + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') + statevector.read_from_file_exceptsam(filename, 'prior') + elif not legacy: + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d')) + statevector.read_from_file(filename, 'prior') + else: + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') + statevector.read_from_legacy_file(filename, 'prior') + + + # We write this "prior" statevector to the restart directory, so we can later also populate it with the posterior statevector + # Note that we could achieve the same by just copying the wanted forward savestate.nc file to the restart folder of our current + # experiment, but then it would already contain a posterior field as well which we will try to write in save_and_submit. + # This could cause problems. Moreover, this method allows us to read older formatted savestate.nc files (legacy) and write them into + # the current format through the "write_to_file" method. + + savefilename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + statevector.write_to_file(savefilename, 'prior') + + # Now read optimized fluxes which we will actually use to propagate through the system + + if not fwddir: + + # if there is no forward dir specified, we simply run forward with unoptimized prior fluxes in the statevector + logging.info("Running forward with prior savestate from: %s"%savefilename) + + else: + + # Read posterior information from another simulation into the statevector. + # This loads the results from another assimilation experiment into the current statevector + + if sam: + statevector.read_from_file_exceptsam(filename, 'opt') + elif not legacy: + statevector.read_from_file(filename, 'opt') + else: + statevector.read_from_legacy_file(filename, 'opt') + + logging.info("Running forward with optimized savestate from: %s"%filename) + + # Finally, we run forward with these parameters + + advance(dacycle, samples, statevector, obsoperator) + + # In save_and_submit, the posterior statevector will be added to the savestate.nc file, and it is added to the copy list. + # This way, we have both the prior and posterior data from another run copied into this assimilation, for later analysis. + + save_and_submit(dacycle, statevector) + analysis_pipeline(dacycle, platform, dasystem, samples, statevector) + + logging.info("Cycle finished...exiting pipeline") +#################################################################################################### + +def analysis_pipeline(dacycle, platform, dasystem, samples, statevector): + """ Main entry point for analysis of ctdas results """ + + # Aki: CTE-CH4 does not yet have routine to calculate other aggregared results through CTDAS. + # In CTE-CH4 the postprocessing is done outside CTDAS. Contact Aki if needed. + + from da.methane_2lambdas.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data + + logging.info(header + "Starting analysis" + footer) + + dasystem.validate() + dacycle.dasystem = dasystem + dacycle.daplatform = platform + dacycle.setup() + statevector.setup(dacycle) + + logging.info(header + "Starting weekly averages" + footer) + + save_weekly_avg_1x1_data(dacycle, statevector) + save_weekly_avg_state_data(dacycle, statevector) + logging.info(header + "Finished analysis" + footer) + + +def archive_pipeline(dacycle, platform, dasystem): + """ Main entry point for archiving of output from one disk/system to another """ + + if not dacycle.has_key('task.rsync'): + logging.info('rsync task not found, not starting automatic backup...') + return + else: + logging.info('rsync task found, starting automatic backup...') + + for task in dacycle['task.rsync'].split(): + sourcedirs = dacycle['task.rsync.%s.sourcedirs'%task] + destdir = dacycle['task.rsync.%s.destinationdir'%task] + + rsyncflags = dacycle['task.rsync.%s.flags'%task] + + # file ID and names + jobid = dacycle['time.end'].strftime('%Y%m%d') + targetdir = os.path.join(dacycle['dir.exec']) + jobfile = os.path.join(targetdir, 'jb.rsync.%s.%s.jb' % (task,jobid) ) + logfile = os.path.join(targetdir, 'jb.rsync.%s.%s.log' % (task,jobid) ) + # Template and commands for job + jobparams = {'jobname':"r.%s" % jobid, 'jobnodes': '1', 'jobtime': '1:00:00', 'joblog': logfile, 'errfile': logfile} + + if platform.ID == 'cartesius': + jobparams['jobqueue'] = 'staging' + + template = platform.get_job_template(jobparams) + for sourcedir in sourcedirs.split(): + execcommand = "\nrsync %s %s %s\n" % (rsyncflags, sourcedir,destdir,) + template += execcommand + + # write and submit + platform.write_job(jobfile, template, jobid) + jobid = platform.submit_job(jobfile, joblog=logfile) + + +def start_job(dacycle, dasystem, platform, statevector, samples, obsoperator): + """ Set up the job specific directory structure and create an expanded rc-file """ + + dasystem.validate() + dacycle.dasystem = dasystem + dacycle.daplatform = platform + dacycle.setup() + #statevector.dacycle = dacycle # also embed object in statevector so it can access cycle information for I/O etc + #samples.dacycle = dacycle # also embed object in samples object so it can access cycle information for I/O etc + #obsoperator.dacycle = dacycle # also embed object in obsoperator object so it can access cycle information for I/O etc + obsoperator.setup(dacycle) # Setup Observation Operator + statevector.setup(dacycle) + +def prepare_state(dacycle, statevector): + """ Set up the input data for the forward model: obs and parameters/fluxes""" + + # We now have an empty statevector object that we need to populate with data. If this is a continuation from a previous cycle, we can read + # the previous statevector values from a NetCDF file in the restart directory. If this is the first cycle, we need to populate the statevector + # with new values for each week. After we have constructed the statevector, it will be propagated by one cycle length so it is ready to be used + # in the current cycle + + logging.info(header + "starting prepare_state" + footer) + + if not dacycle['time.restart']: + + # Fill each week from n=1 to n=nlag with a new ensemble + + for n in range(statevector.nlag): + date = dacycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(dacycle['time.cycle'])) + cov = statevector.get_covariance(date, dacycle) + statevector.make_new_ensemble(n, cov) + + else: + + # Read the statevector data from file + + #saved_sv = os.path.join(dacycle['dir.restart.current'], 'savestate.nc') + + + saved_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['da.restart.tstamp'].strftime('%Y%m%d')) + statevector.read_from_file(saved_sv) # by default will read "opt"(imized) variables, and then propagate + + # Now propagate the ensemble by one cycle to prepare for the current cycle + statevector.propagate(dacycle) + + # Finally, also write the statevector to a file so that we can always access the a-priori information + + current_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + statevector.write_to_file(current_sv, 'prior') # write prior info + +def sample_state(dacycle, samples, statevector, obsoperator): + """ Sample the filter state for the inversion """ + + + # Before a forecast step, save all the data to a save/tmp directory so we can later recover it before the propagation step. + # This is especially important for: + # (i) The transport model restart data which holds the background mole fractions. This is needed to run the model one cycle forward + # (ii) The random numbers (or the seed for the random number generator) so we can recreate the ensembles if needed + + #status = dacycle.MoveSaveData(io_option='store',save_option='partial',filter=[]) + #msg = "All restart data have been copied to the save/tmp directory for future use" ; logging.debug(msg) + logging.info(header + "starting sample_state" + footer) + nlag = int(dacycle['time.nlag']) + logging.info("Sampling model will be run over %d cycles" % nlag) + + obsoperator.get_initial_data() + + for lag in range(nlag): + logging.info(header + ".....Ensemble Kalman Filter at lag %d" % (lag + 1)) + + ############# Perform the actual sampling loop ##################### + + sample_step(dacycle, samples, statevector, obsoperator, lag) + + logging.info("statevector now carries %d samples" % statevector.nobs) + + +def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False): + """ Perform all actions needed to sample one cycle """ + + + # First set up the information for time start and time end of this sample + dacycle.set_sample_times(lag) + + startdate = dacycle['time.sample.start'] + enddate = dacycle['time.sample.end'] + dacycle['time.sample.window'] = lag + dacycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H")) + + logging.info("New simulation interval set : ") + logging.info(" start date : %s " % startdate.strftime('%F %H:%M')) + logging.info(" end date : %s " % enddate.strftime('%F %H:%M')) + logging.info(" file stamp: %s " % dacycle['time.sample.stamp']) + + + # Implement something that writes the ensemble member parameter info to file, or manipulates them further into the + # type of info needed in your transport model + + statevector.write_members_to_file(lag, dacycle['dir.input'],dacycle) #Aki: dacycle as input + + samples.setup(dacycle) + samples.add_observations() + + # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future?? + + #Aki: obs_file_rc and site_and_weights_rc is not distinguished in CTE-CH4 + samples.add_model_data_mismatch(dacycle.dasystem['sites_file']) + + sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp']) + samples.write_sample_coords(sampling_coords_file) + # Write filename to dacycle, and to output collection list + dacycle['ObsOperator.inputfile'] = sampling_coords_file + + # Run the observation operator + + obsoperator.run_forecast_model() + + + # Read forecast model samples that were written to NetCDF files by each member. Add them to the exisiting + # Observation object for each sample loop. This data fill be written to file in the output folder for each sample cycle. + + + # We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates + # one file per member, some logic needs to be included to merge all files!!! + simulated_file = os.path.join(obsoperator.outputdir, 'flask_output.%s.nc' % dacycle['time.sample.stamp']) + if os.path.exists(sampling_coords_file): + samples.add_simulations(simulated_file) + else: logging.warning("No simulations added, because input file does not exist (no samples found in obspack)") + + # Now write a small file that holds for each observation a number of values that we need in postprocessing + + #filename = samples.write_sample_coords() + + # Now add the observations that need to be assimilated to the statevector. + # Note that obs will only be added to the statevector if either this is the first step (restart=False), or lag==nlag + # This is to make sure that the first step of the system uses all observations available, while the subsequent + # steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only + # (and at least) once # in the assimilation + + + if not advance: + if dacycle['time.restart'] == False or lag == int(dacycle['time.nlag']) - 1: + statevector.obs_to_assimilate += (copy.deepcopy(samples),) + statevector.nobs += samples.getlength() + logging.debug("Added samples from the observation operator to the assimilated obs list in the statevector") + + else: + statevector.obs_to_assimilate += (None,) + + +def invert(dacycle, statevector, optimizer): + """ Perform the inverse calculation """ + logging.info(header + "starting invert" + footer) + dims = (int(dacycle['time.nlag']), + int(dacycle['da.optimizer.nmembers']), + statevector.nparams, # Aki: instead of reading from rc + statevector.nobs) + + if not dacycle.dasystem.has_key('opt.algorithm'): + logging.info("There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)") + logging.info("...using serial algorithm as default...") + optimizer.set_algorithm('Serial') + elif dacycle.dasystem['opt.algorithm'] == 'serial': + logging.info("Using the serial minimum least squares algorithm to solve ENKF equations") + optimizer.set_algorithm('Serial') + elif dacycle.dasystem['opt.algorithm'] == 'bulk': + logging.info("Using the bulk minimum least squares algorithm to solve ENKF equations") + optimizer.set_algorithm('Bulk') + + optimizer.setup(dims) + optimizer.state_to_matrix(statevector) + + diagnostics_file = os.path.join(dacycle['dir.output'], 'optimizer.%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + + optimizer.write_diagnostics(diagnostics_file, 'prior') + optimizer.set_localization(dacycle['da.system.localization']) + + if optimizer.algorithm == 'Serial': + optimizer.serial_minimum_least_squares() + else: + optimizer.bulk_minimum_least_squares() + + optimizer.matrix_to_state(statevector) + optimizer.write_diagnostics(diagnostics_file, 'optimized') + + + +def advance(dacycle, samples, statevector, obsoperator): + """ Advance the filter state to the next step """ + + # This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance) + + # Then, restore model state from the start of the filter + logging.info(header + "starting advance" + footer) + logging.info("Sampling model will be run over 1 cycle") + + obsoperator.get_initial_data() + + sample_step(dacycle, samples, statevector, obsoperator, 0, True) + + dacycle.restart_filelist.extend(obsoperator.restart_filelist) + dacycle.output_filelist.extend(obsoperator.output_filelist) + logging.debug("Appended ObsOperator restart and output file lists to dacycle for collection ") + + dacycle.output_filelist.append(dacycle['ObsOperator.inputfile']) + logging.debug("Appended Observation filename to dacycle for collection ") + + sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp']) + if os.path.exists(sampling_coords_file): + outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp']) + samples.write_sample_auxiliary(outfile) + else: logging.warning("Sample auxiliary output not written, because input file does not exist (no samples found in obspack)") + +def save_and_submit(dacycle, statevector): + """ Save the model state and submit the next job """ + logging.info(header + "starting save_and_submit" + footer) + + filename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + statevector.write_to_file(filename, 'opt') + + dacycle.output_filelist.append(filename) + dacycle.finalize() + diff --git a/da/methane/standardvariables.py b/da/methane/standardvariables.py new file mode 100755 index 0000000000000000000000000000000000000000..6f046b0ea4f5323e457ab7e003de59da4123730b --- /dev/null +++ b/da/methane/standardvariables.py @@ -0,0 +1,277 @@ +standard_variables = { 'bio_flux_prior' : {'name' : 'bio_flux_prior',\ + 'units' : 'mol m-2 s-1' ,\ + 'long_name' : 'Surface flux of methane, terrestrial biosphere, not optimized ', \ + 'comment' : 'time-interval average, centered on times in the date axis', \ + 'standard_name' : 'surface_methane_mole_flux', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'bio_flux_opt' : {'name' : 'bio_flux_opt',\ + 'units' : 'mol m-2 s-1' ,\ + 'long_name' : 'Surface flux of methane, terrestrial biosphere , optimized ', \ + 'comment' : 'time-interval average, centered on times in the date axis', \ + 'standard_name' : 'surface_methane_mole_flux', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'fossil_flux_prior' : {'name' : 'fossil_flux_prior',\ + 'units' : 'mol m-2 s-1' ,\ + 'long_name' : 'Surface flux of methane, anthropogenic , not optimized ', \ + 'comment' : 'time-interval average, centered on times in the date axis', \ + 'standard_name' : 'surface_methane_mole_flux', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'fossil_flux_opt' : {'name' : 'fossil_flux_opt',\ + 'units' : 'mol m-2 s-1' ,\ + 'long_name' : 'Surface flux of methane, anthropogenic , optimized ', \ + 'comment' : 'time-interval average, centered on times in the date axis', \ + 'standard_name' : 'surface_methane_mole_flux', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'ocn_flux_imp' : {'name' : 'ocn_flux_imp',\ + 'units' : 'mol m-2 s-1' ,\ + 'long_name' : 'Surface flux of methane, open ocean , imposed ', \ + 'comment' : 'time-interval average, centered on times in the date axis', \ + 'standard_name' : 'surface_methane_mole_flux', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'fire_flux_imp' : {'name' : 'fire_flux_imp',\ + 'units' : 'mol m-2 s-1' ,\ + 'long_name' : 'Surface flux of methane, biomass burning , imposed ', \ + 'comment' : 'time-interval average, centered on times in the date axis', \ + 'standard_name' : 'surface_methane_mole_flux', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'term_flux_imp' : {'name' : 'term_flux_imp',\ + 'units' : 'mol m-2 s-1' ,\ + 'long_name' : 'Surface flux of methane, termites , imposed ', \ + 'comment' : 'time-interval average, centered on times in the date axis', \ + 'standard_name' : 'surface_methane_mole_flux', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'bio_flux_prior_cov' : {'name' : 'bio_flux_prior_cov',\ + 'units' : '[mol m-2 s-1]^2' ,\ + 'long_name' : 'Covariance of surface flux of methane, terrestrial vegetation , not optimized ', \ + 'comment' : 'time-interval average, centered on times in the date axis', \ + 'standard_name' : '', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'bio_flux_prior_ensemble' : {'name' : 'bio_flux_prior_ensemble',\ + 'units' : '[mol m-2 s-1]' ,\ + 'long_name' : 'Ensemble of surface flux of methane, terrestrial vegetation , not optimized ', \ + 'comment' : "This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance", \ + 'standard_name' : '', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'bio_flux_opt_cov' : {'name' : 'bio_flux_opt_cov',\ + 'units' : '[mol m-2 s-1]^2' ,\ + 'long_name' : 'Covariance of surface flux of methane, terrestrial vegetation , optimized ', \ + 'comment' : 'time-interval average, centered on times in the date axis', \ + 'standard_name' : '', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'bio_flux_opt_ensemble' : {'name' : 'bio_flux_opt_ensemble',\ + 'units' : '[mol m-2 s-1]' ,\ + 'long_name' : 'Ensemble of surface flux of methane, terrestrial vegetation , optimized ', \ + 'comment' : "This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance", \ + 'standard_name' : '', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'fossil_flux_prior_cov' : {'name' : 'fossil_flux_prior_cov',\ + 'units' : '[mol m-2 s-1]^2' ,\ + 'long_name' : 'Covariance of surface flux of methane, anthropogenic , not optimized ', \ + 'comment' : 'time-interval average, centered on times in the date axis', \ + 'standard_name' : '', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'fossil_flux_prior_ensemble' : {'name' : 'fossil_flux_prior_ensemble',\ + 'units' : '[mol m-2 s-1]' ,\ + 'long_name' : 'Ensemble of surface flux of methane, anthropogenic , not optimized ', \ + 'comment' : "This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance", \ + 'standard_name' : '', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'fossil_flux_opt_cov' : {'name' : 'fossil_flux_opt_cov',\ + 'units' : '[mol m-2 s-1]^2' ,\ + 'long_name' : 'Covariance of surface flux of methane, anthropogenic , optimized ', \ + 'comment' : 'time-interval average, centered on times in the date axis', \ + 'standard_name' : '', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'fossil_flux_opt_ensemble' : {'name' : 'fossil_flux_opt_ensemble',\ + 'units' : '[mol m-2 s-1]' ,\ + 'long_name' : 'Ensemble of surface flux of methane, anthropogenic , optimized ', \ + 'comment' : "This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance", \ + 'standard_name' : '', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'decimal_date' : {'name' : 'decimal_date',\ + 'units' : 'years' ,\ + 'long_name' : 'dates and times', \ + 'comment' : 'time-interval average, centered on times in the date axis', \ + 'standard_name' : 'date', \ + 'dims' : (), \ + 'dtype' : 'double', \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'date' : {'name' : 'date',\ + 'units' : 'days since 2000-01-01 00:00:00 UTC' ,\ + 'long_name' : 'UTC dates and times', \ + 'comment' : 'time-interval average, centered on times in the date axis', \ + 'standard_name' : 'date', \ + 'dims' : (), \ + 'dtype' : 'double', \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'idate' : {'name' : 'idate',\ + 'units' : 'yyyy MM dd hh mm ss ' ,\ + 'long_name' : 'integer components of date and time', \ + 'standard_name' : 'calendar_components', \ + 'comment' : 'time-interval average, centered on times in the date axis', \ + 'dims' : (), \ + 'dtype' : 'int', \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'latitude' : {'name' : 'latitude',\ + 'units' : 'degrees_north ' ,\ + 'long_name' : 'latitude', \ + 'standard_name' : 'latitude', \ + 'comment' : 'center of interval',\ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'longitude' : {'name' : 'longitude',\ + 'units' : 'degrees_east ' ,\ + 'long_name' : 'longitude', \ + 'standard_name' : 'longitude', \ + 'comment' : 'center of interval',\ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'height' : {'name' : 'height',\ + 'units' : 'masl ' ,\ + 'long_name' : 'height_above_ground_level', \ + 'standard_name' : 'height_above_ground_level', \ + 'comment' : 'value is meters above sea level',\ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'cell_area' : {'name' : 'cell_area',\ + 'units' : 'm2 ' ,\ + 'long_name' : 'horizontal ara of a gridcell', \ + 'standard_name' : 'cell_area', \ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'ch4' : {'name' : 'ch4',\ + 'units' : 'micromol mol-1 ' ,\ + 'long_name' : 'mole_fraction_of_methane_in_air', \ + 'standard_name' : 'mole_fraction_of_methane_in_air', \ + 'comment' : '',\ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'meanstate' : {'name' : 'statevectormean',\ + 'units' : 'unitless' ,\ + 'long_name' : 'mean_value_of_state_vector', \ + 'standard_name' : 'mean_value_of_state_vector', \ + 'comment' : '',\ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'ensemblestate': {'name' : 'statevectorensemble',\ + 'units' : 'unitless' ,\ + 'long_name' : 'ensemble_value_of_state_vector', \ + 'standard_name' : 'ensemble_value_of_state_vector', \ + 'comment' : '',\ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'meanstate_prior' : {'name' : 'statevectormean_prior',\ + 'units' : 'unitless' ,\ + 'long_name' : 'mean_value_of_state_vector_prior', \ + 'standard_name' : 'mean_value_of_state_vector_prior', \ + 'comment' : '',\ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'ensemblestate_prior': {'name' : 'statevectorensemble_prior',\ + 'units' : 'unitless' ,\ + 'long_name' : 'ensemble_value_of_state_vector_prior', \ + 'standard_name' : 'ensemble_value_of_state_vector_prior', \ + 'comment' : '',\ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'meanstate_opt' : {'name' : 'statevectormean_opt',\ + 'units' : 'unitless' ,\ + 'long_name' : 'mean_value_of_state_vector_optimized', \ + 'standard_name' : 'mean_value_of_state_vector_opt', \ + 'comment' : '',\ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'ensemblestate_opt': {'name' : 'statevectorensemble_opt',\ + 'units' : 'unitless' ,\ + 'long_name' : 'ensemble_value_of_state_vector_optimized', \ + 'standard_name' : 'ensemble_value_of_state_vector_opt', \ + 'comment' : '',\ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'unknown' : {'name' : '',\ + 'units' : '' ,\ + 'long_name' : '', \ + 'standard_name' : '', \ + 'comment' : '',\ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + } + + + + diff --git a/da/methane/statevector.py b/da/methane/statevector.py new file mode 100755 index 0000000000000000000000000000000000000000..5b202f8b413b99e7f533af2132177964cfdf3be3 --- /dev/null +++ b/da/methane/statevector.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python +# ct_statevector_tools.py + +""" +Author : aki + +Revision History: +File created on 18 Nov 2013. + +""" + +import os +import sys +sys.path.append(os.getcwd()) + +import logging +import numpy as np +from da.baseclasses.statevector import StateVector, EnsembleMember + +import da.tools.io4 as io + +identifier = 'CarbonTracker Statevector' +version = '0.0' + +################### Begin Class CO2StateVector ################### + +class MethaneStateVector(StateVector): + """ This is a StateVector object for CarbonTracker. It has a private method to make new ensemble members """ + + def make_species_mask(self): + + self.speciesdict = {'ch4': np.ones(self.nparams)} + logging.debug("A species mask was created, only the following species are recognized in this system:") + for k in self.speciesdict.keys(): + logging.debug(" -> %s" % k) + + def get_covariance(self, date, dacycle): + """ Make a new ensemble from specified matrices, the attribute lag refers to the position in the state vector. + Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below. + The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag] + """ + try: + import matplotlib.pyplot as plt + except: + pass + + fullcov = np.zeros((self.nparams, self.nparams), float) + + for i in xrange(self.nparams): + #fullcov[i,i] = 1. # Identity matrix + fullcov[i,i] = 0.08 + fullcov[self.nparams-1,self.nparams-1] = 1.e-10 + + try: + plt.imshow(fullcov) + plt.colorbar() + plt.savefig('fullcovariancematrix.png') + plt.close('all') + logging.debug("Covariance matrix visualized for inspection") + except: + pass + + return fullcov + + def read_from_legacy_file(self, filename, qual='opt'): + """ + :param filename: the full filename for the input NetCDF file + :param qual: a string indicating whether to read the 'prior' or 'opt'(imized) StateVector from file + :rtype: None + + Read the StateVector information from a NetCDF file and put in a StateVector object + In principle the input file will have only one four datasets inside + called: + * `meanstate_prior`, dimensions [nlag, nparamaters] + * `ensemblestate_prior`, dimensions [nlag,nmembers, nparameters] + * `meanstate_opt`, dimensions [nlag, nparamaters] + * `ensemblestate_opt`, dimensions [nlag,nmembers, nparameters] + + This NetCDF information can be written to file using + :meth:`~da.baseclasses.statevector.StateVector.write_to_file` + + """ + + f = io.ct_read(filename, 'read') + + for n in range(self.nlag): + if qual == 'opt': + meanstate = f.get_variable('xac_%02d' % (n + 1)) + EnsembleMembers = f.get_variable('adX_%02d' % (n + 1)) + + elif qual == 'prior': + meanstate = f.get_variable('xpc_%02d' % (n + 1)) + EnsembleMembers = f.get_variable('pdX_%02d' % (n + 1)) + + if not self.ensemble_members[n] == []: + self.ensemble_members[n] = [] + logging.warning('Existing ensemble for lag=%d was removed to make place for newly read data' % (n + 1)) + + for m in range(self.nmembers): + newmember = EnsembleMember(m) + newmember.param_values = EnsembleMembers[m, :].flatten() + meanstate # add the mean to the deviations to hold the full parameter values + self.ensemble_members[n].append(newmember) + + + f.close() + + logging.info('Successfully read the State Vector from file (%s) ' % filename) + +################### End Class MethaneStateVector ################### + + +if __name__ == "__main__": + pass