From 20d22bd7eaabfd3939c677353a781acafd7da149 Mon Sep 17 00:00:00 2001 From: Wouter Peters <wouter.peters@wur.nl> Date: Thu, 26 Jan 2012 10:11:42 +0000 Subject: [PATCH] latest version uses new StateVector structure and analysis suite --- da/analysis/expand_fluxes.py | 383 ++++++++++++++++++++++++++++------ da/baseclasses/optimizer.py | 2 + da/baseclasses/statevector.py | 175 ++++++++++------ da/ctgridded/statevector.py | 7 +- da/examples/das.py | 7 +- da/examples/dasgridded.py | 6 +- da/tools/pipeline.py | 23 +- da/tools/standardvariables.py | 44 +++- 8 files changed, 504 insertions(+), 143 deletions(-) diff --git a/da/analysis/expand_fluxes.py b/da/analysis/expand_fluxes.py index 0c4fab1e..b00c0981 100755 --- a/da/analysis/expand_fluxes.py +++ b/da/analysis/expand_fluxes.py @@ -4,10 +4,10 @@ import sys sys.path.append('../../') import os import getopt -from pylab import array, arange, transpose, date2num -from datetime import datetime +from datetime import datetime, timedelta from da.tools.general import CreateDirs -from da.tools.io4 import CT_Read, CT_CDF, std_savedict, GetVariable +import numpy as np +from pylab import date2num, num2date """ Author: Wouter Peters (Wouter.Peters@noaa.gov) @@ -26,17 +26,16 @@ def proceed_dialog(txt, yes=['y','yes'], all=['a','all', 'yes-to-all']): return 2 return 0 -def SaveWeeklyAvg1x1Data(DaCycle): - """ Function SaveFlux1x1Data saves 3-hourly surface flux data to NetCDF files +def SaveWeeklyAvg1x1Data(DaCycle, StateVector): + """ + Function creates a NetCDF file with output on 1x1 degree grid. It uses the flux data written by the + :class:`~da.baseclasses.obsoperator.ObsOperator.py`, and multiplies these with the mapped parameters and + variance (not covariance!) from the :class:`~da.baseclasses.statevector.StateVector`. - *** Inputs *** - rundat : a RunInfo object - - *** Outputs *** - daily NetCDF file containing 1-hourly global surface fluxes at 1x1 degree - - *** Example *** - ./expand_savestate project=enkf_release sd=20000101 ed=20010101 """ + :param DaCycle: a :class:`~da.tools.initexit.CycleControl` object + :param StateVector: a :class:`~da.baseclasses.statevector.StateVector` + :rtype: None + """ import da.tools.io4 as io import logging @@ -51,9 +50,10 @@ def SaveWeeklyAvg1x1Data(DaCycle): dt = DaCycle['cyclelength'] startdate = DaCycle['time.start'] enddate = DaCycle['time.end'] + nlag = StateVector.nlag - msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) - msg = "DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) + msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg) + msg = "DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg) # # Create or open NetCDF output file @@ -81,34 +81,58 @@ def SaveWeeklyAvg1x1Data(DaCycle): else: # -# if not, process this cycle. Start by getting input data from CTDAS +# if not, process this cycle. Start by getting flux input data from CTDAS # filename = os.path.join(DaCycle['dir.output'],'flux1x1_%s_%s.nc'%(startdate.strftime('%Y%m%d%H'),enddate.strftime('%Y%m%d%H'),) ) file = io.CT_Read(filename,'read') - bio = array(file.GetVariable(DaCycle.DaSystem['background.co2.bio.flux'])) - ocean = array(file.GetVariable(DaCycle.DaSystem['background.co2.ocean.flux'])) - fire = array(file.GetVariable(DaCycle.DaSystem['background.co2.fires.flux'])) - fossil = array(file.GetVariable(DaCycle.DaSystem['background.co2.fossil.flux'])) - mapped_parameters = array(file.GetVariable(DaCycle.DaSystem['final.param.mean.1x1'])) + bio = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.bio.flux'])) + ocean = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.ocean.flux'])) + fire = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.fires.flux'])) + fossil = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.fossil.flux'])) + #mapped_parameters = np.array(file.GetVariable(DaCycle.DaSystem['final.param.mean.1x1'])) file.close() next=ncf.inq_unlimlen()[0] + # Start adding datasets from here on, both prior and posterior datasets for bio and ocn - for prior in [False, True]: + for prior in [True, False]: # -# if prior, do not multiply fluxes with parameters, otherwise do +# Now fill the StateVector with the prior values for this time step. Note that the prior value for this time step +# occurred nlag time steps ago, so we make a shift in the output directory, but only if we are more than nlag cycle away from the start date.. # + if prior: qual_short='prior' - biomapped=bio - oceanmapped=ocean + for n in range(nlag,0,-1): + priordate = enddate - timedelta(dt.days*n) + savedir = DaCycle['dir.output'].replace(startdate.strftime('%Y%m%d'),priordate.strftime('%Y%m%d') ) + filename = os.path.join(savedir,'savestate.nc') + if os.path.exists(filename): + dummy = StateVector.ReadFromFile(filename) + gridmean,gridvariance = StateVector.StateToGrid(lag=n) + + msg = 'Read prior dataset from file %s, sds %d: '%(filename,n) ; logging.debug(msg) + break else: qual_short='opt' - biomapped=bio*mapped_parameters - oceanmapped=ocean*mapped_parameters + savedir = DaCycle['dir.output'] + filename = os.path.join(savedir,'savestate.nc') + dummy = StateVector.ReadFromFile(filename) + gridmean,gridvariance = StateVector.StateToGrid(lag=nlag) + + msg = 'Read posterior dataset from file %s, sds %d: '%(filename,nlag) ; logging.debug(msg) +# +# if prior, do not multiply fluxes with parameters, otherwise do +# + biomapped=bio*gridmean + oceanmapped=ocean*gridmean + biovarmapped=bio*gridvariance + oceanvarmapped=ocean*gridvariance + +# # # For each dataset, get the standard definitions from the module mysettings, add values, dimensions, and unlimited count, then write # @@ -116,16 +140,28 @@ def SaveWeeklyAvg1x1Data(DaCycle): savedict['values']=biomapped.tolist() savedict['dims']=dimdate+dimgrid savedict['count']=next - savedict['nsets']=1 ncf.AddData(savedict) # savedict=ncf.StandardVar(varname='ocn_flux_'+qual_short) - savedict['name']='ocn_flux_'+qual_short savedict['values']=oceanmapped.tolist() savedict['dims']=dimdate+dimgrid savedict['count']=next ncf.AddData(savedict) + + savedict=ncf.StandardVar(varname='bio_flux_%s_cov'%qual_short) + savedict['values']=biovarmapped.tolist() + savedict['dims']=dimdate+dimgrid + savedict['count']=next + ncf.AddData(savedict) # + savedict=ncf.StandardVar(varname='ocn_flux_%s_cov'%qual_short) + savedict['values']=oceanvarmapped.tolist() + savedict['dims']=dimdate+dimgrid + savedict['count']=next + ncf.AddData(savedict) + + # End prior/posterior block + savedict=ncf.StandardVar(varname='fire_flux_imp') savedict['values']=fire.tolist() savedict['dims']=dimdate+dimgrid @@ -157,16 +193,231 @@ def SaveWeeklyAvg1x1Data(DaCycle): return saveas -def SaveWeeklyAvgTCData(DaCycle): - """ Function SaveEcoData saves weekly surface flux data to NetCDF files +def SaveWeeklyAvgStateData(DaCycle, StateVector): + """ + Function creates a NetCDF file with output for all parameters. It uses the flux data written by the + :class:`~da.baseclasses.obsoperator.ObsOperator.py`, and multiplies these with the mapped parameters and + variance (not covariance!) from the :class:`~da.baseclasses.statevector.StateVector`. - *** Inputs *** - DaCycle : a DaCycle object + :param DaCycle: a :class:`~da.tools.initexit.CycleControl` object + :param StateVector: a :class:`~da.baseclasses.statevector.StateVector` + :rtype: None + """ + + import da.tools.io4 as io + import logging + from da.analysis.tools_regions import globarea + +# + dirname = 'data_state_weekly' +# + dirname = CreateDirs(os.path.join(DaCycle['dir.analysis'],dirname)) +# +# Some help variables +# + dectime0 = date2num(datetime(2000,1,1)) + dt = DaCycle['cyclelength'] + startdate = DaCycle['time.start'] + enddate = DaCycle['time.end'] + nlag = StateVector.nlag + + area = globarea() + vectorarea = StateVector.GridToVector(griddata=area,method='sum') + + msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg) + msg = "DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg) + +# +# Create or open NetCDF output file +# + saveas = os.path.join(dirname,'statefluxes.nc') + ncf = io.CT_CDF(saveas,'write') + +# +# Create dimensions and lat/lon grid +# + dimregs = ncf.AddDim('nparameters',StateVector.nparams) + dimmembers = ncf.AddDim('nmembers',StateVector.nmembers) + dimdate = ncf.AddDateDim() +# +# set title and tell GMT that we are using "pixel registration" +# + setattr(ncf,'Title','CarbonTracker fluxes') + setattr(ncf,'node_offset',1) +# +# skip dataset if already in file +# + ncfdate = date2num(startdate)-dectime0+dt.days/2.0 + skip = ncf.has_date(ncfdate) + if skip: + msg = 'Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'),saveas) ; logging.warning(msg) + else: + + next=ncf.inq_unlimlen()[0] + +# +# if not, process this cycle. Start by getting flux input data from CTDAS +# + filename = os.path.join(DaCycle['dir.output'],'flux1x1_%s_%s.nc'%(startdate.strftime('%Y%m%d%H'),enddate.strftime('%Y%m%d%H'),) ) + + file = io.CT_Read(filename,'read') + bio = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.bio.flux'])) + ocean = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.ocean.flux'])) + fire = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.fires.flux'])) + fossil = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.fossil.flux'])) + #mapped_parameters = np.array(file.GetVariable(DaCycle.DaSystem['final.param.mean.1x1'])) + file.close() + + next=ncf.inq_unlimlen()[0] + + vectorbio = StateVector.GridToVector(griddata=bio*area,method='sum') + vectorocn = StateVector.GridToVector(griddata=ocean*area,method='sum') + vectorfire = StateVector.GridToVector(griddata=fire*area,method='sum') + vectorfossil = StateVector.GridToVector(griddata=fossil*area,method='sum') + + +# Start adding datasets from here on, both prior and posterior datasets for bio and ocn + + for prior in [True, False]: +# +# Now fill the StateVector with the prior values for this time step. Note that the prior value for this time step +# occurred nlag time steps ago, so we make a shift in the output directory, but only if we are more than nlag cycle away from the start date.. +# + + if prior: + qual_short='prior' + for n in range(nlag,0,-1): + priordate = enddate - timedelta(dt.days*n) + savedir = DaCycle['dir.output'].replace(startdate.strftime('%Y%m%d'),priordate.strftime('%Y%m%d') ) + filename = os.path.join(savedir,'savestate.nc') + if os.path.exists(filename): + dummy = StateVector.ReadFromFile(filename) + choicelag = n + + msg = 'Read prior dataset from file %s, sds %d: '%(filename,n) ; logging.debug(msg) + break + else: + qual_short='opt' + savedir = DaCycle['dir.output'] + filename = os.path.join(savedir,'savestate.nc') + dummy = StateVector.ReadFromFile(filename) + choicelag = nlag + + msg = 'Read posterior dataset from file %s, sds %d: '%(filename,nlag) ; logging.debug(msg) +# +# if prior, do not multiply fluxes with parameters, otherwise do +# + data = StateVector.EnsembleMembers[choicelag-1][0].ParameterValues*vectorbio # units of mole region-1 s-1 + + savedict = ncf.StandardVar(varname='bio_flux_%s'%qual_short) + savedict['values'] = data + savedict['dims'] = dimdate+dimregs + savedict['count'] = next + ncf.AddData(savedict) + + members = StateVector.EnsembleMembers[choicelag-1] + deviations = np.array([mem.ParameterValues*data for mem in members]) + + savedict=ncf.StandardVar(varname='bio_flux_%s_cov'%qual_short) + + savedict['values']=deviations.tolist() + savedict['dims']=dimdate+dimmembers+dimregs + savedict['comment']="This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance" + savedict['units']="mol region-1 s-1" + savedict['count']=next + ncf.AddData(savedict) + + savedict=ncf.StandardVar('unknown') + savedict['name'] = 'bio_flux_%s_std'%qual_short + savedict['long_name'] = 'Biosphere flux standard deviation, %s'%qual_short + savedict['values']=deviations.std(axis=0) + savedict['dims']=dimdate+dimregs + savedict['comment']="This is the standard deviation on each parameter" + savedict['units']="mol region-1 s-1" + savedict['count']=next + ncf.AddData(savedict) + + data = StateVector.EnsembleMembers[choicelag-1][0].ParameterValues*vectorocn # units of mole region-1 s-1 + + savedict=ncf.StandardVar(varname='ocn_flux_%s'%qual_short) + savedict['values']=data + savedict['dims']=dimdate+dimregs + savedict['count']=next + ncf.AddData(savedict) + + deviations = np.array([mem.ParameterValues*data for mem in members]) + + savedict=ncf.StandardVar(varname='ocn_flux_%s_cov'%qual_short) + savedict['values']=deviations.tolist() + savedict['dims']=dimdate+dimmembers+dimregs + savedict['comment']="This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance" + savedict['units']="mol region-1 s-1" + savedict['count']=next + ncf.AddData(savedict) + + savedict=ncf.StandardVar('unknown') + savedict['name'] = 'ocn_flux_%s_std'%qual_short + savedict['long_name'] = 'Ocean flux standard deviation, %s'%qual_short + savedict['values']=deviations.std(axis=0) + savedict['dims']=dimdate+dimregs + savedict['comment']="This is the standard deviation on each parameter" + savedict['units']="mol region-1 s-1" + savedict['count']=next + ncf.AddData(savedict) + + data = vectorfire + + savedict=ncf.StandardVar(varname='fire_flux_imp') + savedict['values']=data + savedict['dims']=dimdate+dimregs + savedict['count']=next + ncf.AddData(savedict) + + data = vectorfossil + + savedict=ncf.StandardVar(varname='fossil_flux_imp') + savedict['values']=data + savedict['dims']=dimdate+dimregs + savedict['count']=next + ncf.AddData(savedict) + + savedict=ncf.StandardVar(varname='date') + savedict['values'] = ncfdate + savedict['dims'] = dimdate + savedict['count'] = next + ncf.AddData(savedict) + + sys.stdout.write('.') + sys.stdout.flush() +# +# Done, close the new NetCDF file +# + ncf.close() +# +# Return the full name of the NetCDF file so it can be processed by the next routine +# + msg="Vector weekly average fluxes now written" ; logging.info(msg) + + return saveas + + +def SaveWeeklyAvgTCData(DaCycle, StateVector): + """ + Function creates a NetCDF file with output on TransCom regions. It uses the flux input from the + function `SaveWeeklyAvg1x1Data` to create fluxes of length `nparameters`, which are then projected + onto TC regions using the internal methods from :class:`~da.baseclasses.statevector.StateVector`. + + :param DaCycle: a :class:`~da.tools.initexit.CycleControl` object + :param StateVector: a :class:`~da.baseclasses.statevector.StateVector` + :rtype: None + + This function only read the prior fluxes from the flux_1x1.nc files created before, because we want to convolve + these with the parameters in the StateVector. This creates posterior fluxes, and the posterior covariance for the complete + StateVector in units of mol/box/s which we then turn into TC fluxes and covariances. """ from da.analysis.tools_regions import globarea, StateToGrid from da.analysis.tools_transcom import StateToTranscom, StateCovToTranscom, transcommask import da.tools.io4 as io - from numpy import dot import logging # dirname = 'data_tc_weekly' @@ -181,8 +432,8 @@ def SaveWeeklyAvgTCData(DaCycle): enddate = DaCycle['time.end'] ncfdate = date2num(startdate)-dectime0+dt.days/2.0 - msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) - msg = "DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg) + msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg) + msg = "DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg) # Write/Create NetCDF output file # @@ -207,9 +458,9 @@ def SaveWeeklyAvgTCData(DaCycle): area=globarea() - infile=os.path.join(DaCycle['dir.analysis'],'data_flux1x1_weekly','flux_1x1.nc') + infile=os.path.join(DaCycle['dir.analysis'],'data_state_weekly','statefluxes.nc') if not os.path.exists(infile): - msg="Needed input file (%s) does not exist yet, please create weekly eco flux files first, returning..."%infile ; logging.error(msg) + msg="Needed input file (%s) does not exist yet, please create file first, returning..."%infile ; logging.error(msg) return None ncf_in = io.CT_Read(infile,'read') @@ -228,8 +479,8 @@ def SaveWeeklyAvgTCData(DaCycle): try: index = dates.tolist().index(ncfdate) except ValueError: - msg = "The requested cycle date is not yet available as gridded flux in file %s "% infile ; logging.error(msg) - msg = "Please make sure you create gridded fluxes before making TC fluxes " ; logging.error(msg) + msg = "The requested cycle date is not yet available in file %s "% infile ; logging.error(msg) + msg = "Please make sure you create state based fluxes before making TC fluxes " ; logging.error(msg) raise ValueError # First add the date for this cycle to the file, this grows the unlimited dimension @@ -247,28 +498,30 @@ def SaveWeeklyAvgTCData(DaCycle): data = ncf_in.GetVariable(vname)[index] + savedict = ncf.StandardVar(varname=vname) + if vname=='latitude': continue elif vname=='longitude': continue + elif vname=='date': continue + elif vname=='idate': continue + elif 'cov' in vname: + + cov = data.transpose().dot(data) + tcdata = StateVector.VectorToTC(vectordata=cov,cov=True) # vector to TC + + savedict['units'] = '[mol/region/s]**2' + savedict['dims'] = dimdate+dimregs+dimregs + + else: - if vname not in ['date','idate']: - - if 'cov' in vname: - alldata=[] - dd=StateCovToGrid(data*area,transcommask,reverse=True) - data=array(dd) - dims=dimdate+dimregs+dimregs - else: - dd=StateToGrid(data*area,transcommask,reverse=True) - data=array(dd) - dims=dimdate+dimregs + tcdata = StateVector.VectorToTC(vectordata=data) # vector to TC - savedict = ncf.StandardVar(varname=vname) - savedict['values'] = data.tolist() - savedict['dims'] = dims + savedict['dims'] = dimdate+dimregs savedict['units'] = 'mol/region/s' - savedict['count'] = index - ncf.AddData(savedict) - + + savedict['values'] = tcdata + ncf.AddData(savedict) + ncf_in.close() ncf.close() @@ -548,20 +801,25 @@ if __name__ == "__main__": import logging from da.tools.initexit import CycleControl from da.ct.dasystem import CtDaSystem + from da.ctgridded.statevector import CtGriddedStateVector sys.path.append('../../') - logging.root.setLevel(logging.INFO) + logging.root.setLevel(logging.DEBUG) - DaCycle = CycleControl(args={'rc':'../../da.rc'}) + DaCycle = CycleControl(args={'rc':'../../dagridded.rc'}) DaCycle.Initialize() DaCycle.ParseTimes() - DaSystem = CtDaSystem('../rc/carbontracker.rc') + DaSystem = CtDaSystem('../rc/carbontrackergridded.rc') DaSystem.Initialize() DaCycle.DaSystem = DaSystem + StateVector = CtGriddedStateVector(DaCycle) + dummy = StateVector.Initialize() + + no, yes, all = range(3) proceed = no @@ -572,8 +830,9 @@ if __name__ == "__main__": if proceed != no: while DaCycle['time.start'] < DaCycle['time.finish']: - savedas=SaveWeeklyAvg1x1Data(DaCycle) - savedas=SaveWeeklyAvgTCData(DaCycle) + savedas=SaveWeeklyAvg1x1Data(DaCycle, StateVector) + savedas=SaveWeeklyAvgStateData(DaCycle, StateVector) + savedas=SaveWeeklyAvgTCData(DaCycle, StateVector) DaCycle.AdvanceCycleTimes() diff --git a/da/baseclasses/optimizer.py b/da/baseclasses/optimizer.py index 4601447a..c8fa4b33 100755 --- a/da/baseclasses/optimizer.py +++ b/da/baseclasses/optimizer.py @@ -150,6 +150,8 @@ class Optimizer(object): for m,mem in enumerate(members): members[m].ParameterValues[:] = self.X_prime[n*self.nparams:(n+1)*self.nparams,m] + self.x[n*self.nparams:(n+1)*self.nparams] + StateVector.isOptimized = True + msg = 'Returning optimized data to the StateVector, setting "StateVector.isOptimized = True" ' ; logging.debug(msg) return None def WriteDiagnostics(self,DaCycle, StateVector,type='prior'): diff --git a/da/baseclasses/statevector.py b/da/baseclasses/statevector.py index 9806b749..a8ac4d0e 100755 --- a/da/baseclasses/statevector.py +++ b/da/baseclasses/statevector.py @@ -117,8 +117,10 @@ class StateVector(object): Finally, the StateVector can be mapped to a gridded array, or to a vector of TransCom regions, using: + .. automethod:: da.baseclasses.statevector.StateVector.GridToVector .. automethod:: da.baseclasses.statevector.StateVector.VectorToGrid .. automethod:: da.baseclasses.statevector.StateVector.VectorToTC + .. automethod:: da.baseclasses.statevector.StateVector.StateToTC """ @@ -247,9 +249,7 @@ class StateVector(object): dof = np.sum(s)**2/sum(s**2) C = np.linalg.cholesky(covariancematrix) - dummy = logging.debug("Covariance matrix visualized for inspection") - - msg = 'Cholesky decomposition has succeeded offline' ; logging.debug(msg) + msg = 'Cholesky decomposition has succeeded ' ; logging.debug(msg) msg = 'Appr. degrees of freedom in covariance matrix is %s'%(int(dof)) ; logging.info(msg) @@ -342,7 +342,15 @@ class StateVector(object): #import da.tools.io as io import numpy as np - f = io.CT_CDF(filename,method='create') + if not self.isOptimized: + f = io.CT_CDF(filename,method='create') + msg = 'Creating new StateVector output file (%s)' % filename ; logging.debug(msg) + qual = 'prior' + else: + f = io.CT_CDF(filename,method='write') + msg = 'Opening existing StateVector output file (%s)' % filename ; logging.debug(msg) + qual = 'opt' + dimparams = f.AddParamsDim(self.nparams) dimmembers = f.AddMembersDim(self.nmembers) dimlag = f.AddLagDim(self.nlag,unlimited=True) @@ -352,7 +360,7 @@ class StateVector(object): members = self.EnsembleMembers[n] MeanState = members[0].ParameterValues - savedict = f.StandardVar(varname='meanstate') + savedict = f.StandardVar(varname='meanstate_%s'%qual) savedict['dims'] = dimlag+dimparams savedict['values'] = MeanState savedict['count'] = n @@ -363,7 +371,7 @@ class StateVector(object): devs = np.asarray([m.ParameterValues.flatten() for m in members]) data = devs- np.asarray(MeanState) - savedict = f.StandardVar(varname='ensemblestate') + savedict = f.StandardVar(varname='ensemblestate_%s'%qual) savedict['dims'] = dimlag+dimmembers+dimparams savedict['values'] = data savedict['count'] = n @@ -374,16 +382,19 @@ class StateVector(object): msg = 'Successfully wrote the State Vector to file (%s) ' % (filename,) ; logging.info(msg) - def ReadFromFile(self,filename): + def ReadFromFile(self,filename,qual='opt'): """ :param filename: the full filename for the input NetCDF file + :param qual: a string indicating whether to read the 'prior' or 'opt'(imized) StateVector from file :rtype: None Read the StateVector information from a NetCDF file and put in a StateVector object - In principle the input file will have only one two datasets inside + In principle the input file will have only one four datasets inside called: - * `meanstate`, dimensions [nlag, nparamaters] - * `ensemblestate`, dimensions [nlag,nmembers, nparameters] + * `meanstate_prior`, dimensions [nlag, nparamaters] + * `ensemblestate_prior`, dimensions [nlag,nmembers, nparameters] + * `meanstate_opt`, dimensions [nlag, nparamaters] + * `ensemblestate_opt`, dimensions [nlag,nmembers, nparameters] This NetCDF information can be written to file using :meth:`~da.baseclasses.statevector.StateVector.WriteToFile` @@ -394,11 +405,16 @@ class StateVector(object): import numpy as np f = io.CT_Read(filename,'read') - MeanState = f.GetVariable('statevectormean') - EnsembleMembers = f.GetVariable('statevectorensemble') + MeanState = f.GetVariable('statevectormean_'+qual) + EnsembleMembers = f.GetVariable('statevectorensemble_'+qual) dummy = f.close() + for n in range(self.nlag): + if not self.EnsembleMembers[n] == []: + self.EnsembleMembers[n] = [] + msg = 'Existing ensemble for lag=%d was removed to make place for newly read data'%(n+1,) ; logging.warning(msg) + for m in range(self.nmembers): NewMember = self.GetNewMember(m) NewMember.ParameterValues = EnsembleMembers[n,m,:].flatten() + MeanState[n] # add the mean to the deviations to hold the full parameter values @@ -406,8 +422,9 @@ class StateVector(object): msg = 'Successfully read the State Vector from file (%s) ' % (filename,) ; logging.info(msg) - def WriteMembersToFile(self, DaCyle): + def WriteMembersToFile(self, lag): """ + :param: lag: Which lag step of the filter to write, must lie in range [1,...,nlag] :rtype: None Write ensemble member information to a NetCDF file for later use. The standard output filename is @@ -427,14 +444,16 @@ class StateVector(object): #import da.tools.io as io #import da.tools.io4 as io - outdir = DaCycle['dir.input'] + outdir = self.DaCycle['dir.input'] + + members = self.EnsembleMembers[lag-1] - for mem in self.members: + for mem in members: - filename = os.path.join(outdir,'parameters.%03d.nc'% self.membernumber) + filename = os.path.join(outdir,'parameters.%03d.nc'% mem.membernumber) ncf = io.CT_CDF(filename,method='create') dimparams = ncf.AddParamsDim(self.nparams) - dimgrid = filehandle.AddLatLonDim() + dimgrid = ncf.AddLatLonDim() data = mem.ParameterValues @@ -452,77 +471,82 @@ class StateVector(object): savedict = io.std_savedict.copy() savedict['name'] = "parametermap" - savedict['long_name'] = "parametermap_for_member_%d"%self.membernumber + savedict['long_name'] = "parametermap_for_member_%d"%mem.membernumber savedict['units'] = "unitless" savedict['dims'] = dimgrid - savedict['values'] = data.tolist() - savedict['comment'] = 'These are gridded parameter values to use for member %d'%self.membernumber - dummy = filehandle.AddData(savedict) + savedict['values'] = griddata.tolist() + savedict['comment'] = 'These are gridded parameter values to use for member %d'%mem.membernumber + dummy = ncf.AddData(savedict) dummy = ncf.close() - msg = 'Successfully wrote data from ensemble member %d to file (%s) ' % (self.membernumber,filename,) ; logging.debug(msg) + msg = 'Successfully wrote data from ensemble member %d to file (%s) ' % (mem.membernumber,filename,) ; logging.debug(msg) - - def VectorToGrid(self,vectordata=None,griddata=None,reverse=False, method = 'avg'): + def GridToVector(self,griddata=None, method = 'avg'): """ - Map vector elements to a map or vice cersa + Map gridded data onto a vector of length (nparams,) - :param vectordata: a vector dataset to use in case `reverse = False`. This dataset is mapped onto a 1x1 grid and must be of length `nparams` - :param griddata: a dataset to use in case `reverse = True`. This dataset is mapped onto a vector of length `nparams` - :param reverse: a Boolean indicating whether to map from vector to grid (F) or grid to vector (T) + :param griddata: a gridded dataset to use. This dataset is mapped onto a vector of length `nparams` :param method: a string that specifies the method to combine grid boxes in case reverse=True. Must be either ['avg','sum','minval'] - :rtype: ndarray: an array of size (360,180,) (reverse:F) or of size (nparameters,) (reverse:T) + :rtype: ndarray: size (nparameters,) This method makes use of a dictionary that links every parameter number [1,...,nparams] to a series of gridindices. These indices specify a location on a 360x180 array, stretched into a vector using `array.flat`. There are multiple ways of calling this method:: - griddedarray = self.VectorToGrid(vectordata=ParameterValues) # simply puts the ParameterValues onto a (180,360,) array - - values = self.VectorToGrid(griddata=mygriddeddata,reverse=True,method='minval') # does the reverse for a (360,180,) array, + values = self.GridToVector(griddata=mygriddeddata,method='minval') # using the minimum value of all datapoints covered by that parameter index - values = self.VectorToGrid(griddata=mygriddeddata,reverse=True,method='avg') # does the reverse for a (360,180,) array, + values = self.GridToVector(griddata=mygriddeddata,method='avg') # using the average value of all datapoints covered by that parameter index - values = self.VectorToGrid(griddata=mygriddeddata,reverse=True,method='sum') # does the reverse for a (360,180,) array, + values = self.GridToVector(griddata=mygriddeddata,method='sum') # using the sum of values of all datapoints covered by that parameter index - .. note:: This method uses a DaSystem object that must be initialzied with a proper parameter map. See :class:`~da.baseclasses.dasystem` for details + .. note:: This method uses a DaSystem object that must be initialized with a proper parameter map. See :class:`~da.baseclasses.dasystem` for details """ - if reverse: - """ project 1x1 degree map onto ecoregions """ + methods = ['avg','sum','minval'] + if method not in methods: + msg = "To put data from a map into the statevector, please specify the method to use (%s)" % methods ; logging.error(msg) + raise ValueError - methods = ['avg','sum','minval'] - if method not in methods: - msg = "To put data from a map into the statevector, please specify the method to use (%s)" % methods ; logging.error(msg) - raise ValueError + result=np.zeros((self.nparams,),float) + for k,v in self.griddict.iteritems(): + #print k,k-1,result.shape, v + if method == "avg": + result[k-1]=griddata.take(v).mean() + elif method == "sum" : + result[k-1]=griddata.take(v).sum() + elif method == "minval" : + result[k-1]=griddata.take(v).min() + return result # Note that the result is returned, but not yet placed in the member.ParameterValues attrtibute! - result=np.zeros((self.nparams,),float) - for k,v in self.griddict.iteritems(): - #print k,k-1,result.shape, v - if method == "avg": - result[k-1]=griddata.take(v).mean() - elif method == "sum" : - result[k-1]=griddata.take(v).sum() - elif method == "minval" : - result[k-1]=griddata.take(v).min() - return result # Note that the result is returned, but not yet placed in the member.ParameterValues attrtibute! + def VectorToGrid(self,vectordata=None): + """ + Map vector elements to a map or vice cersa - else: + :param vectordata: a vector dataset to use in case `reverse = False`. This dataset is mapped onto a 1x1 grid and must be of length `nparams` + :rtype: ndarray: an array of size (360,180,) + + This method makes use of a dictionary that links every parameter number [1,...,nparams] to a series of gridindices. These + indices specify a location on a 360x180 array, stretched into a vector using `array.flat`. There are multiple ways of calling + this method:: + + griddedarray = self.VectorToGrid(vectordata=ParameterValues) # simply puts the ParameterValues onto a (180,360,) array + + .. note:: This method uses a DaSystem object that must be initialzied with a proper parameter map. See :class:`~da.baseclasses.dasystem` for details - """ project ecoregion properties onto 1x1 degree map """ + """ - result=np.zeros(self.gridmap.shape,float) - for k,v in self.griddict.iteritems(): - #print k,v - result.put(v,vectordata[k-1]) + result=np.zeros(self.gridmap.shape,float) + for k,v in self.griddict.iteritems(): + #print k,v + result.put(v,vectordata[k-1]) - return result + return result def VectorToTC(self,vectordata,cov=False): """ @@ -539,6 +563,39 @@ class StateVector(object): else: return np.dot(vectordata.squeeze(),M) + def StateToGrid(self, fluxvector = None, lag=1): + """ + Transforms the StateVector information (mean + covariance) to a 1x1 degree grid. + + :param: fluxvector: a vector of length (nparams,) that holds the fluxes associated with each parameter in the StateVector + :param: lag: the lag at which to evaluate the StateVector + :rtype: a tuple of two arrays (gridmean,gridvariance) with dimensions (180,360,) + + If the attribute `fluxvector` is not passed, the function will return the mean parameter value and its variance on a 1x1 map. + + ..note:: Although we can return the variance information for each gridbox, the covariance information contained in the original ensemble is lost when mapping to 1x1 degree! + + """ + + if fluxvector == None: + fluxvector = np.ones(self.nparams) + + ensemble = self.EnsembleMembers[lag-1] + + ensemblemean =ensemble[0].ParameterValues + + # First transform the mean + + gridmean = self.VectorToGrid(vectordata = ensemblemean*fluxvector) + + # And now the covariance, first create covariance matrix (!), and then multiply + + deviations = np.array([mem.ParameterValues*fluxvector-ensemblemean for mem in ensemble]) + variance = deviations.std(axis=0)**2 + gridvar = self.VectorToGrid(variance) + + return (gridmean,gridvar,) + def StateToTC(self, fluxvector = None, lag=1): """ Transforms the StateVector information (mean + covariance) to the TransCom regions. @@ -560,7 +617,7 @@ class StateVector(object): # And now the covariance, first create covariance matrix (!), and then multiply deviations = np.array([mem.ParameterValues*fluxvector-ensemblemean for mem in ensemble]) - covariance = np.dot(np.transpose(deviations),deviations)/(self.nparams-1) + covariance = np.dot(np.transpose(deviations),deviations)/(self.nmembers-1) cov = self.VectorToTC(covariance,cov=True) return (mean,cov,) diff --git a/da/ctgridded/statevector.py b/da/ctgridded/statevector.py index be760e88..651295e2 100755 --- a/da/ctgridded/statevector.py +++ b/da/ctgridded/statevector.py @@ -131,6 +131,7 @@ class CtGriddedStateVector(StateVector): istop = 0 dof = 0.0 dev_matrix = np.zeros((self.nparams,self.nmembers,),'float') + randstate = np.random.get_state() for matrix in covariancematrixlist: @@ -151,7 +152,6 @@ class CtGriddedStateVector(StateVector): # Draw nmembers instances of this distribution npoints = matrix.shape[0] - randstate = np.random.get_state() istop = istop+npoints @@ -161,6 +161,9 @@ class CtGriddedStateVector(StateVector): dev_matrix[istart:istop,member-1] = deviations dev_matrix[istop,member-1] = 1.e-10*np.random.randn() + #cov2 = np.dot(dev_matrix[istart:istop,:],np.transpose(dev_matrix[istart:istop,:])) / (self.nmembers-1) + #print matrix.sum(),cov2.sum(),abs(matrix.diagonal()-cov2.diagonal()).max(), matrix.shape,cov2.shape + istart = istart + npoints msg = 'Successfully constructed a deviation matrix from covariance structure' ; logging.debug(msg) @@ -217,7 +220,7 @@ if __name__ == "__main__": sys.path.append('../../') opts = ['-v'] - args = {'rc':'../../da.rc'} + args = {'rc':'../../dagridded.rc'} StartLogger(level = logging.DEBUG) diff --git a/da/examples/das.py b/da/examples/das.py index adcc4954..59fcdcff 100755 --- a/da/examples/das.py +++ b/da/examples/das.py @@ -69,11 +69,14 @@ EnsembleSmootherPipeline(DaCycle,PlatForm, DaSystem, Samples,StateVector,ObsOper msg = header+"Starting analysis"+footer ; logging.info(msg) + from da.analysis.expand_fluxes import SaveWeeklyAvg1x1Data +from da.analysis.expand_fluxes import SaveWeeklyAvgStateData from da.analysis.expand_fluxes import SaveWeeklyAvgTCData -savedas = SaveWeeklyAvg1x1Data(DaCycle) -savedas = SaveWeeklyAvgTCData(DaCycle) +savedas = SaveWeeklyAvg1x1Data(DaCycle, StateVector) +savedas = SaveWeeklyAvgStateData(DaCycle, StateVector) +savedas = SaveWeeklyAvgTCData(DaCycle, StateVector) sys.exit(0) diff --git a/da/examples/dasgridded.py b/da/examples/dasgridded.py index c9c5eb86..5649b45c 100755 --- a/da/examples/dasgridded.py +++ b/da/examples/dasgridded.py @@ -72,10 +72,12 @@ EnsembleSmootherPipeline(DaCycle,PlatForm, DaSystem, Samples,StateVector,ObsOper msg = header+"Starting analysis"+footer ; logging.info(msg) from da.analysis.expand_fluxes import SaveWeeklyAvg1x1Data +from da.analysis.expand_fluxes import SaveWeeklyAvgStateData from da.analysis.expand_fluxes import SaveWeeklyAvgTCData -savedas = SaveWeeklyAvg1x1Data(DaCycle) -savedas = SaveWeeklyAvgTCData(DaCycle) +savedas = SaveWeeklyAvg1x1Data(DaCycle, StateVector) +savedas = SaveWeeklyAvgStateData(DaCycle, StateVector) +savedas = SaveWeeklyAvgTCData(DaCycle, StateVector) sys.exit(0) diff --git a/da/tools/pipeline.py b/da/tools/pipeline.py index 65f58a9b..eec4daf5 100755 --- a/da/tools/pipeline.py +++ b/da/tools/pipeline.py @@ -110,12 +110,19 @@ def PrepareState(DaCycle, StateVector): filtertime = DaCycle['time.start'].strftime('%Y%m%d') filename = os.path.join(savedir,'savestate.nc') - StateVector.ReadFromFile(filename) + StateVector.ReadFromFile(filename) # by default will read "opt"(imized) variables, and then propagate # Now propagate the ensemble by one cycle to prepare for the current cycle dummy = StateVector.Propagate() + # Finally, also write the StateVector to a file so that we can always access the a-priori information + + savedir = DaCycle['dir.output'] + filename = os.path.join(savedir,'savestate.nc') + + dummy = StateVector.WriteToFile(filename) # write prior info because StateVector.Isoptimized == False for now + return None def SampleState(DaCycle,Samples,StateVector, ObservationOperator): @@ -171,7 +178,7 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag): # Implement something that writes the ensemble member parameter info to file, or manipulates them further into the # type of info needed in your transport model - dummy = WriteEnsembleData(DaCycle, StateVector, lag) + dummy = StateVector.WriteMembersToFile(lag+1) dummy = Samples.Initialize() @@ -294,7 +301,8 @@ def SaveAndSubmit( DaCycle, StateVector): dummy = StateVector.WriteToFile(filename) - DaCycle.RestartFileList.extend( [os.path.join(DaCycle['dir.output'],'savestate.nc') ] ) + DaCycle.RestartFileList.extend( [filename] ) # write optimized info because StateVector.Isoptimized == False for now + dummy = DaCycle.Finalize() @@ -326,14 +334,5 @@ def RunForecastModel(DaCycle,ObsOperator): return status -def WriteEnsembleData(DaCycle, StateVector, lag): - """ Write the data needed by each model ensemble member to disk""" - members = StateVector.EnsembleMembers[lag] - for mem in members: - - mem.WriteToFile(DaCycle) - - msg = "Ensemble member parameter files were written to disk for lag = %d" % lag ; logging.info(msg) - return None diff --git a/da/tools/standardvariables.py b/da/tools/standardvariables.py index 9eac8f64..36f5fbce 100755 --- a/da/tools/standardvariables.py +++ b/da/tools/standardvariables.py @@ -53,7 +53,7 @@ standard_variables = { 'bio_flux_prior' : {'name' : 'bio_flux_prior',\ 'count' : 0 \ } , \ 'bio_flux_prior_cov' : {'name' : 'bio_flux_prior_cov',\ - 'units' : 'mol2 region-2 s-2' ,\ + 'units' : '[mol region-1 s-1]^2' ,\ 'long_name' : 'Covariance of surface flux of carbon dioxide, terrestrial vegetation , not optimized ', \ 'comment' : 'time-interval average, centered on times in the date axis', \ 'standard_name' : '', \ @@ -62,7 +62,7 @@ standard_variables = { 'bio_flux_prior' : {'name' : 'bio_flux_prior',\ 'count' : 0 \ } , \ 'bio_flux_opt_cov' : {'name' : 'bio_flux_opt_cov',\ - 'units' : 'mol2 region-2 s-2' ,\ + 'units' : '[mol region-1 s-1]^2' ,\ 'long_name' : 'Covariance of surface flux of carbon dioxide, terrestrial vegetation , optimized ', \ 'comment' : 'time-interval average, centered on times in the date axis', \ 'standard_name' : '', \ @@ -71,7 +71,7 @@ standard_variables = { 'bio_flux_prior' : {'name' : 'bio_flux_prior',\ 'count' : 0 \ } , \ 'ocn_flux_prior_cov' : {'name' : 'ocn_flux_prior_cov',\ - 'units' : 'mol2 region-2 s-2' ,\ + 'units' : '[mol region-1 s-1]^2' ,\ 'long_name' : 'Covariance of surface flux of carbon dioxide, open ocean , not optimized ', \ 'comment' : 'time-interval average, centered on times in the date axis', \ 'standard_name' : '', \ @@ -80,7 +80,7 @@ standard_variables = { 'bio_flux_prior' : {'name' : 'bio_flux_prior',\ 'count' : 0 \ } , \ 'ocn_flux_opt_cov' : {'name' : 'ocn_flux_opt_cov',\ - 'units' : 'mol2 region-2 s-2' ,\ + 'units' : '[mol region-1 s-1]^2' ,\ 'long_name' : 'Covariance of surface flux of carbon dioxide, open ocean , optimized ', \ 'comment' : 'time-interval average, centered on times in the date axis', \ 'standard_name' : '', \ @@ -172,6 +172,42 @@ standard_variables = { 'bio_flux_prior' : {'name' : 'bio_flux_prior',\ 'values' : [], \ 'count' : 0 \ } , \ + 'meanstate_prior' : {'name' : 'statevectormean_prior',\ + 'units' : 'unitless' ,\ + 'long_name' : 'mean_value_of_state_vector_prior', \ + 'standard_name' : 'mean_value_of_state_vector_prior', \ + 'comment' : '',\ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'ensemblestate_prior': {'name' : 'statevectorensemble_prior',\ + 'units' : 'unitless' ,\ + 'long_name' : 'ensemble_value_of_state_vector_prior', \ + 'standard_name' : 'ensemble_value_of_state_vector_prior', \ + 'comment' : '',\ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'meanstate_opt' : {'name' : 'statevectormean_opt',\ + 'units' : 'unitless' ,\ + 'long_name' : 'mean_value_of_state_vector_optimized', \ + 'standard_name' : 'mean_value_of_state_vector_opt', \ + 'comment' : '',\ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ + 'ensemblestate_opt': {'name' : 'statevectorensemble_opt',\ + 'units' : 'unitless' ,\ + 'long_name' : 'ensemble_value_of_state_vector_optimized', \ + 'standard_name' : 'ensemble_value_of_state_vector_opt', \ + 'comment' : '',\ + 'dims' : (), \ + 'values' : [], \ + 'count' : 0 \ + } , \ 'unknown' : {'name' : '',\ 'units' : '' ,\ 'long_name' : '', \ -- GitLab