Commit b4e4a62a authored by Auke van der Woude's avatar Auke van der Woude
Browse files

initialising commit

parent fb8f88ad
#!/usr/bin/env python
# control.py
"""
Author : peters
Revision History:
File created on 26 Aug 2010.
Adapted by super004 on 26 Jan 2017.
"""
import logging
################### Begin Class CO2DaSystem ###################
from da.baseclasses.dasystem import DaSystem
class CO2DaSystem(DaSystem):
""" Information on the data assimilation system used. This is normally an rc-file with settings.
"""
def validate(self):
"""
validate the contents of the rc-file given a dictionary of required keys
"""
needed_rc_items = ['obs.input.id',
'obs.input.nr',
'obs.spec.nr',
'obs.cat.nr',
'nparameters',
'random.seed',
'emis.pparam',
'ff.covariance',
'obs.bgswitch',
'obs.background',
'emis.input.spatial',
'emis.input.tempobs',
'emis.input.tempprior',
'emis.paramfile',
'emis.paramfile2',
'run.emisflag',
'run.emisflagens',
'run.obsflag']
for k, v in self.iteritems():
if v == 'True' :
self[k] = True
if v == 'False':
self[k] = False
for key in needed_rc_items:
if not self.has_key(key):
logging.warning('Missing a required value in rc-file : %s' % key)
logging.debug('DA System Info settings have been validated succesfully')
################### End Class CO2DaSystem ###################
if __name__ == "__main__":
pass
#!/usr/bin/env python
# stilt_tools.py
"""
Author : I. Super
Revision History:
Newly developed code, September 2017
This module holds an emission model that prepares emission files used by the observation operator and
to create pseudo-data
"""
import shutil
import os
import logging
import datetime as dtm
import numpy as np
from numpy import array, logical_and
import da.tools.io4 as io
import math
import da.tools.rc as rc
from da.tools.general import create_dirs, to_datetime
identifier = 'EmissionModel ensemble '
version = '1.0'
################### Begin Class Emission model ###################
class EmisModel(object):
def __init__(self, dacycle=None):
if dacycle != None:
self.dacycle = dacycle
else:
self.dacycle = {}
def setup(self, dacycle):
self.dacycle = dacycle
self.startdate = self.dacycle['time.fxstart']
self.enddate = self.dacycle['time.finish']
self.emisdir = dacycle.dasystem['datadir']
self.proxyfile = dacycle.dasystem['emis.input.spatial']
self.tempfileo = dacycle.dasystem['emis.input.tempobs']
self.tempfilep = dacycle.dasystem['emis.input.tempprior']
self.btime = int(dacycle.dasystem['run.backtime'])
self.obsfile = dacycle.dasystem['obs.input.id']
self.nrspc = int(dacycle.dasystem['obs.spec.nr'])
self.nrcat = int(dacycle.dasystem['obs.cat.nr'])
self.nparams = int(dacycle.dasystem['nparameters'])
self.nmembers = int(dacycle['da.optimizer.nmembers'])
self.pparam = dacycle.dasystem['emis.pparam']
self.paramfile = dacycle.dasystem['emis.paramfile']
#self.paramfile2 = dacycle.dasystem['emis.paramfile2']
def get_emis(self, dacycle, psdo):
"""set up emission information for pseudo-obs (psdo=1) and ensemble runs (psdo=0)"""
if psdo==1:
priorparam=os.path.join(self.emisdir,self.pparam)
f = io.ct_read(priorparam, 'read')
self.prm = f.get_variable('true_values')[:self.nparams]
f.close()
self.get_spatial(dacycle, n=999, infile=os.path.join(self.emisdir, self.paramfile))
self.get_temporal(dacycle, n=999, psdo=1)
elif psdo==0:
self.timestartkey = self.dacycle['time.sample.start']
self.timefinishkey = self.dacycle['time.sample.end']
for j in range(self.nmembers):
#first remove old files, then create new emission files per ensemble member
if self.startdate == self.timestartkey:
file = os.path.join(dacycle.dasystem['datadir'],'temporal_data_%03d.nc'%j)
try:
os.remove(file)
except OSError:
pass
prmfile=os.path.join(dacycle['dir.input'],'parameters.%03d.nc'%j)
f = io.ct_read(prmfile, 'read')
self.prm = f.get_variable('parametervalues')
f.close()
self.get_spatial(dacycle, n=j, infile=os.path.join(self.emisdir, self.paramfile))
self.get_temporal(dacycle, n=j, psdo=0)
def get_totals(self, infile=None):
"""gather all required data and calculate total emissions per sector in kg/yr"""
yremis = np.array(self.nrcat*[self.nrspc*[0.]])
self.spatial_var = []
self.spatial_inc = []
self.temporal_var = []
self.temporal_inc = []
self.ops_sector = []
### Read in parameter values for the emission functions and ratios to calculate total emissions
f = open(infile, 'r')
lines = f.readlines()
f.close()
ct = 0
for line in lines:
dum=line.split(",")
if dum[0]=='#':
continue
else:
id = int(dum[0])
### If id == 0 this (sub)sector is treated only by WRF-STILT; if id takes another value the local sources are treated by OPS
if id != 0:
self.ops_sector.append(ct)
inc = int(dum[2])
### If inc == 999 this parameter is not in the state vector and will not be optimized; otherwise inc refers to the
### position of this parameter in the state vector
if inc!=999:
EC = float(dum[1])*self.prm[inc]
else:
EC = float(dum[1])
inc = int(dum[4])
if inc!=999:
EF = float(dum[3])*self.prm[inc]
else:
EF = float(dum[3])
inc = int(dum[6])
if inc!=999:
AD = float(dum[5])*self.prm[inc]
else:
AD = float(dum[5])
inc = int(dum[8])
if inc!=999:
fr = float(dum[7])*self.prm[inc]
else:
fr = float(dum[7])
### emission = energy consumption per activity x emission factor x activity
### fr can be used to divide sectoral emissions over several subsectors (e.g. to split road traffic into cars and HDV)
ems = EC*EF*AD*fr
### Now we calculated emissions of CO2. To get emissions of other trace gases we multiply with an emission ratio
for s in range(self.nrspc):
inc = int(dum[15+s*3])
if inc!=999:
rat = float(dum[13+s*3])*self.prm[inc]
else:
rat = float(dum[13+s*3])
### Some emission ratios have lognormal uncertainty distributions (label 'l')
if dum[14+s*3]=='n':
yremis[ct,s] = ems*rat
elif dum[14+s*3]=='l':
yremis[ct,s] = ems*np.exp(rat)
ct = ct + 1
### Here we list the spatial and temporal variables that are part of the state vector for use in the get_spatial and get_temporal functions
self.spatial_var.append(dum[9])
self.spatial_inc.append(int(dum[10]))
self.temporal_var.append(dum[11])
self.temporal_inc.append(int(dum[12]))
logging.debug("Successfully calculated total emissions")
return yremis
def get_spatial(self, dacycle, n, infile=None):
"""read in proxy data used for spatial distribution of the gridded emissions, disaggregate yearly totals for the area"""
yremis=self.get_totals(infile)
# read in species information
infile = os.path.join(self.emisdir, self.obsfile)
f = open(infile, 'r')
lines = f.readlines()
f.close()
M_mass = []
spname = []
for line in lines:
dum=line.split(",")
if dum[0]=='#':
continue
else:
M_mass.append(float(dum[6])*1e-9) #kg/micromole
spname.append(dum[5]) #name of the species
sec_year = 8760.*3600. #seconds in a year
arcor = 1e6 #km2 -> m2
conv = np.array(M_mass)*sec_year*arcor # to convert emissions in kg/km2/yr to micromole/m2/s
#read in proxy data for spatial disaggregation
infile = os.path.join(self.emisdir, self.proxyfile)
prx = io.ct_read(infile, method='read')
sp_distr = []
for c in range(self.nrcat):
sp_distr.append(prx.get_variable(self.spatial_var[c]))
sp_distr=np.array(sp_distr)
prx.close()
### create output file
prior_file = os.path.join(self.emisdir, 'prior_spatial_%03d.nc'%n)
f = io.CT_CDF(prior_file, method='create')
dimid = f.add_dim('ncat', self.nrcat)
dimid2 = f.add_dim('ops', 3)
dimlon = f.add_dim('lon', sp_distr.shape[1])
dimlat = f.add_dim('lat', sp_distr.shape[2])
#loop over all tracers
for s in range(self.nrspc):
# determine which fraction of the emissions are local and are treated by OPS
datalistOPS = []
opsfr = [0.317, 0.317, 0.267]
for j in range(len(self.ops_sector)):
OPSemis = yremis[self.ops_sector[j],s] * opsfr[j] * 1E3 / sec_year
datalistOPS.append(OPSemis)
savedict = io.std_savedict.copy()
savedict['name'] = "OPStotals_%s"%spname[s]
savedict['long_name'] = "total OPS emissions"
savedict['units'] = "g/s"
savedict['dims'] = dimid2
savedict['values'] = datalistOPS
f.add_data(savedict)
datalist = []
ct = 0
for c in range(self.nrcat):
inc = self.spatial_inc[c]
if inc!=999:
### note that the spatial distribution has to gridded state vector, such that a scaling factor would affect the total
### emissions in this area
distr = sp_distr[c]*self.prm[inc]
else:
distr = sp_distr[c]
if c in self.ops_sector:
emis_spatial = (yremis[c,s]*(1-opsfr[ct])) / conv[s] * distr
ct = ct + 1
else:
emis_spatial = yremis[c,s] / conv[s] * distr
datalist.append(emis_spatial)
savedict = io.std_savedict.copy()
savedict['name'] = spname[s]
savedict['long_name'] = "Spatially distributed emissions"
savedict['units'] = "micromole/m2/s"
savedict['dims'] = dimid + dimlon + dimlat
savedict['values'] = datalist
f.add_data(savedict)
f.close()
logging.debug("Successfully wrote data to prior spatial distribution file (%s)" % prior_file)
def get_temporal(self, dacycle, n, psdo):
"""read in time profiles used for temporal distribution of the emissions"""
### For pseudo-observation (psdo==1) or when no time profiles need to be optimized the profiles are simply read from the
### input file and copied to another file.
### Otherwise create a new file per ensemble member at t=0 and update the profiles for each time step
if psdo==0 and min(self.temporal_inc)<999:
ensfile = os.path.join(self.emisdir, 'temporal_data_%03d.nc'%n)
if os.path.exists(ensfile) == False:
dumfile = os.path.join(self.emisdir, self.tempfilep)
shutil.copy2(dumfile,ensfile)
tpr = io.ct_read(ensfile, method='read')
itimes = tpr.get_variable('Times')
times = array([dtm.datetime(int(''.join(d[:4])),int(''.join(d[5:7])),int(''.join(d[8:10])),int(''.join(d[11:13])),int(''.join(d[14:16]))) for d in itimes])
subselect = logical_and(times >= self.timestartkey , times <= self.timefinishkey).nonzero()[0]
datlist = times.take(subselect, axis=0)
### The time profiles should always cover at least one full year
start_date = dtm.datetime(self.timestartkey.year,1,1,0,0) #first time included
end_date = dtm.datetime(self.timestartkey.year,12,31,23,0) #last time included
dt = dtm.timedelta(0,3600)
dum = start_date
times=[]
while dum<=end_date:
times.append(dum)
dum=dum+dt
times=np.array(times)
stidx = np.where(times==self.timestartkey)[0][0]
stidx2 = np.where(times==self.startdate)[0][0]
edidx = np.where(times==self.timefinishkey)[0][0]
dumsel = np.where((times<self.startdate)|(times>self.timefinishkey))[0]
""" Time profiles should, for a full year, always have an average value of 1.0. Therefore, a new method has been developed
to optimize time profiles such that we comply with this and the time profiles do not affect the total emissions.
For this purpose we apply the scaling factor (statevector) to the period covered in this cycle. The time profile for all dates
outside this period are scaled equally such that the time profile remains its average value of 1.0. Except previously updated
dates (from previous cycles) are maintained (they are already optimized!)."""
profiles = []
for c in range(self.nrcat):
if self.temporal_inc[c]!=999:
f_orig = tpr.get_variable(self.temporal_var[c])
f_sel = tpr.get_variable(self.temporal_var[c]).take(subselect, axis=0)
f_new = f_sel*self.prm[self.temporal_inc[c]]
dumsum = np.array(f_orig[dumsel]).sum()
for i in range(len(f_new)):
f_orig[:stidx2]=f_orig[:stidx2]-(f_orig[:stidx2]/dumsum)*(f_new.sum()-f_sel.sum())
f_orig[edidx+1:]=f_orig[edidx+1:]-(f_orig[edidx+1:]/dumsum)*(f_new.sum()-f_sel.sum())
f_orig[stidx:edidx+1]=f_new
profiles.append(f_orig)
tpr.close()
f = io.CT_CDF(ensfile, method='write')
ct=0
for c in range(self.nrcat):
if self.temporal_inc[c]!=999:
f.variables[self.temporal_var[c]][:] = profiles[ct]
ct=ct+1
f.close()
### Now read in the correct profiles, select the correct time period and write the profiles into one file per ensemble member
if psdo==1:
infile = os.path.join(self.emisdir, self.tempfileo)
elif psdo==0 and min(self.temporal_inc)==999:
infile = os.path.join(self.emisdir, self.tempfilep)
elif psdo==0 and min(self.temporal_inc)<999:
infile = os.path.join(self.emisdir, 'temporal_data_%03d.nc'%n)
tpr = io.ct_read(infile, method='read')
itimes = tpr.get_variable('Times')
times = array([dtm.datetime(int(''.join(d[:4])),int(''.join(d[5:7])),int(''.join(d[8:10])),int(''.join(d[11:13])),int(''.join(d[14:16]))) for d in itimes])
if psdo == 1:
startdum=dtm.datetime(self.startdate.year,self.startdate.month,self.startdate.day-1,1,0)
subselect = logical_and(times >= startdum, times <= self.enddate).nonzero()[0]
else:
dum = self.timestartkey - dtm.timedelta(0,self.btime*3600)
if dum.hour != 0:
startdum = dtm.datetime(dum.year,dum.month,dum.day,1,0)
else:
startdum = dtm.datetime(dum.year,dum.month,dum.day-1,1,0)
subselect = logical_and(times >= startdum , times <= self.timefinishkey).nonzero()[0]
datlist = times.take(subselect, axis=0)
profiles=[]
for c in range(self.nrcat):
f_orig = tpr.get_variable(self.temporal_var[c]).take(subselect, axis=0)
profiles.append(f_orig)
tpr.close()
profiles=np.array(profiles)
prior_file = os.path.join(self.emisdir, 'prior_temporal_%03d.nc'%n)
f = io.CT_CDF(prior_file, method='create')
dimtime = f.add_dim('Times', len(datlist))
dum=[]
for c in range(self.nrcat):
if self.temporal_var[c] in dum:
continue
else:
savedict = io.std_savedict.copy()
savedict['name'] = self.temporal_var[c]
savedict['long_name'] = "Temporal distribution"
savedict['units'] = ""
savedict['dims'] = dimtime
savedict['values'] = profiles[c,:]
f.add_data(savedict)
dum.append(self.temporal_var[c])
f.close()
logging.debug("Successfully wrote data to prior temporal distribution file (%s)" % prior_file)
################### End Class Emission model ###################
if __name__ == "__main__":
pass
#!/usr/bin/env python
# obs.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
Adapted by super004 on 18 May 2017.
"""
import os
import sys
import logging
import datetime as dtm
from string import strip
from numpy import array, logical_and
import numpy as np
sys.path.append(os.getcwd())
sys.path.append('../../')
from pylab import *
identifier = 'Rotterdam CO2 mole fractions'
version = '0.0'
from da.baseclasses.obs import Observations
import da.tools.io4 as io
import da.tools.rc as rc
################### Begin Class RdamObservations ###################
class RdamObservations(Observations):
""" an object that holds data + methods and attributes needed to manipulate mole fraction values """
def setup(self, dacycle):
self.startdate = dacycle['time.sample.start']
self.enddate = dacycle['time.sample.end']
op_id = dacycle.dasystem['obs.input.id']
op_dir = dacycle.dasystem['datadir']
self.nrloc = dacycle.dasystem['obs.input.nr']
if not os.path.exists(op_dir):
msg = 'Could not find the required ObsPack distribution (%s) ' % op_dir
logging.error(msg)
raise IOError, msg
else:
self.obspack_dir = op_dir
self.obspack_id = op_id
self.datalist = []
self.tracer_list = []
# self.cnt = []
def add_observations(self, dacycle):
""" Returns a MoleFractionList holding individual MoleFractionSample objects for all obs in a file
The ObsPack mole fraction files are provided as time series per site with all dates in sequence.
We will loop over all site files in the ObsPackage, and subset each to our needs
"""
infile = os.path.join(self.obspack_dir, self.obspack_id)
f = open(infile, 'r')
lines = f.readlines()
f.close()
ncfilelist = []
for line in lines:
if line.startswith('#'): continue # header
dum = line.split(",")
ncfile = [dum[1]]
if not ncfile[0] in ncfilelist:
ncfilelist.extend(ncfile)
for ncfile in ncfilelist:
infile = os.path.join(self.obspack_dir, ncfile + '.nc')
ncf = io.ct_read(infile, 'read')
idates = ncf.get_variable('Times')
dates = array([dtm.datetime(int(''.join(d[:4])),int(''.join(d[5:7])),int(''.join(d[8:10])),int(''.join(d[11:13])),0) for d in idates])
subselect = logical_and(dates >= self.startdate , dates <= self.enddate).nonzero()[0]
dates = dates.take(subselect, axis=0)
datasetname = ncfile # use full name of dataset to propagate for clarity
logging.info(datasetname)
lats = ncf.get_variable('lat').take(subselect, axis=0)
lons = ncf.get_variable('lon').take(subselect, axis=0)
alts = ncf.get_variable('alt').take(subselect, axis=0)
obs = ncf.get_variable('obs').take(subselect, axis=0)
ids = ncf.get_variable('ids').take(subselect, axis=0)
spcs = ncf.get_variable('species').take(subselect, axis=0)
ncf.close()
for n in range(len(dates)):
spc = spcs[n,0]+spcs[n,1]+spcs[n,2]
self.datalist.append(MoleFractionSample(ids[n], dates[n], datasetname, obs[n], 0.0, 0.0, 0.0, 0.0, 0, alts[n], lats[n], lons[n], '001', spc, 1, 0.0, infile))
logging.debug("Added %d observations from file (%s) to the Data list" % (len(dates), datasetname))
logging.info("Observations list now holds %d values" % len(self.datalist))
def add_simulations(self, filename, silent=False):
""" Adds model simulated values to the mole fraction objects """
if not os.path.exists(filename):
msg = "Sample output filename for observations could not be found : %s" % filename
logging.error(msg)
logging.error("Did the sampling step succeed?")
logging.error("...exiting")
raise IOError, msg
ncf = io.ct_read(filename, method='read')
ids = ncf.get_variable('obs_num')
simulated = ncf.get_variable('model')
ncf.close()
logging.info("Successfully read data from model sample file (%s)" % filename)
obs_ids = self.getvalues('id').tolist()
ids = map(int, ids)
missing_samples = []
for idx, val in zip(ids, simulated):
if idx in obs_ids:
index = obs_ids.index(idx)
self.datalist[index].simulated = val # in mol/mol
else:
missing_samples.append(idx)
if not silent and missing_samples != []:
logging.warning('Model samples were found that did not match any ID in the observation list. Skipping them...')
logging.debug("Added %d simulated values to the Data list" % (len(ids) - len(missing_samples)))
def write_sample_coords(self, obsinputfile):
"""
Write the information needed by the observation operator to a file. Return the filename that was written for later use
"""
if len(self.datalist) == 0:
logging.debug("No observations found for this time period, nothing written to obs file")
else:
f = io.CT_CDF(obsinputfile, method='create')
logging.debug('Creating new observations file for ObservationOperator (%s)' % obsinputfile)
dimid = f.add_dim('obs', len(self.datalist))
dim200char = f.add_dim('string_of200chars', 200)
dim10char = f.add_dim('string_of10chars', 10)
dimcalcomp = f.add_dim('calendar_components', 6)
data = self.getvalues('id')
savedict = io.std_savedict.copy()
savedict['name'] = "obs_num"
savedict['dtype'] = "int"
savedict['long_name'] = "Unique_Dataset_observation_index_number"
savedict['units'] = ""
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED."
f.add_data(savedict)
data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate') ]