Commit 86e48669 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

start of obspack interface for CTDAS

parent f69e3801
#!/usr/bin/env python
# obs.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
import logging
import datetime
sys.path.append(os.getcwd())
identifier = 'CarbonTracker CO2 mixing ratios'
version = '0.0'
from da.baseclasses.obs import Observation
################### Begin Class CtObservations ###################
class CtObservations(Observation):
""" an object that holds data + methods and attributes needed to manipulate mixing ratio values """
def getid(self):
return identifier
def getversion(self):
return version
def getlength(self):
return len(self.Data)
def Initialize(self):
self.startdate = self.DaCycle['time.sample.start']
self.enddate = self.DaCycle['time.sample.end']
DaSystem = self.DaCycle.DaSystem
sfname = DaSystem['obs.input.fname']
if sfname.endswith('.nc'):
filename = os.path.join(DaSystem['obs.input.dir'],sfname)
else:
filename = os.path.join(DaSystem['obs.input.dir'],sfname+'.'+self.startdate.strftime('%Y%m%d')+'.nc')
if not os.path.exists(filename):
msg = 'Could not find the required observation input file (%s) ' % filename ; logging.error(msg)
raise IOError,msg
else:
self.ObsFilename = filename
self.Data = MixingRatioList([])
return None
def AddObs(self):
""" Returns a MixingRatioList holding individual MixingRatioSample objects for all obs in a file
The CarbonTracker mixing ratio files are provided as one long list of obs for all possible dates. So we can
either:
(1) read all, and the subselect the data we will use in the rest of this cycle
(2) Use nco to make a subset of the data
For now, we will stick with option (1)
"""
import da.tools.io4 as io
import datetime as dtm
from string import strip, join
from numpy import array, logical_and
ncf = io.CT_Read(self.ObsFilename,'read')
idates = ncf.GetVariable('date_components')
dates = array([dtm.datetime(*d) for d in idates])
subselect = logical_and(dates >= self.startdate , dates <= self.enddate).nonzero()[0]
dates = dates.take(subselect,axis=0)
ids = ncf.GetVariable('id').take(subselect,axis=0)
evn = ncf.GetVariable('eventnumber').take(subselect,axis=0)
evn = [s.tostring().lower() for s in evn]
evn = map(strip,evn)
sites = ncf.GetVariable('site').take(subselect,axis=0)
sites = [s.tostring().lower() for s in sites]
sites = map(strip,sites)
lats = ncf.GetVariable('lat').take(subselect,axis=0)
lons = ncf.GetVariable('lon').take(subselect,axis=0)
alts = ncf.GetVariable('alt').take(subselect,axis=0)
obs = ncf.GetVariable('obs').take(subselect,axis=0)
species = ncf.GetVariable('species').take(subselect,axis=0)
species = [s.tostring().lower() for s in species]
species = map(strip,species)
date = ncf.GetVariable('date').take(subselect,axis=0)
strategy = ncf.GetVariable('sampling_strategy').take(subselect,axis=0)
flags = ncf.GetVariable('NOAA_QC_flags').take(subselect,axis=0)
flags = [s.tostring().lower() for s in flags]
flags = map(strip,flags)
flags = [int(f == '...') for f in flags]
dummy = ncf.close()
msg = "Successfully read data from obs file (%s)"%self.ObsFilename ; logging.debug(msg)
for n in range(len(dates)):
self.Data.append( MixingRatioSample(ids[n],dates[n],sites[n],obs[n],0.0,0.0,0.0,0.0,flags[n],alts[n],lats[n],lons[n],evn[n],species[n],strategy[n],0.0) )
msg = "Added %d observations to the Data list" % len(dates) ; logging.debug(msg)
def AddSimulations(self,filename,silent=True):
""" Adds model simulated values to the mixing ratio objects """
import da.tools.io4 as io
if not os.path.exists(filename):
msge = "Sample output filename for observations could not be found : %s" % filename ; logging.error(msge)
msg = "Did the sampling step succeed?" ; logging.error(msg)
msg = "...exiting" ; logging.error(msge)
raise IOError,msg
ncf = io.CT_Read(filename,method='read')
ids = ncf.GetVariable('id')
simulated = ncf.GetVariable('flask')
dummy = ncf.close()
msg = "Successfully read data from model sample file (%s)"%filename ; logging.info(msg)
obs_ids = self.Data.getvalues('id')
obs_ids = obs_ids.tolist()
ids = map(int,ids)
missing_samples = []
for id,val in zip(ids,simulated):
if id in obs_ids:
index = obs_ids.index(id)
#print id,val,val.shape
dummy = self.Data[index].simulated = val*1e6 # to umol/mol
else:
missing_samples.append(id)
if not silent and missing_samples != []:
msg = 'Model samples were found that did not match any ID in the observation list. Skipping them...' ; logging.warning(msg)
#msg = '%s'%missing_samples ; logging.warning(msg)
msg = "Added %d simulated values to the Data list" % (len(ids)-len(missing_samples) ) ; logging.debug(msg)
def WriteSampleInfo(self):
"""
Write the information needed by the observation operator to a file. Return the filename that was written for later use
"""
import shutil
#import da.tools.io as io
import da.tools.io4 as io
from da.tools.general import ToDectime
obsinputfile = os.path.join(self.DaCycle['dir.input'],'observations_%s.nc'% self.DaCycle['time.sample.stamp'])
f = io.CT_CDF(obsinputfile,method='create')
msg = 'Creating new observations file for ObservationOperator (%s)' % obsinputfile ; logging.debug(msg)
dimid = f.AddDim('id',len(self.Data))
dim30char = f.AddDim('string_of30chars',30)
dim24char = f.AddDim('string_of24chars',24)
dim10char = f.AddDim('string_of10chars',10)
dimcalcomp = f.AddDim('calendar_components',6)
data = self.Data.getvalues('id')
savedict = io.std_savedict.copy()
savedict['name'] = "id"
savedict['dtype'] = "int"
savedict['long_name'] = "identification number"
savedict['units'] = "NA"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = "This is a unique identifier for each preprocessed observation used in CarbonTracker"
dummy = f.AddData(savedict)
data = [ToDectime(d) for d in self.Data.getvalues('xdate') ]
savedict = io.std_savedict.copy()
savedict['name'] = "decimal_date"
savedict['units'] = "years"
savedict['dims'] = dimid
savedict['values'] = data
savedict['missing_value'] = -1.e34
savedict['_FillValue'] = -1.e34
dummy = f.AddData(savedict)
data = [[d.year,d.month,d.day,d.hour,d.minute,d.second] for d in self.Data.getvalues('xdate') ]
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "date_components"
savedict['units'] = "integer components of UTC date"
savedict['dims'] = dimid+dimcalcomp
savedict['values'] = data
savedict['missing_value'] = -9
savedict['_FillValue'] = -9
savedict['comment'] = "Calendar date components as integers. Times and dates are UTC."
savedict['order'] = "year, month, day, hour, minute, second"
dummy = f.AddData(savedict)
data = self.Data.getvalues('lat')
savedict = io.std_savedict.copy()
savedict['name'] = "lat"
savedict['units'] = "degrees_north"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
savedict['_FillValue'] = -999.9
dummy = f.AddData(savedict)
data = self.Data.getvalues('lon')
savedict = io.std_savedict.copy()
savedict['name'] = "lon"
savedict['units'] = "degrees_east"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
savedict['_FillValue'] = -999.9
dummy = f.AddData(savedict)
data = self.Data.getvalues('height')
savedict = io.std_savedict.copy()
savedict['name'] = "alt"
savedict['units'] = "meters_above_sea_level"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
savedict['_FillValue'] = -999.9
dummy = f.AddData(savedict)
data = self.Data.getvalues('samplingstrategy')
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "sampling_strategy"
savedict['units'] = "NA"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -9
savedict['_FillValue'] = -9
dummy = f.AddData(savedict)
try:
data = self.Data.getvalues('evn')
savedict = io.std_savedict.copy()
savedict['dtype'] = "char"
savedict['name'] = "eventnumber"
savedict['units'] = "NOAA database identifier"
savedict['dims'] = dimid+dim30char
savedict['values'] = data
savedict['missing_value'] = '-'
savedict['_FillValue'] = '-'
dummy = f.AddData(savedict)
data = self.Data.getvalues('species')
savedict = io.std_savedict.copy()
savedict['dtype'] = "char"
savedict['name'] = "species"
savedict['units'] = "chemical_species_name"
savedict['dims'] = dimid+dim10char
savedict['values'] = data
savedict['missing_value'] = '-'
savedict['_FillValue'] = '-'
dummy = f.AddData(savedict)
data = self.Data.getvalues('code')
savedict = io.std_savedict.copy()
savedict['dtype'] = "char"
savedict['name'] = "site"
savedict['units'] = "site_identifier"
savedict['dims'] = dimid+dim24char
savedict['values'] = data
savedict['missing_value'] = '-'
savedict['_FillValue'] = '-'
dummy = f.AddData(savedict)
except:
msg = "Character arrays 'species' and 'sites' were not written by the io module" ; logging.warning (msg)
dummy = f.close()
msg = "Successfully wrote data to obs file" ;logging.debug(msg)
msg = "Sample input file for obs operator now in place [%s]"%obsinputfile ; logging.info(msg)
return obsinputfile
def AddModelDataMismatch(self ):
"""
Get the model-data mismatch values for this cycle.
(1) Open a sites_weights file
(2) Parse the data
(3) Compare site list against data
(4) Take care of double sites, etc
"""
import da.tools.rc as rc
filename = self.DaCycle.DaSystem['obs.sites.rc']
if not os.path.exists(filename):
msg = 'Could not find the required sites.rc input file (%s) ' % filename ; logging.error(msg)
raise IOError,msg
else:
self.SitesFile = filename
SitesWeights = rc.read(self.SitesFile)
self.rejection_threshold = int(SitesWeights['obs.rejection.threshold'])
self.global_R_scaling = float(SitesWeights['global.R.scaling'])
self.n_site_categories = int(SitesWeights['n.site.categories'])
self.n_sites_active = int(SitesWeights['n.sites.active'])
self.n_sites_moved = int(SitesWeights['n.sites.moved'])
msg = 'Model-data mismatch rejection threshold: %d ' % (self.rejection_threshold) ; logging.debug(msg)
msg = 'Model-data mismatch scaling factor : %f ' % (self.global_R_scaling) ; logging.debug(msg)
msg = 'Model-data mismatch site categories : %d ' % (self.n_site_categories) ; logging.debug(msg)
msg = 'Model-data mismatch active sites : %d ' % (self.n_sites_active) ; logging.debug(msg)
msg = 'Model-data mismatch moved sites : %d ' % (self.n_sites_moved) ; logging.debug(msg)
cats = [k for k in SitesWeights.keys() if 'site.category' in k]
SiteCategories={}
for key in cats:
name,error,may_localize,may_reject = SitesWeights[key].split(';')
name = name.strip().lower()
error = float(error)
may_localize = bool(may_localize)
may_reject = bool(may_reject)
SiteCategories[name] = {'error':error,'may_localize':may_localize,'may_reject':may_reject}
#print name,SiteCategories[name]
active = [k for k in SitesWeights.keys() if 'site.active' in k]
SiteInfo = {}
for key in active:
sitename, sitecategory = SitesWeights[key].split(';')
sitename = sitename.strip().lower()
sitecategory = sitecategory.strip().lower()
SiteInfo[sitename] = SiteCategories[sitecategory]
#print sitename,SiteInfo[sitename]
for obs in self.Data:
obs.mdm = 1000.0 # default is very high model-data-mismatch, until explicitly set by script
if SiteInfo.has_key(obs.code):
msg = "Observation found (%s)" % obs.code ; logging.debug(msg)
obs.mdm = SiteInfo[obs.code]['error']
obs.may_localize = SiteInfo[obs.code]['may_localize']
obs.may_reject = SiteInfo[obs.code]['may_reject']
else:
msg= "Observation NOT found (%s), please check sites.rc file (%s) !!!" % (obs.code, self.SitesFile,) ; logging.warning(msg)
obs.flag = 99
# Add SiteInfo dictionary to the Observation object for future use
self.SiteInfo = SiteInfo
################### End Class CtObservations ###################
################### Begin Class MixingRatioSample ###################
class MixingRatioSample(object):
"""
Holds the data that defines a Mixing Ratio Sample in the data assimilation framework. Sor far, this includes all
attributes listed below in the __init__ method. One can additionally make more types of data, or make new
objects for specific projects.
"""
def __init__(self,id,xdate,code='XXX',obs=0.0,simulated=0.0,resid=0.0,hphr=0.0,mdm=0.0,flag=0,height=0.0,lat=-999.,lon=-999.,evn='0000',species='co2',samplingstrategy=1,sdev=0.0):
self.code = code.strip() # Site code
self.xdate = xdate # Date of obs
self.obs = obs # Value observed
self.simulated = simulated # Value simulated by model
self.resid = resid # Mixing ratio residuals
self.hphr = hphr # Mixing ratio prior uncertainty from fluxes and (HPH) and model data mismatch (R)
self.mdm = mdm # Model data mismatch
self.may_localize = True # Whether sample may be localized in optimizer
self.may_reject = True # Whether sample may be rejected if outside threshold
self.flag = flag # Flag
self.height = height # Sample height
self.lat = lat # Sample lat
self.lon = lon # Sample lon
self.id = id # ID number
self.evn = evn # Event number
self.sdev = sdev # standard deviation of ensemble
self.masl = True # Sample is in Meters Above Sea Level
self.mag = not self.masl # Sample is in Meters Above Ground
self.species = species.strip()
self.samplingstrategy = samplingstrategy
def __str__(self):
day=self.xdate.strftime('%Y-%m-%d %H:%M:%S')
return ' '.join(map(str,[self.code,day,self.obs,self.flag,self.id]))
################### End Class MixingRatioSample ###################
################### Begin Class MixingRatioList ###################
class MixingRatioList(list):
""" This is a special type of list that holds MixingRatioSample objects. It has methods to extract data from such a list easily """
from numpy import array, ndarray
def getvalues(self,name,constructor=array):
from numpy import ndarray
result = constructor([getattr(o,name) for o in self])
if isinstance(result,ndarray):
return result.squeeze()
else:
return result
def unflagged(self):
return MixingRatioSample([o for o in self if o.flag == 0])
def flagged(self):
return MixingRatioSample([o for o in self if o.flag != 0])
def selectsite(self,site='mlo'):
l=[o for o in self if o.code == site]
return MixingRatioSample(l)
def selecthours(self,hours=range(1,24)):
l=[o for o in self if o.xdate.hour in hours]
return MixingRatioSample(l)
################### End Class MixingRatioList ###################
if __name__ == "__main__":
from da.tools.initexit import StartLogger
from da.tools.pipeline import JobStart
from datetime import datetime
import logging
import sys,os
sys.path.append(os.getcwd())
StartLogger()
obs = CtObservations()
DaCycle = JobStart(['-v'],{'rc':'da.rc'})
DaCycle['time.sample.start'] = datetime(2000,1,1)
DaCycle['time.sample.end'] = datetime(2000,1,2)
obs.Initialize()
obs.Validate()
obs.AddObs()
print(obs.Data.getvalues('obs'))
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment