From bc0fbd5a925290fb52b217969a61dc20890983e8 Mon Sep 17 00:00:00 2001 From: ivar <amvdw95@gmail.com> Date: Tue, 30 Oct 2012 09:53:45 +0000 Subject: [PATCH] made a new obspack.py that should work with the new prototype obspack releases --- da/ctc13/obspack-new.py | 664 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 664 insertions(+) create mode 100755 da/ctc13/obspack-new.py diff --git a/da/ctc13/obspack-new.py b/da/ctc13/obspack-new.py new file mode 100755 index 0000000..30e1913 --- /dev/null +++ b/da/ctc13/obspack-new.py @@ -0,0 +1,664 @@ +#i!/usr/bin/env python +# obs.py + +""" +Author : peters + +Revision History: +File created on 28 Jul 2010. + +""" +import os +import sys +sys.path.append(os.getcwd()) +sys.path.append('../../') +import logging +import datetime + +identifier = 'CarbonTracker CO2 mixing ratios' +version = '0.0' + +from da.baseclasses.obs import Observation + +################### Begin Class ObsPackObservations ################### + +class ObsPackObservations(Observation): + """ an object that holds data + methods and attributes needed to manipulate mixing ratio values """ + + def getid(self): + return identifier + + def getversion(self): + return version + + def getlength(self): + return len(self.Data) + + def Initialize(self): + + self.startdate = self.DaCycle['time.sample.start'] + self.enddate = self.DaCycle['time.sample.end'] + DaSystem = self.DaCycle.DaSystem + + op_co2_id = DaSystem['obspack.co2.input.id'] + op_co2_dir = DaSystem['obspack.co2.input.dir'] + + op_c13_id = DaSystem['obspack.c13.input.id'] + op_c13_dir = DaSystem['obspack.c13.input.dir'] + + co2_only = DaSystem['co2.only'] + c13_only = DaSystem['c13.only'] + co2_c13_together = DaSystem['co2.c13.together'] + + if not os.path.exists(op_co2_dir): + msg = 'Could not find the required ObsPack distribution (%s) ' % op_co2_dir ; logging.error(msg) + raise IOError,msg + else: + self.ObsPackCo2Dir = op_co2_dir + self.ObsPackCo2Id = op_co2_id + + self.ObsPackC13Dir = op_c13_dir + self.ObsPackC13Id = op_c13_id + + self.Co2_only = co2_only + self.C13_only = c13_only + self.Co2_C13_together = co2_c13_together + + + self.Data = MixingRatioList([]) + + return None + + def AddObs(self): + """ Returns a MixingRatioList holding individual MixingRatioSample objects for all obs in a file + + The ObsPack mixing ratio files are provided as time series per site with all dates in sequence. + We will loop over all site files in the ObsPackage, and subset each to our needs + + """ + import da.tools.io4 as io + import datetime as dtm + from string import strip, join + from numpy import array, logical_and + + # Step 1: Read list of available site files in package + if self.Co2_only: + obsspecies = [self.ObsPackCo2Dir] + if self.C13_only: + obsspecies = [self.ObsPackC13Dir] + if self.Co2_C13_together: + obsspecies = [self.ObsPackCo2Dir,self.ObsPackC13Dir ] + for obsspecie in obsspecies: + if obsspecie == self.ObsPackCo2Dir: + infile = os.path.join(obsspecie,'summary','%s_dataset_summary.txt'%(self.ObsPackCo2Id,)) + else: + infile = os.path.join(obsspecie,'summary','%s_dataset_summary.txt'%(self.ObsPackC13Id,)) + f = open(infile,'r') + lines = f.readlines() + status = f.close() + + ncfilelist = [] + for line in lines: + if line.startswith('#'): continue # header + print line + ncfile, lab , dist, start_date, stop_date, data_comparison = line.split() + + ncfilelist += [ncfile] + + msg = "ObsPack dataset info read, proceeding with %d netcdf files"%(len(ncfilelist),) ; logging.debug(msg) + + for ncfile in ncfilelist: + + infile = os.path.join(obsspecie,'data','nc',ncfile) + ncf = io.CT_Read(infile,'read') + idates = ncf.GetVariable('time_components') + dates = array([dtm.datetime(*d) for d in idates]) + + subselect = logical_and(dates >= self.startdate , dates <= self.enddate).nonzero()[0] + + dates = dates.take(subselect,axis=0) + + ids = ncf.GetVariable('obspack_num').take(subselect) # or should we propagate obs_num which is not unique across datasets?? + evn = ncf.GetVariable('obspack_id').take(subselect,axis=0) + evn = [s.tostring().lower() for s in evn] + evn = map(strip,evn) + site = ncf.GetAttribute('site_code') + lats = ncf.GetVariable('latitude').take(subselect,axis=0) + lons = ncf.GetVariable('longitude').take(subselect,axis=0) + alts = ncf.GetVariable('altitude').take(subselect,axis=0) + obs = ncf.GetVariable('value').take(subselect,axis=0) + if obsspecie == self.ObsPackCo2Dir: + obs = obs #*1.e6 #IV convert to ppm + species = ncf.GetAttribute('dataset_parameter') + #strategy = ncf.GetVariable('sampling_strategy').take(subselect,axis=0) + strategy = 1 + flags = ncf.GetVariable('qc_flag').take(subselect,axis=0) + flags = [s.tostring().lower() for s in flags] + flags = map(strip,flags) + flags = [int(f == '...') for f in flags] + status = ncf.close() + + msg = site,species,obs,dates,ids ; logging.info(msg) + + for n in range(len(dates)): + + self.Data.append( MixingRatioSample(ids[n],dates[n],site,obs[n],0.0,0.0,0.0,0.0,0.0,flags[n],alts[n],lats[n],lons[n],evn[n],species,strategy,0.0, infile) ) + + msg = "Added %d observations from file (%s) to the Data list" % (len(dates),ncfile,) ; logging.debug(msg) + + msg = "Observation list now holds %d values" % (len(self.Data),) ; logging.info(msg) + + def AddSimulations(self,filename,silent=True): + """ Adds model simulated values to the mixing ratio objects """ + + import da.tools.io4 as io + import da.tools.MixingratioToPermil as mtp + from numpy import array,arange + + + if not os.path.exists(filename): + msge = "Sample output filename for observations could not be found : %s" % filename ; logging.error(msge) + msg = "Did the sampling step succeed?" ; logging.error(msg) + msg = "...exiting" ; logging.error(msge) + raise IOError,msg + + ncf = io.CT_Read(filename,method='read') + ids = ncf.GetVariable('obs_num') + simulated = ncf.GetVariable('flask') + trspecies = ncf.GetVariable('tracer_names') + status = ncf.close() + msg = "Successfully read data from model sample file (%s)"%filename ; logging.info(msg) + + obs_ids = self.Data.getvalues('id') + + obs_ids = obs_ids.tolist() + ids = map(int,ids) + + obs_species = self.Data.getvalues('species') + #obs_species = obs_species.tolist() + + missing_samples = [] + + tr_species=[] + for i in range(len(trspecies[:,0])): + trjoin = [''.join(trspecies[i,:])] + tr_species.append(trjoin[0].strip()) + tr_species=array(tr_species) + + if self.Co2_only: + simulated = simulated #*1.e6 + if self.C13_only: + simulated = mtp.MixingratioToPermil(filename,simulated) + if self.Co2_C13_together: + simulated = mtp.MixingratioToPermil(filename,simulated) + + xx=arange(len(ids)) + + for id,obs_specie,val in zip(xx,obs_species,simulated): + if id in xx:#obs_ids: + index = id #obs_ids.index(id) + #print id,index + counter=0 + trval=[] + for sp in tr_species: + if obs_specie == sp: + #print counter,val[counter] + trval.append(val[counter]) + counter=counter+1 + trval=array(trval) + #print 'trval',index,trval + + status = self.Data[index].simulated = trval # *1e6 # to umol/mol + #if index ==0: + #print 'Data.getvalues2',self.Data[index].simulated + + + #index = obs_ids.index(id) + #print id,val,val.shape + #dummy = self.Data[index].simulated = val # *1e6 # to umol/mol + + else: + + missing_samples.append(id) + if not silent and missing_samples != []: + msg = 'Model samples were found that did not match any ID in the observation list. Skipping them...' ; logging.warning(msg) + #msg = '%s'%missing_samples ; logging.warning(msg) + + msg = "Added %d simulated values to the Data list" % (len(ids)-len(missing_samples) ) ; logging.debug(msg) + + def WriteSampleInfo(self): + """ + Write the information needed by the observation operator to a file. Return the filename that was written for later use + + """ + import shutil + #import da.tools.io as io + import da.tools.io4 as io + from da.tools.general import ToDectime + + obsinputfile = os.path.join(self.DaCycle['dir.input'],'observations_%s.nc'% self.DaCycle['time.sample.stamp']) + + f = io.CT_CDF(obsinputfile,method='create') + msg = 'Creating new observations file for ObservationOperator (%s)' % obsinputfile ; logging.debug(msg) + + dimid = f.AddDim('obs',len(self.Data)) + dim200char = f.AddDim('string_of200chars',200) + dim10char = f.AddDim('string_of10chars',10) + dimcalcomp = f.AddDim('calendar_components',6) + + if len(self.Data) == 0: + f.close() + return obsinputfile + + data = self.Data.getvalues('id') + + savedict = io.std_savedict.copy() + savedict['name'] = "obs_num" + savedict['dtype'] = "int" + savedict['long_name'] = "Unique_Dataset_observation_index_number" + savedict['units'] = "" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED." + status = f.AddData(savedict) + + data = [[d.year,d.month,d.day,d.hour,d.minute,d.second] for d in self.Data.getvalues('xdate') ] + + savedict = io.std_savedict.copy() + savedict['dtype'] = "int" + savedict['name'] = "date_components" + savedict['units'] = "integer components of UTC date/time" + savedict['dims'] = dimid+dimcalcomp + savedict['values'] = data + savedict['missing_value'] = -9 + savedict['comment'] = "Calendar date components as integers. Times and dates are UTC." + savedict['order'] = "year, month, day, hour, minute, second" + status = f.AddData(savedict) + + data = self.Data.getvalues('lat') + + savedict = io.std_savedict.copy() + savedict['name'] = "latitude" + savedict['units'] = "degrees_north" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -999.9 + status = f.AddData(savedict) + + data = self.Data.getvalues('lon') + + savedict = io.std_savedict.copy() + savedict['name'] = "longitude" + savedict['units'] = "degrees_east" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -999.9 + status = f.AddData(savedict) + + data = self.Data.getvalues('height') + + savedict = io.std_savedict.copy() + savedict['name'] = "altitude" + savedict['units'] = "meters_above_sea_level" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -999.9 + status = f.AddData(savedict) + + data = self.Data.getvalues('samplingstrategy') + + savedict = io.std_savedict.copy() + savedict['dtype'] = "int" + savedict['name'] = "sampling_strategy" + savedict['units'] = "NA" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -9 + status = f.AddData(savedict) + + data = self.Data.getvalues('evn') + + savedict = io.std_savedict.copy() + savedict['dtype'] = "char" + savedict['name'] = "obs_id" + savedict['units'] = "NOAA database identifier" + savedict['dims'] = dimid+dim200char + savedict['values'] = data + savedict['missing_value'] = '!' + status = f.AddData(savedict) + + status = f.close() + + msg = "Successfully wrote data to obs file" ;logging.debug(msg) + msg = "Sample input file for obs operator now in place [%s]"%obsinputfile ; logging.info(msg) + + return obsinputfile + + + def AddModelDataMismatch(self ): + """ + Get the model-data mismatch values for this cycle. + + (1) Open a sites_weights file + (2) Parse the data + (3) Compare site list against data + (4) Take care of double sites, etc + + """ + import da.tools.rc as rc + from da.tools.general import NameConvert + + filename = self.DaCycle.DaSystem['obs.sites.rc'] + + if not os.path.exists(filename): + msg = 'Could not find the required sites.rc input file (%s) ' % filename ; logging.error(msg) + raise IOError,msg + else: + self.SitesFile = filename + + SitesWeights = rc.read(self.SitesFile) + + self.rejection_threshold = int(SitesWeights['obs.rejection.threshold']) + self.global_R_scaling = float(SitesWeights['global.R.scaling']) + self.n_site_categories = int(SitesWeights['n.site.categories']) + self.n_sites_active = int(SitesWeights['n.sites.active']) + self.n_sites_moved = int(SitesWeights['n.sites.moved']) + + msg = 'Model-data mismatch rejection threshold: %d ' % (self.rejection_threshold) ; logging.debug(msg) + msg = 'Model-data mismatch scaling factor : %f ' % (self.global_R_scaling) ; logging.debug(msg) + msg = 'Model-data mismatch site categories : %d ' % (self.n_site_categories) ; logging.debug(msg) + msg = 'Model-data mismatch active sites : %d ' % (self.n_sites_active) ; logging.debug(msg) + msg = 'Model-data mismatch moved sites : %d ' % (self.n_sites_moved) ; logging.debug(msg) + + cats = [k for k in SitesWeights.keys() if 'site.category' in k] + + SiteCategories={} + for key in cats: + if self.Co2_C13_together or self.C13_only: + name,error,error_c13,may_localize,may_reject = SitesWeights[key].split(';') + else: + name,error,may_localize,may_reject = SitesWeights[key].split(';') + + name = name.strip().lower() + error = float(error) + if self.Co2_C13_together or self.C13_only: + error_c13 = float(error_c13) + may_localize = bool(may_localize) + may_reject = bool(may_reject) + if self.Co2_C13_together or self.C13_only: + SiteCategories[name] = {'error':error,'error_c13':error_c13,'may_localize':may_localize,'may_reject':may_reject} + else: + SiteCategories[name] = {'error':error,'may_localize':may_localize,'may_reject':may_reject} + + + active = [k for k in SitesWeights.keys() if 'site.active' in k] + + SiteInfo = {} + for key in active: + sitename, sitecategory = SitesWeights[key].split(';') + sitename = sitename.strip() + #sitename=NameConvert.NameConvert(name="%s"%sitename,to="OP") #IV to convert to obspack name conventions + sitecategory = sitecategory.strip().lower() + SiteInfo[sitename] = SiteCategories[sitecategory] + + for obs in self.Data: + + obs.mdm = 1000.0 # default is very high model-data-mismatch, until explicitly set by script + obs.mdmc13 = 1000.0 # default is very high model-data-mismatch, until explicitly set by script + + species, site, method, lab, nr = os.path.split(obs.fromfile)[-1].split('_') + + identifier = "%s_%02d_%s"%(site,int(lab),method,) + + identifier = NameConvert(name="%s_%s_%s"%(site.lower(),method.lower(),lab.lower(),), to='GV') + + if SiteInfo.has_key(identifier): + msg = "Observation found (%s, %s)" % (obs.code, identifier,) ; logging.debug(msg) + # obs.mdm = SiteInfo[identifier]['error'] + if species=='co2c13': + # obs.mdmc13 = SiteInfo[identifier]['error_c13'] + obs.mdm = SiteInfo[identifier]['error_c13'] + else: + obs.mdm = SiteInfo[identifier]['error']*self.global_R_scaling + print obs,obs.mdm,species + obs.may_localize = SiteInfo[identifier]['may_localize'] + obs.may_reject = SiteInfo[identifier]['may_reject'] + else: + msg= "Observation NOT found (%s, %s), please check sites.rc file (%s) !!!" % (obs.code, identifier, self.SitesFile,) ; logging.warning(msg) + obs.flag = 99 + + # Add SiteInfo dictionary to the Observation object for future use + + self.SiteInfo = SiteInfo + + def WriteObsToFile(self): + """ + Write selected information contained in the Observation object to a file. + + """ + import shutil + import da.tools.io4 as io + from da.tools.general import ToDectime + import numpy as np + + outfile = os.path.join(self.DaCycle['dir.output'],'sampleinfo_%s.nc'% self.DaCycle['time.sample.stamp']) + + f = io.CT_CDF(outfile,method='create') + msg = 'Creating new Sample output file for postprocessing (%s)' % outfile ; logging.debug(msg) + + dimid = f.AddDim('obs',len(self.Data)) + dim200char = f.AddDim('string_of200chars',200) + dim10char = f.AddDim('string_of10chars',10) + dimcalcomp = f.AddDim('calendar_components',6) + + if len(self.Data) == 0: + f.close() + return outfile + + data = self.Data.getvalues('id') + + savedict = io.std_savedict.copy() + savedict['name'] = "obs_num" + savedict['dtype'] = "int" + savedict['long_name'] = "Unique_Dataset_observation_index_number" + savedict['units'] = "" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED." + status = f.AddData(savedict) + + data = [[d.year,d.month,d.day,d.hour,d.minute,d.second] for d in self.Data.getvalues('xdate') ] + + savedict = io.std_savedict.copy() + savedict['dtype'] = "int" + savedict['name'] = "date_components" + savedict['units'] = "integer components of UTC date/time" + savedict['dims'] = dimid+dimcalcomp + savedict['values'] = data + savedict['missing_value'] = -9 + savedict['comment'] = "Calendar date components as integers. Times and dates are UTC." + savedict['order'] = "year, month, day, hour, minute, second" + status = f.AddData(savedict) + + data = self.Data.getvalues('obs') + + savedict = io.std_savedict.copy() + savedict['name'] = "observed" + savedict['long_name'] = "observedvalues" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = 'Observations used in optimization' + dummy = f.AddData(savedict) + + data = self.Data.getvalues('mdm') + + savedict = io.std_savedict.copy() + savedict['name'] = "modeldatamismatch" + savedict['long_name'] = "modeldatamismatch" + savedict['units'] = "[mol mol-1]^2" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = 'Variance of mole fractions resulting from model-data mismatch' + dummy = f.AddData(savedict) + + + if self.Co2_C13_together or self.C13_only or self.Co2_only: + + data = self.Data.getvalues('mdmc13') + + savedict = io.std_savedict.copy() + savedict['name'] = "modeldatamismatchc13" + savedict['long_name'] = "modeldatamismatchc13" + savedict['units'] = "[permil]^2" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = 'Variance of permil resulting from model-data mismatch' + dummy = f.AddData(savedict) + + + + + data = self.Data.getvalues('simulated') + + dimmembers = f.AddDim('members',data.shape[1]) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamples" + savedict['long_name'] = "modelsamples for all ensemble members" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid+dimmembers + savedict['values'] = data.tolist() + savedict['comment'] = 'simulated mixing ratios based on optimized state vector' + dummy = f.AddData(savedict) + + data = self.Data.getvalues('fromfile') + + savedict = io.std_savedict.copy() + savedict['name'] = "inputfilename" + savedict['long_name'] = "name of file where original obs data was taken from" + savedict['dtype'] = "char" + savedict['dims'] = dimid+dim200char + savedict['values'] = data + savedict['missing_value'] = '!' + dummy = f.AddData(savedict) + + status = f.close() + + msg = "Successfully wrote data to Sample output file (%s)"%outfile ;logging.debug(msg) + + return outfile + + + +################### End Class CtObservations ################### + + + +################### Begin Class MixingRatioSample ################### + +class MixingRatioSample(object): + """ + Holds the data that defines a Mixing Ratio Sample in the data assimilation framework. Sor far, this includes all + attributes listed below in the __init__ method. One can additionally make more types of data, or make new + objects for specific projects. + + """ + + def __init__(self,id,xdate,code='XXX',obs=0.0,simulated=0.0,resid=0.0,hphr=0.0,mdm=0.0,mdmc13=0.0,flag=0,height=0.0,lat=-999.,lon=-999.,evn='0000',species='co2',samplingstrategy=1,sdev=0.0,fromfile='none.nc'): + + self.code = code.strip() # Site code + self.xdate = xdate # Date of obs + self.obs = obs # Value observed + self.simulated = simulated # Value simulated by model + self.resid = resid # Mixing ratio residuals + self.hphr = hphr # Mixing ratio prior uncertainty from fluxes and (HPH) and model data mismatch (R) + self.mdm = mdm # Model data mismatch + self.mdmc13 = mdmc13 # Model data mismatch c13 + self.may_localize = True # Whether sample may be localized in optimizer + self.may_reject = True # Whether sample may be rejected if outside threshold + self.flag = flag # Flag + self.height = height # Sample height + self.lat = lat # Sample lat + self.lon = lon # Sample lon + self.id = id # ID number + self.evn = evn # Event number + self.sdev = sdev # standard deviation of ensemble + self.masl = True # Sample is in Meters Above Sea Level + self.mag = not self.masl # Sample is in Meters Above Ground + self.species = species.strip() + self.samplingstrategy = samplingstrategy + self.fromfile = fromfile # netcdf filename inside ObsPack distribution, to write back later + + def __str__(self): + day=self.xdate.strftime('%Y-%m-%d %H:%M:%S') + return ' '.join(map(str,[self.code,day,self.obs,self.flag,self.id])) + +################### End Class MixingRatioSample ################### + +################### Begin Class MixingRatioList ################### + +class MixingRatioList(list): + """ This is a special type of list that holds MixingRatioSample objects. It has methods to extract data from such a list easily """ + from numpy import array, ndarray + + def getvalues(self,name,constructor=array): + from numpy import ndarray + result = constructor([getattr(o,name) for o in self]) + if isinstance(result,ndarray): + return result.squeeze() + else: + return result + def unflagged(self): + return MixingRatioSample([o for o in self if o.flag == 0]) + def flagged(self): + return MixingRatioSample([o for o in self if o.flag != 0]) + def selectsite(self,site='mlo'): + l=[o for o in self if o.code == site] + return MixingRatioSample(l) + def selecthours(self,hours=range(1,24)): + l=[o for o in self if o.xdate.hour in hours] + return MixingRatioSample(l) + +################### End Class MixingRatioList ################### + +if __name__ == "__main__": + + import logging + from da.tools.initexit import CycleControl + from da.ct.dasystem import CtDaSystem + from da.ct.statevector import CtStateVector + + sys.path.append('../../') + + logging.root.setLevel(logging.DEBUG) + + DaCycle = CycleControl(args={'rc':'../../da.rc'}) + DaCycle.Initialize() + DaCycle.ParseTimes() + + DaSystem = CtDaSystem('../../carbontracker.rc') + DaSystem.Initialize() + + DaCycle.DaSystem = DaSystem + + obs = ObsPackObservations() + obs.DaCycle = DaCycle + + while DaCycle['time.start'] < DaCycle['time.finish']: + + DaCycle['time.sample.start'] = DaCycle['time.start'] + DaCycle['time.sample.end'] = DaCycle['time.end'] + + obs.Initialize() + obs.Validate() + obs.AddObs() + obs.AddModelDataMismatch() + + #print(obs.Data.getvalues('obs')) + #print(obs.Data.getvalues('mdm')) + #print(obs.Data.getvalues('mdmc13')) + DaCycle.AdvanceCycleTimes() + + + -- GitLab