Skip to content
Snippets Groups Projects
Commit bc0fbd5a authored by ivar's avatar ivar
Browse files

made a new obspack.py that should work with the new prototype obspack releases

parent 444c3ce6
No related branches found
No related tags found
No related merge requests found
#i!/usr/bin/env python
# obs.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
sys.path.append(os.getcwd())
sys.path.append('../../')
import logging
import datetime
identifier = 'CarbonTracker CO2 mixing ratios'
version = '0.0'
from da.baseclasses.obs import Observation
################### Begin Class ObsPackObservations ###################
class ObsPackObservations(Observation):
""" an object that holds data + methods and attributes needed to manipulate mixing ratio values """
def getid(self):
return identifier
def getversion(self):
return version
def getlength(self):
return len(self.Data)
def Initialize(self):
self.startdate = self.DaCycle['time.sample.start']
self.enddate = self.DaCycle['time.sample.end']
DaSystem = self.DaCycle.DaSystem
op_co2_id = DaSystem['obspack.co2.input.id']
op_co2_dir = DaSystem['obspack.co2.input.dir']
op_c13_id = DaSystem['obspack.c13.input.id']
op_c13_dir = DaSystem['obspack.c13.input.dir']
co2_only = DaSystem['co2.only']
c13_only = DaSystem['c13.only']
co2_c13_together = DaSystem['co2.c13.together']
if not os.path.exists(op_co2_dir):
msg = 'Could not find the required ObsPack distribution (%s) ' % op_co2_dir ; logging.error(msg)
raise IOError,msg
else:
self.ObsPackCo2Dir = op_co2_dir
self.ObsPackCo2Id = op_co2_id
self.ObsPackC13Dir = op_c13_dir
self.ObsPackC13Id = op_c13_id
self.Co2_only = co2_only
self.C13_only = c13_only
self.Co2_C13_together = co2_c13_together
self.Data = MixingRatioList([])
return None
def AddObs(self):
""" Returns a MixingRatioList holding individual MixingRatioSample objects for all obs in a file
The ObsPack mixing ratio files are provided as time series per site with all dates in sequence.
We will loop over all site files in the ObsPackage, and subset each to our needs
"""
import da.tools.io4 as io
import datetime as dtm
from string import strip, join
from numpy import array, logical_and
# Step 1: Read list of available site files in package
if self.Co2_only:
obsspecies = [self.ObsPackCo2Dir]
if self.C13_only:
obsspecies = [self.ObsPackC13Dir]
if self.Co2_C13_together:
obsspecies = [self.ObsPackCo2Dir,self.ObsPackC13Dir ]
for obsspecie in obsspecies:
if obsspecie == self.ObsPackCo2Dir:
infile = os.path.join(obsspecie,'summary','%s_dataset_summary.txt'%(self.ObsPackCo2Id,))
else:
infile = os.path.join(obsspecie,'summary','%s_dataset_summary.txt'%(self.ObsPackC13Id,))
f = open(infile,'r')
lines = f.readlines()
status = f.close()
ncfilelist = []
for line in lines:
if line.startswith('#'): continue # header
print line
ncfile, lab , dist, start_date, stop_date, data_comparison = line.split()
ncfilelist += [ncfile]
msg = "ObsPack dataset info read, proceeding with %d netcdf files"%(len(ncfilelist),) ; logging.debug(msg)
for ncfile in ncfilelist:
infile = os.path.join(obsspecie,'data','nc',ncfile)
ncf = io.CT_Read(infile,'read')
idates = ncf.GetVariable('time_components')
dates = array([dtm.datetime(*d) for d in idates])
subselect = logical_and(dates >= self.startdate , dates <= self.enddate).nonzero()[0]
dates = dates.take(subselect,axis=0)
ids = ncf.GetVariable('obspack_num').take(subselect) # or should we propagate obs_num which is not unique across datasets??
evn = ncf.GetVariable('obspack_id').take(subselect,axis=0)
evn = [s.tostring().lower() for s in evn]
evn = map(strip,evn)
site = ncf.GetAttribute('site_code')
lats = ncf.GetVariable('latitude').take(subselect,axis=0)
lons = ncf.GetVariable('longitude').take(subselect,axis=0)
alts = ncf.GetVariable('altitude').take(subselect,axis=0)
obs = ncf.GetVariable('value').take(subselect,axis=0)
if obsspecie == self.ObsPackCo2Dir:
obs = obs #*1.e6 #IV convert to ppm
species = ncf.GetAttribute('dataset_parameter')
#strategy = ncf.GetVariable('sampling_strategy').take(subselect,axis=0)
strategy = 1
flags = ncf.GetVariable('qc_flag').take(subselect,axis=0)
flags = [s.tostring().lower() for s in flags]
flags = map(strip,flags)
flags = [int(f == '...') for f in flags]
status = ncf.close()
msg = site,species,obs,dates,ids ; logging.info(msg)
for n in range(len(dates)):
self.Data.append( MixingRatioSample(ids[n],dates[n],site,obs[n],0.0,0.0,0.0,0.0,0.0,flags[n],alts[n],lats[n],lons[n],evn[n],species,strategy,0.0, infile) )
msg = "Added %d observations from file (%s) to the Data list" % (len(dates),ncfile,) ; logging.debug(msg)
msg = "Observation list now holds %d values" % (len(self.Data),) ; logging.info(msg)
def AddSimulations(self,filename,silent=True):
""" Adds model simulated values to the mixing ratio objects """
import da.tools.io4 as io
import da.tools.MixingratioToPermil as mtp
from numpy import array,arange
if not os.path.exists(filename):
msge = "Sample output filename for observations could not be found : %s" % filename ; logging.error(msge)
msg = "Did the sampling step succeed?" ; logging.error(msg)
msg = "...exiting" ; logging.error(msge)
raise IOError,msg
ncf = io.CT_Read(filename,method='read')
ids = ncf.GetVariable('obs_num')
simulated = ncf.GetVariable('flask')
trspecies = ncf.GetVariable('tracer_names')
status = ncf.close()
msg = "Successfully read data from model sample file (%s)"%filename ; logging.info(msg)
obs_ids = self.Data.getvalues('id')
obs_ids = obs_ids.tolist()
ids = map(int,ids)
obs_species = self.Data.getvalues('species')
#obs_species = obs_species.tolist()
missing_samples = []
tr_species=[]
for i in range(len(trspecies[:,0])):
trjoin = [''.join(trspecies[i,:])]
tr_species.append(trjoin[0].strip())
tr_species=array(tr_species)
if self.Co2_only:
simulated = simulated #*1.e6
if self.C13_only:
simulated = mtp.MixingratioToPermil(filename,simulated)
if self.Co2_C13_together:
simulated = mtp.MixingratioToPermil(filename,simulated)
xx=arange(len(ids))
for id,obs_specie,val in zip(xx,obs_species,simulated):
if id in xx:#obs_ids:
index = id #obs_ids.index(id)
#print id,index
counter=0
trval=[]
for sp in tr_species:
if obs_specie == sp:
#print counter,val[counter]
trval.append(val[counter])
counter=counter+1
trval=array(trval)
#print 'trval',index,trval
status = self.Data[index].simulated = trval # *1e6 # to umol/mol
#if index ==0:
#print 'Data.getvalues2',self.Data[index].simulated
#index = obs_ids.index(id)
#print id,val,val.shape
#dummy = self.Data[index].simulated = val # *1e6 # to umol/mol
else:
missing_samples.append(id)
if not silent and missing_samples != []:
msg = 'Model samples were found that did not match any ID in the observation list. Skipping them...' ; logging.warning(msg)
#msg = '%s'%missing_samples ; logging.warning(msg)
msg = "Added %d simulated values to the Data list" % (len(ids)-len(missing_samples) ) ; logging.debug(msg)
def WriteSampleInfo(self):
"""
Write the information needed by the observation operator to a file. Return the filename that was written for later use
"""
import shutil
#import da.tools.io as io
import da.tools.io4 as io
from da.tools.general import ToDectime
obsinputfile = os.path.join(self.DaCycle['dir.input'],'observations_%s.nc'% self.DaCycle['time.sample.stamp'])
f = io.CT_CDF(obsinputfile,method='create')
msg = 'Creating new observations file for ObservationOperator (%s)' % obsinputfile ; logging.debug(msg)
dimid = f.AddDim('obs',len(self.Data))
dim200char = f.AddDim('string_of200chars',200)
dim10char = f.AddDim('string_of10chars',10)
dimcalcomp = f.AddDim('calendar_components',6)
if len(self.Data) == 0:
f.close()
return obsinputfile
data = self.Data.getvalues('id')
savedict = io.std_savedict.copy()
savedict['name'] = "obs_num"
savedict['dtype'] = "int"
savedict['long_name'] = "Unique_Dataset_observation_index_number"
savedict['units'] = ""
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED."
status = f.AddData(savedict)
data = [[d.year,d.month,d.day,d.hour,d.minute,d.second] for d in self.Data.getvalues('xdate') ]
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "date_components"
savedict['units'] = "integer components of UTC date/time"
savedict['dims'] = dimid+dimcalcomp
savedict['values'] = data
savedict['missing_value'] = -9
savedict['comment'] = "Calendar date components as integers. Times and dates are UTC."
savedict['order'] = "year, month, day, hour, minute, second"
status = f.AddData(savedict)
data = self.Data.getvalues('lat')
savedict = io.std_savedict.copy()
savedict['name'] = "latitude"
savedict['units'] = "degrees_north"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
status = f.AddData(savedict)
data = self.Data.getvalues('lon')
savedict = io.std_savedict.copy()
savedict['name'] = "longitude"
savedict['units'] = "degrees_east"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
status = f.AddData(savedict)
data = self.Data.getvalues('height')
savedict = io.std_savedict.copy()
savedict['name'] = "altitude"
savedict['units'] = "meters_above_sea_level"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
status = f.AddData(savedict)
data = self.Data.getvalues('samplingstrategy')
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "sampling_strategy"
savedict['units'] = "NA"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -9
status = f.AddData(savedict)
data = self.Data.getvalues('evn')
savedict = io.std_savedict.copy()
savedict['dtype'] = "char"
savedict['name'] = "obs_id"
savedict['units'] = "NOAA database identifier"
savedict['dims'] = dimid+dim200char
savedict['values'] = data
savedict['missing_value'] = '!'
status = f.AddData(savedict)
status = f.close()
msg = "Successfully wrote data to obs file" ;logging.debug(msg)
msg = "Sample input file for obs operator now in place [%s]"%obsinputfile ; logging.info(msg)
return obsinputfile
def AddModelDataMismatch(self ):
"""
Get the model-data mismatch values for this cycle.
(1) Open a sites_weights file
(2) Parse the data
(3) Compare site list against data
(4) Take care of double sites, etc
"""
import da.tools.rc as rc
from da.tools.general import NameConvert
filename = self.DaCycle.DaSystem['obs.sites.rc']
if not os.path.exists(filename):
msg = 'Could not find the required sites.rc input file (%s) ' % filename ; logging.error(msg)
raise IOError,msg
else:
self.SitesFile = filename
SitesWeights = rc.read(self.SitesFile)
self.rejection_threshold = int(SitesWeights['obs.rejection.threshold'])
self.global_R_scaling = float(SitesWeights['global.R.scaling'])
self.n_site_categories = int(SitesWeights['n.site.categories'])
self.n_sites_active = int(SitesWeights['n.sites.active'])
self.n_sites_moved = int(SitesWeights['n.sites.moved'])
msg = 'Model-data mismatch rejection threshold: %d ' % (self.rejection_threshold) ; logging.debug(msg)
msg = 'Model-data mismatch scaling factor : %f ' % (self.global_R_scaling) ; logging.debug(msg)
msg = 'Model-data mismatch site categories : %d ' % (self.n_site_categories) ; logging.debug(msg)
msg = 'Model-data mismatch active sites : %d ' % (self.n_sites_active) ; logging.debug(msg)
msg = 'Model-data mismatch moved sites : %d ' % (self.n_sites_moved) ; logging.debug(msg)
cats = [k for k in SitesWeights.keys() if 'site.category' in k]
SiteCategories={}
for key in cats:
if self.Co2_C13_together or self.C13_only:
name,error,error_c13,may_localize,may_reject = SitesWeights[key].split(';')
else:
name,error,may_localize,may_reject = SitesWeights[key].split(';')
name = name.strip().lower()
error = float(error)
if self.Co2_C13_together or self.C13_only:
error_c13 = float(error_c13)
may_localize = bool(may_localize)
may_reject = bool(may_reject)
if self.Co2_C13_together or self.C13_only:
SiteCategories[name] = {'error':error,'error_c13':error_c13,'may_localize':may_localize,'may_reject':may_reject}
else:
SiteCategories[name] = {'error':error,'may_localize':may_localize,'may_reject':may_reject}
active = [k for k in SitesWeights.keys() if 'site.active' in k]
SiteInfo = {}
for key in active:
sitename, sitecategory = SitesWeights[key].split(';')
sitename = sitename.strip()
#sitename=NameConvert.NameConvert(name="%s"%sitename,to="OP") #IV to convert to obspack name conventions
sitecategory = sitecategory.strip().lower()
SiteInfo[sitename] = SiteCategories[sitecategory]
for obs in self.Data:
obs.mdm = 1000.0 # default is very high model-data-mismatch, until explicitly set by script
obs.mdmc13 = 1000.0 # default is very high model-data-mismatch, until explicitly set by script
species, site, method, lab, nr = os.path.split(obs.fromfile)[-1].split('_')
identifier = "%s_%02d_%s"%(site,int(lab),method,)
identifier = NameConvert(name="%s_%s_%s"%(site.lower(),method.lower(),lab.lower(),), to='GV')
if SiteInfo.has_key(identifier):
msg = "Observation found (%s, %s)" % (obs.code, identifier,) ; logging.debug(msg)
# obs.mdm = SiteInfo[identifier]['error']
if species=='co2c13':
# obs.mdmc13 = SiteInfo[identifier]['error_c13']
obs.mdm = SiteInfo[identifier]['error_c13']
else:
obs.mdm = SiteInfo[identifier]['error']*self.global_R_scaling
print obs,obs.mdm,species
obs.may_localize = SiteInfo[identifier]['may_localize']
obs.may_reject = SiteInfo[identifier]['may_reject']
else:
msg= "Observation NOT found (%s, %s), please check sites.rc file (%s) !!!" % (obs.code, identifier, self.SitesFile,) ; logging.warning(msg)
obs.flag = 99
# Add SiteInfo dictionary to the Observation object for future use
self.SiteInfo = SiteInfo
def WriteObsToFile(self):
"""
Write selected information contained in the Observation object to a file.
"""
import shutil
import da.tools.io4 as io
from da.tools.general import ToDectime
import numpy as np
outfile = os.path.join(self.DaCycle['dir.output'],'sampleinfo_%s.nc'% self.DaCycle['time.sample.stamp'])
f = io.CT_CDF(outfile,method='create')
msg = 'Creating new Sample output file for postprocessing (%s)' % outfile ; logging.debug(msg)
dimid = f.AddDim('obs',len(self.Data))
dim200char = f.AddDim('string_of200chars',200)
dim10char = f.AddDim('string_of10chars',10)
dimcalcomp = f.AddDim('calendar_components',6)
if len(self.Data) == 0:
f.close()
return outfile
data = self.Data.getvalues('id')
savedict = io.std_savedict.copy()
savedict['name'] = "obs_num"
savedict['dtype'] = "int"
savedict['long_name'] = "Unique_Dataset_observation_index_number"
savedict['units'] = ""
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED."
status = f.AddData(savedict)
data = [[d.year,d.month,d.day,d.hour,d.minute,d.second] for d in self.Data.getvalues('xdate') ]
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "date_components"
savedict['units'] = "integer components of UTC date/time"
savedict['dims'] = dimid+dimcalcomp
savedict['values'] = data
savedict['missing_value'] = -9
savedict['comment'] = "Calendar date components as integers. Times and dates are UTC."
savedict['order'] = "year, month, day, hour, minute, second"
status = f.AddData(savedict)
data = self.Data.getvalues('obs')
savedict = io.std_savedict.copy()
savedict['name'] = "observed"
savedict['long_name'] = "observedvalues"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Observations used in optimization'
dummy = f.AddData(savedict)
data = self.Data.getvalues('mdm')
savedict = io.std_savedict.copy()
savedict['name'] = "modeldatamismatch"
savedict['long_name'] = "modeldatamismatch"
savedict['units'] = "[mol mol-1]^2"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Variance of mole fractions resulting from model-data mismatch'
dummy = f.AddData(savedict)
if self.Co2_C13_together or self.C13_only or self.Co2_only:
data = self.Data.getvalues('mdmc13')
savedict = io.std_savedict.copy()
savedict['name'] = "modeldatamismatchc13"
savedict['long_name'] = "modeldatamismatchc13"
savedict['units'] = "[permil]^2"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Variance of permil resulting from model-data mismatch'
dummy = f.AddData(savedict)
data = self.Data.getvalues('simulated')
dimmembers = f.AddDim('members',data.shape[1])
savedict = io.std_savedict.copy()
savedict['name'] = "modelsamples"
savedict['long_name'] = "modelsamples for all ensemble members"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid+dimmembers
savedict['values'] = data.tolist()
savedict['comment'] = 'simulated mixing ratios based on optimized state vector'
dummy = f.AddData(savedict)
data = self.Data.getvalues('fromfile')
savedict = io.std_savedict.copy()
savedict['name'] = "inputfilename"
savedict['long_name'] = "name of file where original obs data was taken from"
savedict['dtype'] = "char"
savedict['dims'] = dimid+dim200char
savedict['values'] = data
savedict['missing_value'] = '!'
dummy = f.AddData(savedict)
status = f.close()
msg = "Successfully wrote data to Sample output file (%s)"%outfile ;logging.debug(msg)
return outfile
################### End Class CtObservations ###################
################### Begin Class MixingRatioSample ###################
class MixingRatioSample(object):
"""
Holds the data that defines a Mixing Ratio Sample in the data assimilation framework. Sor far, this includes all
attributes listed below in the __init__ method. One can additionally make more types of data, or make new
objects for specific projects.
"""
def __init__(self,id,xdate,code='XXX',obs=0.0,simulated=0.0,resid=0.0,hphr=0.0,mdm=0.0,mdmc13=0.0,flag=0,height=0.0,lat=-999.,lon=-999.,evn='0000',species='co2',samplingstrategy=1,sdev=0.0,fromfile='none.nc'):
self.code = code.strip() # Site code
self.xdate = xdate # Date of obs
self.obs = obs # Value observed
self.simulated = simulated # Value simulated by model
self.resid = resid # Mixing ratio residuals
self.hphr = hphr # Mixing ratio prior uncertainty from fluxes and (HPH) and model data mismatch (R)
self.mdm = mdm # Model data mismatch
self.mdmc13 = mdmc13 # Model data mismatch c13
self.may_localize = True # Whether sample may be localized in optimizer
self.may_reject = True # Whether sample may be rejected if outside threshold
self.flag = flag # Flag
self.height = height # Sample height
self.lat = lat # Sample lat
self.lon = lon # Sample lon
self.id = id # ID number
self.evn = evn # Event number
self.sdev = sdev # standard deviation of ensemble
self.masl = True # Sample is in Meters Above Sea Level
self.mag = not self.masl # Sample is in Meters Above Ground
self.species = species.strip()
self.samplingstrategy = samplingstrategy
self.fromfile = fromfile # netcdf filename inside ObsPack distribution, to write back later
def __str__(self):
day=self.xdate.strftime('%Y-%m-%d %H:%M:%S')
return ' '.join(map(str,[self.code,day,self.obs,self.flag,self.id]))
################### End Class MixingRatioSample ###################
################### Begin Class MixingRatioList ###################
class MixingRatioList(list):
""" This is a special type of list that holds MixingRatioSample objects. It has methods to extract data from such a list easily """
from numpy import array, ndarray
def getvalues(self,name,constructor=array):
from numpy import ndarray
result = constructor([getattr(o,name) for o in self])
if isinstance(result,ndarray):
return result.squeeze()
else:
return result
def unflagged(self):
return MixingRatioSample([o for o in self if o.flag == 0])
def flagged(self):
return MixingRatioSample([o for o in self if o.flag != 0])
def selectsite(self,site='mlo'):
l=[o for o in self if o.code == site]
return MixingRatioSample(l)
def selecthours(self,hours=range(1,24)):
l=[o for o in self if o.xdate.hour in hours]
return MixingRatioSample(l)
################### End Class MixingRatioList ###################
if __name__ == "__main__":
import logging
from da.tools.initexit import CycleControl
from da.ct.dasystem import CtDaSystem
from da.ct.statevector import CtStateVector
sys.path.append('../../')
logging.root.setLevel(logging.DEBUG)
DaCycle = CycleControl(args={'rc':'../../da.rc'})
DaCycle.Initialize()
DaCycle.ParseTimes()
DaSystem = CtDaSystem('../../carbontracker.rc')
DaSystem.Initialize()
DaCycle.DaSystem = DaSystem
obs = ObsPackObservations()
obs.DaCycle = DaCycle
while DaCycle['time.start'] < DaCycle['time.finish']:
DaCycle['time.sample.start'] = DaCycle['time.start']
DaCycle['time.sample.end'] = DaCycle['time.end']
obs.Initialize()
obs.Validate()
obs.AddObs()
obs.AddModelDataMismatch()
#print(obs.Data.getvalues('obs'))
#print(obs.Data.getvalues('mdm'))
#print(obs.Data.getvalues('mdmc13'))
DaCycle.AdvanceCycleTimes()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment