diff --git a/da/analysis/expand_mixingratios.py b/da/analysis/expand_mixingratios.py index 4c42be2eae210b8e7587ceb5679340edae81d269..21492ab6757b68699ec66923f34eab1f79df51c3 100755 --- a/da/analysis/expand_mixingratios.py +++ b/da/analysis/expand_mixingratios.py @@ -10,427 +10,36 @@ import numpy as np from pylab import date2num, num2date """ -Author: Wouter Peters (Wouter.Peters@noaa.gov) +Author: Wouter Peters (Wouter.Peters@wur.nl) Revision History: -File created on 21 Ocotber 2008. +File created on 11 May 2012. """ -def proceed_dialog(txt, yes=['y','yes'], all=['a','all', 'yes-to-all']): - """ function to ask whether to proceed or not """ - response=raw_input(txt) - if response.lower() in yes: - return 1 - if response.lower() in all: - return 2 - return 0 - -def SaveWeeklyAvg1x1Data(DaCycle, StateVector): - """ - Function creates a NetCDF file with output on 1x1 degree grid. It uses the flux data written by the - :class:`~da.baseclasses.obsoperator.ObsOperator.py`, and multiplies these with the mapped parameters and - variance (not covariance!) from the :class:`~da.baseclasses.statevector.StateVector`. - - :param DaCycle: a :class:`~da.tools.initexit.CycleControl` object - :param StateVector: a :class:`~da.baseclasses.statevector.StateVector` - :rtype: None - """ - - import da.tools.io4 as io - import logging -# - dirname = 'data_flux1x1_weekly' -# - dirname = CreateDirs(os.path.join(DaCycle['dir.analysis'],dirname)) -# -# Some help variables -# - dectime0 = date2num(datetime(2000,1,1)) - dt = DaCycle['cyclelength'] - startdate = DaCycle['time.start'] - enddate = DaCycle['time.end'] - nlag = StateVector.nlag - - msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg) - msg = "DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg) - -# -# Create or open NetCDF output file -# - saveas = os.path.join(dirname,'flux_1x1.nc') - ncf = io.CT_CDF(saveas,'write') - -# -# Create dimensions and lat/lon grid -# - dimgrid = ncf.AddLatLonDim() - dimdate = ncf.AddDateDim() -# -# set title and tell GMT that we are using "pixel registration" -# - setattr(ncf,'Title','CarbonTracker fluxes') - setattr(ncf,'node_offset',1) -# -# skip dataset if already in file -# - ncfdate = date2num(startdate)-dectime0+dt.days/2.0 - skip = ncf.has_date(ncfdate) - if skip: - msg = 'Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'),saveas) ; logging.warning(msg) - else: - -# -# if not, process this cycle. Start by getting flux input data from CTDAS -# - filename = os.path.join(DaCycle['dir.output'],'flux1x1_%s_%s.nc'%(startdate.strftime('%Y%m%d%H'),enddate.strftime('%Y%m%d%H'),) ) - - file = io.CT_Read(filename,'read') - bio = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.bio.flux'])) - ocean = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.ocean.flux'])) - fire = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.fires.flux'])) - fossil = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.fossil.flux'])) - #mapped_parameters = np.array(file.GetVariable(DaCycle.DaSystem['final.param.mean.1x1'])) - file.close() - - next=ncf.inq_unlimlen()[0] - - -# Start adding datasets from here on, both prior and posterior datasets for bio and ocn - - for prior in [True, False]: -# -# Now fill the StateVector with the prior values for this time step. Note that the prior value for this time step -# occurred nlag time steps ago, so we make a shift in the output directory, but only if we are more than nlag cycle away from the start date.. -# - - if prior: - qual_short='prior' - for n in range(nlag,0,-1): - priordate = enddate - timedelta(dt.days*n) - savedir = DaCycle['dir.output'].replace(startdate.strftime('%Y%m%d'),priordate.strftime('%Y%m%d') ) - filename = os.path.join(savedir,'savestate.nc') - if os.path.exists(filename): - dummy = StateVector.ReadFromFile(filename,qual=qual_short) - gridmean,gridvariance = StateVector.StateToGrid(lag=n) - -# Replace the mean statevector by all ones (assumed priors) - - gridmean = StateVector.VectorToGrid(vectordata = np.ones(StateVector.nparams,)) - - msg = 'Read prior dataset from file %s, sds %d: '%(filename,n) ; logging.debug(msg) - break - else: - qual_short='opt' - savedir = DaCycle['dir.output'] - filename = os.path.join(savedir,'savestate.nc') - dummy = StateVector.ReadFromFile(filename,qual=qual_short) - gridmean,gridvariance = StateVector.StateToGrid(lag=nlag) - - msg = 'Read posterior dataset from file %s, sds %d: '%(filename,nlag) ; logging.debug(msg) -# -# if prior, do not multiply fluxes with parameters, otherwise do -# - biomapped=bio*gridmean - oceanmapped=ocean*gridmean - biovarmapped=bio*gridvariance - oceanvarmapped=ocean*gridvariance - -# -# -# For each dataset, get the standard definitions from the module mysettings, add values, dimensions, and unlimited count, then write -# - savedict=ncf.StandardVar(varname='bio_flux_'+qual_short) - savedict['values']=biomapped.tolist() - savedict['dims']=dimdate+dimgrid - savedict['count']=next - ncf.AddData(savedict) -# - savedict=ncf.StandardVar(varname='ocn_flux_'+qual_short) - savedict['values']=oceanmapped.tolist() - savedict['dims']=dimdate+dimgrid - savedict['count']=next - ncf.AddData(savedict) - - savedict=ncf.StandardVar(varname='bio_flux_%s_cov'%qual_short) - savedict['values']=biovarmapped.tolist() - savedict['dims']=dimdate+dimgrid - savedict['count']=next - ncf.AddData(savedict) -# - savedict=ncf.StandardVar(varname='ocn_flux_%s_cov'%qual_short) - savedict['values']=oceanvarmapped.tolist() - savedict['dims']=dimdate+dimgrid - savedict['count']=next - ncf.AddData(savedict) - - # End prior/posterior block - - savedict=ncf.StandardVar(varname='fire_flux_imp') - savedict['values']=fire.tolist() - savedict['dims']=dimdate+dimgrid - savedict['count']=next - ncf.AddData(savedict) -# - savedict=ncf.StandardVar(varname='fossil_flux_imp') - savedict['values']=fossil.tolist() - savedict['dims']=dimdate+dimgrid - savedict['count']=next - ncf.AddData(savedict) -# - savedict=ncf.StandardVar(varname='date') - savedict['values']=date2num(startdate)-dectime0+dt.days/2.0 - savedict['dims']=dimdate - savedict['count']=next - ncf.AddData(savedict) - - sys.stdout.write('.') - sys.stdout.flush() -# -# Done, close the new NetCDF file -# - ncf.close() -# -# Return the full name of the NetCDF file so it can be processed by the next routine -# - msg="Gridded weekly average fluxes now written" ; logging.info(msg) - - return saveas - -def SaveWeeklyAvgStateData(DaCycle, StateVector): - """ - Function creates a NetCDF file with output for all parameters. It uses the flux data written by the - :class:`~da.baseclasses.obsoperator.ObsOperator.py`, and multiplies these with the mapped parameters and - variance (not covariance!) from the :class:`~da.baseclasses.statevector.StateVector`. - - :param DaCycle: a :class:`~da.tools.initexit.CycleControl` object - :param StateVector: a :class:`~da.baseclasses.statevector.StateVector` - :rtype: None - """ - - import da.tools.io4 as io - import logging - from da.analysis.tools_regions import globarea +def WriteMixingRatios(DaCycle): + """ -# - dirname = 'data_state_weekly' -# - dirname = CreateDirs(os.path.join(DaCycle['dir.analysis'],dirname)) -# -# Some help variables -# - dectime0 = date2num(datetime(2000,1,1)) - dt = DaCycle['cyclelength'] - startdate = DaCycle['time.start'] - enddate = DaCycle['time.end'] - nlag = StateVector.nlag - - area = globarea() - vectorarea = StateVector.GridToVector(griddata=area,method='sum') - - msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg) - msg = "DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg) - -# -# Create or open NetCDF output file -# - saveas = os.path.join(dirname,'statefluxes.nc') - ncf = io.CT_CDF(saveas,'write') - -# -# Create dimensions and lat/lon grid -# - dimregs = ncf.AddDim('nparameters',StateVector.nparams) - dimmembers = ncf.AddDim('nmembers',StateVector.nmembers) - dimdate = ncf.AddDateDim() -# -# set title and tell GMT that we are using "pixel registration" -# - setattr(ncf,'Title','CarbonTracker fluxes') - setattr(ncf,'node_offset',1) -# -# skip dataset if already in file -# - ncfdate = date2num(startdate)-dectime0+dt.days/2.0 - skip = ncf.has_date(ncfdate) - if skip: - msg = 'Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'),saveas) ; logging.warning(msg) - else: - - next=ncf.inq_unlimlen()[0] - -# -# if not, process this cycle. Start by getting flux input data from CTDAS -# - filename = os.path.join(DaCycle['dir.output'],'flux1x1_%s_%s.nc'%(startdate.strftime('%Y%m%d%H'),enddate.strftime('%Y%m%d%H'),) ) - - file = io.CT_Read(filename,'read') - bio = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.bio.flux'])) - ocean = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.ocean.flux'])) - fire = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.fires.flux'])) - fossil = np.array(file.GetVariable(DaCycle.DaSystem['background.co2.fossil.flux'])) - #mapped_parameters = np.array(file.GetVariable(DaCycle.DaSystem['final.param.mean.1x1'])) - file.close() - - next=ncf.inq_unlimlen()[0] - - vectorbio = StateVector.GridToVector(griddata=bio*area,method='sum') - vectorocn = StateVector.GridToVector(griddata=ocean*area,method='sum') - vectorfire = StateVector.GridToVector(griddata=fire*area,method='sum') - vectorfossil = StateVector.GridToVector(griddata=fossil*area,method='sum') - - -# Start adding datasets from here on, both prior and posterior datasets for bio and ocn - - for prior in [True, False]: -# -# Now fill the StateVector with the prior values for this time step. Note that the prior value for this time step -# occurred nlag time steps ago, so we make a shift in the output directory, but only if we are more than nlag cycle away from the start date.. -# - - if prior: - qual_short='prior' - for n in range(nlag,0,-1): - priordate = enddate - timedelta(dt.days*n) - savedir = DaCycle['dir.output'].replace(startdate.strftime('%Y%m%d'),priordate.strftime('%Y%m%d') ) - filename = os.path.join(savedir,'savestate.nc') - if os.path.exists(filename): - dummy = StateVector.ReadFromFile(filename,qual=qual_short) - -# Replace the mean statevector by all ones (assumed priors) - - statemean = np.ones((StateVector.nparams,)) - - choicelag = n - - msg = 'Read prior dataset from file %s, sds %d: '%(filename,n) ; logging.debug(msg) - break - else: - qual_short='opt' - savedir = DaCycle['dir.output'] - filename = os.path.join(savedir,'savestate.nc') - dummy = StateVector.ReadFromFile(filename) - choicelag = nlag - statemean = StateVector.EnsembleMembers[choicelag-1][0].ParameterValues - - msg = 'Read posterior dataset from file %s, sds %d: '%(filename,nlag) ; logging.debug(msg) -# -# if prior, do not multiply fluxes with parameters, otherwise do -# - data = statemean*vectorbio # units of mole region-1 s-1 - - savedict = ncf.StandardVar(varname='bio_flux_%s'%qual_short) - savedict['values'] = data - savedict['dims'] = dimdate+dimregs - savedict['count'] = next - ncf.AddData(savedict) - - members = StateVector.EnsembleMembers[choicelag-1] - deviations = np.array([mem.ParameterValues*data for mem in members])-data - - savedict=ncf.StandardVar(varname='bio_flux_%s_ensemble'%qual_short) - - savedict['values']=deviations.tolist() - savedict['dims']=dimdate+dimmembers+dimregs - savedict['comment']="This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance" - savedict['units']="mol region-1 s-1" - savedict['count']=next - ncf.AddData(savedict) - - savedict=ncf.StandardVar('unknown') - savedict['name'] = 'bio_flux_%s_std'%qual_short - savedict['long_name'] = 'Biosphere flux standard deviation, %s'%qual_short - savedict['values']=deviations.std(axis=0) - savedict['dims']=dimdate+dimregs - savedict['comment']="This is the standard deviation on each parameter" - savedict['units']="mol region-1 s-1" - savedict['count']=next - ncf.AddData(savedict) - - data = statemean*vectorocn # units of mole region-1 s-1 - - savedict=ncf.StandardVar(varname='ocn_flux_%s'%qual_short) - savedict['values']=data - savedict['dims']=dimdate+dimregs - savedict['count']=next - ncf.AddData(savedict) - - deviations = np.array([mem.ParameterValues*data for mem in members])-data - - savedict=ncf.StandardVar(varname='ocn_flux_%s_ensemble'%qual_short) - savedict['values']=deviations.tolist() - savedict['dims']=dimdate+dimmembers+dimregs - savedict['comment']="This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance" - savedict['units']="mol region-1 s-1" - savedict['count']=next - ncf.AddData(savedict) - - savedict=ncf.StandardVar('unknown') - savedict['name'] = 'ocn_flux_%s_std'%qual_short - savedict['long_name'] = 'Ocean flux standard deviation, %s'%qual_short - savedict['values']=deviations.std(axis=0) - savedict['dims']=dimdate+dimregs - savedict['comment']="This is the standard deviation on each parameter" - savedict['units']="mol region-1 s-1" - savedict['count']=next - ncf.AddData(savedict) - - data = vectorfire - - savedict=ncf.StandardVar(varname='fire_flux_imp') - savedict['values']=data - savedict['dims']=dimdate+dimregs - savedict['count']=next - ncf.AddData(savedict) - - data = vectorfossil - - savedict=ncf.StandardVar(varname='fossil_flux_imp') - savedict['values']=data - savedict['dims']=dimdate+dimregs - savedict['count']=next - ncf.AddData(savedict) - - savedict=ncf.StandardVar(varname='date') - savedict['values'] = ncfdate - savedict['dims'] = dimdate - savedict['count'] = next - ncf.AddData(savedict) - - sys.stdout.write('.') - sys.stdout.flush() -# -# Done, close the new NetCDF file -# - ncf.close() -# -# Return the full name of the NetCDF file so it can be processed by the next routine -# - msg="Vector weekly average fluxes now written" ; logging.info(msg) + Write Sample information to NetCDF files. These files are organized by site and + have an unlimited time axis to which data is appended each cycle. - return saveas + The needed information is obtained from the sampleinfo.nc files and the original input data files from ObsPack. + The steps are: -def SaveWeeklyAvgTCData(DaCycle, StateVector): - """ - Function creates a NetCDF file with output on TransCom regions. It uses the flux input from the - function `SaveWeeklyAvg1x1Data` to create fluxes of length `nparameters`, which are then projected - onto TC regions using the internal methods from :class:`~da.baseclasses.statevector.StateVector`. - - :param DaCycle: a :class:`~da.tools.initexit.CycleControl` object - :param StateVector: a :class:`~da.baseclasses.statevector.StateVector` - :rtype: None - - This function only read the prior fluxes from the flux_1x1.nc files created before, because we want to convolve - these with the parameters in the StateVector. This creates posterior fluxes, and the posterior covariance for the complete - StateVector in units of mol/box/s which we then turn into TC fluxes and covariances. + (1) Create a directory to hold timeseries output files + (2) Read the sampleinfo.nc file for this cycle and get a list of original files they were obtained from + (3) For each file, copy the original data file from ObsPack (if not yet present) + (4) Open the copied file, find the index of each observation, fill in the simulated data + """ - from da.analysis.tools_regions import globarea - from da.analysis.tools_transcom import transcommask import da.tools.io4 as io + from string import join + import shutil import logging -# - dirname = 'data_tc_weekly' + + + dirname = 'data_molefractions' # dirname = CreateDirs(os.path.join(DaCycle['dir.analysis'],dirname)) # @@ -440,396 +49,181 @@ def SaveWeeklyAvgTCData(DaCycle, StateVector): dt = DaCycle['cyclelength'] startdate = DaCycle['time.start'] enddate = DaCycle['time.end'] - ncfdate = date2num(startdate)-dectime0+dt.days/2.0 msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg) msg = "DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg) - # Write/Create NetCDF output file - # - saveas = os.path.join(dirname,'tcfluxes.nc') - ncf = io.CT_CDF(saveas,'write') - dimdate = ncf.AddDateDim() - dimidateformat = ncf.AddDateDimFormat() - dimregs = ncf.AddRegionDim(type='tc') -# -# set title and tell GMT that we are using "pixel registration" -# - setattr(ncf,'Title','CarbonTracker TransCom fluxes') - setattr(ncf,'node_offset',1) - # - - skip = ncf.has_date(ncfdate) - if skip: - msg = 'Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'),saveas) ; logging.warning(msg) - else: + DaCycle['time.sample.stamp'] = "%s_%s"%(startdate.strftime("%Y%m%d%H"),enddate.strftime("%Y%m%d%H"),) - # Get input data + # Step (1): Get the sample output data file for this cycle - area=globarea() + infile = os.path.join(DaCycle['dir.output'],'sampleinfo_%s.nc'% DaCycle['time.sample.stamp']) - infile=os.path.join(DaCycle['dir.analysis'],'data_state_weekly','statefluxes.nc') - if not os.path.exists(infile): - msg="Needed input file (%s) does not exist yet, please create file first, returning..."%infile ; logging.error(msg) - return None + ncf_in = io.CT_Read(infile,'read') - ncf_in = io.CT_Read(infile,'read') + obs_num = ncf_in.GetVariable('obs_num') + obs_val = ncf_in.GetVariable('observed') + mdm = ncf_in.GetVariable('modeldatamismatch') + simulated = ncf_in.GetVariable('modelsamples') + infilename= ncf_in.GetVariable('inputfilename') - # Transform data one by one + infiles = [join(s.compressed(),'') for s in infilename] - # Get the date variable, and find index corresponding to the DaCycle date + ncf_in.close() - try: - dates = ncf_in.variables['date'][:] - except KeyError: - msg = "The variable date cannot be found in the requested input file (%s) " % infile ; logging.error(msg) - msg = "Please make sure you create gridded fluxes before making TC fluxes " ; logging.error(msg) - raise KeyError + for orig_file in set(infiles): - try: - index = dates.tolist().index(ncfdate) - except ValueError: - msg = "The requested cycle date is not yet available in file %s "% infile ; logging.error(msg) - msg = "Please make sure you create state based fluxes before making TC fluxes " ; logging.error(msg) - raise ValueError + if not os.path.exists(orig_file): + msg = "The original input file (%s) could not be found, continuing to next file..."%orig_file ; logging.error(msg) + continue - # First add the date for this cycle to the file, this grows the unlimited dimension - savedict = ncf.StandardVar(varname='date') - savedict['values'] = ncfdate - savedict['dims'] = dimdate - savedict['count'] = index - dummy = ncf.AddData(savedict) + copy_file = os.path.join(dirname,os.path.split(orig_file)[-1]) + if not os.path.exists(copy_file): + shutil.copy(orig_file,copy_file) + msg = "Copied a new original file (%s) to the analysis directory"%orig_file ; logging.debug(msg) - # Now convert other variables that were inside the flux_1x1 file + ncf_out = io.CT_CDF(copy_file,'write') - vardict = ncf_in.variables - for vname, vprop in vardict.iteritems(): + # Modify the attributes of the file to reflect added data from CTDAS properly + - data = ncf_in.GetVariable(vname)[index] + ncf_out.Caution = '===================================================================================' + ncf_out.History += '\nOriginal observation file modified by user %s on %s\n' % (os.environ['USER'], datetime.today().strftime('%F'),) + ncf_out.CTDAS_info = 'Simulated values added from a CTDAS run by %s on %s\n' % (os.environ['USER'], datetime.today().strftime('%F'),)\ + + '\nCTDAS was run on platform %s' % (os.environ['HOST'],)\ + + '\nCTDAS job directory was %s' % (DaCycle['dir.da_run'],)\ + + '\nCTDAS Da System was %s' % (DaCycle['da.system'],)\ + + '\nCTDAS Da ObsOperator was %s' % (DaCycle['da.obsoperator'],) + ncf_out.CTDAS_startdate = DaCycle['time.start'].strftime('%F') + ncf_out.CTDAS_enddate = DaCycle['time.finish'].strftime("%F") + ncf_out.original_file = orig_file - if vname=='latitude': continue - elif vname=='longitude': continue - elif vname=='date': continue - elif vname=='idate': continue - elif 'std' in vname: continue - elif 'ensemble' in vname: - - tcdata=[] - for member in data: - tcdata.append( StateVector.VectorToTC(vectordata=member) ) - tcdata = np.array(tcdata) - cov = tcdata.transpose().dot(tcdata)/(StateVector.nmembers-1) - tcdata = cov + # get nobs dimension + if ncf_out.dimensions.has_key('id'): + dimidob = ncf_out.dimensions['id'] + dimid = ('id',) + elif ncf_out.dimensions.has_key('obs'): + dimidob = ncf_out.dimensions['obs'] + dimid = ('obs',) - savedict = ncf.StandardVar(varname=vname.replace('ensemble','cov') ) - savedict['units'] = '[mol/region/s]**2' - savedict['dims'] = dimdate+dimregs+dimregs - + if dimidob.isunlimited: + nobs = ncf_out.inq_unlimlen() else: + nobs = len(dimid) + + + # add nmembers dimension + + dimmembersob = ncf_out.createDimension('nmembers',size=simulated.shape[1]) + dimmembers = ('nmembers',) + nmembers = len(dimmembers) + + # Create empty arrays + + savedict = io.std_savedict.copy() + savedict['name'] = "modeldatamismatch" + savedict['long_name'] = "modeldatamismatch" + savedict['units'] = "[mol mol-1]^2" + savedict['dims'] = dimid + savedict['comment'] = 'Variance of mole fractions resulting from model-data mismatch' + status = ncf_out.AddVariable(savedict) + + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesmean" + savedict['long_name'] = "mean modelsamples" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['comment'] = 'simulated mixing ratios based on optimized state vector' + status = ncf_out.AddVariable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesstandarddeviation" + savedict['long_name'] = "standard deviaton of modelsamples over all ensemble members" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['comment'] = 'std dev of simulated mixing ratios based on optimized state vector' + status = ncf_out.AddVariable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesensemble" + savedict['long_name'] = "modelsamples over all ensemble members" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid+dimmembers + savedict['comment'] = 'ensemble of simulated mixing ratios based on optimized state vector' + status = ncf_out.AddVariable(savedict) - tcdata = StateVector.VectorToTC(vectordata=data) # vector to TC - - savedict = ncf.StandardVar(varname=vname) - savedict['dims'] = dimdate+dimregs - savedict['units'] = 'mol/region/s' - - savedict['count'] = index - savedict['values'] = tcdata - ncf.AddData(savedict) - - ncf_in.close() - ncf.close() - - msg="TransCom weekly average fluxes now written" ; logging.info(msg) - - return saveas - -def SaveWeeklyAvgExtTCData(DaCycle): - """ Function SaveTCDataExt saves surface flux data to NetCDF files for extended TransCom regions - - *** Inputs *** - rundat : a RunInfo object - - *** Outputs *** - NetCDF file containing n-hourly global surface fluxes per TransCom region - - *** Example *** - ./expand_savestate project=enkf_release sd=20000101 ed=20010101 """ - - from da.analysis.tools_transcom import ExtendedTCRegions - import da.tools.io4 as io -# - dirname = 'data_tc_weekly' -# - dirname = CreateDirs(os.path.join(DaCycle['dir.analysis'],dirname)) -# -# Some help variables -# - dectime0 = date2num(datetime(2000,1,1)) - dt = DaCycle['cyclelength'] - startdate = DaCycle['time.start'] - enddate = DaCycle['time.end'] - ncfdate = date2num(startdate)-dectime0+dt.days/2.0 - - msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg) - msg = "DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg) - - # Write/Create NetCDF output file - # - saveas = os.path.join(dirname,'tc_extfluxes.nc') - ncf = io.CT_CDF(saveas,'write') - dimdate = ncf.AddDateDim() - dimidateformat = ncf.AddDateDimFormat() - dimregs = ncf.AddRegionDim(type='tc_ext') -# -# set title and tell GMT that we are using "pixel registration" -# - setattr(ncf,'Title','CarbonTracker TransCom fluxes') - setattr(ncf,'node_offset',1) - # - - skip = ncf.has_date(ncfdate) - if skip: - msg = 'Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'),saveas) ; logging.warning(msg) - else: - infile=os.path.join(DaCycle['dir.analysis'],'data_tc_weekly','tcfluxes.nc') - if not os.path.exists(infile): - msg="Needed input file (%s) does not exist yet, please create file first, returning..."%infile ; logging.error(msg) - return None - - ncf_in = io.CT_Read(infile,'read') - - # Transform data one by one - - # Get the date variable, and find index corresponding to the DaCycle date - - try: - dates = ncf_in.variables['date'][:] - except KeyError: - msg = "The variable date cannot be found in the requested input file (%s) " % infile ; logging.error(msg) - msg = "Please make sure you create gridded fluxes before making extended TC fluxes " ; logging.error(msg) - raise KeyError - - try: - index = dates.tolist().index(ncfdate) - except ValueError: - msg = "The requested cycle date is not yet available in file %s "% infile ; logging.error(msg) - msg = "Please make sure you create state based fluxes before making extended TC fluxes " ; logging.error(msg) - raise ValueError - - # First add the date for this cycle to the file, this grows the unlimited dimension - - savedict = ncf.StandardVar(varname='date') - savedict['values'] = ncfdate - savedict['dims'] = dimdate - savedict['count'] = index - dummy = ncf.AddData(savedict) - - # Now convert other variables that were inside the tcfluxes.nc file - - vardict = ncf_in.variables - for vname, vprop in vardict.iteritems(): - - data = ncf_in.GetVariable(vname)[index] - - if vname=='latitude': continue - elif vname=='longitude': continue - elif vname=='date': continue - elif vname=='idate': continue - elif 'cov' in vname: - - tcdata = ExtendedTCRegions(data,cov=True) - - savedict = ncf.StandardVar(varname=vname) - savedict['units'] = '[mol/region/s]**2' - savedict['dims'] = dimdate+dimregs+dimregs - - else: - - tcdata = ExtendedTCRegions(data,cov=False) - - savedict = ncf.StandardVar(varname=vname) - savedict['dims'] = dimdate+dimregs - savedict['units'] = 'mol/region/s' - - savedict['count'] = index - savedict['values'] = tcdata - ncf.AddData(savedict) - - ncf_in.close() - ncf.close() - - msg="TransCom weekly average extended fluxes now written" ; logging.info(msg) - - return saveas + else: + msg = "Modifying existing file (%s) in the analysis directory"%copy_file ; logging.debug(msg) -def SaveTimeAvgData(DaCycle,infile,avg='monthly'): - """ Function saves time mean surface flux data to NetCDF files - - *** Inputs *** - rundat : a RunInfo object + ncf_out = io.CT_CDF(copy_file,'write') - *** Outputs *** - daily NetCDF file containing 1-hourly global surface fluxes at 1x1 degree - *** Example *** - ./expand_savestate project=enkf_release sd=20000101 ed=20010101 """ + # Get existing file obs_nums to determine match to local obs_nums - import da.analysis.tools_time as timetools - import da.tools.io4 as io + if ncf_out.variables.has_key('id'): + file_obs_nums = ncf_out.GetVariable('id') + elif ncf_out.variables.has_key('obspack_num'): + file_obs_nums = ncf_out.GetVariable('obspack_num') - if 'weekly' in infile: intime = 'weekly' - if 'monthly' in infile: intime = 'monthly' - if 'yearly' in infile: intime = 'yearly' + # Get all obs_nums related to this file, determine their indices in the local arrays - dirname,filename = os.path.split(infile) - outdir = CreateDirs(os.path.join(DaCycle['dir.analysis'],dirname.replace(intime,avg))) + selected_obs_nums = [num for infile,num in zip(infiles,obs_num) if infile == orig_file] - dectime0 = date2num(datetime(2000,1,1)) + # For each index, get the data and add to the file in the proper file index location -# Create NetCDF output file -# - saveas = os.path.join(outdir,filename) - ncf = io.CT_CDF(saveas,'create') - dimdate = ncf.AddDateDim() -# -# Open input file specified from the command line -# - if not os.path.exists(infile): - print "Needed input file (%s) not found. Please create this first:"%infile - print "returning..." - return None - else: - pass + for num in selected_obs_nums: - file = io.CT_Read(infile,'read') - datasets = file.variables.keys() - date = file.GetVariable('date') - globatts = file.ncattrs() + model_index = obs_num.tolist().index(num) + file_index = file_obs_nums.tolist().index(num) - for att in globatts: - attval = file.getncattr(att) - if not att in ncf.ncattrs(): - ncf.setncattr(att,attval) + var = ncf_out.variables['modeldatamismatch'] + var[file_index] = mdm[model_index] + var = ncf_out.variables['modelsamplesmean'] + var[file_index] = simulated[model_index,0] - time = [datetime(2000,1,1)+timedelta(days=d) for d in date] + var = ncf_out.variables['modelsamplesstandarddeviation'] + var[file_index] = simulated[model_index,1:].std() -# loop over datasets in infile, skip idate and date as we will make new time axis for the averaged data + var = ncf_out.variables['modelsamplesensemble'] + var[file_index] = simulated[model_index,:] - for sds in ['date']+datasets: + # close the file -# get original data + status = ncf_out.close() - data = file.GetVariable(sds) - varatts = file.variables[sds].ncattrs() - vardims = file.variables[sds].dimensions -# -# Depending on dims of input dataset, create dims for output dataset. Note that we add the new dimdate now. -# + return None - for d in vardims: - if 'date' in d: - continue - if d in ncf.dimensions.keys(): - pass - else: - dim = ncf.createDimension(d,size=len(file.dimensions[d])) - - savedict = ncf.StandardVar(sds) - savedict['name'] = sds - savedict['dims'] = vardims - savedict['units'] = file.variables[sds].units - savedict['long_name'] = file.variables[sds].long_name - savedict['comment'] = file.variables[sds].comment - savedict['standard_name'] = file.variables[sds].standard_name - savedict['count'] = 0 - - if not 'date' in vardims: - savedict['values'] = data - ncf.AddData(savedict) - else: - - if avg == 'monthly': - time_avg, data_avg = timetools.monthly_avg(time,data) - elif avg == 'seasonal': - time_avg, data_avg = timetools.season_avg(time,data) - elif avg == 'yearly': - time_avg, data_avg = timetools.yearly_avg(time,data) - elif avg == 'longterm': - time_avg, data_avg = timetools.longterm_avg(time,data) - time_avg = [time_avg] - data_avg = [data_avg] - else: - raise ValueError,'Averaging (%s) does not exist' % avg - count=-1 - for dd,data in zip(time_avg,data_avg): - count=count+1 - if sds == 'date': - savedict['values'] = date2num(dd)-dectime0 - else: - savedict['values']=data - savedict['count']=count - ncf.AddData(savedict,silent=True) - - sys.stdout.write('.') - - sys.stdout.write('\n') - sys.stdout.flush() - -# end NetCDF file access - file.close() - ncf.close() - - msg = "------------------- Finished time averaging---------------------------------" ; logging.info(msg) - - - return saveas if __name__ == "__main__": import logging from da.tools.initexit import CycleControl - from da.ct.dasystem import CtDaSystem - from da.ctgridded.statevector import CtGriddedStateVector + from da.ct.dasystem import CtDaSystem sys.path.append('../../') logging.root.setLevel(logging.DEBUG) - DaCycle = CycleControl(args={'rc':'../../dagriddedjet.rc'}) + DaCycle = CycleControl(args={'rc':'../../da.rc'}) DaCycle.Initialize() DaCycle.ParseTimes() - DaSystem = CtDaSystem('../rc/carbontrackergridded.rc') + DaSystem = CtDaSystem('../rc/carbontracker.rc') DaSystem.Initialize() DaCycle.DaSystem = DaSystem - StateVector = CtGriddedStateVector(DaCycle) - dummy = StateVector.Initialize() - while DaCycle['time.start'] < DaCycle['time.finish']: - savedas_1x1=SaveWeeklyAvg1x1Data(DaCycle, StateVector) - savedas_state=SaveWeeklyAvgStateData(DaCycle, StateVector) - savedas_tc=SaveWeeklyAvgTCData(DaCycle, StateVector) - savedas_tcext=SaveWeeklyAvgExtTCData(DaCycle) + outfile = WriteMixingRatios(DaCycle) DaCycle.AdvanceCycleTimes() - StateVector = None # free memory - - for avg in ['monthly','yearly','longterm']: - - savedas_1x1 = SaveTimeAvgData(DaCycle,savedas_1x1,avg) - savedas_state = SaveTimeAvgData(DaCycle,savedas_state,avg) - savedas_tc = SaveTimeAvgData(DaCycle,savedas_tc,avg) - savedas_tcext = SaveTimeAvgData(DaCycle,savedas_tcext,avg) - sys.exit(0)