From b28a71df588a3ecce1909e55e33a5071afd89b9a Mon Sep 17 00:00:00 2001 From: Joram <joramjd@gmail.com> Date: Tue, 8 Feb 2022 16:43:21 +0100 Subject: [PATCH] start of c13 under git version control --- da/analysis/ctc13/expand_fluxes_co2c13eps.py | 1295 +++++++++++++++++ .../ctc13/expand_molefractions_co2c13.py | 368 +++++ da/pipelines/pipeline_co2c13.py | 418 ++++++ 3 files changed, 2081 insertions(+) create mode 100644 da/analysis/ctc13/expand_fluxes_co2c13eps.py create mode 100644 da/analysis/ctc13/expand_molefractions_co2c13.py create mode 100644 da/pipelines/pipeline_co2c13.py diff --git a/da/analysis/ctc13/expand_fluxes_co2c13eps.py b/da/analysis/ctc13/expand_fluxes_co2c13eps.py new file mode 100644 index 00000000..3de57255 --- /dev/null +++ b/da/analysis/ctc13/expand_fluxes_co2c13eps.py @@ -0,0 +1,1295 @@ +#!/usr/bin/env python +# expand_fluxes.py +import sys +sys.path.append('../../') +import os +from datetime import datetime, timedelta + +import logging +import numpy as np +from da.tools.general import date2num, num2date +import da.ctc13.io4 as io +from da.analysis.tools_regions import globarea, state_to_grid +from da.tools.general import create_dirs +from da.analysis.tools_country import countryinfo # needed here +from da.analysis.tools_transcom import transcommask, ExtendedTCRegions + + +import da.analysis.tools_transcom as tc +import da.analysis.tools_country as ct +import da.analysis.tools_time as timetools + + + +""" +Author: Wouter Peters (Wouter.Peters@noaa.gov) + +Revision History: +File created on 21 Ocotber 2008. + +""" + +def proceed_dialog(txt, yes=['y', 'yes'], all=['a', 'all', 'yes-to-all']): + """ function to ask whether to proceed or not """ + response = raw_input(txt) + if response.lower() in yes: + return 1 + if response.lower() in all: + return 2 + return 0 + +def save_weekly_avg_1x1_data(dacycle, statevector): + """ + Function creates a NetCDF file with output on 1x1 degree grid. It uses the flux data written by the + :class:`~da.baseclasses.obsoperator.ObsOperator.py`, and multiplies these with the mapped parameters and + variance (not covariance!) from the :class:`~da.baseclasses.statevector.StateVector`. + + :param dacycle: a :class:`~da.tools.initexit.CycleControl` object + :param statevector: a :class:`~da.baseclasses.statevector.StateVector` + :rtype: None + """ +# + dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_flux1x1_weekly')) +# +# Some help variables +# + dectime0 = date2num(datetime(2000, 1, 1)) + dt = dacycle['cyclelength'] + startdate = dacycle['time.start'] + enddate = dacycle['time.end'] + nlag = statevector.nlag + + logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M')) + logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M')) + +# +# Create or open NetCDF output file +# + saveas = os.path.join(dirname, 'flux_1x1.%s.nc' % startdate.strftime('%Y-%m-%d')) + ncf = io.CT_CDF(saveas, 'write') + +# +# Create dimensions and lat/lon grid +# + dimgrid = ncf.add_latlon_dim() + dimensemble = ncf.add_dim('members', statevector.nmembers) + dimdate = ncf.add_date_dim() +# +# set title and tell GMT that we are using "pixel registration" +# + setattr(ncf, 'Title', 'CarbonTracker fluxes') + setattr(ncf, 'node_offset', 1) +# +# skip dataset if already in file +# + ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0 + skip = ncf.has_date(ncfdate) + if skip: + logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas)) + else: + +# +# if not, process this cycle. Start by getting flux input data from CTDAS +# + filename = os.path.join(dacycle['dir.output'], 'flux1x1_%s_%s.nc' % (startdate.strftime('%Y%m%d%H'), enddate.strftime('%Y%m%d%H'))) + + file = io.ct_read(filename, 'read') + bio = np.array(file.get_variable(dacycle.dasystem['background.co2.bio.flux'])) + ocean = np.array(file.get_variable(dacycle.dasystem['background.co2.ocean.flux'])) + fire = np.array(file.get_variable(dacycle.dasystem['background.co2.fires.flux'])) + fossil = np.array(file.get_variable(dacycle.dasystem['background.co2.fossil.flux'])) + gpp = np.array(file.get_variable(dacycle.dasystem['background.co2.gpp.flux'])) + bio13 = np.array(file.get_variable(dacycle.dasystem['background.c13.bio.flux'])) + ocean13 = np.array(file.get_variable(dacycle.dasystem['background.c13.ocean.flux'])) + gpp13 = np.array(file.get_variable(dacycle.dasystem['background.c13.gpp.flux'])) + #gpp13 = np.ma.masked_where(gpp13==0.,gpp13) + #gpp = np.ma.masked_where(gpp==0.,gpp) + gpp13[np.isnan(gpp13)]=0 + gpp[np.isnan(gpp)]=0 + epsbio = gpp13[:,:]/gpp[:,:] #((gpp13[:,:]/gpp[:,:]/0.011112)-1.)*1000. + logging.info('epsbio: %s, %s, %s' %(epsbio.shape, epsbio.sum(),epsbio[89,199])) + logging.info('gpp13: %s, %s, %s' %(gpp13.shape, gpp13.sum(),gpp13[89,199])) + logging.info('gpp: %s, %s, %s' %(gpp.shape, bio.sum(), gpp[89,199])) + + #mapped_parameters = np.array(file.get_variable(dacycle.dasystem['final.param.mean.1x1'])) + file.close() + + next = ncf.inq_unlimlen()[0] + + +# Start adding datasets from here on, both prior and posterior datasets for bio and ocn + + for prior in [True, False]: +# +# Now fill the statevector with the prior values for this time step. Note that the prior value for this time step +# occurred nlag time steps ago, so we make a shift in the output directory, but only if we are more than nlag cycle away from the start date.. +# + + if prior: + qual_short = 'prior' + for n in range(nlag, 0, -1): + priordate = enddate - timedelta(dt.days * n) + savedir = dacycle['dir.output'].replace(startdate.strftime('%Y%m%d'), priordate.strftime('%Y%m%d')) + filename = os.path.join(savedir, 'savestate_%s.nc' % priordate.strftime('%Y%m%d')) + if os.path.exists(filename): + statevector.read_from_file(filename, qual=qual_short) + gridmean, gridensemble = statevector.state_to_grid(lag=n) + gridmean_eps,gridensemble_eps = statevector.state_to_grid_eps(lag=n) + +# Replace the mean statevector by all ones (assumed priors) + + gridmean = statevector.vector2grid(vectordata=np.ones(statevector.nparams,)) + gridmean_eps = statevector.vector2grid_eps(vectordata = np.ones(statevector.nparams,)) + + logging.debug('Read prior dataset from file %s, sds %d: ' % (filename, n)) + break + else: + qual_short = 'opt' + savedir = dacycle['dir.output'] + filename = os.path.join(savedir, 'savestate_%s.nc' % startdate.strftime('%Y%m%d')) + statevector.read_from_file(filename, qual=qual_short) + gridmean, gridensemble = statevector.state_to_grid(lag=1) + gridmean_eps,gridensemble_eps = statevector.state_to_grid_eps(lag=1) + + logging.debug('Read posterior dataset from file %s, sds %d: ' % (filename, 1)) +# +# if prior, do not multiply fluxes with parameters, otherwise do +# + print gridensemble.shape, bio.shape, gridmean.shape + biomapped = bio * gridmean + oceanmapped = ocean * gridmean + biovarmapped = bio * gridensemble + oceanvarmapped = ocean * gridensemble + epsbiomapped = epsbio #* gridmean_eps + epsbiovarmapped = epsbio #* gridensemble_eps + bio13mapped = bio13 * gridmean + bio13varmapped = bio13 * gridensemble + ocean13mapped = ocean13 * gridmean + oceanvarmapped = ocean13 * gridensemble + gpp13mapped = gpp13*gridmean_eps + gpp13varmapped = gpp13* gridensemble_eps + gppmapped = gpp #* gridmean + gppvarmapped = gpp #* gridensemble + + +# +# +# For each dataset, get the standard definitions from the module mysettings, add values, dimensions, and unlimited count, then write +# + savedict = ncf.standard_var(varname='bio_flux_' + qual_short) + savedict['values'] = biomapped.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) +# + savedict = ncf.standard_var(varname='ocn_flux_' + qual_short) + savedict['values'] = oceanmapped.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + print biovarmapped.shape + savedict = ncf.standard_var(varname='bio_flux_%s_ensemble' % qual_short) + savedict['values'] = biovarmapped.tolist() + savedict['dims'] = dimdate + dimensemble + dimgrid + savedict['count'] = next + ncf.add_data(savedict) +# + savedict = ncf.standard_var(varname='ocn_flux_%s_ensemble' % qual_short) + savedict['values'] = oceanvarmapped.tolist() + savedict['dims'] = dimdate + dimensemble + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + #savedict = ncf.standard_var(varname='epsbio_' + qual_short) + #savedict['values'] = epsbiomapped.tolist() + #savedict['dims'] = dimdate + dimgrid + #savedict['count'] = next + #ncf.add_data(savedict) + + savedict = ncf.standard_var(varname='bio_c13_flux_' + qual_short) + savedict['values'] = bio13mapped.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + savedict = ncf.standard_var(varname='ocn_c13_flux_' + qual_short) + savedict['values'] = ocean13mapped.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + + savedict = ncf.standard_var(varname='gpp_c13_flux_' + qual_short) + savedict['values'] = gpp13mapped.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + + savedict = ncf.standard_var(varname='gpp_flux_' + qual_short) + savedict['values'] = gppmapped.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + + # End prior/posterior block + + savedict = ncf.standard_var(varname='fire_flux_imp') + savedict['values'] = fire.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + savedict = ncf.standard_var(varname='fossil_flux_imp') + savedict['values'] = fossil.tolist() + savedict['dims'] = dimdate + dimgrid + savedict['count'] = next + ncf.add_data(savedict) + + area = globarea() + savedict = ncf.standard_var(varname='cell_area') + savedict['values'] = area.tolist() + savedict['dims'] = dimgrid + ncf.add_data(savedict) +# + savedict = ncf.standard_var(varname='date') + savedict['values'] = date2num(startdate) - dectime0 + dt.days / 2.0 + savedict['dims'] = dimdate + savedict['count'] = next + ncf.add_data(savedict) + + sys.stdout.write('.') + sys.stdout.flush() +# +# Done, close the new NetCDF file +# + ncf.close() +# +# Return the full name of the NetCDF file so it can be processed by the next routine +# + logging.info("Gridded weekly average fluxes now written") + + return saveas + +def save_weekly_avg_state_data(dacycle, statevector): + """ + Function creates a NetCDF file with output for all parameters. It uses the flux data written by the + :class:`~da.baseclasses.obsoperator.ObsOperator.py`, and multiplies these with the mapped parameters and + variance (not covariance!) from the :class:`~da.baseclasses.statevector.StateVector`. + + :param dacycle: a :class:`~da.tools.initexit.CycleControl` object + :param statevector: a :class:`~da.baseclasses.statevector.StateVector` + :rtype: None + """ + import da.ctc13.io4 as io + dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_state_weekly')) +# +# Some help variables +# + dectime0 = date2num(datetime(2000, 1, 1)) + dt = dacycle['cyclelength'] + startdate = dacycle['time.start'] + enddate = dacycle['time.end'] + nlag = statevector.nlag + + area = globarea() + vectorarea = statevector.grid2vector(griddata=area, method='sum') + + logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M')) + logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M')) + +# +# Create or open NetCDF output file +# + saveas = os.path.join(dirname, 'statefluxes.nc') + ncf = io.CT_CDF(saveas, 'write') + +# +# Create dimensions and lat/lon grid +# + dimregs = ncf.add_dim('nparameters', statevector.nparams) + dimmembers = ncf.add_dim('nmembers', statevector.nmembers) + dimdate = ncf.add_date_dim() +# +# set title and tell GMT that we are using "pixel registration" +# + setattr(ncf, 'Title', 'CarbonTracker fluxes') + setattr(ncf, 'node_offset', 1) +# +# skip dataset if already in file +# + ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0 + skip = ncf.has_date(ncfdate) + if skip: + logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas)) + else: + next = ncf.inq_unlimlen()[0] + +# +# if not, process this cycle. Start by getting flux input data from CTDAS +# + filename = os.path.join(dacycle['dir.output'], 'flux1x1_%s_%s.nc' % (startdate.strftime('%Y%m%d%H'), enddate.strftime('%Y%m%d%H'))) + + file = io.ct_read(filename, 'read') + bio = np.array(file.get_variable(dacycle.dasystem['background.co2.bio.flux'])) + ocean = np.array(file.get_variable(dacycle.dasystem['background.co2.ocean.flux'])) + fire = np.array(file.get_variable(dacycle.dasystem['background.co2.fires.flux'])) + fossil = np.array(file.get_variable(dacycle.dasystem['background.co2.fossil.flux'])) + #epsbio = np.array(file.get_variable(dacycle.dasystem['background.c13.epsbio'])) + bio13 = np.array(file.get_variable(dacycle.dasystem['background.c13.bio.flux'])) + ocean13 = np.array(file.get_variable(dacycle.dasystem['background.c13.ocean.flux'])) + gpp13 = np.array(file.get_variable(dacycle.dasystem['background.c13.gpp.flux'])) + gpp = np.array(file.get_variable(dacycle.dasystem['background.co2.gpp.flux'])) + epsbio = np.ma.masked_where(gpp13==0.,gpp13)/np.ma.masked_where(gpp==0.,gpp) #(np.ma.masked_where(gpp13==0.,gpp13)/np.ma.masked_where(gpp==0.,gpp)/0.011112-1.)*1000. + + #mapped_parameters = np.array(file.get_variable(dacycle.dasystem['final.param.mean.1x1'])) + file.close() + + next = ncf.inq_unlimlen()[0] + + vectorbio = statevector.grid2vector(griddata=bio * area, method='sum') + vectorocn = statevector.grid2vector(griddata=ocean * area, method='sum') + vectorfire = statevector.grid2vector(griddata=fire * area, method='sum') + vectorfossil = statevector.grid2vector(griddata=fossil * area, method='sum') + #vectorepsbio = statevector.grid2vector_eps(griddata=epsbio*area,method='sum') + vectorbio13 = statevector.grid2vector(griddata=bio13 * area, method='sum') + vectorocn13 = statevector.grid2vector(griddata=ocean13 * area, method='sum') + vectorgpp13 = statevector.grid2vector_eps(griddata=gpp13 * area, method='sum') + vectorgpp = statevector.grid2vector_eps(griddata=gpp * area, method='sum') + vectorepsbio = vectorgpp13/vectorgpp #(vectorgpp13/vectorgpp/0.011112-1.)*1000. + +# Start adding datasets from here on, both prior and posterior datasets for bio and ocn + + for prior in [True, False]: +# +# Now fill the statevector with the prior values for this time step. Note that the prior value for this time step +# occurred nlag time steps ago, so we make a shift in the output directory, but only if we are more than nlag cycle away from the start date.. +# + if prior: + qual_short = 'prior' + for n in range(nlag, 0, -1): + priordate = enddate - timedelta(dt.days * n) + savedir = dacycle['dir.output'].replace(startdate.strftime('%Y%m%d'), priordate.strftime('%Y%m%d')) + filename = os.path.join(savedir,'savestate_%s.nc' % priordate.strftime('%Y%m%d')) + if os.path.exists(filename): + statevector.read_from_file(filename, qual=qual_short) +# Replace the mean statevector by all ones (assumed priors) + statemean = np.ones((statevector.nparams,)) + choicelag = n + logging.debug('Read prior dataset from file %s, lag %d: ' % (filename, choicelag)) + break + else: + qual_short = 'opt' + savedir = dacycle['dir.output'] + filename = os.path.join(savedir, 'savestate_%s.nc' % startdate.strftime('%Y%m%d')) + statevector.read_from_file(filename) + choicelag = 1 + statemean = statevector.ensemble_members[choicelag - 1][0].param_values + logging.debug('Read posterior dataset from file %s, lag %d: ' % (filename, choicelag)) +# +# if prior, do not multiply fluxes with parameters, otherwise do +# + data = statemean * vectorbio # units of mole region-1 s-1 + + savedict = ncf.standard_var(varname='bio_flux_%s' % qual_short) + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + +# +# Here comes a special provision for the posterior flux covariances: these are calculated relative to the prior flux covariance to +# ensure they are indeed smaller due to the data assimilation. If they would be calculated relative to the mean posterior flux, the +# uncertainties would shift just because the mean flux had increased or decreased, which is not what we want. +# +# The implementation is done by multiplying the ensemble with the vectorbio only, and not with the statemean values +# which are assumed 1.0 in the prior always. +# + + members = statevector.ensemble_members[choicelag - 1] + deviations = np.array([mem.param_values * vectorbio for mem in members]) + deviations = deviations - deviations[0, :] + + savedict = ncf.standard_var(varname='bio_flux_%s_ensemble' % qual_short) + + savedict['values'] = deviations.tolist() + savedict['dims'] = dimdate + dimmembers + dimregs + savedict['comment'] = "This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance" + savedict['units'] = "mol region-1 s-1" + savedict['count'] = next + ncf.add_data(savedict) + + savedict = ncf.standard_var('unknown') + savedict['name'] = 'bio_flux_%s_std' % qual_short + savedict['long_name'] = 'Biosphere flux standard deviation, %s' % qual_short + savedict['values'] = deviations.std(axis=0) + savedict['dims'] = dimdate + dimregs + savedict['comment'] = "This is the standard deviation on each parameter" + savedict['units'] = "mol region-1 s-1" + savedict['count'] = next + ncf.add_data(savedict) + + + data = statemean * vectorbio13 # units of mole region-1 s-1 + + savedict = ncf.standard_var(varname='bio_c13_flux_%s' % qual_short) + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + +# +# Here comes a special provision for the posterior flux covariances: these are calculated relative to the prior flux covariance to +# ensure they are indeed smaller due to the data assimilation. If they would be calculated relative to the mean posterior flux, the +# uncertainties would shift just because the mean flux had increased or decreased, which is not what we want. +# +# The implementation is done by multiplying the ensemble with the vectorbio only, and not with the statemean values +# which are assumed 1.0 in the prior always. +# + + members = statevector.ensemble_members[choicelag - 1] + deviations = np.array([mem.param_values * vectorbio13 for mem in members]) + deviations = deviations - deviations[0, :] + + savedict = ncf.standard_var(varname='bio_c13_flux_%s_ensemble' % qual_short) + + savedict['values'] = deviations.tolist() + savedict['dims'] = dimdate + dimmembers + dimregs + savedict['comment'] = "This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance" + savedict['units'] = "mol region-1 s-1" + savedict['count'] = next + ncf.add_data(savedict) + + savedict = ncf.standard_var('unknown') + savedict['name'] = 'bio_c13_flux_%s_std' % qual_short + savedict['long_name'] = 'Biosphere flux standard deviation, %s' % qual_short + savedict['values'] = deviations.std(axis=0) + savedict['dims'] = dimdate + dimregs + savedict['comment'] = "This is the standard deviation on each parameter" + savedict['units'] = "mol region-1 s-1" + savedict['count'] = next + ncf.add_data(savedict) + + + + + data = statemean * vectorocn # units of mole region-1 s-1 + + savedict = ncf.standard_var(varname='ocn_flux_%s' % qual_short) + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + + +# +# Here comes a special provision for the posterior flux covariances: these are calculated relative to the prior flux covariance to +# ensure they are indeed smaller due to the data assimilation. If they would be calculated relative to the mean posterior flux, the +# uncertainties would shift just because the mean flux had increased or decreased, which is not what we want. +# +# The implementation is done by multiplying the ensemble with the vectorocn only, and not with the statemean values +# which are assumed 1.0 in the prior always. +# + + deviations = np.array([mem.param_values * vectorocn for mem in members]) + deviations = deviations - deviations[0, :] + + savedict = ncf.standard_var(varname='ocn_flux_%s_ensemble'%qual_short) + savedict['values']=deviations.tolist() + savedict['dims']=dimdate+dimmembers+dimregs + savedict['comment']="This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance" + savedict['units']="mol region-1 s-1" + savedict['count']=next + ncf.add_data(savedict) + + savedict=ncf.standard_var('unknown') + savedict['name'] = 'ocn_flux_%s_std'%qual_short + savedict['long_name'] = 'Ocean flux standard deviation, %s'%qual_short + savedict['values']=deviations.std(axis=0) + savedict['dims']=dimdate+dimregs + savedict['comment']="This is the standard deviation on each parameter" + savedict['units']="mol region-1 s-1" + savedict['count']=next + ncf.add_data(savedict) + + + data = statemean * vectorocn13 # units of mole region-1 s-1 + + savedict = ncf.standard_var(varname='ocn_c13_flux_%s' % qual_short) + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + + +# +# Here comes a special provision for the posterior flux covariances: these are calculated relative to the prior flux covariance to +# ensure they are indeed smaller due to the data assimilation. If they would be calculated relative to the mean posterior flux, the +# uncertainties would shift just because the mean flux had increased or decreased, which is not what we want. +# +# The implementation is done by multiplying the ensemble with the vectorocn only, and not with the statemean values +# which are assumed 1.0 in the prior always. +# + + deviations = np.array([mem.param_values * vectorocn13 for mem in members]) + deviations = deviations - deviations[0, :] + + savedict = ncf.standard_var(varname='ocn_c13_flux_%s_ensemble'%qual_short) + savedict['values']=deviations.tolist() + savedict['dims']=dimdate+dimmembers+dimregs + savedict['comment']="This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance" + savedict['units']="mol region-1 s-1" + savedict['count']=next + ncf.add_data(savedict) + + savedict=ncf.standard_var('unknown') + savedict['name'] = 'ocn_c13_flux_%s_std'%qual_short + savedict['long_name'] = 'Ocean flux standard deviation, %s'%qual_short + savedict['values']=deviations.std(axis=0) + savedict['dims']=dimdate+dimregs + savedict['comment']="This is the standard deviation on each parameter" + savedict['units']="mol region-1 s-1" + savedict['count']=next + ncf.add_data(savedict) + + + #vectorepsbiow=vectorepsbio #/vectorbio + #print "vectorepsbiow",vectorepsbiow.shape,len(vectorepsbiow) + #msg = "vectorepsbiow %s,%s" %(vectorepsbiow.shape,len(vectorepsbiow)) ; logging.debug(msg) + #for i in range(len(vectorepsbiow)): + # if vectorepsbiow[i]!=vectorepsbiow[i]: + # vectorepsbiow[i]=0. + #data = statemean*vectorepsbiow # units of mole region-1 s-1 + #print data,data.shape + #savedict=ncf.standard_var(varname='epsbio_%s'%qual_short) + #savedict['values']=data + #savedict['dims']=dimdate+dimregs + #savedict['count']=next + #ncf.add_data(savedict) + + + #deviations = np.array([mem.param_values*vectorepsbiow for mem in members]) + #deviations = deviations-deviations[0,:] + + #savedict=ncf.standard_var(varname='epsbio_%s_ensemble'%qual_short) + #savedict['values']=deviations.tolist() + #savedict['dims']=dimdate+dimmembers+dimregs + #savedict['comment']="This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance" + #savedict['units']="permil" + #savedict['count']=next + #ncf.add_data(savedict) + + #savedict=ncf.standard_var('unknown') + #savedict['name'] = 'epsbio_%s_std'%qual_short + #savedict['long_name'] = 'epsilon standard deviation, %s'%qual_short + #savedict['values']=deviations.std(axis=0) + #savedict['dims']=dimdate+dimregs + #savedict['comment']="This is the standard deviation on each parameter" + #savedict['units']="permil" + #savedict['count']=next + #ncf.add_data(savedict) + + + + print statemean.shape,vectorgpp13.shape + print statemean[:],vectorgpp13[:] + data = statemean * vectorgpp13 # units of mole region-1 s-1 + + savedict = ncf.standard_var(varname='gpp_c13_flux_%s' % qual_short) + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + + data = statemean * vectorgpp # units of mole region-1 s-1 + + savedict = ncf.standard_var(varname='gpp_flux_%s' % qual_short) + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + + + + + + + + + + + data = vectorfire + + savedict=ncf.standard_var(varname='fire_flux_imp') + savedict['values']=data + savedict['dims']=dimdate+dimregs + savedict['count']=next + ncf.add_data(savedict) + + data = vectorfossil + + savedict = ncf.standard_var(varname='fossil_flux_imp') + savedict['values'] = data + savedict['dims'] = dimdate + dimregs + savedict['count'] = next + ncf.add_data(savedict) + + savedict = ncf.standard_var(varname='date') + savedict['values'] = ncfdate + savedict['dims'] = dimdate + savedict['count'] = next + ncf.add_data(savedict) + + sys.stdout.write('.') + sys.stdout.flush() +# +# Done, close the new NetCDF file +# + ncf.close() +# +# Return the full name of the NetCDF file so it can be processed by the next routine +# + logging.info("Vector weekly average fluxes now written") + + return saveas + + +def save_weekly_avg_tc_data(dacycle, statevector): + """ + Function creates a NetCDF file with output on TransCom regions. It uses the flux input from the + function `save_weekly_avg_1x1_data` to create fluxes of length `nparameters`, which are then projected + onto TC regions using the internal methods from :class:`~da.baseclasses.statevector.StateVector`. + + :param dacycle: a :class:`~da.tools.initexit.CycleControl` object + :param statevector: a :class:`~da.baseclasses.statevector.StateVector` + :rtype: None + + This function only read the prior fluxes from the flux_1x1.nc files created before, because we want to convolve + these with the parameters in the statevector. This creates posterior fluxes, and the posterior covariance for the complete + statevector in units of mol/box/s which we then turn into TC fluxes and covariances. + """ + import da.ctc13.io4 as io +# + dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_tc_weekly')) +# +# Some help variables +# + dectime0 = date2num(datetime(2000, 1, 1)) + dt = dacycle['cyclelength'] + startdate = dacycle['time.start'] + enddate = dacycle['time.end'] + ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0 + + logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M')) + logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M')) + + # Write/Create NetCDF output file + # + saveas = os.path.join(dirname, 'tcfluxes.nc') + ncf = io.CT_CDF(saveas, 'write') + dimdate = ncf.add_date_dim() + dimidateformat = ncf.add_date_dim_format() + dimregs = ncf.add_region_dim(type='tc') +# +# set title and tell GMT that we are using "pixel registration" +# + setattr(ncf, 'Title', 'CarbonTracker TransCom fluxes') + setattr(ncf, 'node_offset', 1) + # + + skip = ncf.has_date(ncfdate) + if skip: + logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas)) + else: + + # Get input data + + area = globarea() + + infile = os.path.join(dacycle['dir.analysis'], 'data_state_weekly', 'statefluxes.nc') + if not os.path.exists(infile): + logging.error("Needed input file (%s) does not exist yet, please create file first, returning..." % infile) + return None + + ncf_in = io.ct_read(infile, 'read') + + # Transform data one by one + + # Get the date variable, and find index corresponding to the dacycle date + + try: + dates = ncf_in.variables['date'][:] + except KeyError: + logging.error("The variable date cannot be found in the requested input file (%s) " % infile) + logging.error("Please make sure you create gridded fluxes before making TC fluxes ") + raise KeyError + + try: + index = dates.tolist().index(ncfdate) + except ValueError: + logging.error("The requested cycle date is not yet available in file %s " % infile) + logging.error("Please make sure you create state based fluxes before making TC fluxes") + raise ValueError + + # First add the date for this cycle to the file, this grows the unlimited dimension + + savedict = ncf.standard_var(varname='date') + savedict['values'] = ncfdate + savedict['dims'] = dimdate + savedict['count'] = index + ncf.add_data(savedict) + + # Now convert other variables that were inside the flux_1x1 file + + vardict = ncf_in.variables + for vname, vprop in vardict.iteritems(): + + data = ncf_in.get_variable(vname)[index] + if vname in ['latitude','longitude', 'date', 'idate'] or 'std' in vname: + continue + elif 'ensemble' in vname: + tcdata = [] + if 'gpp_' in vname: + for member in data: + tcdata.append( statevector.vector2tc_eps(vectordata=member) ) + else: + for member in data: + tcdata.append( statevector.vector2tc(vectordata=member) ) + + tcdata = np.array(tcdata) + try: + cov = tcdata.transpose().dot(tcdata) / (statevector.nmembers - 1) + except: + cov = np.dot(tcdata.transpose(), tcdata) / (statevector.nmembers - 1) # Huygens fix + + #print vname,cov.sum() + + tcdata = cov + + savedict = ncf.standard_var(varname=vname.replace('ensemble', 'cov')) + savedict['units'] = '[mol/region/s]**2' + savedict['dims'] = dimdate + dimregs + dimregs + + else: + if 'gpp_' in vname: + tcdata = statevector.vector2tc_eps(vectordata=data) # vector to TC + else: + tcdata = statevector.vector2tc(vectordata=data) # vector to TC + + savedict = ncf.standard_var(varname=vname) + savedict['dims'] = dimdate+dimregs + savedict['units'] = 'mol/region/s' + + savedict['count'] = index + savedict['values'] = tcdata + ncf.add_data(savedict) + + ncf_in.close() + ncf.close() + + logging.info("TransCom weekly average fluxes now written") + + return saveas + +def save_weekly_avg_ext_tc_data(dacycle): + """ Function SaveTCDataExt saves surface flux data to NetCDF files for extended TransCom regions + + *** Inputs *** + rundat : a RunInfo object + + *** Outputs *** + NetCDF file containing n-hourly global surface fluxes per TransCom region + + *** Example *** + ./expand_savestate project=enkf_release sd=20000101 ed=20010101 """ + + import da.ctc13.io4 as io +# +# + dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_tc_weekly')) +# +# Some help variables +# + dectime0 = date2num(datetime(2000, 1, 1)) + dt = dacycle['cyclelength'] + startdate = dacycle['time.start'] + enddate = dacycle['time.end'] + ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0 + + logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M')) + logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M')) + + # Write/Create NetCDF output file + + # + saveas = os.path.join(dirname, 'tc_extfluxes.nc') + ncf = io.CT_CDF(saveas, 'write') + dimdate = ncf.add_date_dim() + dimidateformat = ncf.add_date_dim_format() + dimregs = ncf.add_region_dim(type='tc_ext') +# +# set title and tell GMT that we are using "pixel registration" +# + setattr(ncf, 'Title', 'CarbonTracker TransCom fluxes') + setattr(ncf, 'node_offset', 1) + # + + skip = ncf.has_date(ncfdate) + if skip: + logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas)) + else: + infile = os.path.join(dacycle['dir.analysis'], 'data_tc_weekly', 'tcfluxes.nc') + if not os.path.exists(infile): + logging.error("Needed input file (%s) does not exist yet, please create file first, returning..." % infile) + return None + + ncf_in = io.ct_read(infile, 'read') + + # Transform data one by one + + # Get the date variable, and find index corresponding to the dacycle date + + try: + dates = ncf_in.variables['date'][:] + except KeyError: + logging.error("The variable date cannot be found in the requested input file (%s) " % infile) + logging.error("Please make sure you create gridded fluxes before making extended TC fluxes") + raise KeyError + + try: + index = dates.tolist().index(ncfdate) + except ValueError: + logging.error("The requested cycle date is not yet available in file %s " % infile) + logging.error("Please make sure you create state based fluxes before making extended TC fluxes ") + raise ValueError + + # First add the date for this cycle to the file, this grows the unlimited dimension + + savedict = ncf.standard_var(varname='date') + savedict['values'] = ncfdate + savedict['dims'] = dimdate + savedict['count'] = index + ncf.add_data(savedict) + + # Now convert other variables that were inside the tcfluxes.nc file + + vardict = ncf_in.variables + for vname, vprop in vardict.iteritems(): + + data = ncf_in.get_variable(vname)[index] + + if vname == 'latitude': continue + elif vname == 'longitude': continue + elif vname == 'date': continue + elif vname == 'idate': continue + elif 'cov' in vname: + + tcdata = ExtendedTCRegions(data, cov=True) + + savedict = ncf.standard_var(varname=vname) + savedict['units'] = '[mol/region/s]**2' + savedict['dims'] = dimdate + dimregs + dimregs + + else: + + tcdata = ExtendedTCRegions(data, cov=False) + + savedict = ncf.standard_var(varname=vname) + savedict['dims'] = dimdate + dimregs + savedict['units'] = 'mol/region/s' + + savedict['count'] = index + savedict['values'] = tcdata + ncf.add_data(savedict) + + ncf_in.close() + ncf.close() + + logging.info("TransCom weekly average extended fluxes now written") + + return saveas + +def save_weekly_avg_agg_data(dacycle, region_aggregate='olson'): + """ + Function creates a NetCDF file with output on TransCom regions. It uses the flux input from the + function `save_weekly_avg_1x1_data` to create fluxes of length `nparameters`, which are then projected + onto TC regions using the internal methods from :class:`~da.baseclasses.statevector.StateVector`. + + :param dacycle: a :class:`~da.tools.initexit.CycleControl` object + :param StateVector: a :class:`~da.baseclasses.statevector.StateVector` + :rtype: None + + This function only read the prior fluxes from the flux_1x1.nc files created before, because we want to convolve + these with the parameters in the statevector. This creates posterior fluxes, and the posterior covariance for the complete + statevector in units of mol/box/s which we then turn into TC fluxes and covariances. + """ + import da.ctc13.io4 as io + +# + dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_%s_weekly' % region_aggregate)) +# +# Some help variables +# + dectime0 = date2num(datetime(2000, 1, 1)) + dt = dacycle['cyclelength'] + startdate = dacycle['time.start'] + enddate = dacycle['time.end'] + ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0 + + logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M')) + logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M')) + + logging.debug("Aggregating 1x1 fluxes to %s totals" % region_aggregate) + + + # Write/Create NetCDF output file + # + saveas = os.path.join(dirname, '%s_fluxes.%s.nc' % (region_aggregate, startdate.strftime('%Y-%m-%d'))) + ncf = io.CT_CDF(saveas, 'write') + dimdate = ncf.add_date_dim() + dimidateformat = ncf.add_date_dim_format() + dimgrid = ncf.add_latlon_dim() # for mask +# +# Select regions to aggregate to +# + + if region_aggregate == "olson": + regionmask = tc.olson240mask + dimname = 'olson' + dimregs = ncf.add_dim(dimname, regionmask.max()) + + regionnames = [] + for i in range(11): + for j in range(19): + regionnames.append("%s_%s" % (tc.transnams[i], tc.olsonnams[j],)) + regionnames.extend(tc.oifnams) + + for i, name in enumerate(regionnames): + lab = 'Aggregate_Region_%03d' % (i + 1,) + setattr(ncf, lab, name) + + elif region_aggregate == "transcom": + regionmask = tc.transcommask + dimname = 'tc' + dimregs = ncf.add_region_dim(type='tc') + + elif region_aggregate == "country": + + countrydict = ct.get_countrydict() + selected = ['Russia', 'Canada', 'China', 'United States', 'EU27', 'Brazil', 'Australia', 'India'] #,'G8','UNFCCC_annex1','UNFCCC_annex2'] + regionmask = np.zeros((180, 360,), 'float') + + for i, name in enumerate(selected): + lab = 'Country_%03d' % (i + 1,) + setattr(ncf, lab, name) + + if name == 'EU27': + namelist = ct.EU27 + elif name == 'EU25': + namelist = ct.EU25 + elif name == 'G8': + namelist = ct.G8 + elif name == 'UNFCCC_annex1': + namelist = ct.annex1 + elif name == 'UNFCCC_annex2': + namelist = ct.annex2 + else: + namelist = [name] + + for countryname in namelist: + try: + country = countrydict[countryname] + regionmask.put(country.gridnr, i + 1) + except: + continue + + dimname = 'country' + dimregs = ncf.add_dim(dimname, regionmask.max()) + + # + + skip = ncf.has_date(ncfdate) + if skip: + logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas)) + else: + # + # set title and tell GMT that we are using "pixel registration" + # + setattr(ncf, 'Title', 'CTDAS Aggregated fluxes') + setattr(ncf, 'node_offset', 1) + + savedict = ncf.standard_var('unknown') + savedict['name'] = 'regionmask' + savedict['comment'] = 'numerical mask used to aggregate 1x1 flux fields, each integer 0,...,N is one region aggregated' + savedict['values'] = regionmask.tolist() + savedict['units'] = '-' + savedict['dims'] = dimgrid + savedict['count'] = 0 + ncf.add_data(savedict) + + # Get input data from 1x1 degree flux files + + area = globarea() + + infile = os.path.join(dacycle['dir.analysis'], 'data_flux1x1_weekly', 'flux_1x1.%s.nc' % startdate.strftime('%Y-%m-%d')) + if not os.path.exists(infile): + logging.error("Needed input file (%s) does not exist yet, please create file first, returning..." % infile) + return None + + ncf_in = io.ct_read(infile, 'read') + + # Transform data one by one + + # Get the date variable, and find index corresponding to the dacycle date + + try: + dates = ncf_in.variables['date'][:] + except KeyError: + logging.error("The variable date cannot be found in the requested input file (%s) " % infile) + logging.error("Please make sure you create gridded fluxes before making TC fluxes ") + raise KeyError + + try: + index = dates.tolist().index(ncfdate) + except ValueError: + logging.error("The requested cycle date is not yet available in file %s " % infile) + logging.error("Please make sure you create state based fluxes before making TC fluxes ") + raise ValueError + + # First add the date for this cycle to the file, this grows the unlimited dimension + + savedict = ncf.standard_var(varname='date') + savedict['values'] = ncfdate + savedict['dims'] = dimdate + savedict['count'] = index + ncf.add_data(savedict) + + # Now convert other variables that were inside the statevector file + + vardict = ncf_in.variables + for vname, vprop in vardict.iteritems(): + if vname == 'latitude': continue + elif vname == 'longitude': continue + elif vname == 'date': continue + elif vname == 'idate': continue + elif 'std' in vname: continue + elif 'ensemble' in vname: + + data = ncf_in.get_variable(vname)[index] + + dimensemble = ncf.add_dim('members', data.shape[0]) + + regiondata = [] + for member in data: + aggdata = state_to_grid(member * area, regionmask, reverse=True, mapname=region_aggregate) + regiondata.append(aggdata) + + regiondata = np.array(regiondata) + + savedict = ncf.standard_var(varname=vname) + savedict['units'] = 'mol/region/s' + savedict['dims'] = dimdate + dimensemble + dimregs + + elif 'flux' in vname: + + data = ncf_in.get_variable(vname)[index] + + regiondata = state_to_grid(data * area, regionmask, reverse=True, mapname=region_aggregate) + + savedict = ncf.standard_var(varname=vname) + savedict['dims'] = dimdate + dimregs + savedict['units'] = 'mol/region/s' + + else: + + data = ncf_in.get_variable(vname)[:] + regiondata = state_to_grid(data, regionmask, reverse=True, mapname=region_aggregate) + + savedict = ncf.standard_var(varname=vname) + savedict['dims'] = dimdate + dimregs + + savedict['count'] = index + savedict['values'] = regiondata + ncf.add_data(savedict) + + ncf_in.close() + ncf.close() + + logging.info("%s aggregated weekly average fluxes now written" % dimname) + + return saveas + +def save_time_avg_data(dacycle, infile, avg='monthly'): + """ Function saves time mean surface flux data to NetCDF files + + *** Inputs *** + rundat : a RunInfo object + + *** Outputs *** + daily NetCDF file containing 1-hourly global surface fluxes at 1x1 degree + + *** Example *** + ./expand_savestate project=enkf_release sd=20000101 ed=20010101 """ + + import da.ctc13.io4 as io + if 'weekly' in infile: + intime = 'weekly' + if 'monthly' in infile: + intime = 'monthly' + if 'yearly' in infile: + intime = 'yearly' + + dirname, filename = os.path.split(infile) + outdir = create_dirs(os.path.join(dacycle['dir.analysis'], dirname.replace(intime,avg))) + + dectime0 = date2num(datetime(2000, 1, 1)) + +# Create NetCDF output file +# + saveas = os.path.join(outdir, filename) + ncf = io.CT_CDF(saveas, 'create') + dimdate = ncf.add_date_dim() +# +# Open input file specified from the command line +# + if not os.path.exists(infile): + logging.error("Needed input file (%s) not found. Please create this first:" % infile) + logging.error("returning...") + return None + else: + pass + + file = io.ct_read(infile, 'read') + datasets = file.variables.keys() + date = file.get_variable('date') + globatts = file.ncattrs() + + for att in globatts: + attval = file.getncattr(att) + if not att in ncf.ncattrs(): + ncf.setncattr(att, attval) + + + time = [datetime(2000, 1, 1) + timedelta(days=d) for d in date] + +# loop over datasets in infile, skip idate and date as we will make new time axis for the averaged data + + for sds in ['date'] + datasets: + +# get original data + + data = file.get_variable(sds) + varatts = file.variables[sds].ncattrs() + vardims = file.variables[sds].dimensions +# +# Depending on dims of input dataset, create dims for output dataset. Note that we add the new dimdate now. +# + + for d in vardims: + if 'date' in d: + continue + if d in ncf.dimensions.keys(): + pass + else: + dim = ncf.createDimension(d, size=len(file.dimensions[d])) + + savedict = ncf.standard_var(sds) + savedict['name'] = sds + savedict['dims'] = vardims + savedict['units'] = file.variables[sds].units + savedict['long_name'] = file.variables[sds].long_name + savedict['comment'] = file.variables[sds].comment + savedict['standard_name'] = file.variables[sds].standard_name + savedict['count'] = 0 + + if not 'date' in vardims: + savedict['values'] = data + ncf.add_data(savedict) + else: + + if avg == 'monthly': + time_avg, data_avg = timetools.monthly_avg(time, data) + elif avg == 'seasonal': + time_avg, data_avg = timetools.season_avg(time, data) + elif avg == 'yearly': + time_avg, data_avg = timetools.yearly_avg(time, data) + elif avg == 'longterm': + time_avg, data_avg = timetools.longterm_avg(time, data) + time_avg = [time_avg] + data_avg = [data_avg] + else: + raise ValueError, 'Averaging (%s) does not exist' % avg + + count = -1 + for dd, data in zip(time_avg, data_avg): + count = count + 1 + if sds == 'date': + savedict['values'] = date2num(dd) - dectime0 + else: + savedict['values'] = data + savedict['count'] = count + ncf.add_data(savedict, silent=True) + + sys.stdout.write('.') + + sys.stdout.write('\n') + sys.stdout.flush() + +# end NetCDF file access + file.close() + ncf.close() + + logging.info("------------------- Finished time averaging---------------------------------") + + return saveas + +if __name__ == "__main__": + #from da.ctc13.initexit_offline import CycleControl + from da.tools.initexit import CycleControl + from da.ctc13.dasystem import CO2C13DaSystem + from da.ctc13.statevector_co2c13_coveps import CO2C13StateVector + #from da.ctc13.statevector_co2c13 import CO2C13StateVector + from da.analysis.time_avg_fluxes import time_avg + + sys.path.append('../../') + + logging.root.setLevel(logging.DEBUG) + + dacycle = CycleControl(args={'rc':'../../ctdas-co2c13eps-inv-6x4-ei-sibasa-gfed4-weekly.rc'}) + + dacycle.setup() + dacycle.parse_times() + + dasystem = CO2C13DaSystem('../rc/carbontracker_co2c13eps.rc') + + dacycle.dasystem = dasystem + + statevector = CO2C13StateVector() + #statevector = CO2StateVector() + statevector.setup(dacycle) + + while dacycle['time.end'] < dacycle['time.finish']: + save_weekly_avg_1x1_data(dacycle, statevector) + save_weekly_avg_state_data(dacycle, statevector) + save_weekly_avg_tc_data(dacycle, statevector) + save_weekly_avg_ext_tc_data(dacycle) + save_weekly_avg_agg_data(dacycle, region_aggregate='olson') + save_weekly_avg_agg_data(dacycle, region_aggregate='transcom') + # save_weekly_avg_agg_data(dacycle, region_aggregate='country') + time_avg(dacycle,'flux1x1') + time_avg(dacycle,'transcom') + time_avg(dacycle,'olson') + #time_avg(dacycle,'country') + + dacycle.advance_cycle_times() + + statevector = None # free memory + + sys.exit(0) + + + + + diff --git a/da/analysis/ctc13/expand_molefractions_co2c13.py b/da/analysis/ctc13/expand_molefractions_co2c13.py new file mode 100644 index 00000000..f1d64f6f --- /dev/null +++ b/da/analysis/ctc13/expand_molefractions_co2c13.py @@ -0,0 +1,368 @@ +#!/usr/bin/env python +# expand_fluxes.py +import sys +import os +import getopt +import shutil +import logging +import netCDF4 +import numpy as np +from string import join +from datetime import datetime, timedelta +sys.path.append('../../') +from da.tools.general import date2num, num2date +from da.tools.general import create_dirs +import da.tools.io4 as io +import matplotlib as mpl #zeus +mpl.use('Agg') +from pylab import date2num, num2date + + +""" +Author: Wouter Peters (Wouter.Peters@wur.nl) + +Revision History: +File created on 11 May 2012. + +""" + +def write_mole_fractions(dacycle): + """ + + Write Sample information to NetCDF files. These files are organized by site and + have an unlimited time axis to which data is appended each cycle. + + The needed information is obtained from the sample_auxiliary.nc files and the original input data files from ObsPack. + + The steps are: + + (1) Create a directory to hold timeseries output files + (2) Read the sample_auxiliary.nc file for this cycle and get a list of original files they were obtained from + (3) For each file, copy the original data file from ObsPack (if not yet present) + (4) Open the copied file, find the index of each observation, fill in the simulated data + + """ + + dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_molefractions')) +# +# Some help variables +# + dectime0 = date2num(datetime(2000, 1, 1)) + dt = dacycle['cyclelength'] + startdate = dacycle['time.start'] + enddate = dacycle['time.end'] + + logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M')) + logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M')) + + dacycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H"),) + + # Step (1): Get the posterior sample output data file for this cycle + + infile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp']) + + ncf_in = io.ct_read(infile,'read') + + obs_num = ncf_in.get_variable('obs_num') + obs_val = ncf_in.get_variable('observed') + simulated = ncf_in.get_variable('modelsamples') + infilename= ncf_in.get_variable('inputfilename') + infiles = netCDF4.chartostring(infilename).tolist() + + #infiles = [join(s.compressed(),'') for s in infilename] + + ncf_in.close() + + # Step (2): Get the prior sample output data file for this cycle + + infile = os.path.join(dacycle['dir.output'],'optimizer.%s.nc'% startdate.strftime('%Y%m%d') ) + + if os.path.exists(infile): + optimized_present = True + else: + optimized_present = False + + if optimized_present: + + ncf_fc_in = io.ct_read(infile, 'read') + + fc_obs_num = ncf_fc_in.get_variable('obspack_num') + fc_obs_val = ncf_fc_in.get_variable('observed') + fc_simulated = ncf_fc_in.get_variable('modelsamplesmean_prior') + fc_simulated_ens = ncf_fc_in.get_variable('modelsamplesdeviations_prior') + fc_flag = ncf_fc_in.get_variable('flag') + if not dacycle.dasystem.has_key('opt.algorithm'): + fc_r = ncf_fc_in.get_variable('modeldatamismatchvariance') + fc_hphtr = ncf_fc_in.get_variable('totalmolefractionvariance') + elif dacycle.dasystem['opt.algorithm'] == 'serial': + fc_r = ncf_fc_in.get_variable('modeldatamismatchvariance') + fc_hphtr = ncf_fc_in.get_variable('totalmolefractionvariance') + elif dacycle.dasystem['opt.algorithm'] == 'bulk': + fc_r = ncf_fc_in.get_variable('modeldatamismatchvariance').diagonal() + fc_hphtr = ncf_fc_in.get_variable('totalmolefractionvariance').diagonal() + + + + filesitecode = ncf_fc_in.get_variable('sitecode') + + fc_sitecodes = netCDF4.chartostring(filesitecode).tolist() + #fc_sitecodes = [join(s.compressed(),'') for s in filesitecode] + + + fc_sitecodes_co2 = [f for f in fc_sitecodes if f.find('co2_') == 0] + fc_sitecodes_co2c13 = [ff for ff in fc_sitecodes if ff.find('co2c13_') == 0] + + ncf_fc_in.close() + + # Expand the list of input files with those available from the forecast list + + infiles_rootdir_co2 = [f for f in infiles if f.find('co2_') > 0] + infiles_rootdir_co2 = os.path.split(infiles_rootdir_co2[0])[0] + infiles_rootdir_co2c13 = [f for f in infiles if f.find('co2c13_') > 0] + infiles_rootdir_co2c13 = os.path.split(infiles_rootdir_co2c13[0])[0] + + + infiles.extend(os.path.join(infiles_rootdir_co2,f+'.nc') for f in fc_sitecodes_co2) + infiles.extend(os.path.join(infiles_rootdir_co2c13,f+'.nc') for f in fc_sitecodes_co2c13) + + + #Step (2): For each observation timeseries we now have data for, open it and fill with data + + for orig_file in set(infiles): + #print orig_file + if not os.path.exists(orig_file): + msg = "The original input file (%s) could not be found, continuing to next file..."%orig_file ; logging.error(msg) + continue + + if orig_file.endswith('co2_ljo_surface-flask_4_representative.nc') ==True or orig_file.endswith('co2_azr_surface-flask_1_representative.nc') ==True or orig_file.endswith('co2c13_wlg_surface-flask_7_representative.nc')==True: + continue + + copy_file = os.path.join(dirname,os.path.split(orig_file)[-1]) + if not os.path.exists(copy_file): + shutil.copy(orig_file, copy_file) + logging.debug("Copied a new original file (%s) to the analysis directory" % orig_file) + + ncf_out = io.CT_CDF(copy_file,'write') + + # Modify the attributes of the file to reflect added data from CTDAS properly + + try: + host=os.environ['HOSTNAME'] + except: + host='unknown' + + + ncf_out.Caution = '===================================================================================' + try: + ncf_out.History += '\nOriginal observation file modified by user %s on %s\n' % (os.environ['USER'], datetime.today().strftime('%F'),) + except: + ncf_out.History = '\nOriginal observation file modified by user %s on %s\n' % (os.environ['USER'], datetime.today().strftime('%F'),) + ncf_out.CTDAS_info = 'Simulated values added from a CTDAS run by %s on %s\n' % (os.environ['USER'], datetime.today().strftime('%F'),)\ + + '\nCTDAS was run on platform %s' % (host,)\ + + '\nCTDAS job directory was %s' % (dacycle['dir.da_run'],)\ + + '\nCTDAS Da System was %s' % (dacycle['da.system'],)\ + + '\nCTDAS Da ObsOperator was %s' % (dacycle['da.obsoperator'],) + ncf_out.CTDAS_startdate = dacycle['time.start'].strftime('%F') + ncf_out.CTDAS_enddate = dacycle['time.finish'].strftime("%F") + ncf_out.original_file = orig_file + + + # get nobs dimension + + if ncf_out.dimensions.has_key('id'): + dimidob = ncf_out.dimensions['id'] + dimid = ('id',) + elif ncf_out.dimensions.has_key('obs'): + dimidob = ncf_out.dimensions['obs'] + dimid = ('obs',) + + if dimidob.isunlimited: + nobs = ncf_out.inq_unlimlen() + else: + nobs = len(dimid) + + + # add nmembers dimension + + dimmembersob = ncf_out.createDimension('nmembers',size=simulated.shape[1]) + dimmembers = ('nmembers',) + nmembers = len(dimmembers) + + # Create empty arrays for posterior samples, as well as for forecast sample statistics + + savedict = io.std_savedict.copy() + savedict['name'] = "flag_forecast" + savedict['long_name'] = "flag_for_obs_model in forecast" + savedict['units'] = "None" + savedict['dims'] = dimid + savedict['comment'] = 'Flag (0/1/2/99) for observation value, 0 means okay, 1 means QC error, 2 means rejected, 99 means not sampled' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modeldatamismatch" + savedict['long_name'] = "modeldatamismatch" + savedict['units'] = "[mol mol-1]^2" + savedict['dims'] = dimid + savedict['comment'] = 'Variance of mole fractions resulting from model-data mismatch' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "totalmolefractionvariance_forecast" + savedict['long_name'] = "totalmolefractionvariance of forecast" + savedict['units'] = "[mol mol-1]^2" + savedict['dims'] = dimid + savedict['comment'] = 'Variance of mole fractions resulting from prior state and model-data mismatch' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesmean" + savedict['long_name'] = "mean modelsamples" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['comment'] = 'simulated mixing ratios based on optimized state vector' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesmean_forecast" + savedict['long_name'] = "mean modelsamples from forecast" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['comment'] = 'simulated mixing ratios based on prior state vector' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesstandarddeviation" + savedict['long_name'] = "standard deviaton of modelsamples over all ensemble members" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['comment'] = 'std dev of simulated mixing ratios based on optimized state vector' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesstandarddeviation_forecast" + savedict['long_name'] = "standard deviaton of modelsamples from forecast over all ensemble members" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['comment'] = 'std dev of simulated mixing ratios based on prior state vector' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesensemble" + savedict['long_name'] = "modelsamples over all ensemble members" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid+dimmembers + savedict['comment'] = 'ensemble of simulated mixing ratios based on optimized state vector' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesensemble_forecast" + savedict['long_name'] = "modelsamples from forecast over all ensemble members" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid+dimmembers + savedict['comment'] = 'ensemble of simulated mixing ratios based on prior state vector' + ncf_out.add_variable(savedict) + + else: + logging.debug("Modifying existing file (%s) in the analysis directory" % copy_file) + + ncf_out = io.CT_CDF(copy_file,'write') + + + # Get existing file obs_nums to determine match to local obs_nums + + if ncf_out.variables.has_key('id'): + file_obs_nums = ncf_out.get_variable('id') + elif ncf_out.variables.has_key('obspack_num'): + file_obs_nums = ncf_out.get_variable('obspack_num') + + # Get all obs_nums related to this file, determine their indices in the local arrays + + selected_obs_nums = [num for infile,num in zip(infiles,obs_num) if infile == orig_file] + + + # Optimized data 1st: For each index, get the data and add to the file in the proper file index location + + for num in selected_obs_nums: + + model_index = obs_num.tolist().index(num) + file_index = file_obs_nums.tolist().index(num) + + #var = ncf_out.variables['modeldatamismatch'] # Take from optimizer.yyyymmdd.nc file instead + #var[file_index] = mdm[model_index] + + var = ncf_out.variables['modelsamplesmean'] + var[file_index] = simulated[model_index,0] + + var = ncf_out.variables['modelsamplesstandarddeviation'] + var[file_index] = simulated[model_index,1:].std() + + var = ncf_out.variables['modelsamplesensemble'] + var[file_index] = simulated[model_index,:] + + # Now forecast data too: For each index, get the data and add to the file in the proper file index location + + if optimized_present: + + selected_fc_obs_nums = [num for sitecode,num in zip(fc_sitecodes,fc_obs_num) if sitecode in orig_file] + + for num in selected_fc_obs_nums: + + model_index = fc_obs_num.tolist().index(num) + file_index = file_obs_nums.tolist().index(num) + + var = ncf_out.variables['modeldatamismatch'] + var[file_index] = np.sqrt(fc_r[model_index]) + + var = ncf_out.variables['modelsamplesmean_forecast'] + var[file_index] = fc_simulated[model_index] + + var = ncf_out.variables['modelsamplesstandarddeviation_forecast'] + var[file_index] = fc_simulated_ens[model_index,1:].std() + + var = ncf_out.variables['modelsamplesensemble_forecast'] + var[file_index] = fc_simulated_ens[model_index,:] + + var = ncf_out.variables['totalmolefractionvariance_forecast'] + var[file_index] = fc_hphtr[model_index] + + var = ncf_out.variables['flag_forecast'] + var[file_index] = fc_flag[model_index] + + + # close the file + + status = ncf_out.close() + + return None + + + +if __name__ == "__main__": + + import logging + from da.tools.initexit import CycleControl + from da.ctc13.dasystem import CO2C13DaSystem + + sys.path.append('../../') + + logging.root.setLevel(logging.DEBUG) + + dacycle = CycleControl(args={'rc':'../../ctdas-co2c13eps-inv-6x4-ei-sibasa-gfed4-weekly.rc'}) + + dacycle.setup() + dacycle.parse_times() + + dasystem = CO2C13DaSystem('../rc/carbontracker_co2c13eps.rc') + + dacycle.dasystem = dasystem + + while dacycle['time.start'] < dacycle['time.finish']: + + outfile = write_mole_fractions(dacycle) + + dacycle.advance_cycle_times() + + sys.exit(0) + + + diff --git a/da/pipelines/pipeline_co2c13.py b/da/pipelines/pipeline_co2c13.py new file mode 100644 index 00000000..a3ec9b83 --- /dev/null +++ b/da/pipelines/pipeline_co2c13.py @@ -0,0 +1,418 @@ +#!/usr/bin/env python +# pipeline.py + +""" +.. module:: pipeline +.. moduleauthor:: Wouter Peters + +Revision History: +File created on 06 Sep 2010. + +The pipeline module holds methods that execute consecutive tasks with each of the objects of the DA system. + +""" +import logging +import os +import sys +import datetime +import copy + +header = '\n\n *************************************** ' +footer = ' *************************************** \n ' + +def ensemble_smoother_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator, optimizer): + """ The main point of entry for the pipeline """ + sys.path.append(os.getcwd()) + + logging.info(header + "Initializing current cycle" + footer) + start_job(dacycle, dasystem, platform, statevector, samples, obsoperator) + + prepare_state(dacycle, statevector) + sample_state(dacycle, samples, statevector, obsoperator) + + invert(dacycle, statevector, optimizer) + + advance(dacycle, samples, statevector, obsoperator) + + save_and_submit(dacycle, statevector) + logging.info("Cycle finished...exiting pipeline") + +def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator): + """ The main point of entry for the pipeline """ + sys.path.append(os.getcwd()) + + logging.info(header + "Initializing current cycle" + footer) + start_job(dacycle, dasystem, platform, statevector, samples, obsoperator) + + if dacycle.has_key('forward.savestate.dir'): + fwddir = dacycle['forward.savestate.dir'] + else: + logging.debug("No forward.savestate.dir key found in rc-file, proceeding with self-constructed prior parameters") + fwddir = False + + if dacycle.has_key('forward.savestate.legacy'): + legacy = (dacycle['forward.savestate.legacy'].upper() in ["TRUE","T","YES","Y"]) + else: + legacy = False + logging.debug("No forward.savestate.legacy key found in rc-file") + + if not fwddir: + # Simply make a prior statevector using the normal method + + + prepare_state(dacycle, statevector)#LU tutaj zamiast tego raczej to stworzenie nowej kowariancji i ensembli bo pozostale rzeczy sa na gorze i na doel. + + else: + # Read prior information from another simulation into the statevector. + # This loads the results from another assimilation experiment into the current statevector + + if not legacy: + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d')) + statevector.read_from_file(filename, 'prior') + else: + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc') + statevector.read_from_legacy_file(filename, 'prior') + + + # We write this "prior" statevector to the restart directory, so we can later also populate it with the posterior statevector + # Note that we could achieve the same by just copying the wanted forward savestate.nc file to the restart folder of our current + # experiment, but then it would already contain a posterior field as well which we will try to write in save_and_submit. + # This could cause problems. Moreover, this method allows us to read older formatted savestate.nc files (legacy) and write them into + # the current format through the "write_to_file" method. + + savefilename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + statevector.write_to_file(savefilename, 'prior') + + # Now read optimized fluxes which we will actually use to propagate through the system + + if not fwddir: + + # if there is no forward dir specified, we simply run forward with unoptimized prior fluxes in the statevector + logging.info("Running forward with prior savestate from: %s"%savefilename) + + else: + + # Read posterior information from another simulation into the statevector. + # This loads the results from another assimilation experiment into the current statevector + + if not legacy: + statevector.read_from_file(filename, 'opt') + else: + statevector.read_from_legacy_file(filename, 'opt') + + logging.info("Running forward with optimized savestate from: %s"%filename) + + # Finally, we run forward with these parameters + + advance(dacycle, samples, statevector, obsoperator) + + # In save_and_submit, the posterior statevector will be added to the savestate.nc file, and it is added to the copy list. + # This way, we have both the prior and posterior data from another run copied into this assimilation, for later analysis. + + save_and_submit(dacycle, statevector) + + logging.info("Cycle finished...exiting pipeline") +#################################################################################################### + +def analysis_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator): + """ Main entry point for analysis of ctdas results """ + + #from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data + #from da.analysis.expand_molefractions import write_mole_fractions + #from da.ctc13.expand_fluxes_co2c13 import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data + from da.ctc13.expand_fluxes_co2c13eps import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data + from da.ctc13.expand_molefractions_co2c13 import write_mole_fractions + + from da.analysis.summarize_obs import summarize_obs + from da.analysis.time_avg_fluxes import time_avg + + logging.info(header + "Starting analysis" + footer) + + try: + save_weekly_avg_1x1_data(dacycle, statevector) + save_weekly_avg_state_data(dacycle, statevector) + except: + logging.error("Error in analysis of weekly 1x1 fluxes and/or statevector, continuing analysis...") + try: + save_weekly_avg_tc_data(dacycle, statevector) + save_weekly_avg_ext_tc_data(dacycle) + except: + logging.error("Error in analysis of weekly tc fluxes, continuing analysis...") + try: + save_weekly_avg_agg_data(dacycle,region_aggregate='transcom') + save_weekly_avg_agg_data(dacycle,region_aggregate='olson') + save_weekly_avg_agg_data(dacycle,region_aggregate='country') + except: + logging.error("Error in analysis of weekly aggregated fluxes, continuing analysis...") + try: + time_avg(dacycle,'flux1x1') + time_avg(dacycle,'transcom') + time_avg(dacycle,'olson') + time_avg(dacycle,'country') + except: + logging.error("Error in analysis of time averaged fluxes, continuing analysis...") + try: + write_mole_fractions(dacycle) + #summarize_obs(dacycle) + except: + logging.error("Error in analysis of mole fractions, continuing analysis...") + +def archive_pipeline(dacycle, platform, dasystem): + """ Main entry point for archiving of output from one disk/system to another """ + + if not dacycle.has_key('task.rsync'): + logging.info('rsync task not found, not starting automatic backup...') + return + else: + logging.info('rsync task found, starting automatic backup...') + + for task in dacycle['task.rsync'].split(): + sourcedirs = dacycle['task.rsync.%s.sourcedirs'%task] + destdir = dacycle['task.rsync.%s.destinationdir'%task] + + rsyncflags = dacycle['task.rsync.%s.flags'%task] + + # file ID and names + jobid = dacycle['time.end'].strftime('%Y%m%d') + targetdir = os.path.join(dacycle['dir.exec']) + jobfile = os.path.join(targetdir, 'jb.rsync.%s.%s.jb' % (task,jobid) ) + logfile = os.path.join(targetdir, 'jb.rsync.%s.%s.log' % (task,jobid) ) + # Template and commands for job + jobparams = {'jobname':"r.%s" % jobid, 'jobnodes': '1', 'jobtime': '1:00:00', 'joblog': logfile, 'errfile': logfile} + + if platform.ID == 'cartesius': + jobparams['jobqueue'] = 'staging' + + template = platform.get_job_template(jobparams) + for sourcedir in sourcedirs.split(): + execcommand = "\nrsync %s %s %s\n" % (rsyncflags, sourcedir,destdir,) + template += execcommand + + # write and submit + platform.write_job(jobfile, template, jobid) + jobid = platform.submit_job(jobfile, joblog=logfile) + + +def start_job(dacycle, dasystem, platform, statevector, samples, obsoperator): + """ Set up the job specific directory structure and create an expanded rc-file """ + + dasystem.validate() + dacycle.dasystem = dasystem + dacycle.daplatform = platform + dacycle.setup() + #statevector.dacycle = dacycle # also embed object in statevector so it can access cycle information for I/O etc + #samples.dacycle = dacycle # also embed object in samples object so it can access cycle information for I/O etc + #obsoperator.dacycle = dacycle # also embed object in obsoperator object so it can access cycle information for I/O etc + obsoperator.setup(dacycle) # Setup Observation Operator + statevector.setup(dacycle) + +def prepare_state(dacycle, statevector): + """ Set up the input data for the forward model: obs and parameters/fluxes""" + + # We now have an empty statevector object that we need to populate with data. If this is a continuation from a previous cycle, we can read + # the previous statevector values from a NetCDF file in the restart directory. If this is the first cycle, we need to populate the statevector + # with new values for each week. After we have constructed the statevector, it will be propagated by one cycle length so it is ready to be used + # in the current cycle + + logging.info(header + "starting prepare_state" + footer) + + if not dacycle['time.restart']: + + # Fill each week from n=1 to n=nlag with a new ensemble + + for n in range(statevector.nlag): + date = dacycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(dacycle['time.cycle'])) + cov = statevector.get_covariance(date, dacycle) + statevector.make_new_ensemble(n, cov) + + else: + + # Read the statevector data from file + + #saved_sv = os.path.join(dacycle['dir.restart.current'], 'savestate.nc') + + + saved_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['da.restart.tstamp'].strftime('%Y%m%d')) + statevector.read_from_file(saved_sv) # by default will read "opt"(imized) variables, and then propagate + + # Now propagate the ensemble by one cycle to prepare for the current cycle + statevector.propagate(dacycle) + + # Finally, also write the statevector to a file so that we can always access the a-priori information + + current_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + statevector.write_to_file(current_sv, 'prior') # write prior info + +def sample_state(dacycle, samples, statevector, obsoperator): + """ Sample the filter state for the inversion """ + + + # Before a forecast step, save all the data to a save/tmp directory so we can later recover it before the propagation step. + # This is especially important for: + # (i) The transport model restart data which holds the background mole fractions. This is needed to run the model one cycle forward + # (ii) The random numbers (or the seed for the random number generator) so we can recreate the ensembles if needed + + #status = dacycle.MoveSaveData(io_option='store',save_option='partial',filter=[]) + #msg = "All restart data have been copied to the save/tmp directory for future use" ; logging.debug(msg) + logging.info(header + "starting sample_state" + footer) + nlag = int(dacycle['time.nlag']) + logging.info("Sampling model will be run over %d cycles" % nlag) + + obsoperator.get_initial_data() + + for lag in range(nlag): + logging.info(header + ".....Ensemble Kalman Filter at lag %d" % (lag + 1)) + + ############# Perform the actual sampling loop ##################### + + sample_step(dacycle, samples, statevector, obsoperator, lag) + + logging.debug("statevector now carries %d samples" % statevector.nobs) + + +def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False): + """ Perform all actions needed to sample one cycle """ + + + # First set up the information for time start and time end of this sample + dacycle.set_sample_times(lag) + + startdate = dacycle['time.sample.start'] + enddate = dacycle['time.sample.end'] + dacycle['time.sample.window'] = lag + dacycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H")) + + logging.info("New simulation interval set : ") + logging.info(" start date : %s " % startdate.strftime('%F %H:%M')) + logging.info(" end date : %s " % enddate.strftime('%F %H:%M')) + logging.info(" file stamp: %s " % dacycle['time.sample.stamp']) + + + # Implement something that writes the ensemble member parameter info to file, or manipulates them further into the + # type of info needed in your transport model + + statevector.write_members_to_file(lag, dacycle['dir.input']) + + samples.setup(dacycle) + samples.add_observations() + samples.add_observations_c13() + + # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future?? + + samples.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc']) + samples.add_model_data_mismatch_c13(dacycle.dasystem['obs.c13.sites.rc']) + sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp']) + samples.write_sample_coords(sampling_coords_file) + # Write filename to dacycle, and to output collection list + dacycle['ObsOperator.inputfile'] = sampling_coords_file + + # Run the observation operator + + obsoperator.run_forecast_model() + + + # Read forecast model samples that were written to NetCDF files by each member. Add them to the exisiting + # Observation object for each sample loop. This data fill be written to file in the output folder for each sample cycle. + + + # We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates + # one file per member, some logic needs to be included to merge all files!!! + simulated_file = os.path.join(obsoperator.outputdir, 'flask_output.%s.nc' % dacycle['time.sample.stamp']) + samples.add_simulations(simulated_file) + samples.add_simulations_c13(simulated_file) + + # Now write a small file that holds for each observation a number of values that we need in postprocessing + + #filename = samples.write_sample_coords() + + # Now add the observations that need to be assimilated to the statevector. + # Note that obs will only be added to the statevector if either this is the first step (restart=False), or lag==nlag + # This is to make sure that the first step of the system uses all observations available, while the subsequent + # steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only + # (and at least) once # in the assimilation + + + if not advance: + if dacycle['time.restart'] == False or lag == int(dacycle['time.nlag']) - 1: + statevector.obs_to_assimilate += (copy.deepcopy(samples),) + statevector.nobs += samples.getlength() + logging.debug("Added samples from the observation operator to the assimilated obs list in the statevector") + + else: + statevector.obs_to_assimilate += (None,) + + +def invert(dacycle, statevector, optimizer): + """ Perform the inverse calculation """ + logging.info(header + "starting invert" + footer) + dims = (int(dacycle['time.nlag']), + int(dacycle['da.optimizer.nmembers']), + int(dacycle.dasystem['nparameters']), + statevector.nobs) + + if not dacycle.dasystem.has_key('opt.algorithm'): + logging.info("There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)") + logging.info("...using serial algorithm as default...") + optimizer.set_algorithm('Serial') + elif dacycle.dasystem['opt.algorithm'] == 'serial': + logging.info("Using the serial minimum least squares algorithm to solve ENKF equations") + optimizer.set_algorithm('Serial') + elif dacycle.dasystem['opt.algorithm'] == 'bulk': + logging.info("Using the bulk minimum least squares algorithm to solve ENKF equations") + optimizer.set_algorithm('Bulk') + + optimizer.setup(dims) + optimizer.state_to_matrix(statevector) + + diagnostics_file = os.path.join(dacycle['dir.output'], 'optimizer.%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + + optimizer.write_diagnostics(diagnostics_file, 'prior') + optimizer.set_localization(dacycle['da.system.localization']) + + if optimizer.algorithm == 'Serial': + optimizer.serial_minimum_least_squares() + else: + optimizer.bulk_minimum_least_squares() + + optimizer.matrix_to_state(statevector) + optimizer.write_diagnostics(diagnostics_file, 'optimized') + + + +def advance(dacycle, samples, statevector, obsoperator): + """ Advance the filter state to the next step """ + + # This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance) + + # Then, restore model state from the start of the filter + logging.info(header + "starting advance" + footer) + logging.info("Sampling model will be run over 1 cycle") + + obsoperator.get_initial_data() + + sample_step(dacycle, samples, statevector, obsoperator, 0, True) + + dacycle.restart_filelist.extend(obsoperator.restart_filelist) + dacycle.output_filelist.extend(obsoperator.output_filelist) + logging.debug("Appended ObsOperator restart and output file lists to dacycle for collection ") + + dacycle.output_filelist.append(dacycle['ObsOperator.inputfile']) + logging.debug("Appended Observation filename to dacycle for collection ") + + outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp']) + samples.write_sample_auxiliary(outfile) + + +def save_and_submit(dacycle, statevector): + """ Save the model state and submit the next job """ + logging.info(header + "starting save_and_submit" + footer) + + filename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + statevector.write_to_file(filename, 'opt') + + dacycle.output_filelist.append(filename) + dacycle.finalize() + + + + -- GitLab