Skip to content
Snippets Groups Projects
Commit 3adbe550 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

reads latest geocarbon obspack

parent 97abc5e3
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
# obs.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
sys.path.append(os.getcwd())
sys.path.append('../../')
import logging
import datetime
identifier = 'CarbonTracker CO2 mixing ratios'
version = '0.0'
from da.baseclasses.obs import Observation
################### Begin Class ObsPackObservations ###################
class ObsPackObservations(Observation):
""" an object that holds data + methods and attributes needed to manipulate mixing ratio values """
def getid(self):
return identifier
def getversion(self):
return version
def getlength(self):
return len(self.Data)
def Initialize(self):
self.startdate = self.DaCycle['time.sample.start']
self.enddate = self.DaCycle['time.sample.end']
DaSystem = self.DaCycle.DaSystem
op_id = DaSystem['obspack.input.id']
op_dir = DaSystem['obspack.input.dir']
if not os.path.exists(op_dir):
msg = 'Could not find the required ObsPack distribution (%s) ' % op_dir ; logging.error(msg)
raise IOError,msg
else:
self.ObsPackDir = op_dir
self.ObsPackId = op_id
self.Data = MixingRatioList([])
return None
def AddObs(self):
""" Returns a MixingRatioList holding individual MixingRatioSample objects for all obs in a file
The ObsPack mixing ratio files are provided as time series per site with all dates in sequence.
We will loop over all site files in the ObsPackage, and subset each to our needs
"""
import da.tools.io4 as io
import datetime as dtm
from string import strip, join
from numpy import array, logical_and
# Step 1: Read list of available site files in package
infile = os.path.join(self.ObsPackDir,'summary','%s_dataset_summary.txt'%(self.ObsPackId,))
f = open(infile,'r')
lines = f.readlines()
status = f.close()
ncfilelist = []
for line in lines:
if line.startswith('#'): continue # header
items = line.split()
ncfile, lab , start_date, stop_date, data_comparison = items[0:5]
ncfilelist += [ncfile]
msg = "ObsPack dataset info read, proceeding with %d netcdf files"%(len(ncfilelist),) ; logging.debug(msg)
for ncfile in ncfilelist:
infile = os.path.join(self.ObsPackDir,'data','nc',ncfile+'.nc')
ncf = io.CT_Read(infile,'read')
idates = ncf.GetVariable('time_components')
dates = array([dtm.datetime(*d) for d in idates])
subselect = logical_and(dates >= self.startdate , dates <= self.enddate).nonzero()[0]
dates = dates.take(subselect,axis=0)
obspacknum = ncf.GetVariable('obspack_num').take(subselect) # or should we propagate obs_num which is not unique across datasets??
obspackid = ncf.GetVariable('obspack_id').take(subselect,axis=0)
obspackid = [s.tostring().lower() for s in obspackid]
obspackid = map(strip,obspackid)
datasetname = ncfile # use full name of dataset to propagate for clarity
lats = ncf.GetVariable('latitude').take(subselect,axis=0)
lons = ncf.GetVariable('longitude').take(subselect,axis=0)
alts = ncf.GetVariable('altitude').take(subselect,axis=0)
obs = ncf.GetVariable('value').take(subselect,axis=0)
species = ncf.GetAttribute('dataset_parameter')
flags = ncf.GetVariable('obspack_flag').take(subselect,axis=0)
status = ncf.close()
for n in range(len(dates)):
self.Data.append( MixingRatioSample(obspacknum[n],dates[n],datasetname,obs[n],0.0,0.0,0.0,0.0,flags[n],alts[n],lats[n],lons[n],obspackid[n],species,1,0.0, infile) )
msg = "Added %d observations from file (%s) to the Data list" % (len(dates),ncfile,) ; logging.debug(msg)
msg = "Observation list now holds %d values" % (len(self.Data),) ; logging.info(msg)
def AddSimulations(self,filename,silent=False):
""" Adds model simulated values to the mixing ratio objects """
import da.tools.io4 as io
if not os.path.exists(filename):
msge = "Sample output filename for observations could not be found : %s" % filename ; logging.error(msge)
msg = "Did the sampling step succeed?" ; logging.error(msg)
msg = "...exiting" ; logging.error(msge)
raise IOError,msg
ncf = io.CT_Read(filename,method='read')
ids = ncf.GetVariable('obs_num')
simulated = ncf.GetVariable('flask')
status = ncf.close()
msg = "Successfully read data from model sample file (%s)"%filename ; logging.info(msg)
obs_ids = self.Data.getvalues('id')
obs_ids = obs_ids.tolist()
ids = map(int,ids)
missing_samples = []
for id,val in zip(ids,simulated):
if id in obs_ids:
index = obs_ids.index(id)
#print id,val,val.shape
status = self.Data[index].simulated = val # in mol/mol
else:
missing_samples.append(id)
if not silent and missing_samples != []:
msg = 'Model samples were found that did not match any ID in the observation list. Skipping them...' ; logging.warning(msg)
#msg = '%s'%missing_samples ; logging.warning(msg)
msg = "Added %d simulated values to the Data list" % (len(ids)-len(missing_samples) ) ; logging.debug(msg)
def WriteSampleInfo(self):
"""
Write the information needed by the observation operator to a file. Return the filename that was written for later use
"""
import shutil
#import da.tools.io as io
import da.tools.io4 as io
from da.tools.general import ToDectime
obsinputfile = os.path.join(self.DaCycle['dir.input'],'observations_%s.nc'% self.DaCycle['time.sample.stamp'])
f = io.CT_CDF(obsinputfile,method='create')
msg = 'Creating new observations file for ObservationOperator (%s)' % obsinputfile ; logging.debug(msg)
dimid = f.AddDim('obs',len(self.Data))
dim200char = f.AddDim('string_of200chars',200)
dim10char = f.AddDim('string_of10chars',10)
dimcalcomp = f.AddDim('calendar_components',6)
if len(self.Data) == 0:
f.close()
return obsinputfile
for key,value in self.SiteMove.iteritems():
msg= "Site is moved by %3.2f degrees latitude and %3.2f degrees longitude" % value
f.AddAttribute(key,msg)
data = self.Data.getvalues('id')
savedict = io.std_savedict.copy()
savedict['name'] = "obs_num"
savedict['dtype'] = "int"
savedict['long_name'] = "Unique_Dataset_observation_index_number"
savedict['units'] = ""
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED."
status = f.AddData(savedict)
data = [[d.year,d.month,d.day,d.hour,d.minute,d.second] for d in self.Data.getvalues('xdate') ]
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "date_components"
savedict['units'] = "integer components of UTC date/time"
savedict['dims'] = dimid+dimcalcomp
savedict['values'] = data
savedict['missing_value'] = -9
savedict['comment'] = "Calendar date components as integers. Times and dates are UTC."
savedict['order'] = "year, month, day, hour, minute, second"
status = f.AddData(savedict)
data = self.Data.getvalues('lat')
savedict = io.std_savedict.copy()
savedict['name'] = "latitude"
savedict['units'] = "degrees_north"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
status = f.AddData(savedict)
data = self.Data.getvalues('lon')
savedict = io.std_savedict.copy()
savedict['name'] = "longitude"
savedict['units'] = "degrees_east"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
status = f.AddData(savedict)
data = self.Data.getvalues('height')
savedict = io.std_savedict.copy()
savedict['name'] = "altitude"
savedict['units'] = "meters_above_sea_level"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
status = f.AddData(savedict)
data = self.Data.getvalues('samplingstrategy')
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "sampling_strategy"
savedict['units'] = "NA"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -9
status = f.AddData(savedict)
data = self.Data.getvalues('evn')
savedict = io.std_savedict.copy()
savedict['dtype'] = "char"
savedict['name'] = "obs_id"
savedict['units'] = "ObsPack datapoint identifier"
savedict['dims'] = dimid+dim200char
savedict['values'] = data
savedict['missing_value'] = '!'
status = f.AddData(savedict)
status = f.close()
msg = "Successfully wrote data to obs file" ;logging.debug(msg)
msg = "Sample input file for obs operator now in place [%s]"%obsinputfile ; logging.info(msg)
return obsinputfile
def AddModelDataMismatch(self ):
"""
Get the model-data mismatch values for this cycle.
(1) Open a sites_weights file
(2) Parse the data
(3) Compare site list against data
(4) Take care of double sites, etc
"""
import da.tools.rc as rc
from da.tools.general import NameConvert
filename = self.DaCycle.DaSystem['obs.sites.rc']
if not os.path.exists(filename):
msg = 'Could not find the required sites.rc input file (%s) ' % filename ; logging.error(msg)
raise IOError,msg
else:
self.SitesFile = filename
SitesWeights = rc.read(self.SitesFile)
self.rejection_threshold = int(SitesWeights['obs.rejection.threshold'])
self.global_R_scaling = float(SitesWeights['global.R.scaling'])
self.n_site_categories = int(SitesWeights['n.site.categories'])
msg = 'Model-data mismatch rejection threshold: %d ' % (self.rejection_threshold) ; logging.debug(msg)
msg = 'Model-data mismatch scaling factor : %f ' % (self.global_R_scaling) ; logging.warning(msg)
msg = 'Model-data mismatch site categories : %d ' % (self.n_site_categories) ; logging.debug(msg)
cats = [k for k in SitesWeights.keys() if 'site.category' in k]
SiteCategories={}
for key in cats:
name,error,may_localize,may_reject = SitesWeights[key].split(';')
name = name.strip().lower()
error = float(error)
may_localize = bool(may_localize)
may_reject = bool(may_reject)
SiteCategories[name] = {'category':name,'error':error,'may_localize':may_localize,'may_reject':may_reject}
#print name,SiteCategories[name]
SiteInfo = {}
SiteMove = {}
for key,value in SitesWeights.iteritems():
if 'co2_' in key:
sitename, sitecategory = key,value
sitename = sitename.strip()
sitecategory = sitecategory.split()[0].strip().lower()
SiteInfo[sitename] = SiteCategories[sitecategory]
#print sitename,SiteInfo[sitename]
if 'site.move' in key:
identifier, latmove, lonmove = value.split(';')
SiteMove[identifier.strip()] = (float(latmove),float(lonmove),)
for obs in self.Data: # loop over all available data points
obs.mdm = 1000.0 # default is very high model-data-mismatch, until explicitly set by script
obs.flag = 99 # default is do-not-use , until explicitly set by script
identifier = obs.code
species, site, method, lab, datasetnr = identifier.split('_')
if SiteInfo.has_key(identifier):
if SiteInfo[identifier]['category'] == 'do-not-use':
msg = "Observation found (%s, %d), but not used in assimilation !!!" % (identifier, obs.id) ; logging.warning(msg)
obs.mdm = SiteInfo[identifier]['error']* self.global_R_scaling
obs.may_localize = SiteInfo[identifier]['may_localize']
obs.may_reject = SiteInfo[identifier]['may_reject']
obs.flag = 99
else:
msg = "Observation found (%s, %d)" % (identifier,obs.id,) ; logging.debug(msg)
obs.mdm = SiteInfo[identifier]['error']* self.global_R_scaling
obs.may_localize = SiteInfo[identifier]['may_localize']
obs.may_reject = SiteInfo[identifier]['may_reject']
obs.flag = 0
else:
msg= "Observation NOT found (%s, %d), please check sites.rc file (%s) !!!" % (identifier, obs.id, self.SitesFile,) ; logging.warning(msg)
if SiteMove.has_key(identifier):
movelat,movelon = SiteMove[identifier]
obs.lat = obs.lat + movelat
obs.lon = obs.lon + movelon
msg= "Observation location for (%s, %d), is moved by %3.2f degrees latitude and %3.2f degrees longitude" % (identifier, obs.id, movelat, movelon,) ; logging.warning(msg)
# Add SiteInfo dictionary to the Observation object for future use
self.SiteInfo = SiteInfo
self.SiteMove = SiteMove
msg = "Added Model Data Mismatch to all samples " ; logging.debug(msg)
def WriteObsToFile(self):
"""
Write selected information contained in the Observation object to a file.
"""
import shutil
import da.tools.io4 as io
from da.tools.general import ToDectime
import numpy as np
outfile = os.path.join(self.DaCycle['dir.output'],'sampleinfo_%s.nc'% self.DaCycle['time.sample.stamp'])
f = io.CT_CDF(outfile,method='create')
msg = 'Creating new Sample output file for postprocessing (%s)' % outfile ; logging.debug(msg)
dimid = f.AddDim('obs',len(self.Data))
dim200char = f.AddDim('string_of200chars',200)
dim10char = f.AddDim('string_of10chars',10)
dimcalcomp = f.AddDim('calendar_components',6)
if len(self.Data) == 0:
f.close()
return outfile
for key,value in self.SiteMove.iteritems():
msg= "Site is moved by %3.2f degrees latitude and %3.2f degrees longitude" % value
f.AddAttribute(key,msg)
data = self.Data.getvalues('id')
savedict = io.std_savedict.copy()
savedict['name'] = "obs_num"
savedict['dtype'] = "int"
savedict['long_name'] = "Unique_Dataset_observation_index_number"
savedict['units'] = ""
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED."
status = f.AddData(savedict)
data = [[d.year,d.month,d.day,d.hour,d.minute,d.second] for d in self.Data.getvalues('xdate') ]
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "date_components"
savedict['units'] = "integer components of UTC date/time"
savedict['dims'] = dimid+dimcalcomp
savedict['values'] = data
savedict['missing_value'] = -9
savedict['comment'] = "Calendar date components as integers. Times and dates are UTC."
savedict['order'] = "year, month, day, hour, minute, second"
status = f.AddData(savedict)
data = self.Data.getvalues('obs')
savedict = io.std_savedict.copy()
savedict['name'] = "observed"
savedict['long_name'] = "observedvalues"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Observations used in optimization'
dummy = f.AddData(savedict)
data = self.Data.getvalues('mdm')
savedict = io.std_savedict.copy()
savedict['name'] = "modeldatamismatch"
savedict['long_name'] = "modeldatamismatch"
savedict['units'] = "[mol mol-1]"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Standard deviation of mole fractions resulting from model-data mismatch'
dummy = f.AddData(savedict)
data = self.Data.getvalues('simulated')
dimmembers = f.AddDim('members',data.shape[1])
savedict = io.std_savedict.copy()
savedict['name'] = "modelsamples"
savedict['long_name'] = "modelsamples for all ensemble members"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid+dimmembers
savedict['values'] = data.tolist()
savedict['comment'] = 'simulated mixing ratios based on optimized state vector'
dummy = f.AddData(savedict)
data = self.Data.getvalues('fromfile')
savedict = io.std_savedict.copy()
savedict['name'] = "inputfilename"
savedict['long_name'] = "name of file where original obs data was taken from"
savedict['dtype'] = "char"
savedict['dims'] = dimid+dim200char
savedict['values'] = data
savedict['missing_value'] = '!'
dummy = f.AddData(savedict)
status = f.close()
msg = "Successfully wrote data to Sample output file (%s)"%outfile ;logging.debug(msg)
return outfile
################### End Class CtObservations ###################
################### Begin Class MixingRatioSample ###################
class MixingRatioSample(object):
"""
Holds the data that defines a Mixing Ratio Sample in the data assimilation framework. Sor far, this includes all
attributes listed below in the __init__ method. One can additionally make more types of data, or make new
objects for specific projects.
"""
def __init__(self,id,xdate,code='XXX',obs=0.0,simulated=0.0,resid=0.0,hphr=0.0,mdm=0.0,flag=0,height=0.0,lat=-999.,lon=-999.,evn='0000',species='co2',samplingstrategy=1,sdev=0.0,fromfile='none.nc'):
self.code = code.strip() # dataset identifier, i.e., co2_lef_tower_insitu_1_99
self.xdate = xdate # Date of obs
self.obs = obs # Value observed
self.simulated = simulated # Value simulated by model
self.resid = resid # Mixing ratio residuals
self.hphr = hphr # Mixing ratio prior uncertainty from fluxes and (HPH) and model data mismatch (R)
self.mdm = mdm # Model data mismatch
self.may_localize = True # Whether sample may be localized in optimizer
self.may_reject = True # Whether sample may be rejected if outside threshold
self.flag = flag # Flag
self.height = height # Sample height in masl
self.lat = lat # Sample lat
self.lon = lon # Sample lon
self.id = id # Obspack ID within distrution (integer), e.g., 82536
self.evn = evn # Obspack Number within distrution (string), e.g., obspack_co2_1_PROTOTYPE_v0.9.2_2012-07-26_99_82536
self.sdev = sdev # standard deviation of ensemble
self.masl = True # Sample is in Meters Above Sea Level
self.mag = not self.masl # Sample is in Meters Above Ground
self.species = species.strip()
self.samplingstrategy = samplingstrategy
self.fromfile = fromfile # netcdf filename inside ObsPack distribution, to write back later
def __str__(self):
day=self.xdate.strftime('%Y-%m-%d %H:%M:%S')
return ' '.join(map(str,[self.code,day,self.obs,self.flag,self.id]))
################### End Class MixingRatioSample ###################
################### Begin Class MixingRatioList ###################
class MixingRatioList(list):
""" This is a special type of list that holds MixingRatioSample objects. It has methods to extract data from such a list easily """
from numpy import array, ndarray
def getvalues(self,name,constructor=array):
from numpy import ndarray
result = constructor([getattr(o,name) for o in self])
if isinstance(result,ndarray):
return result.squeeze()
else:
return result
def unflagged(self):
return MixingRatioSample([o for o in self if o.flag == 0])
def flagged(self):
return MixingRatioSample([o for o in self if o.flag != 0])
def selectsite(self,site='mlo'):
l=[o for o in self if o.code == site]
return MixingRatioSample(l)
def selecthours(self,hours=range(1,24)):
l=[o for o in self if o.xdate.hour in hours]
return MixingRatioSample(l)
################### End Class MixingRatioList ###################
if __name__ == "__main__":
import logging
from da.tools.initexit import CycleControl
from da.ct.dasystem import CtDaSystem
from da.ctgridded.statevector import CtGriddedStateVector
sys.path.append('../../')
logging.root.setLevel(logging.DEBUG)
DaCycle = CycleControl(args={'rc':'../../dagriddedjet.rc'})
DaCycle.Initialize()
DaCycle.ParseTimes()
DaSystem = CtDaSystem('../rc/carbontracker.rc')
DaSystem.Initialize()
DaCycle.DaSystem = DaSystem
obs = ObsPackObservations()
obs.DaCycle = DaCycle
while DaCycle['time.start'] < DaCycle['time.finish']:
DaCycle['time.sample.start'] = DaCycle['time.start']
DaCycle['time.sample.end'] = DaCycle['time.end']
obs.Initialize()
obs.Validate()
obs.AddObs()
obs.AddModelDataMismatch()
print(obs.Data.getvalues('obs'))
print(obs.Data.getvalues('mdm'))
DaCycle.AdvanceCycleTimes()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment