Skip to content
Snippets Groups Projects
Commit b28a71df authored by Joram's avatar Joram
Browse files

start of c13 under git version control

parent 92f57282
Branches
No related tags found
No related merge requests found
#!/usr/bin/env python
# expand_fluxes.py
import sys
sys.path.append('../../')
import os
from datetime import datetime, timedelta
import logging
import numpy as np
from da.tools.general import date2num, num2date
import da.ctc13.io4 as io
from da.analysis.tools_regions import globarea, state_to_grid
from da.tools.general import create_dirs
from da.analysis.tools_country import countryinfo # needed here
from da.analysis.tools_transcom import transcommask, ExtendedTCRegions
import da.analysis.tools_transcom as tc
import da.analysis.tools_country as ct
import da.analysis.tools_time as timetools
"""
Author: Wouter Peters (Wouter.Peters@noaa.gov)
Revision History:
File created on 21 Ocotber 2008.
"""
def proceed_dialog(txt, yes=['y', 'yes'], all=['a', 'all', 'yes-to-all']):
""" function to ask whether to proceed or not """
response = raw_input(txt)
if response.lower() in yes:
return 1
if response.lower() in all:
return 2
return 0
def save_weekly_avg_1x1_data(dacycle, statevector):
"""
Function creates a NetCDF file with output on 1x1 degree grid. It uses the flux data written by the
:class:`~da.baseclasses.obsoperator.ObsOperator.py`, and multiplies these with the mapped parameters and
variance (not covariance!) from the :class:`~da.baseclasses.statevector.StateVector`.
:param dacycle: a :class:`~da.tools.initexit.CycleControl` object
:param statevector: a :class:`~da.baseclasses.statevector.StateVector`
:rtype: None
"""
#
dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_flux1x1_weekly'))
#
# Some help variables
#
dectime0 = date2num(datetime(2000, 1, 1))
dt = dacycle['cyclelength']
startdate = dacycle['time.start']
enddate = dacycle['time.end']
nlag = statevector.nlag
logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M'))
logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M'))
#
# Create or open NetCDF output file
#
saveas = os.path.join(dirname, 'flux_1x1.%s.nc' % startdate.strftime('%Y-%m-%d'))
ncf = io.CT_CDF(saveas, 'write')
#
# Create dimensions and lat/lon grid
#
dimgrid = ncf.add_latlon_dim()
dimensemble = ncf.add_dim('members', statevector.nmembers)
dimdate = ncf.add_date_dim()
#
# set title and tell GMT that we are using "pixel registration"
#
setattr(ncf, 'Title', 'CarbonTracker fluxes')
setattr(ncf, 'node_offset', 1)
#
# skip dataset if already in file
#
ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0
skip = ncf.has_date(ncfdate)
if skip:
logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas))
else:
#
# if not, process this cycle. Start by getting flux input data from CTDAS
#
filename = os.path.join(dacycle['dir.output'], 'flux1x1_%s_%s.nc' % (startdate.strftime('%Y%m%d%H'), enddate.strftime('%Y%m%d%H')))
file = io.ct_read(filename, 'read')
bio = np.array(file.get_variable(dacycle.dasystem['background.co2.bio.flux']))
ocean = np.array(file.get_variable(dacycle.dasystem['background.co2.ocean.flux']))
fire = np.array(file.get_variable(dacycle.dasystem['background.co2.fires.flux']))
fossil = np.array(file.get_variable(dacycle.dasystem['background.co2.fossil.flux']))
gpp = np.array(file.get_variable(dacycle.dasystem['background.co2.gpp.flux']))
bio13 = np.array(file.get_variable(dacycle.dasystem['background.c13.bio.flux']))
ocean13 = np.array(file.get_variable(dacycle.dasystem['background.c13.ocean.flux']))
gpp13 = np.array(file.get_variable(dacycle.dasystem['background.c13.gpp.flux']))
#gpp13 = np.ma.masked_where(gpp13==0.,gpp13)
#gpp = np.ma.masked_where(gpp==0.,gpp)
gpp13[np.isnan(gpp13)]=0
gpp[np.isnan(gpp)]=0
epsbio = gpp13[:,:]/gpp[:,:] #((gpp13[:,:]/gpp[:,:]/0.011112)-1.)*1000.
logging.info('epsbio: %s, %s, %s' %(epsbio.shape, epsbio.sum(),epsbio[89,199]))
logging.info('gpp13: %s, %s, %s' %(gpp13.shape, gpp13.sum(),gpp13[89,199]))
logging.info('gpp: %s, %s, %s' %(gpp.shape, bio.sum(), gpp[89,199]))
#mapped_parameters = np.array(file.get_variable(dacycle.dasystem['final.param.mean.1x1']))
file.close()
next = ncf.inq_unlimlen()[0]
# Start adding datasets from here on, both prior and posterior datasets for bio and ocn
for prior in [True, False]:
#
# Now fill the statevector with the prior values for this time step. Note that the prior value for this time step
# occurred nlag time steps ago, so we make a shift in the output directory, but only if we are more than nlag cycle away from the start date..
#
if prior:
qual_short = 'prior'
for n in range(nlag, 0, -1):
priordate = enddate - timedelta(dt.days * n)
savedir = dacycle['dir.output'].replace(startdate.strftime('%Y%m%d'), priordate.strftime('%Y%m%d'))
filename = os.path.join(savedir, 'savestate_%s.nc' % priordate.strftime('%Y%m%d'))
if os.path.exists(filename):
statevector.read_from_file(filename, qual=qual_short)
gridmean, gridensemble = statevector.state_to_grid(lag=n)
gridmean_eps,gridensemble_eps = statevector.state_to_grid_eps(lag=n)
# Replace the mean statevector by all ones (assumed priors)
gridmean = statevector.vector2grid(vectordata=np.ones(statevector.nparams,))
gridmean_eps = statevector.vector2grid_eps(vectordata = np.ones(statevector.nparams,))
logging.debug('Read prior dataset from file %s, sds %d: ' % (filename, n))
break
else:
qual_short = 'opt'
savedir = dacycle['dir.output']
filename = os.path.join(savedir, 'savestate_%s.nc' % startdate.strftime('%Y%m%d'))
statevector.read_from_file(filename, qual=qual_short)
gridmean, gridensemble = statevector.state_to_grid(lag=1)
gridmean_eps,gridensemble_eps = statevector.state_to_grid_eps(lag=1)
logging.debug('Read posterior dataset from file %s, sds %d: ' % (filename, 1))
#
# if prior, do not multiply fluxes with parameters, otherwise do
#
print gridensemble.shape, bio.shape, gridmean.shape
biomapped = bio * gridmean
oceanmapped = ocean * gridmean
biovarmapped = bio * gridensemble
oceanvarmapped = ocean * gridensemble
epsbiomapped = epsbio #* gridmean_eps
epsbiovarmapped = epsbio #* gridensemble_eps
bio13mapped = bio13 * gridmean
bio13varmapped = bio13 * gridensemble
ocean13mapped = ocean13 * gridmean
oceanvarmapped = ocean13 * gridensemble
gpp13mapped = gpp13*gridmean_eps
gpp13varmapped = gpp13* gridensemble_eps
gppmapped = gpp #* gridmean
gppvarmapped = gpp #* gridensemble
#
#
# For each dataset, get the standard definitions from the module mysettings, add values, dimensions, and unlimited count, then write
#
savedict = ncf.standard_var(varname='bio_flux_' + qual_short)
savedict['values'] = biomapped.tolist()
savedict['dims'] = dimdate + dimgrid
savedict['count'] = next
ncf.add_data(savedict)
#
savedict = ncf.standard_var(varname='ocn_flux_' + qual_short)
savedict['values'] = oceanmapped.tolist()
savedict['dims'] = dimdate + dimgrid
savedict['count'] = next
ncf.add_data(savedict)
print biovarmapped.shape
savedict = ncf.standard_var(varname='bio_flux_%s_ensemble' % qual_short)
savedict['values'] = biovarmapped.tolist()
savedict['dims'] = dimdate + dimensemble + dimgrid
savedict['count'] = next
ncf.add_data(savedict)
#
savedict = ncf.standard_var(varname='ocn_flux_%s_ensemble' % qual_short)
savedict['values'] = oceanvarmapped.tolist()
savedict['dims'] = dimdate + dimensemble + dimgrid
savedict['count'] = next
ncf.add_data(savedict)
#savedict = ncf.standard_var(varname='epsbio_' + qual_short)
#savedict['values'] = epsbiomapped.tolist()
#savedict['dims'] = dimdate + dimgrid
#savedict['count'] = next
#ncf.add_data(savedict)
savedict = ncf.standard_var(varname='bio_c13_flux_' + qual_short)
savedict['values'] = bio13mapped.tolist()
savedict['dims'] = dimdate + dimgrid
savedict['count'] = next
ncf.add_data(savedict)
savedict = ncf.standard_var(varname='ocn_c13_flux_' + qual_short)
savedict['values'] = ocean13mapped.tolist()
savedict['dims'] = dimdate + dimgrid
savedict['count'] = next
ncf.add_data(savedict)
savedict = ncf.standard_var(varname='gpp_c13_flux_' + qual_short)
savedict['values'] = gpp13mapped.tolist()
savedict['dims'] = dimdate + dimgrid
savedict['count'] = next
ncf.add_data(savedict)
savedict = ncf.standard_var(varname='gpp_flux_' + qual_short)
savedict['values'] = gppmapped.tolist()
savedict['dims'] = dimdate + dimgrid
savedict['count'] = next
ncf.add_data(savedict)
# End prior/posterior block
savedict = ncf.standard_var(varname='fire_flux_imp')
savedict['values'] = fire.tolist()
savedict['dims'] = dimdate + dimgrid
savedict['count'] = next
ncf.add_data(savedict)
savedict = ncf.standard_var(varname='fossil_flux_imp')
savedict['values'] = fossil.tolist()
savedict['dims'] = dimdate + dimgrid
savedict['count'] = next
ncf.add_data(savedict)
area = globarea()
savedict = ncf.standard_var(varname='cell_area')
savedict['values'] = area.tolist()
savedict['dims'] = dimgrid
ncf.add_data(savedict)
#
savedict = ncf.standard_var(varname='date')
savedict['values'] = date2num(startdate) - dectime0 + dt.days / 2.0
savedict['dims'] = dimdate
savedict['count'] = next
ncf.add_data(savedict)
sys.stdout.write('.')
sys.stdout.flush()
#
# Done, close the new NetCDF file
#
ncf.close()
#
# Return the full name of the NetCDF file so it can be processed by the next routine
#
logging.info("Gridded weekly average fluxes now written")
return saveas
def save_weekly_avg_state_data(dacycle, statevector):
"""
Function creates a NetCDF file with output for all parameters. It uses the flux data written by the
:class:`~da.baseclasses.obsoperator.ObsOperator.py`, and multiplies these with the mapped parameters and
variance (not covariance!) from the :class:`~da.baseclasses.statevector.StateVector`.
:param dacycle: a :class:`~da.tools.initexit.CycleControl` object
:param statevector: a :class:`~da.baseclasses.statevector.StateVector`
:rtype: None
"""
import da.ctc13.io4 as io
dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_state_weekly'))
#
# Some help variables
#
dectime0 = date2num(datetime(2000, 1, 1))
dt = dacycle['cyclelength']
startdate = dacycle['time.start']
enddate = dacycle['time.end']
nlag = statevector.nlag
area = globarea()
vectorarea = statevector.grid2vector(griddata=area, method='sum')
logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M'))
logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M'))
#
# Create or open NetCDF output file
#
saveas = os.path.join(dirname, 'statefluxes.nc')
ncf = io.CT_CDF(saveas, 'write')
#
# Create dimensions and lat/lon grid
#
dimregs = ncf.add_dim('nparameters', statevector.nparams)
dimmembers = ncf.add_dim('nmembers', statevector.nmembers)
dimdate = ncf.add_date_dim()
#
# set title and tell GMT that we are using "pixel registration"
#
setattr(ncf, 'Title', 'CarbonTracker fluxes')
setattr(ncf, 'node_offset', 1)
#
# skip dataset if already in file
#
ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0
skip = ncf.has_date(ncfdate)
if skip:
logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas))
else:
next = ncf.inq_unlimlen()[0]
#
# if not, process this cycle. Start by getting flux input data from CTDAS
#
filename = os.path.join(dacycle['dir.output'], 'flux1x1_%s_%s.nc' % (startdate.strftime('%Y%m%d%H'), enddate.strftime('%Y%m%d%H')))
file = io.ct_read(filename, 'read')
bio = np.array(file.get_variable(dacycle.dasystem['background.co2.bio.flux']))
ocean = np.array(file.get_variable(dacycle.dasystem['background.co2.ocean.flux']))
fire = np.array(file.get_variable(dacycle.dasystem['background.co2.fires.flux']))
fossil = np.array(file.get_variable(dacycle.dasystem['background.co2.fossil.flux']))
#epsbio = np.array(file.get_variable(dacycle.dasystem['background.c13.epsbio']))
bio13 = np.array(file.get_variable(dacycle.dasystem['background.c13.bio.flux']))
ocean13 = np.array(file.get_variable(dacycle.dasystem['background.c13.ocean.flux']))
gpp13 = np.array(file.get_variable(dacycle.dasystem['background.c13.gpp.flux']))
gpp = np.array(file.get_variable(dacycle.dasystem['background.co2.gpp.flux']))
epsbio = np.ma.masked_where(gpp13==0.,gpp13)/np.ma.masked_where(gpp==0.,gpp) #(np.ma.masked_where(gpp13==0.,gpp13)/np.ma.masked_where(gpp==0.,gpp)/0.011112-1.)*1000.
#mapped_parameters = np.array(file.get_variable(dacycle.dasystem['final.param.mean.1x1']))
file.close()
next = ncf.inq_unlimlen()[0]
vectorbio = statevector.grid2vector(griddata=bio * area, method='sum')
vectorocn = statevector.grid2vector(griddata=ocean * area, method='sum')
vectorfire = statevector.grid2vector(griddata=fire * area, method='sum')
vectorfossil = statevector.grid2vector(griddata=fossil * area, method='sum')
#vectorepsbio = statevector.grid2vector_eps(griddata=epsbio*area,method='sum')
vectorbio13 = statevector.grid2vector(griddata=bio13 * area, method='sum')
vectorocn13 = statevector.grid2vector(griddata=ocean13 * area, method='sum')
vectorgpp13 = statevector.grid2vector_eps(griddata=gpp13 * area, method='sum')
vectorgpp = statevector.grid2vector_eps(griddata=gpp * area, method='sum')
vectorepsbio = vectorgpp13/vectorgpp #(vectorgpp13/vectorgpp/0.011112-1.)*1000.
# Start adding datasets from here on, both prior and posterior datasets for bio and ocn
for prior in [True, False]:
#
# Now fill the statevector with the prior values for this time step. Note that the prior value for this time step
# occurred nlag time steps ago, so we make a shift in the output directory, but only if we are more than nlag cycle away from the start date..
#
if prior:
qual_short = 'prior'
for n in range(nlag, 0, -1):
priordate = enddate - timedelta(dt.days * n)
savedir = dacycle['dir.output'].replace(startdate.strftime('%Y%m%d'), priordate.strftime('%Y%m%d'))
filename = os.path.join(savedir,'savestate_%s.nc' % priordate.strftime('%Y%m%d'))
if os.path.exists(filename):
statevector.read_from_file(filename, qual=qual_short)
# Replace the mean statevector by all ones (assumed priors)
statemean = np.ones((statevector.nparams,))
choicelag = n
logging.debug('Read prior dataset from file %s, lag %d: ' % (filename, choicelag))
break
else:
qual_short = 'opt'
savedir = dacycle['dir.output']
filename = os.path.join(savedir, 'savestate_%s.nc' % startdate.strftime('%Y%m%d'))
statevector.read_from_file(filename)
choicelag = 1
statemean = statevector.ensemble_members[choicelag - 1][0].param_values
logging.debug('Read posterior dataset from file %s, lag %d: ' % (filename, choicelag))
#
# if prior, do not multiply fluxes with parameters, otherwise do
#
data = statemean * vectorbio # units of mole region-1 s-1
savedict = ncf.standard_var(varname='bio_flux_%s' % qual_short)
savedict['values'] = data
savedict['dims'] = dimdate + dimregs
savedict['count'] = next
ncf.add_data(savedict)
#
# Here comes a special provision for the posterior flux covariances: these are calculated relative to the prior flux covariance to
# ensure they are indeed smaller due to the data assimilation. If they would be calculated relative to the mean posterior flux, the
# uncertainties would shift just because the mean flux had increased or decreased, which is not what we want.
#
# The implementation is done by multiplying the ensemble with the vectorbio only, and not with the statemean values
# which are assumed 1.0 in the prior always.
#
members = statevector.ensemble_members[choicelag - 1]
deviations = np.array([mem.param_values * vectorbio for mem in members])
deviations = deviations - deviations[0, :]
savedict = ncf.standard_var(varname='bio_flux_%s_ensemble' % qual_short)
savedict['values'] = deviations.tolist()
savedict['dims'] = dimdate + dimmembers + dimregs
savedict['comment'] = "This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance"
savedict['units'] = "mol region-1 s-1"
savedict['count'] = next
ncf.add_data(savedict)
savedict = ncf.standard_var('unknown')
savedict['name'] = 'bio_flux_%s_std' % qual_short
savedict['long_name'] = 'Biosphere flux standard deviation, %s' % qual_short
savedict['values'] = deviations.std(axis=0)
savedict['dims'] = dimdate + dimregs
savedict['comment'] = "This is the standard deviation on each parameter"
savedict['units'] = "mol region-1 s-1"
savedict['count'] = next
ncf.add_data(savedict)
data = statemean * vectorbio13 # units of mole region-1 s-1
savedict = ncf.standard_var(varname='bio_c13_flux_%s' % qual_short)
savedict['values'] = data
savedict['dims'] = dimdate + dimregs
savedict['count'] = next
ncf.add_data(savedict)
#
# Here comes a special provision for the posterior flux covariances: these are calculated relative to the prior flux covariance to
# ensure they are indeed smaller due to the data assimilation. If they would be calculated relative to the mean posterior flux, the
# uncertainties would shift just because the mean flux had increased or decreased, which is not what we want.
#
# The implementation is done by multiplying the ensemble with the vectorbio only, and not with the statemean values
# which are assumed 1.0 in the prior always.
#
members = statevector.ensemble_members[choicelag - 1]
deviations = np.array([mem.param_values * vectorbio13 for mem in members])
deviations = deviations - deviations[0, :]
savedict = ncf.standard_var(varname='bio_c13_flux_%s_ensemble' % qual_short)
savedict['values'] = deviations.tolist()
savedict['dims'] = dimdate + dimmembers + dimregs
savedict['comment'] = "This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance"
savedict['units'] = "mol region-1 s-1"
savedict['count'] = next
ncf.add_data(savedict)
savedict = ncf.standard_var('unknown')
savedict['name'] = 'bio_c13_flux_%s_std' % qual_short
savedict['long_name'] = 'Biosphere flux standard deviation, %s' % qual_short
savedict['values'] = deviations.std(axis=0)
savedict['dims'] = dimdate + dimregs
savedict['comment'] = "This is the standard deviation on each parameter"
savedict['units'] = "mol region-1 s-1"
savedict['count'] = next
ncf.add_data(savedict)
data = statemean * vectorocn # units of mole region-1 s-1
savedict = ncf.standard_var(varname='ocn_flux_%s' % qual_short)
savedict['values'] = data
savedict['dims'] = dimdate + dimregs
savedict['count'] = next
ncf.add_data(savedict)
#
# Here comes a special provision for the posterior flux covariances: these are calculated relative to the prior flux covariance to
# ensure they are indeed smaller due to the data assimilation. If they would be calculated relative to the mean posterior flux, the
# uncertainties would shift just because the mean flux had increased or decreased, which is not what we want.
#
# The implementation is done by multiplying the ensemble with the vectorocn only, and not with the statemean values
# which are assumed 1.0 in the prior always.
#
deviations = np.array([mem.param_values * vectorocn for mem in members])
deviations = deviations - deviations[0, :]
savedict = ncf.standard_var(varname='ocn_flux_%s_ensemble'%qual_short)
savedict['values']=deviations.tolist()
savedict['dims']=dimdate+dimmembers+dimregs
savedict['comment']="This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance"
savedict['units']="mol region-1 s-1"
savedict['count']=next
ncf.add_data(savedict)
savedict=ncf.standard_var('unknown')
savedict['name'] = 'ocn_flux_%s_std'%qual_short
savedict['long_name'] = 'Ocean flux standard deviation, %s'%qual_short
savedict['values']=deviations.std(axis=0)
savedict['dims']=dimdate+dimregs
savedict['comment']="This is the standard deviation on each parameter"
savedict['units']="mol region-1 s-1"
savedict['count']=next
ncf.add_data(savedict)
data = statemean * vectorocn13 # units of mole region-1 s-1
savedict = ncf.standard_var(varname='ocn_c13_flux_%s' % qual_short)
savedict['values'] = data
savedict['dims'] = dimdate + dimregs
savedict['count'] = next
ncf.add_data(savedict)
#
# Here comes a special provision for the posterior flux covariances: these are calculated relative to the prior flux covariance to
# ensure they are indeed smaller due to the data assimilation. If they would be calculated relative to the mean posterior flux, the
# uncertainties would shift just because the mean flux had increased or decreased, which is not what we want.
#
# The implementation is done by multiplying the ensemble with the vectorocn only, and not with the statemean values
# which are assumed 1.0 in the prior always.
#
deviations = np.array([mem.param_values * vectorocn13 for mem in members])
deviations = deviations - deviations[0, :]
savedict = ncf.standard_var(varname='ocn_c13_flux_%s_ensemble'%qual_short)
savedict['values']=deviations.tolist()
savedict['dims']=dimdate+dimmembers+dimregs
savedict['comment']="This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance"
savedict['units']="mol region-1 s-1"
savedict['count']=next
ncf.add_data(savedict)
savedict=ncf.standard_var('unknown')
savedict['name'] = 'ocn_c13_flux_%s_std'%qual_short
savedict['long_name'] = 'Ocean flux standard deviation, %s'%qual_short
savedict['values']=deviations.std(axis=0)
savedict['dims']=dimdate+dimregs
savedict['comment']="This is the standard deviation on each parameter"
savedict['units']="mol region-1 s-1"
savedict['count']=next
ncf.add_data(savedict)
#vectorepsbiow=vectorepsbio #/vectorbio
#print "vectorepsbiow",vectorepsbiow.shape,len(vectorepsbiow)
#msg = "vectorepsbiow %s,%s" %(vectorepsbiow.shape,len(vectorepsbiow)) ; logging.debug(msg)
#for i in range(len(vectorepsbiow)):
# if vectorepsbiow[i]!=vectorepsbiow[i]:
# vectorepsbiow[i]=0.
#data = statemean*vectorepsbiow # units of mole region-1 s-1
#print data,data.shape
#savedict=ncf.standard_var(varname='epsbio_%s'%qual_short)
#savedict['values']=data
#savedict['dims']=dimdate+dimregs
#savedict['count']=next
#ncf.add_data(savedict)
#deviations = np.array([mem.param_values*vectorepsbiow for mem in members])
#deviations = deviations-deviations[0,:]
#savedict=ncf.standard_var(varname='epsbio_%s_ensemble'%qual_short)
#savedict['values']=deviations.tolist()
#savedict['dims']=dimdate+dimmembers+dimregs
#savedict['comment']="This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance"
#savedict['units']="permil"
#savedict['count']=next
#ncf.add_data(savedict)
#savedict=ncf.standard_var('unknown')
#savedict['name'] = 'epsbio_%s_std'%qual_short
#savedict['long_name'] = 'epsilon standard deviation, %s'%qual_short
#savedict['values']=deviations.std(axis=0)
#savedict['dims']=dimdate+dimregs
#savedict['comment']="This is the standard deviation on each parameter"
#savedict['units']="permil"
#savedict['count']=next
#ncf.add_data(savedict)
print statemean.shape,vectorgpp13.shape
print statemean[:],vectorgpp13[:]
data = statemean * vectorgpp13 # units of mole region-1 s-1
savedict = ncf.standard_var(varname='gpp_c13_flux_%s' % qual_short)
savedict['values'] = data
savedict['dims'] = dimdate + dimregs
savedict['count'] = next
ncf.add_data(savedict)
data = statemean * vectorgpp # units of mole region-1 s-1
savedict = ncf.standard_var(varname='gpp_flux_%s' % qual_short)
savedict['values'] = data
savedict['dims'] = dimdate + dimregs
savedict['count'] = next
ncf.add_data(savedict)
data = vectorfire
savedict=ncf.standard_var(varname='fire_flux_imp')
savedict['values']=data
savedict['dims']=dimdate+dimregs
savedict['count']=next
ncf.add_data(savedict)
data = vectorfossil
savedict = ncf.standard_var(varname='fossil_flux_imp')
savedict['values'] = data
savedict['dims'] = dimdate + dimregs
savedict['count'] = next
ncf.add_data(savedict)
savedict = ncf.standard_var(varname='date')
savedict['values'] = ncfdate
savedict['dims'] = dimdate
savedict['count'] = next
ncf.add_data(savedict)
sys.stdout.write('.')
sys.stdout.flush()
#
# Done, close the new NetCDF file
#
ncf.close()
#
# Return the full name of the NetCDF file so it can be processed by the next routine
#
logging.info("Vector weekly average fluxes now written")
return saveas
def save_weekly_avg_tc_data(dacycle, statevector):
"""
Function creates a NetCDF file with output on TransCom regions. It uses the flux input from the
function `save_weekly_avg_1x1_data` to create fluxes of length `nparameters`, which are then projected
onto TC regions using the internal methods from :class:`~da.baseclasses.statevector.StateVector`.
:param dacycle: a :class:`~da.tools.initexit.CycleControl` object
:param statevector: a :class:`~da.baseclasses.statevector.StateVector`
:rtype: None
This function only read the prior fluxes from the flux_1x1.nc files created before, because we want to convolve
these with the parameters in the statevector. This creates posterior fluxes, and the posterior covariance for the complete
statevector in units of mol/box/s which we then turn into TC fluxes and covariances.
"""
import da.ctc13.io4 as io
#
dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_tc_weekly'))
#
# Some help variables
#
dectime0 = date2num(datetime(2000, 1, 1))
dt = dacycle['cyclelength']
startdate = dacycle['time.start']
enddate = dacycle['time.end']
ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0
logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M'))
logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M'))
# Write/Create NetCDF output file
#
saveas = os.path.join(dirname, 'tcfluxes.nc')
ncf = io.CT_CDF(saveas, 'write')
dimdate = ncf.add_date_dim()
dimidateformat = ncf.add_date_dim_format()
dimregs = ncf.add_region_dim(type='tc')
#
# set title and tell GMT that we are using "pixel registration"
#
setattr(ncf, 'Title', 'CarbonTracker TransCom fluxes')
setattr(ncf, 'node_offset', 1)
#
skip = ncf.has_date(ncfdate)
if skip:
logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas))
else:
# Get input data
area = globarea()
infile = os.path.join(dacycle['dir.analysis'], 'data_state_weekly', 'statefluxes.nc')
if not os.path.exists(infile):
logging.error("Needed input file (%s) does not exist yet, please create file first, returning..." % infile)
return None
ncf_in = io.ct_read(infile, 'read')
# Transform data one by one
# Get the date variable, and find index corresponding to the dacycle date
try:
dates = ncf_in.variables['date'][:]
except KeyError:
logging.error("The variable date cannot be found in the requested input file (%s) " % infile)
logging.error("Please make sure you create gridded fluxes before making TC fluxes ")
raise KeyError
try:
index = dates.tolist().index(ncfdate)
except ValueError:
logging.error("The requested cycle date is not yet available in file %s " % infile)
logging.error("Please make sure you create state based fluxes before making TC fluxes")
raise ValueError
# First add the date for this cycle to the file, this grows the unlimited dimension
savedict = ncf.standard_var(varname='date')
savedict['values'] = ncfdate
savedict['dims'] = dimdate
savedict['count'] = index
ncf.add_data(savedict)
# Now convert other variables that were inside the flux_1x1 file
vardict = ncf_in.variables
for vname, vprop in vardict.iteritems():
data = ncf_in.get_variable(vname)[index]
if vname in ['latitude','longitude', 'date', 'idate'] or 'std' in vname:
continue
elif 'ensemble' in vname:
tcdata = []
if 'gpp_' in vname:
for member in data:
tcdata.append( statevector.vector2tc_eps(vectordata=member) )
else:
for member in data:
tcdata.append( statevector.vector2tc(vectordata=member) )
tcdata = np.array(tcdata)
try:
cov = tcdata.transpose().dot(tcdata) / (statevector.nmembers - 1)
except:
cov = np.dot(tcdata.transpose(), tcdata) / (statevector.nmembers - 1) # Huygens fix
#print vname,cov.sum()
tcdata = cov
savedict = ncf.standard_var(varname=vname.replace('ensemble', 'cov'))
savedict['units'] = '[mol/region/s]**2'
savedict['dims'] = dimdate + dimregs + dimregs
else:
if 'gpp_' in vname:
tcdata = statevector.vector2tc_eps(vectordata=data) # vector to TC
else:
tcdata = statevector.vector2tc(vectordata=data) # vector to TC
savedict = ncf.standard_var(varname=vname)
savedict['dims'] = dimdate+dimregs
savedict['units'] = 'mol/region/s'
savedict['count'] = index
savedict['values'] = tcdata
ncf.add_data(savedict)
ncf_in.close()
ncf.close()
logging.info("TransCom weekly average fluxes now written")
return saveas
def save_weekly_avg_ext_tc_data(dacycle):
""" Function SaveTCDataExt saves surface flux data to NetCDF files for extended TransCom regions
*** Inputs ***
rundat : a RunInfo object
*** Outputs ***
NetCDF file containing n-hourly global surface fluxes per TransCom region
*** Example ***
./expand_savestate project=enkf_release sd=20000101 ed=20010101 """
import da.ctc13.io4 as io
#
#
dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_tc_weekly'))
#
# Some help variables
#
dectime0 = date2num(datetime(2000, 1, 1))
dt = dacycle['cyclelength']
startdate = dacycle['time.start']
enddate = dacycle['time.end']
ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0
logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M'))
logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M'))
# Write/Create NetCDF output file
#
saveas = os.path.join(dirname, 'tc_extfluxes.nc')
ncf = io.CT_CDF(saveas, 'write')
dimdate = ncf.add_date_dim()
dimidateformat = ncf.add_date_dim_format()
dimregs = ncf.add_region_dim(type='tc_ext')
#
# set title and tell GMT that we are using "pixel registration"
#
setattr(ncf, 'Title', 'CarbonTracker TransCom fluxes')
setattr(ncf, 'node_offset', 1)
#
skip = ncf.has_date(ncfdate)
if skip:
logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas))
else:
infile = os.path.join(dacycle['dir.analysis'], 'data_tc_weekly', 'tcfluxes.nc')
if not os.path.exists(infile):
logging.error("Needed input file (%s) does not exist yet, please create file first, returning..." % infile)
return None
ncf_in = io.ct_read(infile, 'read')
# Transform data one by one
# Get the date variable, and find index corresponding to the dacycle date
try:
dates = ncf_in.variables['date'][:]
except KeyError:
logging.error("The variable date cannot be found in the requested input file (%s) " % infile)
logging.error("Please make sure you create gridded fluxes before making extended TC fluxes")
raise KeyError
try:
index = dates.tolist().index(ncfdate)
except ValueError:
logging.error("The requested cycle date is not yet available in file %s " % infile)
logging.error("Please make sure you create state based fluxes before making extended TC fluxes ")
raise ValueError
# First add the date for this cycle to the file, this grows the unlimited dimension
savedict = ncf.standard_var(varname='date')
savedict['values'] = ncfdate
savedict['dims'] = dimdate
savedict['count'] = index
ncf.add_data(savedict)
# Now convert other variables that were inside the tcfluxes.nc file
vardict = ncf_in.variables
for vname, vprop in vardict.iteritems():
data = ncf_in.get_variable(vname)[index]
if vname == 'latitude': continue
elif vname == 'longitude': continue
elif vname == 'date': continue
elif vname == 'idate': continue
elif 'cov' in vname:
tcdata = ExtendedTCRegions(data, cov=True)
savedict = ncf.standard_var(varname=vname)
savedict['units'] = '[mol/region/s]**2'
savedict['dims'] = dimdate + dimregs + dimregs
else:
tcdata = ExtendedTCRegions(data, cov=False)
savedict = ncf.standard_var(varname=vname)
savedict['dims'] = dimdate + dimregs
savedict['units'] = 'mol/region/s'
savedict['count'] = index
savedict['values'] = tcdata
ncf.add_data(savedict)
ncf_in.close()
ncf.close()
logging.info("TransCom weekly average extended fluxes now written")
return saveas
def save_weekly_avg_agg_data(dacycle, region_aggregate='olson'):
"""
Function creates a NetCDF file with output on TransCom regions. It uses the flux input from the
function `save_weekly_avg_1x1_data` to create fluxes of length `nparameters`, which are then projected
onto TC regions using the internal methods from :class:`~da.baseclasses.statevector.StateVector`.
:param dacycle: a :class:`~da.tools.initexit.CycleControl` object
:param StateVector: a :class:`~da.baseclasses.statevector.StateVector`
:rtype: None
This function only read the prior fluxes from the flux_1x1.nc files created before, because we want to convolve
these with the parameters in the statevector. This creates posterior fluxes, and the posterior covariance for the complete
statevector in units of mol/box/s which we then turn into TC fluxes and covariances.
"""
import da.ctc13.io4 as io
#
dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_%s_weekly' % region_aggregate))
#
# Some help variables
#
dectime0 = date2num(datetime(2000, 1, 1))
dt = dacycle['cyclelength']
startdate = dacycle['time.start']
enddate = dacycle['time.end']
ncfdate = date2num(startdate) - dectime0 + dt.days / 2.0
logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M'))
logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M'))
logging.debug("Aggregating 1x1 fluxes to %s totals" % region_aggregate)
# Write/Create NetCDF output file
#
saveas = os.path.join(dirname, '%s_fluxes.%s.nc' % (region_aggregate, startdate.strftime('%Y-%m-%d')))
ncf = io.CT_CDF(saveas, 'write')
dimdate = ncf.add_date_dim()
dimidateformat = ncf.add_date_dim_format()
dimgrid = ncf.add_latlon_dim() # for mask
#
# Select regions to aggregate to
#
if region_aggregate == "olson":
regionmask = tc.olson240mask
dimname = 'olson'
dimregs = ncf.add_dim(dimname, regionmask.max())
regionnames = []
for i in range(11):
for j in range(19):
regionnames.append("%s_%s" % (tc.transnams[i], tc.olsonnams[j],))
regionnames.extend(tc.oifnams)
for i, name in enumerate(regionnames):
lab = 'Aggregate_Region_%03d' % (i + 1,)
setattr(ncf, lab, name)
elif region_aggregate == "transcom":
regionmask = tc.transcommask
dimname = 'tc'
dimregs = ncf.add_region_dim(type='tc')
elif region_aggregate == "country":
countrydict = ct.get_countrydict()
selected = ['Russia', 'Canada', 'China', 'United States', 'EU27', 'Brazil', 'Australia', 'India'] #,'G8','UNFCCC_annex1','UNFCCC_annex2']
regionmask = np.zeros((180, 360,), 'float')
for i, name in enumerate(selected):
lab = 'Country_%03d' % (i + 1,)
setattr(ncf, lab, name)
if name == 'EU27':
namelist = ct.EU27
elif name == 'EU25':
namelist = ct.EU25
elif name == 'G8':
namelist = ct.G8
elif name == 'UNFCCC_annex1':
namelist = ct.annex1
elif name == 'UNFCCC_annex2':
namelist = ct.annex2
else:
namelist = [name]
for countryname in namelist:
try:
country = countrydict[countryname]
regionmask.put(country.gridnr, i + 1)
except:
continue
dimname = 'country'
dimregs = ncf.add_dim(dimname, regionmask.max())
#
skip = ncf.has_date(ncfdate)
if skip:
logging.warning('Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'), saveas))
else:
#
# set title and tell GMT that we are using "pixel registration"
#
setattr(ncf, 'Title', 'CTDAS Aggregated fluxes')
setattr(ncf, 'node_offset', 1)
savedict = ncf.standard_var('unknown')
savedict['name'] = 'regionmask'
savedict['comment'] = 'numerical mask used to aggregate 1x1 flux fields, each integer 0,...,N is one region aggregated'
savedict['values'] = regionmask.tolist()
savedict['units'] = '-'
savedict['dims'] = dimgrid
savedict['count'] = 0
ncf.add_data(savedict)
# Get input data from 1x1 degree flux files
area = globarea()
infile = os.path.join(dacycle['dir.analysis'], 'data_flux1x1_weekly', 'flux_1x1.%s.nc' % startdate.strftime('%Y-%m-%d'))
if not os.path.exists(infile):
logging.error("Needed input file (%s) does not exist yet, please create file first, returning..." % infile)
return None
ncf_in = io.ct_read(infile, 'read')
# Transform data one by one
# Get the date variable, and find index corresponding to the dacycle date
try:
dates = ncf_in.variables['date'][:]
except KeyError:
logging.error("The variable date cannot be found in the requested input file (%s) " % infile)
logging.error("Please make sure you create gridded fluxes before making TC fluxes ")
raise KeyError
try:
index = dates.tolist().index(ncfdate)
except ValueError:
logging.error("The requested cycle date is not yet available in file %s " % infile)
logging.error("Please make sure you create state based fluxes before making TC fluxes ")
raise ValueError
# First add the date for this cycle to the file, this grows the unlimited dimension
savedict = ncf.standard_var(varname='date')
savedict['values'] = ncfdate
savedict['dims'] = dimdate
savedict['count'] = index
ncf.add_data(savedict)
# Now convert other variables that were inside the statevector file
vardict = ncf_in.variables
for vname, vprop in vardict.iteritems():
if vname == 'latitude': continue
elif vname == 'longitude': continue
elif vname == 'date': continue
elif vname == 'idate': continue
elif 'std' in vname: continue
elif 'ensemble' in vname:
data = ncf_in.get_variable(vname)[index]
dimensemble = ncf.add_dim('members', data.shape[0])
regiondata = []
for member in data:
aggdata = state_to_grid(member * area, regionmask, reverse=True, mapname=region_aggregate)
regiondata.append(aggdata)
regiondata = np.array(regiondata)
savedict = ncf.standard_var(varname=vname)
savedict['units'] = 'mol/region/s'
savedict['dims'] = dimdate + dimensemble + dimregs
elif 'flux' in vname:
data = ncf_in.get_variable(vname)[index]
regiondata = state_to_grid(data * area, regionmask, reverse=True, mapname=region_aggregate)
savedict = ncf.standard_var(varname=vname)
savedict['dims'] = dimdate + dimregs
savedict['units'] = 'mol/region/s'
else:
data = ncf_in.get_variable(vname)[:]
regiondata = state_to_grid(data, regionmask, reverse=True, mapname=region_aggregate)
savedict = ncf.standard_var(varname=vname)
savedict['dims'] = dimdate + dimregs
savedict['count'] = index
savedict['values'] = regiondata
ncf.add_data(savedict)
ncf_in.close()
ncf.close()
logging.info("%s aggregated weekly average fluxes now written" % dimname)
return saveas
def save_time_avg_data(dacycle, infile, avg='monthly'):
""" Function saves time mean surface flux data to NetCDF files
*** Inputs ***
rundat : a RunInfo object
*** Outputs ***
daily NetCDF file containing 1-hourly global surface fluxes at 1x1 degree
*** Example ***
./expand_savestate project=enkf_release sd=20000101 ed=20010101 """
import da.ctc13.io4 as io
if 'weekly' in infile:
intime = 'weekly'
if 'monthly' in infile:
intime = 'monthly'
if 'yearly' in infile:
intime = 'yearly'
dirname, filename = os.path.split(infile)
outdir = create_dirs(os.path.join(dacycle['dir.analysis'], dirname.replace(intime,avg)))
dectime0 = date2num(datetime(2000, 1, 1))
# Create NetCDF output file
#
saveas = os.path.join(outdir, filename)
ncf = io.CT_CDF(saveas, 'create')
dimdate = ncf.add_date_dim()
#
# Open input file specified from the command line
#
if not os.path.exists(infile):
logging.error("Needed input file (%s) not found. Please create this first:" % infile)
logging.error("returning...")
return None
else:
pass
file = io.ct_read(infile, 'read')
datasets = file.variables.keys()
date = file.get_variable('date')
globatts = file.ncattrs()
for att in globatts:
attval = file.getncattr(att)
if not att in ncf.ncattrs():
ncf.setncattr(att, attval)
time = [datetime(2000, 1, 1) + timedelta(days=d) for d in date]
# loop over datasets in infile, skip idate and date as we will make new time axis for the averaged data
for sds in ['date'] + datasets:
# get original data
data = file.get_variable(sds)
varatts = file.variables[sds].ncattrs()
vardims = file.variables[sds].dimensions
#
# Depending on dims of input dataset, create dims for output dataset. Note that we add the new dimdate now.
#
for d in vardims:
if 'date' in d:
continue
if d in ncf.dimensions.keys():
pass
else:
dim = ncf.createDimension(d, size=len(file.dimensions[d]))
savedict = ncf.standard_var(sds)
savedict['name'] = sds
savedict['dims'] = vardims
savedict['units'] = file.variables[sds].units
savedict['long_name'] = file.variables[sds].long_name
savedict['comment'] = file.variables[sds].comment
savedict['standard_name'] = file.variables[sds].standard_name
savedict['count'] = 0
if not 'date' in vardims:
savedict['values'] = data
ncf.add_data(savedict)
else:
if avg == 'monthly':
time_avg, data_avg = timetools.monthly_avg(time, data)
elif avg == 'seasonal':
time_avg, data_avg = timetools.season_avg(time, data)
elif avg == 'yearly':
time_avg, data_avg = timetools.yearly_avg(time, data)
elif avg == 'longterm':
time_avg, data_avg = timetools.longterm_avg(time, data)
time_avg = [time_avg]
data_avg = [data_avg]
else:
raise ValueError, 'Averaging (%s) does not exist' % avg
count = -1
for dd, data in zip(time_avg, data_avg):
count = count + 1
if sds == 'date':
savedict['values'] = date2num(dd) - dectime0
else:
savedict['values'] = data
savedict['count'] = count
ncf.add_data(savedict, silent=True)
sys.stdout.write('.')
sys.stdout.write('\n')
sys.stdout.flush()
# end NetCDF file access
file.close()
ncf.close()
logging.info("------------------- Finished time averaging---------------------------------")
return saveas
if __name__ == "__main__":
#from da.ctc13.initexit_offline import CycleControl
from da.tools.initexit import CycleControl
from da.ctc13.dasystem import CO2C13DaSystem
from da.ctc13.statevector_co2c13_coveps import CO2C13StateVector
#from da.ctc13.statevector_co2c13 import CO2C13StateVector
from da.analysis.time_avg_fluxes import time_avg
sys.path.append('../../')
logging.root.setLevel(logging.DEBUG)
dacycle = CycleControl(args={'rc':'../../ctdas-co2c13eps-inv-6x4-ei-sibasa-gfed4-weekly.rc'})
dacycle.setup()
dacycle.parse_times()
dasystem = CO2C13DaSystem('../rc/carbontracker_co2c13eps.rc')
dacycle.dasystem = dasystem
statevector = CO2C13StateVector()
#statevector = CO2StateVector()
statevector.setup(dacycle)
while dacycle['time.end'] < dacycle['time.finish']:
save_weekly_avg_1x1_data(dacycle, statevector)
save_weekly_avg_state_data(dacycle, statevector)
save_weekly_avg_tc_data(dacycle, statevector)
save_weekly_avg_ext_tc_data(dacycle)
save_weekly_avg_agg_data(dacycle, region_aggregate='olson')
save_weekly_avg_agg_data(dacycle, region_aggregate='transcom')
# save_weekly_avg_agg_data(dacycle, region_aggregate='country')
time_avg(dacycle,'flux1x1')
time_avg(dacycle,'transcom')
time_avg(dacycle,'olson')
#time_avg(dacycle,'country')
dacycle.advance_cycle_times()
statevector = None # free memory
sys.exit(0)
#!/usr/bin/env python
# expand_fluxes.py
import sys
import os
import getopt
import shutil
import logging
import netCDF4
import numpy as np
from string import join
from datetime import datetime, timedelta
sys.path.append('../../')
from da.tools.general import date2num, num2date
from da.tools.general import create_dirs
import da.tools.io4 as io
import matplotlib as mpl #zeus
mpl.use('Agg')
from pylab import date2num, num2date
"""
Author: Wouter Peters (Wouter.Peters@wur.nl)
Revision History:
File created on 11 May 2012.
"""
def write_mole_fractions(dacycle):
"""
Write Sample information to NetCDF files. These files are organized by site and
have an unlimited time axis to which data is appended each cycle.
The needed information is obtained from the sample_auxiliary.nc files and the original input data files from ObsPack.
The steps are:
(1) Create a directory to hold timeseries output files
(2) Read the sample_auxiliary.nc file for this cycle and get a list of original files they were obtained from
(3) For each file, copy the original data file from ObsPack (if not yet present)
(4) Open the copied file, find the index of each observation, fill in the simulated data
"""
dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_molefractions'))
#
# Some help variables
#
dectime0 = date2num(datetime(2000, 1, 1))
dt = dacycle['cyclelength']
startdate = dacycle['time.start']
enddate = dacycle['time.end']
logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M'))
logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M'))
dacycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H"),)
# Step (1): Get the posterior sample output data file for this cycle
infile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp'])
ncf_in = io.ct_read(infile,'read')
obs_num = ncf_in.get_variable('obs_num')
obs_val = ncf_in.get_variable('observed')
simulated = ncf_in.get_variable('modelsamples')
infilename= ncf_in.get_variable('inputfilename')
infiles = netCDF4.chartostring(infilename).tolist()
#infiles = [join(s.compressed(),'') for s in infilename]
ncf_in.close()
# Step (2): Get the prior sample output data file for this cycle
infile = os.path.join(dacycle['dir.output'],'optimizer.%s.nc'% startdate.strftime('%Y%m%d') )
if os.path.exists(infile):
optimized_present = True
else:
optimized_present = False
if optimized_present:
ncf_fc_in = io.ct_read(infile, 'read')
fc_obs_num = ncf_fc_in.get_variable('obspack_num')
fc_obs_val = ncf_fc_in.get_variable('observed')
fc_simulated = ncf_fc_in.get_variable('modelsamplesmean_prior')
fc_simulated_ens = ncf_fc_in.get_variable('modelsamplesdeviations_prior')
fc_flag = ncf_fc_in.get_variable('flag')
if not dacycle.dasystem.has_key('opt.algorithm'):
fc_r = ncf_fc_in.get_variable('modeldatamismatchvariance')
fc_hphtr = ncf_fc_in.get_variable('totalmolefractionvariance')
elif dacycle.dasystem['opt.algorithm'] == 'serial':
fc_r = ncf_fc_in.get_variable('modeldatamismatchvariance')
fc_hphtr = ncf_fc_in.get_variable('totalmolefractionvariance')
elif dacycle.dasystem['opt.algorithm'] == 'bulk':
fc_r = ncf_fc_in.get_variable('modeldatamismatchvariance').diagonal()
fc_hphtr = ncf_fc_in.get_variable('totalmolefractionvariance').diagonal()
filesitecode = ncf_fc_in.get_variable('sitecode')
fc_sitecodes = netCDF4.chartostring(filesitecode).tolist()
#fc_sitecodes = [join(s.compressed(),'') for s in filesitecode]
fc_sitecodes_co2 = [f for f in fc_sitecodes if f.find('co2_') == 0]
fc_sitecodes_co2c13 = [ff for ff in fc_sitecodes if ff.find('co2c13_') == 0]
ncf_fc_in.close()
# Expand the list of input files with those available from the forecast list
infiles_rootdir_co2 = [f for f in infiles if f.find('co2_') > 0]
infiles_rootdir_co2 = os.path.split(infiles_rootdir_co2[0])[0]
infiles_rootdir_co2c13 = [f for f in infiles if f.find('co2c13_') > 0]
infiles_rootdir_co2c13 = os.path.split(infiles_rootdir_co2c13[0])[0]
infiles.extend(os.path.join(infiles_rootdir_co2,f+'.nc') for f in fc_sitecodes_co2)
infiles.extend(os.path.join(infiles_rootdir_co2c13,f+'.nc') for f in fc_sitecodes_co2c13)
#Step (2): For each observation timeseries we now have data for, open it and fill with data
for orig_file in set(infiles):
#print orig_file
if not os.path.exists(orig_file):
msg = "The original input file (%s) could not be found, continuing to next file..."%orig_file ; logging.error(msg)
continue
if orig_file.endswith('co2_ljo_surface-flask_4_representative.nc') ==True or orig_file.endswith('co2_azr_surface-flask_1_representative.nc') ==True or orig_file.endswith('co2c13_wlg_surface-flask_7_representative.nc')==True:
continue
copy_file = os.path.join(dirname,os.path.split(orig_file)[-1])
if not os.path.exists(copy_file):
shutil.copy(orig_file, copy_file)
logging.debug("Copied a new original file (%s) to the analysis directory" % orig_file)
ncf_out = io.CT_CDF(copy_file,'write')
# Modify the attributes of the file to reflect added data from CTDAS properly
try:
host=os.environ['HOSTNAME']
except:
host='unknown'
ncf_out.Caution = '==================================================================================='
try:
ncf_out.History += '\nOriginal observation file modified by user %s on %s\n' % (os.environ['USER'], datetime.today().strftime('%F'),)
except:
ncf_out.History = '\nOriginal observation file modified by user %s on %s\n' % (os.environ['USER'], datetime.today().strftime('%F'),)
ncf_out.CTDAS_info = 'Simulated values added from a CTDAS run by %s on %s\n' % (os.environ['USER'], datetime.today().strftime('%F'),)\
+ '\nCTDAS was run on platform %s' % (host,)\
+ '\nCTDAS job directory was %s' % (dacycle['dir.da_run'],)\
+ '\nCTDAS Da System was %s' % (dacycle['da.system'],)\
+ '\nCTDAS Da ObsOperator was %s' % (dacycle['da.obsoperator'],)
ncf_out.CTDAS_startdate = dacycle['time.start'].strftime('%F')
ncf_out.CTDAS_enddate = dacycle['time.finish'].strftime("%F")
ncf_out.original_file = orig_file
# get nobs dimension
if ncf_out.dimensions.has_key('id'):
dimidob = ncf_out.dimensions['id']
dimid = ('id',)
elif ncf_out.dimensions.has_key('obs'):
dimidob = ncf_out.dimensions['obs']
dimid = ('obs',)
if dimidob.isunlimited:
nobs = ncf_out.inq_unlimlen()
else:
nobs = len(dimid)
# add nmembers dimension
dimmembersob = ncf_out.createDimension('nmembers',size=simulated.shape[1])
dimmembers = ('nmembers',)
nmembers = len(dimmembers)
# Create empty arrays for posterior samples, as well as for forecast sample statistics
savedict = io.std_savedict.copy()
savedict['name'] = "flag_forecast"
savedict['long_name'] = "flag_for_obs_model in forecast"
savedict['units'] = "None"
savedict['dims'] = dimid
savedict['comment'] = 'Flag (0/1/2/99) for observation value, 0 means okay, 1 means QC error, 2 means rejected, 99 means not sampled'
ncf_out.add_variable(savedict)
savedict = io.std_savedict.copy()
savedict['name'] = "modeldatamismatch"
savedict['long_name'] = "modeldatamismatch"
savedict['units'] = "[mol mol-1]^2"
savedict['dims'] = dimid
savedict['comment'] = 'Variance of mole fractions resulting from model-data mismatch'
ncf_out.add_variable(savedict)
savedict = io.std_savedict.copy()
savedict['name'] = "totalmolefractionvariance_forecast"
savedict['long_name'] = "totalmolefractionvariance of forecast"
savedict['units'] = "[mol mol-1]^2"
savedict['dims'] = dimid
savedict['comment'] = 'Variance of mole fractions resulting from prior state and model-data mismatch'
ncf_out.add_variable(savedict)
savedict = io.std_savedict.copy()
savedict['name'] = "modelsamplesmean"
savedict['long_name'] = "mean modelsamples"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid
savedict['comment'] = 'simulated mixing ratios based on optimized state vector'
ncf_out.add_variable(savedict)
savedict = io.std_savedict.copy()
savedict['name'] = "modelsamplesmean_forecast"
savedict['long_name'] = "mean modelsamples from forecast"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid
savedict['comment'] = 'simulated mixing ratios based on prior state vector'
ncf_out.add_variable(savedict)
savedict = io.std_savedict.copy()
savedict['name'] = "modelsamplesstandarddeviation"
savedict['long_name'] = "standard deviaton of modelsamples over all ensemble members"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid
savedict['comment'] = 'std dev of simulated mixing ratios based on optimized state vector'
ncf_out.add_variable(savedict)
savedict = io.std_savedict.copy()
savedict['name'] = "modelsamplesstandarddeviation_forecast"
savedict['long_name'] = "standard deviaton of modelsamples from forecast over all ensemble members"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid
savedict['comment'] = 'std dev of simulated mixing ratios based on prior state vector'
ncf_out.add_variable(savedict)
savedict = io.std_savedict.copy()
savedict['name'] = "modelsamplesensemble"
savedict['long_name'] = "modelsamples over all ensemble members"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid+dimmembers
savedict['comment'] = 'ensemble of simulated mixing ratios based on optimized state vector'
ncf_out.add_variable(savedict)
savedict = io.std_savedict.copy()
savedict['name'] = "modelsamplesensemble_forecast"
savedict['long_name'] = "modelsamples from forecast over all ensemble members"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid+dimmembers
savedict['comment'] = 'ensemble of simulated mixing ratios based on prior state vector'
ncf_out.add_variable(savedict)
else:
logging.debug("Modifying existing file (%s) in the analysis directory" % copy_file)
ncf_out = io.CT_CDF(copy_file,'write')
# Get existing file obs_nums to determine match to local obs_nums
if ncf_out.variables.has_key('id'):
file_obs_nums = ncf_out.get_variable('id')
elif ncf_out.variables.has_key('obspack_num'):
file_obs_nums = ncf_out.get_variable('obspack_num')
# Get all obs_nums related to this file, determine their indices in the local arrays
selected_obs_nums = [num for infile,num in zip(infiles,obs_num) if infile == orig_file]
# Optimized data 1st: For each index, get the data and add to the file in the proper file index location
for num in selected_obs_nums:
model_index = obs_num.tolist().index(num)
file_index = file_obs_nums.tolist().index(num)
#var = ncf_out.variables['modeldatamismatch'] # Take from optimizer.yyyymmdd.nc file instead
#var[file_index] = mdm[model_index]
var = ncf_out.variables['modelsamplesmean']
var[file_index] = simulated[model_index,0]
var = ncf_out.variables['modelsamplesstandarddeviation']
var[file_index] = simulated[model_index,1:].std()
var = ncf_out.variables['modelsamplesensemble']
var[file_index] = simulated[model_index,:]
# Now forecast data too: For each index, get the data and add to the file in the proper file index location
if optimized_present:
selected_fc_obs_nums = [num for sitecode,num in zip(fc_sitecodes,fc_obs_num) if sitecode in orig_file]
for num in selected_fc_obs_nums:
model_index = fc_obs_num.tolist().index(num)
file_index = file_obs_nums.tolist().index(num)
var = ncf_out.variables['modeldatamismatch']
var[file_index] = np.sqrt(fc_r[model_index])
var = ncf_out.variables['modelsamplesmean_forecast']
var[file_index] = fc_simulated[model_index]
var = ncf_out.variables['modelsamplesstandarddeviation_forecast']
var[file_index] = fc_simulated_ens[model_index,1:].std()
var = ncf_out.variables['modelsamplesensemble_forecast']
var[file_index] = fc_simulated_ens[model_index,:]
var = ncf_out.variables['totalmolefractionvariance_forecast']
var[file_index] = fc_hphtr[model_index]
var = ncf_out.variables['flag_forecast']
var[file_index] = fc_flag[model_index]
# close the file
status = ncf_out.close()
return None
if __name__ == "__main__":
import logging
from da.tools.initexit import CycleControl
from da.ctc13.dasystem import CO2C13DaSystem
sys.path.append('../../')
logging.root.setLevel(logging.DEBUG)
dacycle = CycleControl(args={'rc':'../../ctdas-co2c13eps-inv-6x4-ei-sibasa-gfed4-weekly.rc'})
dacycle.setup()
dacycle.parse_times()
dasystem = CO2C13DaSystem('../rc/carbontracker_co2c13eps.rc')
dacycle.dasystem = dasystem
while dacycle['time.start'] < dacycle['time.finish']:
outfile = write_mole_fractions(dacycle)
dacycle.advance_cycle_times()
sys.exit(0)
#!/usr/bin/env python
# pipeline.py
"""
.. module:: pipeline
.. moduleauthor:: Wouter Peters
Revision History:
File created on 06 Sep 2010.
The pipeline module holds methods that execute consecutive tasks with each of the objects of the DA system.
"""
import logging
import os
import sys
import datetime
import copy
header = '\n\n *************************************** '
footer = ' *************************************** \n '
def ensemble_smoother_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator, optimizer):
""" The main point of entry for the pipeline """
sys.path.append(os.getcwd())
logging.info(header + "Initializing current cycle" + footer)
start_job(dacycle, dasystem, platform, statevector, samples, obsoperator)
prepare_state(dacycle, statevector)
sample_state(dacycle, samples, statevector, obsoperator)
invert(dacycle, statevector, optimizer)
advance(dacycle, samples, statevector, obsoperator)
save_and_submit(dacycle, statevector)
logging.info("Cycle finished...exiting pipeline")
def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator):
""" The main point of entry for the pipeline """
sys.path.append(os.getcwd())
logging.info(header + "Initializing current cycle" + footer)
start_job(dacycle, dasystem, platform, statevector, samples, obsoperator)
if dacycle.has_key('forward.savestate.dir'):
fwddir = dacycle['forward.savestate.dir']
else:
logging.debug("No forward.savestate.dir key found in rc-file, proceeding with self-constructed prior parameters")
fwddir = False
if dacycle.has_key('forward.savestate.legacy'):
legacy = (dacycle['forward.savestate.legacy'].upper() in ["TRUE","T","YES","Y"])
else:
legacy = False
logging.debug("No forward.savestate.legacy key found in rc-file")
if not fwddir:
# Simply make a prior statevector using the normal method
prepare_state(dacycle, statevector)#LU tutaj zamiast tego raczej to stworzenie nowej kowariancji i ensembli bo pozostale rzeczy sa na gorze i na doel.
else:
# Read prior information from another simulation into the statevector.
# This loads the results from another assimilation experiment into the current statevector
if not legacy:
filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d'))
statevector.read_from_file(filename, 'prior')
else:
filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc')
statevector.read_from_legacy_file(filename, 'prior')
# We write this "prior" statevector to the restart directory, so we can later also populate it with the posterior statevector
# Note that we could achieve the same by just copying the wanted forward savestate.nc file to the restart folder of our current
# experiment, but then it would already contain a posterior field as well which we will try to write in save_and_submit.
# This could cause problems. Moreover, this method allows us to read older formatted savestate.nc files (legacy) and write them into
# the current format through the "write_to_file" method.
savefilename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d'))
statevector.write_to_file(savefilename, 'prior')
# Now read optimized fluxes which we will actually use to propagate through the system
if not fwddir:
# if there is no forward dir specified, we simply run forward with unoptimized prior fluxes in the statevector
logging.info("Running forward with prior savestate from: %s"%savefilename)
else:
# Read posterior information from another simulation into the statevector.
# This loads the results from another assimilation experiment into the current statevector
if not legacy:
statevector.read_from_file(filename, 'opt')
else:
statevector.read_from_legacy_file(filename, 'opt')
logging.info("Running forward with optimized savestate from: %s"%filename)
# Finally, we run forward with these parameters
advance(dacycle, samples, statevector, obsoperator)
# In save_and_submit, the posterior statevector will be added to the savestate.nc file, and it is added to the copy list.
# This way, we have both the prior and posterior data from another run copied into this assimilation, for later analysis.
save_and_submit(dacycle, statevector)
logging.info("Cycle finished...exiting pipeline")
####################################################################################################
def analysis_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator):
""" Main entry point for analysis of ctdas results """
#from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data
#from da.analysis.expand_molefractions import write_mole_fractions
#from da.ctc13.expand_fluxes_co2c13 import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data
from da.ctc13.expand_fluxes_co2c13eps import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data
from da.ctc13.expand_molefractions_co2c13 import write_mole_fractions
from da.analysis.summarize_obs import summarize_obs
from da.analysis.time_avg_fluxes import time_avg
logging.info(header + "Starting analysis" + footer)
try:
save_weekly_avg_1x1_data(dacycle, statevector)
save_weekly_avg_state_data(dacycle, statevector)
except:
logging.error("Error in analysis of weekly 1x1 fluxes and/or statevector, continuing analysis...")
try:
save_weekly_avg_tc_data(dacycle, statevector)
save_weekly_avg_ext_tc_data(dacycle)
except:
logging.error("Error in analysis of weekly tc fluxes, continuing analysis...")
try:
save_weekly_avg_agg_data(dacycle,region_aggregate='transcom')
save_weekly_avg_agg_data(dacycle,region_aggregate='olson')
save_weekly_avg_agg_data(dacycle,region_aggregate='country')
except:
logging.error("Error in analysis of weekly aggregated fluxes, continuing analysis...")
try:
time_avg(dacycle,'flux1x1')
time_avg(dacycle,'transcom')
time_avg(dacycle,'olson')
time_avg(dacycle,'country')
except:
logging.error("Error in analysis of time averaged fluxes, continuing analysis...")
try:
write_mole_fractions(dacycle)
#summarize_obs(dacycle)
except:
logging.error("Error in analysis of mole fractions, continuing analysis...")
def archive_pipeline(dacycle, platform, dasystem):
""" Main entry point for archiving of output from one disk/system to another """
if not dacycle.has_key('task.rsync'):
logging.info('rsync task not found, not starting automatic backup...')
return
else:
logging.info('rsync task found, starting automatic backup...')
for task in dacycle['task.rsync'].split():
sourcedirs = dacycle['task.rsync.%s.sourcedirs'%task]
destdir = dacycle['task.rsync.%s.destinationdir'%task]
rsyncflags = dacycle['task.rsync.%s.flags'%task]
# file ID and names
jobid = dacycle['time.end'].strftime('%Y%m%d')
targetdir = os.path.join(dacycle['dir.exec'])
jobfile = os.path.join(targetdir, 'jb.rsync.%s.%s.jb' % (task,jobid) )
logfile = os.path.join(targetdir, 'jb.rsync.%s.%s.log' % (task,jobid) )
# Template and commands for job
jobparams = {'jobname':"r.%s" % jobid, 'jobnodes': '1', 'jobtime': '1:00:00', 'joblog': logfile, 'errfile': logfile}
if platform.ID == 'cartesius':
jobparams['jobqueue'] = 'staging'
template = platform.get_job_template(jobparams)
for sourcedir in sourcedirs.split():
execcommand = "\nrsync %s %s %s\n" % (rsyncflags, sourcedir,destdir,)
template += execcommand
# write and submit
platform.write_job(jobfile, template, jobid)
jobid = platform.submit_job(jobfile, joblog=logfile)
def start_job(dacycle, dasystem, platform, statevector, samples, obsoperator):
""" Set up the job specific directory structure and create an expanded rc-file """
dasystem.validate()
dacycle.dasystem = dasystem
dacycle.daplatform = platform
dacycle.setup()
#statevector.dacycle = dacycle # also embed object in statevector so it can access cycle information for I/O etc
#samples.dacycle = dacycle # also embed object in samples object so it can access cycle information for I/O etc
#obsoperator.dacycle = dacycle # also embed object in obsoperator object so it can access cycle information for I/O etc
obsoperator.setup(dacycle) # Setup Observation Operator
statevector.setup(dacycle)
def prepare_state(dacycle, statevector):
""" Set up the input data for the forward model: obs and parameters/fluxes"""
# We now have an empty statevector object that we need to populate with data. If this is a continuation from a previous cycle, we can read
# the previous statevector values from a NetCDF file in the restart directory. If this is the first cycle, we need to populate the statevector
# with new values for each week. After we have constructed the statevector, it will be propagated by one cycle length so it is ready to be used
# in the current cycle
logging.info(header + "starting prepare_state" + footer)
if not dacycle['time.restart']:
# Fill each week from n=1 to n=nlag with a new ensemble
for n in range(statevector.nlag):
date = dacycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(dacycle['time.cycle']))
cov = statevector.get_covariance(date, dacycle)
statevector.make_new_ensemble(n, cov)
else:
# Read the statevector data from file
#saved_sv = os.path.join(dacycle['dir.restart.current'], 'savestate.nc')
saved_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['da.restart.tstamp'].strftime('%Y%m%d'))
statevector.read_from_file(saved_sv) # by default will read "opt"(imized) variables, and then propagate
# Now propagate the ensemble by one cycle to prepare for the current cycle
statevector.propagate(dacycle)
# Finally, also write the statevector to a file so that we can always access the a-priori information
current_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d'))
statevector.write_to_file(current_sv, 'prior') # write prior info
def sample_state(dacycle, samples, statevector, obsoperator):
""" Sample the filter state for the inversion """
# Before a forecast step, save all the data to a save/tmp directory so we can later recover it before the propagation step.
# This is especially important for:
# (i) The transport model restart data which holds the background mole fractions. This is needed to run the model one cycle forward
# (ii) The random numbers (or the seed for the random number generator) so we can recreate the ensembles if needed
#status = dacycle.MoveSaveData(io_option='store',save_option='partial',filter=[])
#msg = "All restart data have been copied to the save/tmp directory for future use" ; logging.debug(msg)
logging.info(header + "starting sample_state" + footer)
nlag = int(dacycle['time.nlag'])
logging.info("Sampling model will be run over %d cycles" % nlag)
obsoperator.get_initial_data()
for lag in range(nlag):
logging.info(header + ".....Ensemble Kalman Filter at lag %d" % (lag + 1))
############# Perform the actual sampling loop #####################
sample_step(dacycle, samples, statevector, obsoperator, lag)
logging.debug("statevector now carries %d samples" % statevector.nobs)
def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False):
""" Perform all actions needed to sample one cycle """
# First set up the information for time start and time end of this sample
dacycle.set_sample_times(lag)
startdate = dacycle['time.sample.start']
enddate = dacycle['time.sample.end']
dacycle['time.sample.window'] = lag
dacycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H"))
logging.info("New simulation interval set : ")
logging.info(" start date : %s " % startdate.strftime('%F %H:%M'))
logging.info(" end date : %s " % enddate.strftime('%F %H:%M'))
logging.info(" file stamp: %s " % dacycle['time.sample.stamp'])
# Implement something that writes the ensemble member parameter info to file, or manipulates them further into the
# type of info needed in your transport model
statevector.write_members_to_file(lag, dacycle['dir.input'])
samples.setup(dacycle)
samples.add_observations()
samples.add_observations_c13()
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
samples.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc'])
samples.add_model_data_mismatch_c13(dacycle.dasystem['obs.c13.sites.rc'])
sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
samples.write_sample_coords(sampling_coords_file)
# Write filename to dacycle, and to output collection list
dacycle['ObsOperator.inputfile'] = sampling_coords_file
# Run the observation operator
obsoperator.run_forecast_model()
# Read forecast model samples that were written to NetCDF files by each member. Add them to the exisiting
# Observation object for each sample loop. This data fill be written to file in the output folder for each sample cycle.
# We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates
# one file per member, some logic needs to be included to merge all files!!!
simulated_file = os.path.join(obsoperator.outputdir, 'flask_output.%s.nc' % dacycle['time.sample.stamp'])
samples.add_simulations(simulated_file)
samples.add_simulations_c13(simulated_file)
# Now write a small file that holds for each observation a number of values that we need in postprocessing
#filename = samples.write_sample_coords()
# Now add the observations that need to be assimilated to the statevector.
# Note that obs will only be added to the statevector if either this is the first step (restart=False), or lag==nlag
# This is to make sure that the first step of the system uses all observations available, while the subsequent
# steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only
# (and at least) once # in the assimilation
if not advance:
if dacycle['time.restart'] == False or lag == int(dacycle['time.nlag']) - 1:
statevector.obs_to_assimilate += (copy.deepcopy(samples),)
statevector.nobs += samples.getlength()
logging.debug("Added samples from the observation operator to the assimilated obs list in the statevector")
else:
statevector.obs_to_assimilate += (None,)
def invert(dacycle, statevector, optimizer):
""" Perform the inverse calculation """
logging.info(header + "starting invert" + footer)
dims = (int(dacycle['time.nlag']),
int(dacycle['da.optimizer.nmembers']),
int(dacycle.dasystem['nparameters']),
statevector.nobs)
if not dacycle.dasystem.has_key('opt.algorithm'):
logging.info("There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)")
logging.info("...using serial algorithm as default...")
optimizer.set_algorithm('Serial')
elif dacycle.dasystem['opt.algorithm'] == 'serial':
logging.info("Using the serial minimum least squares algorithm to solve ENKF equations")
optimizer.set_algorithm('Serial')
elif dacycle.dasystem['opt.algorithm'] == 'bulk':
logging.info("Using the bulk minimum least squares algorithm to solve ENKF equations")
optimizer.set_algorithm('Bulk')
optimizer.setup(dims)
optimizer.state_to_matrix(statevector)
diagnostics_file = os.path.join(dacycle['dir.output'], 'optimizer.%s.nc' % dacycle['time.start'].strftime('%Y%m%d'))
optimizer.write_diagnostics(diagnostics_file, 'prior')
optimizer.set_localization(dacycle['da.system.localization'])
if optimizer.algorithm == 'Serial':
optimizer.serial_minimum_least_squares()
else:
optimizer.bulk_minimum_least_squares()
optimizer.matrix_to_state(statevector)
optimizer.write_diagnostics(diagnostics_file, 'optimized')
def advance(dacycle, samples, statevector, obsoperator):
""" Advance the filter state to the next step """
# This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)
# Then, restore model state from the start of the filter
logging.info(header + "starting advance" + footer)
logging.info("Sampling model will be run over 1 cycle")
obsoperator.get_initial_data()
sample_step(dacycle, samples, statevector, obsoperator, 0, True)
dacycle.restart_filelist.extend(obsoperator.restart_filelist)
dacycle.output_filelist.extend(obsoperator.output_filelist)
logging.debug("Appended ObsOperator restart and output file lists to dacycle for collection ")
dacycle.output_filelist.append(dacycle['ObsOperator.inputfile'])
logging.debug("Appended Observation filename to dacycle for collection ")
outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp'])
samples.write_sample_auxiliary(outfile)
def save_and_submit(dacycle, statevector):
""" Save the model state and submit the next job """
logging.info(header + "starting save_and_submit" + footer)
filename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d'))
statevector.write_to_file(filename, 'opt')
dacycle.output_filelist.append(filename)
dacycle.finalize()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment