From e2203562157cb6aa30232730ffedf837224fbb66 Mon Sep 17 00:00:00 2001 From: Auke van der Woude <50618871+amvdw@users.noreply.github.com> Date: Tue, 14 Jan 2020 10:21:44 +0100 Subject: [PATCH] dynamic ff model branch initialisation --- da/stiltops/dasystem.py | 62 +++ da/stiltops/emissionmodel.py | 362 ++++++++++++++ da/stiltops/obs.py | 537 +++++++++++++++++++++ da/stiltops/observationoperator.py | 730 +++++++++++++++++++++++++++++ da/stiltops/optimizer.py | 308 ++++++++++++ da/stiltops/pipeline.py | 444 ++++++++++++++++++ da/stiltops/statevector.py | 262 +++++++++++ da/stiltops/stilt-ops.rc | 47 ++ 8 files changed, 2752 insertions(+) create mode 100755 da/stiltops/dasystem.py create mode 100755 da/stiltops/emissionmodel.py create mode 100755 da/stiltops/obs.py create mode 100755 da/stiltops/observationoperator.py create mode 100755 da/stiltops/optimizer.py create mode 100755 da/stiltops/pipeline.py create mode 100755 da/stiltops/statevector.py create mode 100755 da/stiltops/stilt-ops.rc diff --git a/da/stiltops/dasystem.py b/da/stiltops/dasystem.py new file mode 100755 index 00000000..51acbe07 --- /dev/null +++ b/da/stiltops/dasystem.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +# control.py + +""" +Author : peters + +Revision History: +File created on 26 Aug 2010. +Adapted by super004 on 26 Jan 2017. + +""" + +import logging + +################### Begin Class CO2DaSystem ################### + +from da.baseclasses.dasystem import DaSystem + +class CO2DaSystem(DaSystem): + """ Information on the data assimilation system used. This is normally an rc-file with settings. + """ + def validate(self): + """ + validate the contents of the rc-file given a dictionary of required keys + """ + + needed_rc_items = ['obs.input.id', + 'obs.input.nr', + 'obs.spec.nr', + 'obs.cat.nr', + 'nparameters', + 'random.seed', + 'emis.pparam', + 'ff.covariance', + 'obs.bgswitch', + 'obs.background', + 'emis.input.spatial', + 'emis.input.tempobs', + 'emis.input.tempprior', + 'emis.paramfile', + 'emis.paramfile2', + 'run.emisflag', + 'run.emisflagens', + 'run.obsflag'] + + + for k, v in self.iteritems(): + if v == 'True' : + self[k] = True + if v == 'False': + self[k] = False + + for key in needed_rc_items: + if not self.has_key(key): + logging.warning('Missing a required value in rc-file : %s' % key) + logging.debug('DA System Info settings have been validated succesfully') + +################### End Class CO2DaSystem ################### + + +if __name__ == "__main__": + pass diff --git a/da/stiltops/emissionmodel.py b/da/stiltops/emissionmodel.py new file mode 100755 index 00000000..dea1dbad --- /dev/null +++ b/da/stiltops/emissionmodel.py @@ -0,0 +1,362 @@ +#!/usr/bin/env python +# stilt_tools.py + +""" +Author : I. Super + +Revision History: +Newly developed code, September 2017 + +This module holds an emission model that prepares emission files used by the observation operator and +to create pseudo-data + +""" + +import shutil +import os +import logging +import datetime as dtm +import numpy as np +from numpy import array, logical_and +import da.tools.io4 as io +import math + +import da.tools.rc as rc +from da.tools.general import create_dirs, to_datetime + +identifier = 'EmissionModel ensemble ' +version = '1.0' + +################### Begin Class Emission model ################### + +class EmisModel(object): + + def __init__(self, dacycle=None): + if dacycle != None: + self.dacycle = dacycle + else: + self.dacycle = {} + + def setup(self, dacycle): + self.dacycle = dacycle + self.startdate = self.dacycle['time.fxstart'] + self.enddate = self.dacycle['time.finish'] + self.emisdir = dacycle.dasystem['datadir'] + self.proxyfile = dacycle.dasystem['emis.input.spatial'] + self.tempfileo = dacycle.dasystem['emis.input.tempobs'] + self.tempfilep = dacycle.dasystem['emis.input.tempprior'] + self.btime = int(dacycle.dasystem['run.backtime']) + self.obsfile = dacycle.dasystem['obs.input.id'] + self.nrspc = int(dacycle.dasystem['obs.spec.nr']) + self.nrcat = int(dacycle.dasystem['obs.cat.nr']) + self.nparams = int(dacycle.dasystem['nparameters']) + self.nmembers = int(dacycle['da.optimizer.nmembers']) + self.pparam = dacycle.dasystem['emis.pparam'] + self.paramfile = dacycle.dasystem['emis.paramfile'] + #self.paramfile2 = dacycle.dasystem['emis.paramfile2'] + + def get_emis(self, dacycle, psdo): + """set up emission information for pseudo-obs (psdo=1) and ensemble runs (psdo=0)""" + + if psdo==1: + priorparam=os.path.join(self.emisdir,self.pparam) + f = io.ct_read(priorparam, 'read') + self.prm = f.get_variable('true_values')[:self.nparams] + f.close() + self.get_spatial(dacycle, n=999, infile=os.path.join(self.emisdir, self.paramfile)) + self.get_temporal(dacycle, n=999, psdo=1) + elif psdo==0: + self.timestartkey = self.dacycle['time.sample.start'] + self.timefinishkey = self.dacycle['time.sample.end'] + for j in range(self.nmembers): + #first remove old files, then create new emission files per ensemble member + if self.startdate == self.timestartkey: + file = os.path.join(dacycle.dasystem['datadir'],'temporal_data_%03d.nc'%j) + try: + os.remove(file) + except OSError: + pass + prmfile=os.path.join(dacycle['dir.input'],'parameters.%03d.nc'%j) + f = io.ct_read(prmfile, 'read') + self.prm = f.get_variable('parametervalues') + f.close() + self.get_spatial(dacycle, n=j, infile=os.path.join(self.emisdir, self.paramfile)) + self.get_temporal(dacycle, n=j, psdo=0) + + def get_totals(self, infile=None): + """gather all required data and calculate total emissions per sector in kg/yr""" + + yremis = np.array(self.nrcat*[self.nrspc*[0.]]) + self.spatial_var = [] + self.spatial_inc = [] + self.temporal_var = [] + self.temporal_inc = [] + self.ops_sector = [] + + ### Read in parameter values for the emission functions and ratios to calculate total emissions + f = open(infile, 'r') + lines = f.readlines() + f.close() + ct = 0 + for line in lines: + dum=line.split(",") + if dum[0]=='#': + continue + else: + id = int(dum[0]) + ### If id == 0 this (sub)sector is treated only by WRF-STILT; if id takes another value the local sources are treated by OPS + if id != 0: + self.ops_sector.append(ct) + inc = int(dum[2]) + ### If inc == 999 this parameter is not in the state vector and will not be optimized; otherwise inc refers to the + ### position of this parameter in the state vector + if inc!=999: + EC = float(dum[1])*self.prm[inc] + else: + EC = float(dum[1]) + inc = int(dum[4]) + if inc!=999: + EF = float(dum[3])*self.prm[inc] + else: + EF = float(dum[3]) + inc = int(dum[6]) + if inc!=999: + AD = float(dum[5])*self.prm[inc] + else: + AD = float(dum[5]) + inc = int(dum[8]) + if inc!=999: + fr = float(dum[7])*self.prm[inc] + else: + fr = float(dum[7]) + ### emission = energy consumption per activity x emission factor x activity + ### fr can be used to divide sectoral emissions over several subsectors (e.g. to split road traffic into cars and HDV) + ems = EC*EF*AD*fr + ### Now we calculated emissions of CO2. To get emissions of other trace gases we multiply with an emission ratio + for s in range(self.nrspc): + inc = int(dum[15+s*3]) + if inc!=999: + rat = float(dum[13+s*3])*self.prm[inc] + else: + rat = float(dum[13+s*3]) + ### Some emission ratios have lognormal uncertainty distributions (label 'l') + if dum[14+s*3]=='n': + yremis[ct,s] = ems*rat + elif dum[14+s*3]=='l': + yremis[ct,s] = ems*np.exp(rat) + ct = ct + 1 + ### Here we list the spatial and temporal variables that are part of the state vector for use in the get_spatial and get_temporal functions + self.spatial_var.append(dum[9]) + self.spatial_inc.append(int(dum[10])) + self.temporal_var.append(dum[11]) + self.temporal_inc.append(int(dum[12])) + + logging.debug("Successfully calculated total emissions") + + return yremis + + def get_spatial(self, dacycle, n, infile=None): + """read in proxy data used for spatial distribution of the gridded emissions, disaggregate yearly totals for the area""" + + yremis=self.get_totals(infile) + + # read in species information + infile = os.path.join(self.emisdir, self.obsfile) + f = open(infile, 'r') + lines = f.readlines() + f.close() + M_mass = [] + spname = [] + for line in lines: + dum=line.split(",") + if dum[0]=='#': + continue + else: + M_mass.append(float(dum[6])*1e-9) #kg/micromole + spname.append(dum[5]) #name of the species + sec_year = 8760.*3600. #seconds in a year + arcor = 1e6 #km2 -> m2 + conv = np.array(M_mass)*sec_year*arcor # to convert emissions in kg/km2/yr to micromole/m2/s + + #read in proxy data for spatial disaggregation + infile = os.path.join(self.emisdir, self.proxyfile) + prx = io.ct_read(infile, method='read') + sp_distr = [] + for c in range(self.nrcat): + sp_distr.append(prx.get_variable(self.spatial_var[c])) + sp_distr=np.array(sp_distr) + prx.close() + + ### create output file + prior_file = os.path.join(self.emisdir, 'prior_spatial_%03d.nc'%n) + f = io.CT_CDF(prior_file, method='create') + dimid = f.add_dim('ncat', self.nrcat) + dimid2 = f.add_dim('ops', 3) + dimlon = f.add_dim('lon', sp_distr.shape[1]) + dimlat = f.add_dim('lat', sp_distr.shape[2]) + + #loop over all tracers + for s in range(self.nrspc): + # determine which fraction of the emissions are local and are treated by OPS + datalistOPS = [] + opsfr = [0.317, 0.317, 0.267] + + for j in range(len(self.ops_sector)): + OPSemis = yremis[self.ops_sector[j],s] * opsfr[j] * 1E3 / sec_year + datalistOPS.append(OPSemis) + + savedict = io.std_savedict.copy() + savedict['name'] = "OPStotals_%s"%spname[s] + savedict['long_name'] = "total OPS emissions" + savedict['units'] = "g/s" + savedict['dims'] = dimid2 + savedict['values'] = datalistOPS + f.add_data(savedict) + + datalist = [] + + ct = 0 + for c in range(self.nrcat): + inc = self.spatial_inc[c] + if inc!=999: + ### note that the spatial distribution has to gridded state vector, such that a scaling factor would affect the total + ### emissions in this area + distr = sp_distr[c]*self.prm[inc] + else: + distr = sp_distr[c] + + if c in self.ops_sector: + emis_spatial = (yremis[c,s]*(1-opsfr[ct])) / conv[s] * distr + ct = ct + 1 + else: + emis_spatial = yremis[c,s] / conv[s] * distr + datalist.append(emis_spatial) + + savedict = io.std_savedict.copy() + savedict['name'] = spname[s] + savedict['long_name'] = "Spatially distributed emissions" + savedict['units'] = "micromole/m2/s" + savedict['dims'] = dimid + dimlon + dimlat + savedict['values'] = datalist + f.add_data(savedict) + + f.close() + + logging.debug("Successfully wrote data to prior spatial distribution file (%s)" % prior_file) + + def get_temporal(self, dacycle, n, psdo): + """read in time profiles used for temporal distribution of the emissions""" + + ### For pseudo-observation (psdo==1) or when no time profiles need to be optimized the profiles are simply read from the + ### input file and copied to another file. + ### Otherwise create a new file per ensemble member at t=0 and update the profiles for each time step + if psdo==0 and min(self.temporal_inc)<999: + ensfile = os.path.join(self.emisdir, 'temporal_data_%03d.nc'%n) + if os.path.exists(ensfile) == False: + dumfile = os.path.join(self.emisdir, self.tempfilep) + shutil.copy2(dumfile,ensfile) + tpr = io.ct_read(ensfile, method='read') + itimes = tpr.get_variable('Times') + times = array([dtm.datetime(int(''.join(d[:4])),int(''.join(d[5:7])),int(''.join(d[8:10])),int(''.join(d[11:13])),int(''.join(d[14:16]))) for d in itimes]) + subselect = logical_and(times >= self.timestartkey , times <= self.timefinishkey).nonzero()[0] + datlist = times.take(subselect, axis=0) + + ### The time profiles should always cover at least one full year + start_date = dtm.datetime(self.timestartkey.year,1,1,0,0) #first time included + end_date = dtm.datetime(self.timestartkey.year,12,31,23,0) #last time included + dt = dtm.timedelta(0,3600) + dum = start_date + times=[] + while dum<=end_date: + times.append(dum) + dum=dum+dt + times=np.array(times) + stidx = np.where(times==self.timestartkey)[0][0] + stidx2 = np.where(times==self.startdate)[0][0] + edidx = np.where(times==self.timefinishkey)[0][0] + + dumsel = np.where((times<self.startdate)|(times>self.timefinishkey))[0] + + """ Time profiles should, for a full year, always have an average value of 1.0. Therefore, a new method has been developed + to optimize time profiles such that we comply with this and the time profiles do not affect the total emissions. + For this purpose we apply the scaling factor (statevector) to the period covered in this cycle. The time profile for all dates + outside this period are scaled equally such that the time profile remains its average value of 1.0. Except previously updated + dates (from previous cycles) are maintained (they are already optimized!).""" + + profiles = [] + for c in range(self.nrcat): + if self.temporal_inc[c]!=999: + f_orig = tpr.get_variable(self.temporal_var[c]) + f_sel = tpr.get_variable(self.temporal_var[c]).take(subselect, axis=0) + f_new = f_sel*self.prm[self.temporal_inc[c]] + dumsum = np.array(f_orig[dumsel]).sum() + for i in range(len(f_new)): + f_orig[:stidx2]=f_orig[:stidx2]-(f_orig[:stidx2]/dumsum)*(f_new.sum()-f_sel.sum()) + f_orig[edidx+1:]=f_orig[edidx+1:]-(f_orig[edidx+1:]/dumsum)*(f_new.sum()-f_sel.sum()) + f_orig[stidx:edidx+1]=f_new + profiles.append(f_orig) + tpr.close() + + f = io.CT_CDF(ensfile, method='write') + ct=0 + for c in range(self.nrcat): + if self.temporal_inc[c]!=999: + f.variables[self.temporal_var[c]][:] = profiles[ct] + ct=ct+1 + f.close() + + ### Now read in the correct profiles, select the correct time period and write the profiles into one file per ensemble member + if psdo==1: + infile = os.path.join(self.emisdir, self.tempfileo) + elif psdo==0 and min(self.temporal_inc)==999: + infile = os.path.join(self.emisdir, self.tempfilep) + elif psdo==0 and min(self.temporal_inc)<999: + infile = os.path.join(self.emisdir, 'temporal_data_%03d.nc'%n) + tpr = io.ct_read(infile, method='read') + itimes = tpr.get_variable('Times') + times = array([dtm.datetime(int(''.join(d[:4])),int(''.join(d[5:7])),int(''.join(d[8:10])),int(''.join(d[11:13])),int(''.join(d[14:16]))) for d in itimes]) + if psdo == 1: + startdum=dtm.datetime(self.startdate.year,self.startdate.month,self.startdate.day-1,1,0) + subselect = logical_and(times >= startdum, times <= self.enddate).nonzero()[0] + else: + dum = self.timestartkey - dtm.timedelta(0,self.btime*3600) + if dum.hour != 0: + startdum = dtm.datetime(dum.year,dum.month,dum.day,1,0) + else: + startdum = dtm.datetime(dum.year,dum.month,dum.day-1,1,0) + subselect = logical_and(times >= startdum , times <= self.timefinishkey).nonzero()[0] + datlist = times.take(subselect, axis=0) + profiles=[] + for c in range(self.nrcat): + f_orig = tpr.get_variable(self.temporal_var[c]).take(subselect, axis=0) + profiles.append(f_orig) + tpr.close() + profiles=np.array(profiles) + + prior_file = os.path.join(self.emisdir, 'prior_temporal_%03d.nc'%n) + f = io.CT_CDF(prior_file, method='create') + dimtime = f.add_dim('Times', len(datlist)) + + dum=[] + for c in range(self.nrcat): + if self.temporal_var[c] in dum: + continue + else: + savedict = io.std_savedict.copy() + savedict['name'] = self.temporal_var[c] + savedict['long_name'] = "Temporal distribution" + savedict['units'] = "" + savedict['dims'] = dimtime + savedict['values'] = profiles[c,:] + f.add_data(savedict) + dum.append(self.temporal_var[c]) + f.close() + + logging.debug("Successfully wrote data to prior temporal distribution file (%s)" % prior_file) + +################### End Class Emission model ################### + + +if __name__ == "__main__": + pass + diff --git a/da/stiltops/obs.py b/da/stiltops/obs.py new file mode 100755 index 00000000..1762579b --- /dev/null +++ b/da/stiltops/obs.py @@ -0,0 +1,537 @@ +#!/usr/bin/env python +# obs.py + +""" +Author : peters + +Revision History: +File created on 28 Jul 2010. +Adapted by super004 on 18 May 2017. + +""" +import os +import sys +import logging + +import datetime as dtm +from string import strip +from numpy import array, logical_and +import numpy as np +sys.path.append(os.getcwd()) +sys.path.append('../../') +from pylab import * + +identifier = 'Rotterdam CO2 mole fractions' +version = '0.0' + +from da.baseclasses.obs import Observations +import da.tools.io4 as io +import da.tools.rc as rc +################### Begin Class RdamObservations ################### + +class RdamObservations(Observations): + """ an object that holds data + methods and attributes needed to manipulate mole fraction values """ + + def setup(self, dacycle): + + self.startdate = dacycle['time.sample.start'] + self.enddate = dacycle['time.sample.end'] + + op_id = dacycle.dasystem['obs.input.id'] + op_dir = dacycle.dasystem['datadir'] + self.nrloc = dacycle.dasystem['obs.input.nr'] + + if not os.path.exists(op_dir): + msg = 'Could not find the required ObsPack distribution (%s) ' % op_dir + logging.error(msg) + raise IOError, msg + else: + self.obspack_dir = op_dir + self.obspack_id = op_id + + self.datalist = [] + self.tracer_list = [] + # self.cnt = [] + + def add_observations(self, dacycle): + """ Returns a MoleFractionList holding individual MoleFractionSample objects for all obs in a file + + The ObsPack mole fraction files are provided as time series per site with all dates in sequence. + We will loop over all site files in the ObsPackage, and subset each to our needs + + """ + + infile = os.path.join(self.obspack_dir, self.obspack_id) + f = open(infile, 'r') + lines = f.readlines() + f.close() + + ncfilelist = [] + for line in lines: + if line.startswith('#'): continue # header + + dum = line.split(",") + ncfile = [dum[1]] + + if not ncfile[0] in ncfilelist: + ncfilelist.extend(ncfile) + + for ncfile in ncfilelist: + infile = os.path.join(self.obspack_dir, ncfile + '.nc') + ncf = io.ct_read(infile, 'read') + idates = ncf.get_variable('Times') + dates = array([dtm.datetime(int(''.join(d[:4])),int(''.join(d[5:7])),int(''.join(d[8:10])),int(''.join(d[11:13])),0) for d in idates]) + + subselect = logical_and(dates >= self.startdate , dates <= self.enddate).nonzero()[0] + + dates = dates.take(subselect, axis=0) + + datasetname = ncfile # use full name of dataset to propagate for clarity + logging.info(datasetname) + lats = ncf.get_variable('lat').take(subselect, axis=0) + lons = ncf.get_variable('lon').take(subselect, axis=0) + alts = ncf.get_variable('alt').take(subselect, axis=0) + obs = ncf.get_variable('obs').take(subselect, axis=0) + ids = ncf.get_variable('ids').take(subselect, axis=0) + spcs = ncf.get_variable('species').take(subselect, axis=0) + ncf.close() + for n in range(len(dates)): + spc = spcs[n,0]+spcs[n,1]+spcs[n,2] + self.datalist.append(MoleFractionSample(ids[n], dates[n], datasetname, obs[n], 0.0, 0.0, 0.0, 0.0, 0, alts[n], lats[n], lons[n], '001', spc, 1, 0.0, infile)) + + logging.debug("Added %d observations from file (%s) to the Data list" % (len(dates), datasetname)) + + logging.info("Observations list now holds %d values" % len(self.datalist)) + + def add_simulations(self, filename, silent=False): + """ Adds model simulated values to the mole fraction objects """ + + if not os.path.exists(filename): + msg = "Sample output filename for observations could not be found : %s" % filename + logging.error(msg) + logging.error("Did the sampling step succeed?") + logging.error("...exiting") + raise IOError, msg + + ncf = io.ct_read(filename, method='read') + ids = ncf.get_variable('obs_num') + simulated = ncf.get_variable('model') + ncf.close() + logging.info("Successfully read data from model sample file (%s)" % filename) + + obs_ids = self.getvalues('id').tolist() + ids = map(int, ids) + + missing_samples = [] + + for idx, val in zip(ids, simulated): + if idx in obs_ids: + index = obs_ids.index(idx) + self.datalist[index].simulated = val # in mol/mol + else: + missing_samples.append(idx) + + if not silent and missing_samples != []: + logging.warning('Model samples were found that did not match any ID in the observation list. Skipping them...') + + logging.debug("Added %d simulated values to the Data list" % (len(ids) - len(missing_samples))) + + + def write_sample_coords(self, obsinputfile): + """ + Write the information needed by the observation operator to a file. Return the filename that was written for later use + + """ + + if len(self.datalist) == 0: + logging.debug("No observations found for this time period, nothing written to obs file") + else: + f = io.CT_CDF(obsinputfile, method='create') + logging.debug('Creating new observations file for ObservationOperator (%s)' % obsinputfile) + + dimid = f.add_dim('obs', len(self.datalist)) + dim200char = f.add_dim('string_of200chars', 200) + dim10char = f.add_dim('string_of10chars', 10) + dimcalcomp = f.add_dim('calendar_components', 6) + + data = self.getvalues('id') + + savedict = io.std_savedict.copy() + savedict['name'] = "obs_num" + savedict['dtype'] = "int" + savedict['long_name'] = "Unique_Dataset_observation_index_number" + savedict['units'] = "" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED." + f.add_data(savedict) + + data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate') ] + + savedict = io.std_savedict.copy() + savedict['dtype'] = "int" + savedict['name'] = "date_components" + savedict['units'] = "integer components of UTC date/time" + savedict['dims'] = dimid + dimcalcomp + savedict['values'] = data + savedict['missing_value'] = -9 + savedict['comment'] = "Calendar date components as integers. Times and dates are UTC." + savedict['order'] = "year, month, day, hour, minute, second" + f.add_data(savedict) + + data = self.getvalues('lat') + + savedict = io.std_savedict.copy() + savedict['name'] = "latitude" + savedict['units'] = "degrees_north" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -999.9 + f.add_data(savedict) + + data = self.getvalues('lon') + + savedict = io.std_savedict.copy() + savedict['name'] = "longitude" + savedict['units'] = "degrees_east" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -999.9 + f.add_data(savedict) + + data = self.getvalues('height') + + savedict = io.std_savedict.copy() + savedict['name'] = "altitude" + savedict['units'] = "meters_above_sea_level" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -999.9 + f.add_data(savedict) + + data = np.array(self.tracer_list) + + savedict = io.std_savedict.copy() + savedict['dtype'] = "int" + savedict['name'] = "source_type" + savedict['units'] = "NA" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -9 + f.add_data(savedict) + + data = self.getvalues('evn') + + savedict = io.std_savedict.copy() + savedict['dtype'] = "char" + savedict['name'] = "obs_id" + savedict['units'] = "ObsPack datapoint identifier" + savedict['dims'] = dimid + dim200char + savedict['values'] = data + savedict['missing_value'] = '!' + f.add_data(savedict) + + data = self.getvalues('obs') + + savedict = io.std_savedict.copy() + savedict['name'] = "observed" + savedict['long_name'] = "observedvalues" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = 'Observations used in optimization' + f.add_data(savedict) + + data = self.getvalues('mdm') + + savedict = io.std_savedict.copy() + savedict['name'] = "modeldatamismatch" + savedict['long_name'] = "modeldatamismatch" + savedict['units'] = "[mol mol-1]" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = 'Standard deviation of mole fractions resulting from model-data mismatch' + f.add_data(savedict) + f.close() + + logging.debug("Successfully wrote data to obs file") + logging.info("Sample input file for obs operator now in place [%s]" % obsinputfile) + + def add_model_data_mismatch(self, filename): + """ + Get the model-data mismatch values for this cycle. + + (1) Open a sites_weights file + (2) Parse the data + (3) Compare site list against data + (4) Take care of double sites, etc + + """ + + if not os.path.exists(filename): + msg = 'Could not find the required sites.rc input file (%s) ' % filename + logging.error(msg) + raise IOError, msg + else: + self.sites_file = filename + + sites_weights = rc.read(self.sites_file) + + self.rejection_threshold = int(sites_weights['obs.rejection.threshold']) + self.global_R_scaling = float(sites_weights['global.R.scaling']) + self.n_site_categories = int(sites_weights['n.site.categories']) + + logging.debug('Model-data mismatch rejection threshold: %d ' % self.rejection_threshold) + logging.warning('Model-data mismatch scaling factor : %f ' % self.global_R_scaling) + logging.debug('Model-data mismatch site categories : %d ' % self.n_site_categories) + + cats = [k for k in sites_weights.keys() if 'site.category' in k] + + site_categories = {} + for key in cats: + name, error, may_localize, may_reject = sites_weights[key].split(';') + name = name.strip().lower() + error = float(error) + may_reject = ("TRUE" in may_reject.upper()) + may_localize = ("TRUE" in may_localize.upper()) + site_categories[name] = {'category': name, 'error': error, 'may_localize': may_localize, 'may_reject': may_reject} + + site_info = {} + site_move = {} + site_hourly = {} # option added to include only certain hours of the day (for e.g. PAL) IvdL + site_incalt = {} # option to increase sampling altitude for sites specified in sites and weights file + for key, value in sites_weights.iteritems(): + if 'obsfile' in key: # to be fixed later, do not yet know how to parse valid keys from rc-files yet.... WP + sitename, sitecategory = key, value + sitename = sitename.strip() + sitecategory = sitecategory.split()[0].strip().lower() + site_info[sitename] = site_categories[sitecategory] + if 'site.move' in key: + identifier, latmove, lonmove = value.split(';') + site_move[identifier.strip()] = (float(latmove), float(lonmove)) + if 'site.hourly' in key: + identifier, hourfrom, hourto = value.split(';') + site_hourly[identifier.strip()] = (int(hourfrom), int(hourto)) + if 'site.incalt' in key: + identifier, incalt = value.split(';') + site_incalt[identifier.strip()] = (int(incalt)) + + for obs in self.datalist: # loop over all available data points + + obs.mdm = 1000.0 # default is very high model-data-mismatch, until explicitly set by script + obs.flag = 99 # default is do-not-use , until explicitly set by script + exclude_hourly = False # default is that hourly values are not included + + identifier = obs.code + # species, site, method, lab, datasetnr = identifier.split('_') + + if site_info.has_key(identifier): + if site_hourly.has_key(identifier): + obs.samplingstrategy = 2 + hourf, hourt = site_hourly[identifier] + if int(obs.xdate.hour) >= hourf and int(obs.xdate.hour) <= hourt: + logging.warning("Observations in hourly dataset INCLUDED, while sampling time %s was between %s:00-%s:00"%(obs.xdate.time(),hourf,hourt)) + obs.flag = 0 + else: + logging.warning("Observation in hourly dataset EXCLUDED, while sampling time %s was outside %s:00-%s:00"%(obs.xdate.time(),hourf,hourt)) + exclude_hourly = True + if site_info[identifier]['category'] == 'do-not-use' or exclude_hourly: + logging.warning("Observation found (%s, %d), but not used in assimilation !!!" % (identifier, obs.id)) + obs.mdm = site_info[identifier]['error'] * self.global_R_scaling + obs.may_localize = site_info[identifier]['may_localize'] + obs.may_reject = site_info[identifier]['may_reject'] + obs.flag = 99 + else: + logging.debug("Observation found (%s, %d)" % (identifier, obs.id)) + obs.mdm = site_info[identifier]['error'] * self.global_R_scaling + obs.may_localize = site_info[identifier]['may_localize'] + obs.may_reject = site_info[identifier]['may_reject'] + obs.flag = 0 + + else: + logging.warning("Observation NOT found (%s, %d), please check sites.rc file (%s) !!!" % (identifier, obs.id, self.sites_file)) + + if site_move.has_key(identifier): + + movelat, movelon = site_move[identifier] + obs.lat = obs.lat + movelat + obs.lon = obs.lon + movelon + + logging.warning("Observation location for (%s, %d), is moved by %3.2f degrees latitude and %3.2f degrees longitude" % (identifier, obs.id, movelat, movelon)) + + if site_incalt.has_key(identifier): + + incalt = site_incalt[identifier] + obs.height = obs.height + incalt + + logging.warning("Observation location for (%s, %d), is moved by %3.2f meters in altitude" % (identifier, obs.id, incalt)) + + + # Add site_info dictionary to the Observations object for future use + + self.site_info = site_info + self.site_move = site_move + self.site_hourly = site_hourly + self.site_incalt = site_incalt + + logging.debug("Added Model Data Mismatch to all samples ") + + def write_sample_auxiliary(self, auxoutputfile, filename): + """ + Write selected information contained in the Observations object to a file. + + """ + + if not os.path.exists(filename): + msg = "Sample output filename for observations could not be found : %s" % filename + logging.error(msg) + logging.error("Did the sampling step succeed?") + logging.error("...exiting") + raise IOError, msg + + ncf = io.ct_read(filename, method='read') + ids = ncf.get_variable('obs_num') + simulatedensemble = ncf.get_variable('model') + ncf.close() + logging.info("Successfully read data from model sample file (%s)" % filename) + + f = io.CT_CDF(auxoutputfile, method='create') + logging.debug('Creating new auxiliary sample output file for postprocessing (%s)' % auxoutputfile) + + dimid = f.add_dim('obs', len(self.datalist)) + dim200char = f.add_dim('string_of200chars', 200) + dim10char = f.add_dim('string_of10chars', 10) + dimcalcomp = f.add_dim('calendar_components', 6) + + if len(self.datalist) == 0: + f.close() + #return outfile + + for key, value in self.site_move.iteritems(): + msg = "Site is moved by %3.2f degrees latitude and %3.2f degrees longitude" % value + f.add_attribute(key, msg) + + data = self.getvalues('id') + + savedict = io.std_savedict.copy() + savedict['name'] = "obs_num" + savedict['dtype'] = "int" + savedict['long_name'] = "Unique_Dataset_observation_index_number" + savedict['units'] = "" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED." + f.add_data(savedict) + + data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate')] + + savedict = io.std_savedict.copy() + savedict['dtype'] = "int" + savedict['name'] = "date_components" + savedict['units'] = "integer components of UTC date/time" + savedict['dims'] = dimid + dimcalcomp + savedict['values'] = data + savedict['missing_value'] = -9 + savedict['comment'] = "Calendar date components as integers. Times and dates are UTC." + savedict['order'] = "year, month, day, hour, minute, second" + f.add_data(savedict) + + data = self.getvalues('obs') + + savedict = io.std_savedict.copy() + savedict['name'] = "observed" + savedict['long_name'] = "observedvalues" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = 'Observations used in optimization' + f.add_data(savedict) + + data = self.getvalues('mdm') + + savedict = io.std_savedict.copy() + savedict['name'] = "modeldatamismatch" + savedict['long_name'] = "modeldatamismatch" + savedict['units'] = "[mol mol-1]" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = 'Standard deviation of mole fractions resulting from model-data mismatch' + f.add_data(savedict) + + data = simulatedensemble + + dimmembers = f.add_dim('members', data.shape[1]) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamples" + savedict['long_name'] = "modelsamples for all ensemble members" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + dimmembers + savedict['values'] = data.tolist() + savedict['comment'] = 'simulated mole fractions based on optimized state vector' + f.add_data(savedict) + + data = self.getvalues('fromfile') + + savedict = io.std_savedict.copy() + savedict['name'] = "inputfilename" + savedict['long_name'] = "name of file where original obs data was taken from" + savedict['dtype'] = "char" + savedict['dims'] = dimid + dim200char + savedict['values'] = data + savedict['missing_value'] = '!' + f.add_data(savedict) + + f.close() + + logging.debug("Successfully wrote data to auxiliary sample output file (%s)" % auxoutputfile) + + #return outfile + + + +################### End Class CtObservations ################### + + + +################### Begin Class MoleFractionSample ################### + +class MoleFractionSample(object): + """ + Holds the data that defines a mole fraction Sample in the data assimilation framework. Sor far, this includes all + attributes listed below in the __init__ method. One can additionally make more types of data, or make new + objects for specific projects. + + """ + + def __init__(self, idx, xdate, code='XXX', obs=0.0, simulated=0.0, resid=0.0, hphr=0.0, mdm=0.0, flag=0, height=0.0, lat= -999., lon= -999., evn='0000', species='co2', samplingstrategy=1, sdev=0.0, fromfile='none.nc'): + self.code = code.strip() # dataset identifier, i.e., co2_lef_tower_insitu_1_99 + self.xdate = xdate # Date of obs + self.obs = obs # Value observed + self.simulated = simulated # Value simulated by model + self.resid = resid # Mole fraction residuals + self.hphr = hphr # Mole fraction prior uncertainty from fluxes and (HPH) and model data mismatch (R) + self.mdm = mdm # Model data mismatch + self.may_localize = True # Whether sample may be localized in optimizer + self.may_reject = True # Whether sample may be rejected if outside threshold + self.flag = flag # Flag + self.height = height # Sample height in masl + self.lat = lat # Sample lat + self.lon = lon # Sample lon + self.id = idx # Obspack ID within distrution (integer), e.g., 82536 + self.evn = evn # Obspack Number within distrution (string), e.g., obspack_co2_1_PROTOTYPE_v0.9.2_2012-07-26_99_82536 + self.sdev = sdev # standard deviation of ensemble + self.masl = True # Sample is in Meters Above Sea Level + self.mag = not self.masl # Sample is in Meters Above Ground + self.species = species.strip() + self.samplingstrategy = samplingstrategy + self.fromfile = fromfile # netcdf filename inside ObsPack distribution, to write back later + +################### End Class MoleFractionSample ################### + + +if __name__ == "__main__": + pass + + + diff --git a/da/stiltops/observationoperator.py b/da/stiltops/observationoperator.py new file mode 100755 index 00000000..e48525ae --- /dev/null +++ b/da/stiltops/observationoperator.py @@ -0,0 +1,730 @@ +#!/usr/bin/env python +# stilt_tools.py + +""" +Author : W. He +Adapted by Ingrid Super, last revisions 14-8-2018 + +Revision History: +Basing on Wouter's codes, replace the TM5 model with the STILT model, April 2015. + +This module holds specific functions needed to use the STILT model within the data assimilation shell. It uses the information +from the DA system in combination with the generic stilt.rc files. + +The STILT model is now controlled by a python subprocess. This subprocess consists of an MPI wrapper (written in C) that spawns +a large number ( N= nmembers) of STILT model instances under mpirun, and waits for them all to finish. + +#The design of the system assumes that the stilt function are executed using R codes, and is residing in a +#directory specified by the ${RUNDIR} of a stilt rc-file. This stilt rc-file name is taken from the data assimilation rc-file. Thus, + +""" +import shutil +import os +import sys +import logging +import datetime as dtm +import numpy as np +from numpy import array, logical_and +from string import join +import glob +import da.tools.io4 as io +import subprocess + +import da.tools.rc as rc +from da.tools.general import create_dirs, to_datetime +from da.baseclasses.observationoperator import ObservationOperator + +# global constants, which will be used in the following classes +identifier = 'WRF-STILT' +version = '1.0' + +################### Begin Class STILT ################### + +def dumfunc(arg): + return STILTObservationOperator.OPS_run(*arg) + +class STILTObservationOperator(object): + + def __init__(self, filename, dacycle=None): #only the filename used to specify the location of the stavector file for wrf-stilt runs + """ The instance of an STILTObservationOperator is application dependent """ + self.ID = identifier # the identifier gives the model name + self.version = version # the model version used + self.restart_filelist = [] + self.output_filelist = [] + self.outputdir = None # Needed for opening the samples.nc files created + + logging.info('Observation Operator initialized: %s (%s)' % (self.ID, self.version)) + + self.load_rc(filename) # load the specified rc-file + + if dacycle != None: + self.dacycle = dacycle + else: + self.dacycle = {} + + self.startdate=None + + def get_initial_data(self): + """ This method places all initial data needed by an ObservationOperator in the proper folder for the model """ + + def setup(self, dacycle): + """ Execute all steps needed to prepare the ObsOperator for use inside CTDAS, only done at the very first cycle normally """ + self.dacycle = dacycle + self.outputdir = dacycle['dir.output'] + self.load_rc = dacycle['da.obsoperator.rc'] #stilt_footprint.rc + self.obsfile = dacycle.dasystem['obs.input.id'] + self.obsdir = dacycle.dasystem['datadir'] + self.nrloc = int(dacycle.dasystem['obs.input.nr']) + self.nrspc = int(dacycle.dasystem['obs.spec.nr']) + self.nrcat = int(dacycle.dasystem['obs.cat.nr']) + self.bgswitch = int(dacycle.dasystem['obs.bgswitch']) + self.bgfile = dacycle.dasystem['obs.background'] + self.btime = int(dacycle.dasystem['run.backtime']) + self.paramfile = dacycle.dasystem['emis.paramfile'] + + def load_rc(self, name): + """ + This method loads an OPS rc-file with settings for this simulation + """ + + self.rcfile = rc.RcFile(name) + self.model_settings = self.rcfile.values + logging.debug('rc-file loaded successfully') + + def prepare_run(self,psdo,adv): + """ Prepare the running of the actual forecast model, for example compile code """ + + if psdo==0: + # Define the name of the file that will contain the modeled output of each observation + if adv==0: + self.simulated_file = os.path.join(self.outputdir, 'samples_simulated.%s.nc' % self.dacycle['time.sample.stamp']) + else: + self.simulated_file = os.path.join(self.outputdir, 'samples_simulated.%s.adv.nc' % self.dacycle['time.sample.stamp']) + self.forecast_nmembers = int(self.dacycle['da.optimizer.nmembers']) + self.param_inputdir = self.dacycle['dir.input'] + + #list all observation locations and species + self.lat = [] + self.lon = [] + self.hgt = [] + self.obsnc = [] + self.mmass = [] + self.cr = [] + infile = os.path.join(self.obsdir, self.obsfile) + f = open(infile, 'r') + lines = f.readlines() + f.close() + self.spname = [] + ct = 0 + for line in lines: + dum=line.split(",") + if dum[0]=='#': + continue + else: + fln, lt, ln, hg, sp, mss, crr = (dum[1], float(dum[2]), float(dum[3]), float(dum[4]), dum[5], float(dum[6]), float(dum[7])) + self.obsnc.append(fln) + if sp in self.spname: + if sp == self.spname[0]: + self.lat.append(lt) + self.lon.append(ln) + self.hgt.append(hg) + else: + self.spname.append(sp) + self.mmass.append(mss) + self.cr.append(crr) + if ct == 0: + self.lat.append(lt) + self.lon.append(ln) + self.hgt.append(hg) + ct = ct + 1 + + #find out which model is to be used for which sector + infile = os.path.join(self.obsdir, self.paramfile) + f = open(infile, 'r') + lines = f.readlines() + f.close() + self.ops_sector = [] + self.ops_id = [] + self.temporal_var = [] + ct = 0 + for line in lines: + dum=line.split(",") + if dum[0]=='#': + continue + else: + id = int(dum[0]) + if id != 0: + self.ops_sector.append(ct) + self.ops_id.append(id) + ct = ct + 1 + self.temporal_var.append(dum[11]) + + def OPS_run(self,n,d,spc,psdo): + """ This function prepares input files for OPS runs and reads the output""" + + #update the .inv file (model settings for OPS) + edhr = self.modelend.hour + if edhr == 0: + edhr = 24 + enddate=self.modelend-dtm.timedelta(0,3600) + else: + enddate=self.modelend + + invfile = self.model_settings['run.setupfile'] + file = open(invfile,'r') + dum = file.readlines() + file.close() + invfile_new = os.path.join(self.dacycle['da.obsoperator.home'], 'run%03d%03d.inv'%(spc+1,n)) + file = open(invfile_new,'w') + for line in dum: + if 'component' in line: + if self.spname[spc] == 'CO2' or self.spname[spc] == 'C14': + file.write('7') + file.write('\n') + elif self.spname[spc] == 'CO': + file.write('8') + file.write('\n') + elif self.spname[spc] == 'NOx': + file.write('2') + file.write('\n') + elif self.spname[spc] == 'SO2': + file.write('1') + file.write('\n') + else: #assume no chemistry or deposition for other species + file.write('8') + file.write('\n') + elif 'date & hour begin' in line: + file.write('%02d %02d %02d %02d\n'%(self.modelstart.year - 2000,self.modelstart.month,self.modelstart.day,1)) + elif 'date & hour end' in line: + file.write('%02d %02d %02d %02d\n'%(enddate.year - 2000,enddate.month,enddate.day,edhr)) + elif 'meteofile' in line: + file.write('./../../OPS_input/Meteo_ruwe_data/rg2mbk%02d\n'%(self.modelstart.year - 2000)) + elif 'averaging' in line: + file.write(str(self.model_settings['run.avgopt'])) + file.write('\n') + elif 'hourly (1) or annual (0)' in line: + file.write(str(self.model_settings['run.emdt'])) + file.write('\n') + ### currently CTDAS only supports run.emdt == 1, otherwise no time profiles can be included + elif 'emission data file' in line: + if self.model_settings['run.emdt']=='0': + file.write('./run%03d.brn'%(spc+1)) + file.write('\n') + else: + file.write('./dum.brn') + file.write('\n') + elif 'hourly emission file' in line: + if self.model_settings['run.emdt']=='0': + file.write('./dum.ems') + file.write('\n') + else: + file.write('./run%03d%03d.ems'%(spc+1,n)) + file.write('\n') + else: + file.write(line) + file.close() + + #call function to create emission file with correct temporal variations for each ensemble member + self.OPSsource(d=d,n=n,spc=spc,psdo=psdo) + + #set correct file paths and run OPS + cwd = os.getcwd() + os.chdir(self.dacycle['da.obsoperator.home']) + outdir = os.path.join(self.model_settings['run.opsoutdir'],'%03d'%(spc+1)) + rptfile = self.model_settings['run.rptfile'] + newfile = 'run%03d.rpt'%(spc+1) + shutil.copy(rptfile,os.path.join(self.dacycle['da.obsoperator.home'],newfile)) + FNULL = open(os.devnull, 'w') + + #extra check: compare final date output file with enddate in .inv file, since sometimes OPS files are prepared + #but the run is not completed. Then try again! + testdate = [0,0] + while testdate != [enddate.day, edhr]: + code = subprocess.call(["./run", "%03d%03d"%(spc+1,n)],stdout=FNULL) + opsfile = os.path.join(outdir,'run%03d_%03d.plt'%(spc+1,n)) + shutil.copy('run%03d%03d.plt'%(spc+1,n),opsfile) + out = np.loadtxt(opsfile,skiprows=2,delimiter=';',usecols=(0,1,2,3)).T + testdate = [int(out[2,-1]),int(out[3,-1])] + os.chdir(cwd) + + #read output, which contains output for one species but for all locations + opsfile = os.path.join(outdir,'run%03d_%03d.plt'%(spc+1,n)) + out = np.loadtxt(opsfile,skiprows=2,delimiter=';',usecols=(0,4,11)).T + loc = out[1] + cnc = out[2] + cncc = [] + for j in range(self.nrloc): + dum = [] + for i in range(len(loc)): + if loc[i]==j+1: + dum.append(cnc[i]) + cncc.append(dum) + cncc = np.array(cncc) + Vm = 24.45E-3 + if n==999: #pseudo-observations: read entire time series excluding first day of spin-up + for j in range(self.nrloc): + conc = cncc[j,24:]*self.cr[spc]*Vm/self.mmass[spc] + self.conc_OPS[len(self.datlist)*spc+len(self.datlist)*self.nrspc*j:len(self.datlist)*spc+len(self.datlist)*self.nrspc*j+len(self.datlist)]=conc + else: #inversion: read only final concentration for each locations + conc = cncc[:,-1]*self.cr[spc]*Vm/self.mmass[spc] + + return conc + + def OPSsource(self,d,n,spc,psdo): + """This function creates hourly emission files for each OPS run""" + + dt=(self.modelend-self.modelstart).days*24+(self.modelend-self.modelstart).seconds/3600+1 + dtkey=(self.timefinishkey-self.timestartkey).days*24+(self.timefinishkey-self.timestartkey).seconds/3600+1 + #read in temporal profiles; always start at 1:00 a.m. because this is needed for OPS to run + if psdo==0: + idx=d+dtkey + tempfile = os.path.join(self.model_settings['run.datadir'],'prior_temporal_%03d.nc'%n) + tf = io.ct_read(tempfile, 'read') + temp_profiles = [] + for s in self.ops_sector: + f = tf.get_variable(self.temporal_var[s])[idx-dt:idx] + temp_profiles.append(f) + tf.close() + elif psdo==1: + tempfile = os.path.join(self.model_settings['run.datadir'],'prior_temporal_999.nc') + tf = io.ct_read(tempfile, 'read') + temp_profiles = [] + for s in self.ops_sector: + f = tf.get_variable(self.temporal_var[s]) + temp_profiles.append(f) + tf.close() + temp_profiles = np.array(temp_profiles) + + #read total OPS emissions calculated with the emission model + emisfile = os.path.join(self.model_settings['run.datadir'],'prior_spatial_%03d.nc'%n) + f = io.ct_read(emisfile,method='read') + OPS_tot=f.get_variable('OPStotals_%s'%(self.spname[spc])) + f.close() + + #read prior emission file with fractions + brnfile = os.path.join(self.dacycle['da.obsoperator.home'], 'run%s_prior.brn'%(self.spname[spc])) + file = open(brnfile,'r') + dum = file.readlines() + file.close() + header=[] + data=[] + ct=1 + for line in dum: + if 'ssn' in line: + header.append(line) + else: + xloc, yloc, hc, hgt, dim, sv, dv, cat, ar, sd = \ + (int(line[4:12]), int(line[12:20]), float(line[30:37]), \ + float(line[37:43]), int(line[43:50]), float(line[50:56]), int(line[56:60]), \ + int(line[60:64]), int(line[64:68]), int(line[68:70])) + emid = ct + emis = float(line[20:30]) + data.append([emid,xloc,yloc,emis,hc,hgt,dim,sv,dv,cat,ar,sd]) + ct = ct+1 + data=np.array(data) + + # Open new emission file and fill with data. OPS always needs to start a run at 1:00 a.m. Since we want a specific footprint (self.btime) + # we need to set all emissions prior to the start of the footprint to zero (flg = 1). + brnfile_new = os.path.join(self.dacycle['da.obsoperator.home'], 'run%03d%03d.ems'%(spc+1,n)) + file = open(brnfile_new,'w') + for t in range(dt): + crdate = self.modelstart+dtm.timedelta(0,3600*t) + if crdate < self.timestart and psdo==0: + flg = 1 + else: + flg = 0 + crhr = crdate.hour + if crhr == 0: + crhr = 24 + crdate = crdate - dtm.timedelta(0,3600) + cryr = crdate.year-2000 + crmnt = crdate.month + crdy = crdate.day + for k in range(len(data)): + if flg == 0: + for s in range(len(self.ops_sector)): + if data[k,8] == self.ops_id[s]: + emis = data[k,3]*OPS_tot[s]*temp_profiles[s,t] + elif flg==1: + emis = 0. + file.write('%4d%4d%4d%4d%6d%8.0f%8.0f%10.3E%7.3f%6.1f%7.0f%6.1f%4d%4d%4d%2d\n' \ + %(cryr,crmnt,crdy,crhr,data[k,0],data[k,1],data[k,2],emis,data[k,4],data[k,5],data[k,6],data[k,7],data[k,8],data[k,9],data[k,10],data[k,11])) + file.close() + + def run_STILT(self,j,n,k,spc,psdo): + """This function reads in STILT footprints and returns hourly concentration data""" + + #read temporal profiles for the times within the footprint + if psdo==0: + dtkey=(self.timefinishkey-self.timestartkey).days*24+(self.timefinishkey-self.timestartkey).seconds/3600+1 + elif psdo==1: + dtkey=int(self.dacycle['time.cycle'])*24 + idx=j+dtkey + tempfile = os.path.join(self.model_settings['run.datadir'],'prior_temporal_%03d.nc'%n) + tf = io.ct_read(tempfile, 'read') + temp_profiles = [] + for s in range(self.nrcat): + f = tf.get_variable(self.temporal_var[s])[idx-self.btime:idx] + temp_profiles.append(f) + tf.close() + temp_profiles = np.array(temp_profiles) + + #read spatially distributed emissions calculated with the emission model + emisfile = os.path.join(self.model_settings['run.datadir'],'prior_spatial_%03d.nc'%n) #format: species[SNAP,lon,lat] + f = io.ct_read(emisfile,method='read') + emis=f.get_variable(self.spname[spc]) + f.close() + + #find correct background concentration + if self.bgswitch == 1: + bgr = self.bg[spc,j] + else: + bgr=0. + + #find correct footprint file + tms=[] + tmser=[] + if self.lat[k]>0: + lt='N' + else: + lt='S' + if self.lon[k]>0: + ln='E' + else: + ln='W' + dum='%.2f'%abs(self.lat[k]) + dum2=dum.zfill(5) + lt2=dum2+lt + dum='%.2f'%abs(self.lon[k]) + dum2=dum.zfill(6) + ln2=dum2+ln + ht2='%05d'%self.hgt[k] + fprint=os.path.join(self.model_settings['run.footdir'], 'foot%04dx%02dx%02dx%02dx%sx%sx%s.nc'%(self.datlist[j].year,self.datlist[j].month,self.datlist[j].day,self.datlist[j].hour,lt2,ln2,ht2)) + mf = io.ct_read(fprint,method='read') + footprint=mf.get_variable('foot1') + mf.close() + + # multiply footprint with emissions for each hour in the back trajectory + for l in range(footprint.shape[0]): + ft=footprint[l] + dumvar=np.array(emis.shape[1]*[emis.shape[2]*[0]]) + for i in range(emis.shape[0]): + emis_new=emis[i]*temp_profiles[i,l] + dumvar=dumvar+emis_new + dum=np.sum(ft*dumvar*self.cr[spc],axis=0) + tms.append(dum) + tms=np.array(tms) + tmser=tms.sum()+bgr + + return tmser + + def validate_input(self): + """ Make sure that data needed for the ObservationOperator (such as observation input lists, or parameter files) + are present. + """ + + datadir = self.dacycle['dir.input'] + if not os.path.exists(datadir): + msg = "The specified input directory for the OPS model to read from does not exist (%s), exiting..." % datadir + logging.error(msg) + raise IOError, msg + + datafiles = os.listdir(datadir) + + obsfile = self.dacycle['ObsOperator.inputfile'] + + if not os.path.exists(obsfile): + msg = "The specified obs input file for the OPS model to read from does not exist (%s), exiting..." % obsfile + logging.error(msg) + if not self.dacycle.has_key('forward.savestate.dir'): + raise IOError, msg + + for n in range(int(self.dacycle['da.optimizer.nmembers'])): + paramfile = 'parameters.%03d.nc' % n + if paramfile not in datafiles: + msg = "The specified parameter input file for the OPS model to read from does not exist (%s), exiting..." % paramfile + logging.error(msg) + raise IOError, msg + + self.ops_exec = self.model_settings['run.opsexec'] + if not os.path.exists(self.ops_exec): + logging.error("Required OPS executable was not found %s" % self.ops_exec) + logging.error("Please compile the model first") + raise IOError + + + def run_forecast_model(self,dacycle,psdo,adv): + if psdo==0: + self.prepare_run(psdo,adv) + self.validate_input() + self.run(dacycle,psdo) + self.save_data() + elif psdo==1: + self.prepare_run(psdo,adv) + self.run(dacycle,psdo) + self.save_obs() + + def run(self,dacycle,psdo): + """ This function calls the OPS and STILT functions for each locations, species, ensemble member and time step""" + + #set time control variables for this cycle + if psdo==0: + self.timestartkey = dtm.datetime(self.dacycle['time.sample.start'].year,self.dacycle['time.sample.start'].month,self.dacycle['time.sample.start'].day,self.dacycle['time.sample.start'].hour,0) + self.timefinishkey = dtm.datetime(self.dacycle['time.sample.end'].year,self.dacycle['time.sample.end'].month,self.dacycle['time.sample.end'].day,self.dacycle['time.sample.end'].hour,0) + elif psdo==1: + self.timestartkey = dtm.datetime(self.dacycle['time.fxstart'].year,self.dacycle['time.fxstart'].month,self.dacycle['time.fxstart'].day,self.dacycle['time.fxstart'].hour,0) + self.timefinishkey = dtm.datetime(self.dacycle['time.finish'].year,self.dacycle['time.finish'].month,self.dacycle['time.finish'].day,self.dacycle['time.finish'].hour,0) + dt=dtm.timedelta(0,3600) + dumdate=self.timestartkey + self.datlist=[] + while dumdate<(self.timefinishkey+dt): + self.datlist.append(dumdate) + dumdate=dumdate+dt + + #first remove old OPS output files and make sure output directories exist + for s in range(self.nrspc): + dirname = os.path.join(self.model_settings['run.opsoutdir'],'%03d'%(s+1)) + if os.path.isdir(dirname): + filename = os.path.join(self.model_settings['run.opsoutdir'],'%03d/run%03d_999.plt'%(s+1,s+1)) + try: + os.remove(filename) + except OSError: + pass + for n in range(int(self.dacycle['da.optimizer.nmembers'])): + filename = os.path.join(self.model_settings['run.opsoutdir'],'%03d/run%03d_%03d.plt'%(s+1,s+1,n)) + try: + os.remove(filename) + except OSError: + pass + else: + os.makedirs(dirname) + + #read in background concentrations + if self.bgswitch == 1: + bf = io.ct_read(self.bgfile, 'read') + itimes = bf.get_variable('Times') + bgtimes = array([dtm.datetime(int(''.join(d[:4])),int(''.join(d[5:7])),int(''.join(d[8:10])),int(''.join(d[11:13])),int(''.join(d[14:16]))) for d in itimes]) + idx1=np.where(bgtimes==self.timestartkey)[0] + idx2=np.where(bgtimes==self.timefinishkey)[0] + self.bg = [] + for s in range(self.nrspc): + if psdo == 1: + varname = '%sbg_true'%(self.spname[s]) + elif psdo == 0: + #varname = '%sbg_true'%(self.spname[s]) + varname = '%sbg_prior'%(self.spname[s]) + if varname in bf.variables.keys(): + bgc = bf.get_variable(varname)[idx1[0]:idx2[0]+1] + else: + bgc = np.zeros(idx2-idx1+1) + self.bg.append(bgc) + self.bg=np.array(self.bg) + bf.close() + + if psdo==1: + #create pseudo-obs (full time series at once); for this we set n (ensemble member) to 999 as a flag + self.conc_OPS=(self.nrloc*self.nrspc*len(self.datlist))*[0.] + dums2_all=[] + self.ids=[] + self.sps=[] + #OPS always has to start at 1:00 a.m. + self.modelstart = dtm.datetime(self.timestartkey.year,self.timestartkey.month,self.timestartkey.day-1,1,0) + self.timestart = self.modelstart + self.modelend = self.timefinishkey + for k in range(self.nrloc): + for s in range(self.nrspc): + if k == 0 and len(self.ops_id)>0: #run OPS only for first location, as all locations are in the output (no need to run multiple times) + dum1=self.OPS_run(n=999,d=999,spc=s,psdo=psdo) + for d in range(len(self.datlist)): + ######do STILT area sources + dum2=self.run_STILT(j=d,n=999,k=k,spc=s,psdo=psdo) + dums2_all.append(dum2) + #other variables that need to be in observation files: species and index + self.ids.append(d+len(self.datlist)*s+len(self.datlist)*self.nrspc*k) + self.sps.append('%3s'%self.spname[s]) + self.mod_prior=np.array(self.conc_OPS)+np.array(dums2_all) + logging.debug('Finished simulating pseudo time series') + else: + ####create model simulations per time step using X hour footprints + #### start with OPS + totser=int(self.dacycle['da.optimizer.nmembers'])*[(self.nrloc*self.nrspc*len(self.datlist))*[0.]] + self.conc_OPS=np.array(int(self.dacycle['da.optimizer.nmembers'])*[(self.nrloc*self.nrspc*len(self.datlist))*[0.]]) + if len(self.ops_id)>0: + if self.dacycle['da.resources.parallel']=='parallel': + import multiprocessing + for s in range(self.nrspc): + for d in range(len(self.datlist)): + self.timestart = self.datlist[d] - dtm.timedelta(0,self.btime*3600) + if self.timestart.hour != 0: + self.modelstart = dtm.datetime(self.timestart.year,self.timestart.month,self.timestart.day,1,0) + else: + self.modelstart = dtm.datetime(self.timestart.year,self.timestart.month,self.timestart.day-1,1,0) + self.modelend = self.datlist[d] + p = multiprocessing.Pool(int(self.dacycle['da.resources.ntasks'])) + args=[] + ##parallelize OPS runs for ensemble members + for n in range(int(self.dacycle['da.optimizer.nmembers'])): + dum=[self,n,d,s,psdo] + args.append(dum) + data = p.map(dumfunc,args) + for m in range(len(data)): + for t in range(self.nrloc): + self.conc_OPS[m,d+len(self.datlist)*s+len(self.datlist)*self.nrspc*t]=data[m][t] + p.close() + p.join() + else: + for n in range(int(self.dacycle['da.optimizer.nmembers'])): + for s in range(self.nrspc): + for d in range(len(self.datlist)): + self.timestart = self.datlist[d] - dtm.timedelta(0,self.btime*3600) + if self.timestart.hour != 0: + self.modelstart = dtm.datetime(self.timestart.year,self.timestart.month,self.timestart.day,1,0) + else: + self.modelstart = dtm.datetime(self.timestart.year,self.timestart.month,self.timestart.day-1,1,0) + self.modelend = self.datlist[d] + dum1=self.OPS_run(n=n,d=d,spc=s,psdo=psdo) + for t in range(self.nrloc): + self.conc_OPS[n,d+len(self.datlist)*s+len(self.datlist)*self.nrspc*t]=dum1[t] + #### then do STILT + for n in range(int(self.dacycle['da.optimizer.nmembers'])): + conc_STILT=[] + for k in range(self.nrloc): + for s in range(self.nrspc): + for d in range(len(self.datlist)): + dum2=self.run_STILT(j=d,n=n,k=k,spc=s,psdo=psdo) + conc_STILT.append(dum2) + totser[n]=np.array(self.conc_OPS[n])+np.array(conc_STILT) + # totser[n]=np.array(conc_STILT) + self.mod=np.array(totser).T + logging.debug('Finished doing simulations per back trajectory time') + + def save_data(self): + """ Write the data that is needed for a restart or recovery of the Observation Operator to the save directory """ + + import da.tools.io4 as io + + f = io.CT_CDF(self.simulated_file, method='create') + logging.debug('Creating new simulated observation file in ObservationOperator (%s)' % self.simulated_file) + + dimid = f.createDimension('obs_num', size=None) + dimid = ('obs_num',) + savedict = io.std_savedict.copy() + savedict['name'] = "obs_num" + savedict['dtype'] = "int" + savedict['long_name'] = "Unique_Dataset_observation_index_number" + savedict['units'] = "" + savedict['dims'] = dimid + savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED." + f.add_data(savedict,nsets=0) + + dimmember = f.createDimension('nmembers', size=self.forecast_nmembers) + dimmember = ('nmembers',) + savedict = io.std_savedict.copy() + savedict['name'] = "model" + savedict['dtype'] = "float" + savedict['long_name'] = "mole_fraction_of_trace_gas_in_air" + savedict['units'] = "mol tracer (mol air)^-1" + savedict['dims'] = dimid + dimmember + savedict['comment'] = "Simulated model value" + f.add_data(savedict,nsets=0) + + f_in = io.ct_read(self.dacycle['ObsOperator.inputfile'],method='read') + ids = f_in.get_variable('obs_num') + + for i,data in enumerate(zip(ids,self.mod)): + f.variables['obs_num'][i] = data[0] + f.variables['model'][i,:] = data[1] + dum=f.variables['model'][:] + + f.close() + f_in.close() + + def save_obs(self): + """Write pseudo-observations to file""" + + ct1=0 + for k in range(self.nrloc*self.nrspc): + newfile = os.path.join(self.obsdir,self.obsnc[k]+'.nc') + f = io.CT_CDF(newfile, method='create') + logging.debug('Creating new pseudo observation file in ObservationOperator (%s)' % newfile) + + dimid = f.add_dim('Time',len(self.datlist)) + ln = len(self.datlist) + str19 = f.add_dim('strlen',19) + str3 = f.add_dim('strlen2',3) + + data=[] + for it,t in enumerate(self.datlist): + data.append(list(t.isoformat().replace('T','_'))) + + savedict = io.std_savedict.copy() + savedict['dtype'] = "char" + savedict['name'] = "Times" + savedict['units'] = "" + savedict['dims'] = dimid + str19 + savedict['values'] = data + f.add_data(savedict) + + data = ln*[self.lat[int(k/self.nrspc)]] + savedict = io.std_savedict.copy() + savedict['name'] = "lat" + savedict['units'] = "degrees_north" + savedict['dims'] = dimid + savedict['values'] = data + f.add_data(savedict) + + data = ln*[self.lon[int(k/self.nrspc)]] + savedict = io.std_savedict.copy() + savedict['name'] = "lon" + savedict['units'] = "degrees_east" + savedict['dims'] = dimid + savedict['values'] = data + f.add_data(savedict) + + data = ln*[self.hgt[int(k/self.nrspc)]] + savedict = io.std_savedict.copy() + savedict['name'] = "alt" + savedict['units'] = "m above ground" + savedict['dims'] = dimid + savedict['values'] = data + f.add_data(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "obs" + savedict['units'] = "ppm or ppb" + savedict['dims'] = dimid + savedict['values'] = self.mod_prior[ct1:ct1+ln] + f.add_data(savedict) + + savedict = io.std_savedict.copy() + savedict['dtype'] = "char" + savedict['name'] = "species" + savedict['units'] = "observed species" + savedict['dims'] = dimid + str3 + savedict['values'] = self.sps[ct1:ct1+ln] + f.add_data(savedict) + + + savedict = io.std_savedict.copy() + savedict['dtype'] = "int" + savedict['name'] = "ids" + savedict['units'] = "unique observation identification number" + savedict['dims'] = dimid + savedict['values'] = self.ids[ct1:ct1+ln] + f.add_data(savedict) + + + f.close() + + ct1=ct1+ln + + logging.debug("Successfully wrote data to obs file %s"%newfile) + + +################### End Class STILT ################### + + +if __name__ == "__main__": + pass + + diff --git a/da/stiltops/optimizer.py b/da/stiltops/optimizer.py new file mode 100755 index 00000000..5c5dc106 --- /dev/null +++ b/da/stiltops/optimizer.py @@ -0,0 +1,308 @@ +#!/usr/bin/env python +# optimizer.py + +""" +.. module:: optimizer +.. moduleauthor:: Wouter Peters + +Revision History: +File created on 28 Jul 2010. + +""" + +import os +import logging +import numpy as np +import numpy.linalg as la +import da.tools.io4 as io + +identifier = 'Optimizer CO2' +version = '0.0' + +from da.baseclasses.optimizer import Optimizer + +################### Begin Class CO2Optimizer ################### + +class CO2Optimizer(Optimizer): + """ + This creates an instance of an optimization object. It handles the minimum least squares optimization + of the state vector given a set of sample objects. Two routines will be implemented: one where the optimization + is sequential and one where it is the equivalent matrix solution. The choice can be made based on considerations of speed + and efficiency. + """ + + def setup(self, dims): + self.nlag = dims[0] + self.nmembers = dims[1] + self.nparams = dims[2] + self.nobs = dims[3] + self.nrloc = dims[4] + self.nrspc = dims[5] + self.inputdir = dims[6] + #self.specfile = dims[7] + self.create_matrices() + + def set_localization(self, loctype='None'): + """ determine which localization to use """ + + if loctype == 'CT2007': + self.localization = True + self.localizetype = 'CT2007' + #T-test values for two-tailed student's T-test using 95% confidence interval for some options of nmembers + if self.nmembers == 50: + self.tvalue = 2.0086 + elif self.nmembers == 100: + self.tvalue = 1.9840 + elif self.nmembers == 150: + self.tvalue = 1.97591 + elif self.nmembers == 200: + self.tvalue = 1.9719 + else: self.tvalue = 0 + elif loctype == 'multitracer': + self.localization = True + self.localizetype = 'multitracer' + elif loctype == 'multitracer2': + self.localization = True + self.localizetype = 'multitracer2' + else: + self.localization = False + self.localizetype = 'None' + + logging.info("Current localization option is set to %s" % self.localizetype) + + def localize(self, n): + """ localize the Kalman Gain matrix """ + import numpy as np + + if not self.localization: + logging.debug('Not localized observation %i' % self.obs_ids[n]) + return + if self.localizetype == 'CT2007': + count_localized = 0 + for r in range(self.nlag * self.nparams): + corr = np.corrcoef(self.HX_prime[n, :], self.X_prime[r, :].squeeze())[0, 1] + prob = corr / np.sqrt((1.000000001 - corr ** 2) / (self.nmembers - 2)) + if abs(prob) < self.tvalue: + self.KG[r] = 0.0 + count_localized = count_localized + 1 + logging.debug('Localized observation %i, %i%% of values set to 0' % (self.obs_ids[n],count_localized*100/(self.nlag * self.nparams))) + if self.localizetype == 'multitracer': + count_localized = 0 + ###read emission ratios for source type related to each parameter + infile = os.path.join(self.inputdir, self.specfile) + f = open(infile, 'r') + lines = f.readlines() + f.close() + speclist = [] + speclist.append('CO2') + emr = [] + for line in lines: + dum=line.split(",") + if dum[0]=='#' or dum[0]=='CO2' or dum[0]=='SNAP': + continue + else: + sp = dum[0] + emrs = [] + for k in range(self.nparams): + emrs.append(float(dum[k+1])) + speclist.append(sp) + emr.append(emrs) + emr=np.array(emr) + + ###find obs and model value for this time step and species; calculate differences and ratios + lnd = self.nobs/(self.nrspc*self.nrloc) + for i in range(self.nrspc): + if self.species[n] == speclist[i]: + idx = [0-i,1-i,2-i,3-i] + + Xobs = [] + Xmod = [] + Robs = [] + Rmod = [] + for i in range(self.nrspc): + Xobs.append(self.obs[n+lnd*idx[i]]) + Xmod.append(self.Hx[n+lnd*idx[i]]) + if i>0: + Robs.append(Xobs[i]/Xobs[0]) + Rmod.append(Xmod[i]/Xmod[0]) + Xobs = np.array(Xobs) + Xmod = np.array(Xmod) + Robs = np.array(Robs) + Rmod = np.array(Rmod) + dX = Xmod - Xobs + dR = abs(Rmod - Robs) + flg = [] + if Xobs[0]>1.: #and self.species[n] == 'CO2' + flg=0 + for i in range(self.nrspc): + if Xobs[i] == 0 or Xmod[i] == 0.: + flg=1 + else: + flg=1 + + ###old version + # for i in range(self.nrspc): + # if dX[i]>0: + # flg.append(1) + # elif dX[i]<0: + # flg.append(0) + # else: + # flg.append(np.nan) + + ### This routine determines which source types are likely to cause the model-data mismatch, according to the following principle: + ### if the modeled CO:CO2 ratio is higher than the observed ratio this means we either overestimate emissions with a high CO:CO2 ratio + ### or we underestimate emissions with a low CO:CO2 ratio; if the CO concentration is overestimated, it is likely to be the first option + ### (i.e. too much emissions). We only include information from species if the model-data mismatch in the ratio is >5% of the observed ratio + dums = [] + dum = [] + tst1 = [] + if flg == 0: + for i in range(self.nrspc-1): + if dX[0]>0: + if dX[i+1]>0: + if Rmod[i]>Robs[i]: + tst1.append(1) + elif Rmod[i]<Robs[i]: + tst1.append(-1) + elif dX[i+1]<0: + if Rmod[i]>Robs[i]: + tst1.append(0) + elif Rmod[i]<Robs[i]: + tst1.append(2) + elif dX[0]<0: + if dX[i+1]>0: + if Rmod[i]>Robs[i]: + tst1.append(2) + elif Rmod[i]<Robs[i]: + tst1.append(0) + elif dX[i+1]<0: + if Rmod[i]>Robs[i]: + tst1.append(-1) + elif Rmod[i]<Robs[i]: + tst1.append(1) + if 2 in tst1: + dums1=[] + dums2=[] + for i in range(self.nrspc-1): + for k in range(len(emr[i])): + if emr[i,k]<Robs[i]: + dums1.append(k) + if emr[i,k]>Robs[i]: + dums2.append(k) + for j in range(self.nparams): + ct = dums1.count(j) + ctc = dums2.count(j) + if ct == (self.nrspc - 1) or ctc == (self.nrspc - 1): + ### all requirements are met, so do not localize + dum.append(j) + else: + for i in range(self.nrspc-1): + if tst1[i] == 1: + for k in range(len(emr[i])): + if emr[i,k]>Robs[i]: + dums.append(k) + elif tst1[i] == -1: + for k in range(len(emr[i])): + if emr[i,k]<Robs[i]: + dums.append(k) + for j in range(self.nparams): + ct = dums.count(j) + if ct == (self.nrspc - 1): + ### all requirements are met, so do not localize + dum.append(j) + + ###old version + # if sum(flg) == 0 or sum(flg) == 4: + ### if this is not the case, it is likely a mixture of over- and underestimation, which we can't specify; so no localization applied + # for i in range(self.nrspc-1): + # if dR[i]>0.05*Robs[i]: + # if (Rmod[i]>Robs[i] and dX[i+1]>0) or (Rmod[i]<Robs[i] and dX[i+1]<0): + # tst1.append(1.) + # elif (Rmod[i]<Robs[i] and dX[i+1]>0) or (Rmod[i]>Robs[i] and dX[i+1]<0): + # tst1.append(-1.) + # else: + # tst1.append(0) + # for i in range(self.nrspc-1): + # if tst1[i] == 1: + # for k in range(len(emr[i])): + # if emr[i,k]>Robs[i]: + # dums.append(k) + # elif tst1[i] == -1: + # for k in range(len(emr[i])): + # if emr[i,k]<Robs[i]: + # dums.append(k) + # for j in range(self.nparams): + # ct = dums.count(j) + # if ct == (self.nrspc - 1): + ### all requirements are met, so do not localize + # dum.append(j) + + if len(dum) > 0: + ### what to do when we can't attribute model-data mismatch? update all parameters or set them all to zero?? (adapt dum) + for r in range(self.nlag * self.nparams): + if r in dum: + continue + else: + self.KG[r] = 0.0 + self.test_localize[r] = self.test_localize[r] + 1 + count_localized = count_localized + 1 + logging.debug('Localized observation %i, %i%% of values set to 0' % (self.obs_ids[n],count_localized*100/(self.nlag * self.nparams))) + if self.localizetype == 'multitracer2': + ### This routine used more strict rules for source attribution by comparing the observed concentration ratios to emission ratios + ### per source type + + count_localized = 0 + + ###find obs and model value for this time step and species; calculate differences and ratios + lnd = self.nobs/(self.nrspc*self.nrloc) + for i in range(self.nrspc): + if self.species[n] == speclist[i]: + idx = [0-i,1-i,2-i,3-i] + + Xobs = [] + Robs = [] + for i in range(self.nrspc): + Xobs.append(self.obs[n+lnd*idx[i]]) + if i>0: + Robs.append(Xobs[i]/Xobs[0]) + + dum=[] + if Robs[2]>0.1: + if Robs[1]<1. and Robs[0]<1.: + dum.append(4) + elif Robs[2]>2.5 and Robs[1]<1.>8. and Robs[0]>3.: + dum.append(8) + elif Robs[0]>1. and Robs[2]<0.1: + if Robs[0]<4. and Robs[1]<1.<1.: + dum.append(2) + dum.append(3) + elif Robs[0]>7. and Robs[1]<1.>1.5: + dum.append(5) + dum.append(6) + dum.append(7) + elif Robs[0]>3.5 and Robs[1]<1.<2.5: + dum.append(2) + dum.append(3) + dum.append(5) + dum.append(6) + dum.append(7) + elif Robs[0]<0.6 and Robs[1]<1.<0.6: + dum.append(0) + dum.append(1) + + if len(dum) > 0 and len(dum) < self.nparams: + for r in range(self.nlag * self.nparams): + if r in dum: + continue + else: + self.KG[r] = 0.0 + count_localized = count_localized + 1 + logging.debug('Localized observation %i, %i%% of values set to 0' % (self.obs_ids[n],count_localized*100/(self.nlag * self.nparams))) + + +################### End Class OPSOptimizer ################### + + + +if __name__ == "__main__": + pass diff --git a/da/stiltops/pipeline.py b/da/stiltops/pipeline.py new file mode 100755 index 00000000..769a793f --- /dev/null +++ b/da/stiltops/pipeline.py @@ -0,0 +1,444 @@ +#!/usr/bin/env python +# pipeline.py + +""" +.. module:: pipeline +.. moduleauthor:: Wouter Peters + +Revision History: +File created on 06 Sep 2010. + +The pipeline module holds methods that execute consecutive tasks with each of the objects of the DA system. + +""" +import logging +import os +import sys +import datetime +import copy + +header = '\n\n *************************************** ' +footer = ' *************************************** \n ' + +def ensemble_smoother_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator, optimizer, emismodel): + """ The main point of entry for the pipeline """ + sys.path.append(os.getcwd()) + + logging.info(header + "Initializing current cycle" + footer) + start_job(dacycle, dasystem, platform, statevector, samples, obsoperator, emismodel) + + prepare_state(dacycle, dasystem, statevector,samples, obsoperator, emismodel) + sample_state(dacycle, samples, statevector, obsoperator, emismodel) + + invert(dacycle, statevector, optimizer, obsoperator) + + advance(dacycle, samples, statevector, obsoperator, emismodel) + + save_and_submit(dacycle, statevector) + logging.info("Cycle finished...exiting pipeline") + +def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator): + """ The main point of entry for the pipeline """ + sys.path.append(os.getcwd()) + + logging.info(header + "Initializing current cycle" + footer) + start_job(dacycle, dasystem, platform, statevector, samples, obsoperator) + + if dacycle.has_key('forward.savestate.exceptsam'): + sam = (dacycle['forward.savestate.exceptsam'].upper() in ["TRUE","T","YES","Y"]) + else: + sam = False + + if dacycle.has_key('forward.savestate.dir'): + fwddir = dacycle['forward.savestate.dir'] + else: + logging.debug("No forward.savestate.dir key found in rc-file, proceeding with self-constructed prior parameters") + fwddir = False + + if dacycle.has_key('forward.savestate.legacy'): + legacy = (dacycle['forward.savestate.legacy'].upper() in ["TRUE","T","YES","Y"]) + else: + legacy = False + logging.debug("No forward.savestate.legacy key found in rc-file") + + if not fwddir: + # Simply make a prior statevector using the normal method + + + prepare_state(dacycle, statevector)#LU tutaj zamiast tego raczej to stworzenie nowej kowariancji i ensembli bo pozostale rzeczy sa na gorze i na doel. + + else: + # Read prior information from another simulation into the statevector. + # This loads the results from another assimilation experiment into the current statevector + + if sam: + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d')) + #filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') + statevector.read_from_file_exceptsam(filename, 'prior') + elif not legacy: + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d')) + statevector.read_from_file(filename, 'prior') + else: + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') + statevector.read_from_legacy_file(filename, 'prior') + + + # We write this "prior" statevector to the restart directory, so we can later also populate it with the posterior statevector + # Note that we could achieve the same by just copying the wanted forward savestate.nc file to the restart folder of our current + # experiment, but then it would already contain a posterior field as well which we will try to write in save_and_submit. + # This could cause problems. Moreover, this method allows us to read older formatted savestate.nc files (legacy) and write them into + # the current format through the "write_to_file" method. + + savefilename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + statevector.write_to_file(savefilename, 'prior') + + # Now read optimized fluxes which we will actually use to propagate through the system + + if not fwddir: + + # if there is no forward dir specified, we simply run forward with unoptimized prior fluxes in the statevector + logging.info("Running forward with prior savestate from: %s"%savefilename) + + else: + + # Read posterior information from another simulation into the statevector. + # This loads the results from another assimilation experiment into the current statevector + + if sam: + statevector.read_from_file_exceptsam(filename, 'opt') + elif not legacy: + statevector.read_from_file(filename, 'opt') + else: + statevector.read_from_legacy_file(filename, 'opt') + + logging.info("Running forward with optimized savestate from: %s"%filename) + + # Finally, we run forward with these parameters + + advance(dacycle, samples, statevector, obsoperator) + + # In save_and_submit, the posterior statevector will be added to the savestate.nc file, and it is added to the copy list. + # This way, we have both the prior and posterior data from another run copied into this assimilation, for later analysis. + + save_and_submit(dacycle, statevector) + + logging.info("Cycle finished...exiting pipeline") +#################################################################################################### + +def analysis_pipeline(dacycle, platform, dasystem, samples, statevector): + """ Main entry point for analysis of ctdas results """ + + from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data + from da.analysis.expand_molefractions import write_mole_fractions + from da.analysis.summarize_obs import summarize_obs + from da.analysis.time_avg_fluxes import time_avg + + logging.info(header + "Starting analysis" + footer) + + dasystem.validate() + dacycle.dasystem = dasystem + dacycle.daplatform = platform + dacycle.setup() + statevector.setup(dacycle) + + logging.info(header + "Starting mole fractions" + footer) + + write_mole_fractions(dacycle) + # summarize_obs(dacycle['dir.analysis']) + + logging.info(header + "Starting weekly averages" + footer) + + # save_weekly_avg_1x1_data(dacycle, statevector) + save_weekly_avg_state_data(dacycle, statevector) + # save_weekly_avg_tc_data(dacycle, statevector) + # save_weekly_avg_ext_tc_data(dacycle) + # save_weekly_avg_agg_data(dacycle,region_aggregate='transcom') + # save_weekly_avg_agg_data(dacycle,region_aggregate='transcom_extended') + # save_weekly_avg_agg_data(dacycle,region_aggregate='olson') + # save_weekly_avg_agg_data(dacycle,region_aggregate='olson_extended') + # save_weekly_avg_agg_data(dacycle,region_aggregate='country') + + logging.info(header + "Starting monthly and yearly averages" + footer) + + time_avg(dacycle,'flux1x1') + # time_avg(dacycle,'transcom') + # time_avg(dacycle,'transcom_extended') + # time_avg(dacycle,'olson') + # time_avg(dacycle,'olson_extended') + # time_avg(dacycle,'country') + + logging.info(header + "Finished analysis" + footer) + +def archive_pipeline(dacycle, platform, dasystem): + """ Main entry point for archiving of output from one disk/system to another """ + + if not dacycle.has_key('task.rsync'): + logging.info('rsync task not found, not starting automatic backup...') + return + else: + logging.info('rsync task found, starting automatic backup...') + + for task in dacycle['task.rsync'].split(): + sourcedirs = dacycle['task.rsync.%s.sourcedirs'%task] + destdir = dacycle['task.rsync.%s.destinationdir'%task] + + rsyncflags = dacycle['task.rsync.%s.flags'%task] + + # file ID and names + jobid = dacycle['time.end'].strftime('%Y%m%d') + targetdir = os.path.join(dacycle['dir.exec']) + jobfile = os.path.join(targetdir, 'jb.rsync.%s.%s.jb' % (task,jobid) ) + logfile = os.path.join(targetdir, 'jb.rsync.%s.%s.log' % (task,jobid) ) + # Template and commands for job + jobparams = {'jobname':"r.%s" % jobid, 'jobnodes': '1', 'jobtime': '1:00:00', 'joblog': logfile, 'errfile': logfile} + + if platform.ID == 'cartesius': + jobparams['jobqueue'] = 'staging' + + template = platform.get_job_template(jobparams) + for sourcedir in sourcedirs.split(): + execcommand = "\nrsync %s %s %s\n" % (rsyncflags, sourcedir,destdir,) + template += execcommand + + # write and submit + platform.write_job(jobfile, template, jobid) + jobid = platform.submit_job(jobfile, joblog=logfile) + + +def start_job(dacycle, dasystem, platform, statevector, samples, obsoperator, emismodel): + """ Set up the job specific directory structure and create an expanded rc-file """ + + dasystem.validate() + dacycle.dasystem = dasystem + dacycle.daplatform = platform + dacycle.setup() + statevector.dacycle = dacycle # also embed object in statevector so it can access cycle information for I/O etc + #samples.dacycle = dacycle # also embed object in samples object so it can access cycle information for I/O etc + #obsoperator.dacycle = dacycle # also embed object in obsoperator object so it can access cycle information for I/O etc + obsoperator.setup(dacycle) # Setup Observation Operator + statevector.setup(dacycle) + emismodel.setup(dacycle) + +def prepare_state(dacycle, dasystem, statevector,samples, obsoperator, emismodel): + """ Set up the input data for the forward model: obs and parameters/fluxes""" + + # We now have an empty statevector object that we need to populate with data. If this is a continuation from a previous cycle, we can read + # the previous statevector values from a NetCDF file in the restart directory. If this is the first cycle, we need to populate the statevector + # with new values for each week. After we have constructed the statevector, it will be propagated by one cycle length so it is ready to be used + # in the current cycle + + logging.info(header + "starting prepare_state" + footer) + + if not dacycle['time.restart']: + + # Fill each week from n=1 to n=nlag with a new ensemble + + for n in range(statevector.nlag): + date = dacycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(dacycle['time.cycle'])) + cov = statevector.get_covariance(date, dacycle) + statevector.make_new_ensemble(dacycle, n, cov) + + # Prepare emissions (spatial and temporal distribution) if emisflag is set to 1 + if dacycle.dasystem['run.emisflag']=='1': + emismodel.get_emis(dacycle,psdo=1) + # Prepare new pseudo-observations if obsflag is set to 1 + if dacycle.dasystem['run.obsflag']=='1': + obsoperator.run_forecast_model(dacycle,psdo=1,adv=0) + else: + + # Read the statevector data from file + + #saved_sv = os.path.join(dacycle['dir.restart.current'], 'savestate.nc') + + + saved_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['da.restart.tstamp'].strftime('%Y%m%d')) + statevector.read_from_file(saved_sv) # by default will read "opt"(imized) variables, and then propagate + + # Now propagate the ensemble by one cycle to prepare for the current cycle + statevector.propagate(dacycle) + # Finally, also write the statevector to a file so that we can always access the a-priori information + + current_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + statevector.write_to_file(current_sv, 'prior') # write prior info + +def sample_state(dacycle, samples, statevector, obsoperator, emismodel): + """ Sample the filter state for the inversion """ + + + # Before a forecast step, save all the data to a save/tmp directory so we can later recover it before the propagation step. + # This is especially important for: + # (i) The transport model restart data which holds the background mole fractions. This is needed to run the model one cycle forward + # (ii) The random numbers (or the seed for the random number generator) so we can recreate the ensembles if needed + + #status = dacycle.MoveSaveData(io_option='store',save_option='partial',filter=[]) + #msg = "All restart data have been copied to the save/tmp directory for future use" ; logging.debug(msg) + logging.info(header + "starting sample_state" + footer) + nlag = int(dacycle['time.nlag']) + logging.info("Sampling model will be run over %d cycles" % nlag) + + obsoperator.get_initial_data() + + for lag in range(nlag): + logging.info(header + ".....Ensemble Kalman Filter at lag %d" % (lag + 1)) + + ############# Perform the actual sampling loop ##################### + + sample_step(dacycle, samples, statevector, obsoperator, emismodel, lag) + + logging.debug("statevector now carries %d samples" % statevector.nobs) + + +def sample_step(dacycle, samples, statevector, obsoperator, emismodel, lag, advance=False): + """ Perform all actions needed to sample one cycle """ + + + # First set up the information for time start and time end of this sample + dacycle.set_sample_times(lag) + + startdate = dacycle['time.sample.start'] + enddate = dacycle['time.sample.end'] + dacycle['time.sample.window'] = lag + dacycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H")) + + logging.info("New simulation interval set : ") + logging.info(" start date : %s " % startdate.strftime('%F %H:%M')) + logging.info(" end date : %s " % enddate.strftime('%F %H:%M')) + logging.info(" file stamp: %s " % dacycle['time.sample.stamp']) + + + # Implement something that writes the ensemble member parameter info to file, or manipulates them further into the + # type of info needed in your transport model + + statevector.write_members_to_file(lag, dacycle['dir.input']) + + samples.setup(dacycle) + samples.add_observations(dacycle) + + # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future?? + + samples.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc']) + + sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp']) + samples.write_sample_coords(sampling_coords_file) + # Write filename to dacycle, and to output collection list + dacycle['ObsOperator.inputfile'] = sampling_coords_file + + # Run the observation operator, store data in different file for advance + if dacycle.dasystem['run.emisflagens']=='1': + emismodel.get_emis(dacycle,psdo=0) + if not advance: + obsoperator.run_forecast_model(dacycle,psdo=0,adv=0) + else: + obsoperator.run_forecast_model(dacycle,psdo=0,adv=1) + + + # Read forecast model samples that were written to NetCDF files by each member. Add them to the exisiting + # Observation object for each sample loop. This data fill be written to file in the output folder for each sample cycle. + + + # We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates + # one file per member, some logic needs to be included to merge all files!!! + + if os.path.exists(sampling_coords_file): + samples.add_simulations(obsoperator.simulated_file) + else: logging.warning("No simulations added, because input file does not exist (no samples found in obspack)") + + # Now write a small file that holds for each observation a number of values that we need in postprocessing + + #filename = samples.write_sample_coords() + + # Now add the observations that need to be assimilated to the statevector. + # Note that obs will only be added to the statevector if either this is the first step (restart=False), or lag==nlag + # This is to make sure that the first step of the system uses all observations available, while the subsequent + # steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only + # (and at least) once # in the assimilation + + + if not advance: + if dacycle['time.restart'] == False or lag == int(dacycle['time.nlag']) - 1: + statevector.obs_to_assimilate += (copy.deepcopy(samples),) + statevector.nobs += samples.getlength() + logging.debug("Added samples from the observation operator to the assimilated obs list in the statevector") + else: + statevector.obs_to_assimilate += (None,) + + +def invert(dacycle, statevector, optimizer, obsoperator): + """ Perform the inverse calculation """ + logging.info(header + "starting invert" + footer) + dims = (int(dacycle['time.nlag']), + int(dacycle['da.optimizer.nmembers']), + int(dacycle.dasystem['nparameters']), + statevector.nobs, + int(dacycle.dasystem['obs.input.nr']), + int(dacycle.dasystem['obs.spec.nr']), + dacycle.dasystem['datadir']) + + if not dacycle.dasystem.has_key('opt.algorithm'): + logging.info("There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)") + logging.info("...using serial algorithm as default...") + optimizer.set_algorithm() + elif dacycle.dasystem['opt.algorithm'] == 'serial': + logging.info("Using the serial minimum least squares algorithm to solve ENKF equations") + optimizer.set_algorithm('Serial') + elif dacycle.dasystem['opt.algorithm'] == 'bulk': + logging.info("Using the bulk minimum least squares algorithm to solve ENKF equations") + optimizer.set_algorithm('Bulk') + + optimizer.setup(dims) + optimizer.state_to_matrix(statevector) + + diagnostics_file = os.path.join(dacycle['dir.output'], 'optimizer.%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + + optimizer.write_diagnostics(diagnostics_file, 'prior') + optimizer.set_localization(dacycle['da.system.localization']) + + if optimizer.algorithm == 'Serial': + optimizer.serial_minimum_least_squares() + else: + optimizer.bulk_minimum_least_squares() + + optimizer.matrix_to_state(statevector) + optimizer.write_diagnostics(diagnostics_file, 'optimized') + + + +def advance(dacycle, samples, statevector, obsoperator, emismodel): + """ Advance the filter state to the next step """ + + # This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance) + + # Then, restore model state from the start of the filter + logging.info(header + "starting advance" + footer) + logging.info("Sampling model will be run over 1 cycle") + + obsoperator.get_initial_data() + + sample_step(dacycle, samples, statevector, obsoperator, emismodel, 0, True) + + dacycle.restart_filelist.extend(obsoperator.restart_filelist) + dacycle.output_filelist.extend(obsoperator.output_filelist) + logging.debug("Appended ObsOperator restart and output file lists to dacycle for collection ") + + dacycle.output_filelist.append(dacycle['ObsOperator.inputfile']) + logging.debug("Appended Observation filename to dacycle for collection: %s"%(dacycle['ObsOperator.inputfile'])) + + sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp']) + if os.path.exists(sampling_coords_file): + outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp']) + samples.write_sample_auxiliary(outfile, obsoperator.simulated_file) + else: logging.warning("Sample auxiliary output not written, because input file does not exist (no samples found in obspack)") + +def save_and_submit(dacycle, statevector): + """ Save the model state and submit the next job """ + logging.info(header + "starting save_and_submit" + footer) + + filename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + statevector.write_to_file(filename, 'opt') + + dacycle.output_filelist.append(filename) + dacycle.finalize() + + + + diff --git a/da/stiltops/statevector.py b/da/stiltops/statevector.py new file mode 100755 index 00000000..30be35ab --- /dev/null +++ b/da/stiltops/statevector.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python +# ct_statevector_tools.py + +""" +.. module:: statevector +.. moduleauthor:: Wouter Peters + +Revision History: +File created on 28 Jul 2010. +Adapted by super004 on 26 Jan 2017. + +The module statevector implements the data structure and methods needed to work with state vectors (a set of unknown parameters to be optimized by a DA system) of different lengths, types, and configurations. Two baseclasses together form a generic framework: + * :class:`~da.baseclasses.statevector.StateVector` + * :class:`~da.baseclasses.statevector.EnsembleMember` + +As usual, specific implementations of StateVector objects are done through inheritance form these baseclasses. An example of designing +your own baseclass StateVector we refer to :ref:`tut_chapter5`. + +.. autoclass:: da.baseclasses.statevector.StateVector + +.. autoclass:: da.baseclasses.statevector.EnsembleMember + +""" + +import os +import sys +sys.path.append(os.getcwd()) + +import logging +import numpy as np +from da.baseclasses.statevector import StateVector, EnsembleMember +from da.tools.general import create_dirs, to_datetime +import datetime as dtm + +import da.tools.io4 as io + +identifier = 'CarbonTracker Statevector ' +version = '0.0' + +################### Begin Class CO2StateVector ################### + +class CO2StateVector(StateVector): + + def __init__(self, dacycle=None): + if dacycle != None: + self.dacycle = dacycle + else: + self.dacycle = {} + + def setup(self, dacycle): + """ + setup the object by specifying the dimensions. + There are two major requirements for each statvector that you want to build: + + (1) is that the statevector can map itself onto a regular grid + (2) is that the statevector can map itself (mean+covariance) onto TransCom regions + + An example is given below. + """ + + self.dacycle = dacycle + self.nlag = int(self.dacycle['time.nlag']) + self.nmembers = int(self.dacycle['da.optimizer.nmembers']) + self.nparams = int(self.dacycle.dasystem['nparameters']) + self.obsdir = self.dacycle.dasystem['datadir'] + self.pparam = self.dacycle.dasystem['emis.pparam'] + self.covm = self.dacycle.dasystem['ff.covariance'] + self.prop = int(self.dacycle.dasystem['run.propscheme']) + self.nobs = 0 + + self.obs_to_assimilate = () # empty containter to hold observations to assimilate later on + + # These list objects hold the data for each time step of lag in the system. Note that the ensembles for each time step consist + # of lists of EnsembleMember objects, we define member 0 as the mean of the distribution and n=1,...,nmembers as the spread. + + self.ensemble_members = range(self.nlag) + + for n in range(self.nlag): + self.ensemble_members[n] = [] + + + # This specifies the file to read with the gridded mask at 1x1 degrees. Each gridbox holds a number that specifies the parametermember + # that maps onto it. From this map, a dictionary is created that allows a reverse look-up so that we can map parameters to a grid. + + self.gridmap = np.arange(1,self.nparams+1,1) + + # Create a dictionary for state <-> gridded map conversions + + nparams = self.gridmap.max() + self.griddict = {} + for r in range(1, int(nparams) + 1): + sel = (self.gridmap.flat == r).nonzero() + if len(sel[0]) > 0: + self.griddict[r] = sel + + logging.debug("A dictionary to map grids to states and vice versa was created") + + # Create a mask for species/unknowns + + self.make_species_mask() + + def get_covariance(self, date, dacycle): + + file=os.path.join(self.obsdir,self.covm) + f = io.ct_read(file, 'read') + covmatrix = f.get_variable('covariances')[:self.nparams,:self.nparams] + f.close() + + return covmatrix + + def write_members_to_file(self, lag, outdir,endswith='.nc'): + """ + :param: lag: Which lag step of the filter to write, must lie in range [1,...,nlag] + :param: outdir: Directory where to write files + :param: endswith: Optional label to add to the filename, default is simply .nc + :rtype: None + + Write ensemble member information to a NetCDF file for later use. The standard output filename is + *parameters.DDD.nc* where *DDD* is the number of the ensemble member. Standard output file location + is the `dir.input` of the dacycle object. In principle the output file will have only two datasets inside + called `parametervalues` which is of dimensions `nparameters` and `parametermap` which is of dimensions (180,360). + This dataset can be read and used by a :class:`~da.baseclasses.observationoperator.ObservationOperator` object. + + .. note:: if more, or other information is needed to complete the sampling of the ObservationOperator you + can simply inherit from the StateVector baseclass and overwrite this write_members_to_file function. + + """ + + # These import statements caused a crash in netCDF4 on MacOSX. No problems on Jet though. Solution was + # to do the import already at the start of the module, not just in this method. + + #import da.tools.io as io + #import da.tools.io4 as io + + members = self.ensemble_members[lag] + + for mem in members: + filename = os.path.join(outdir, 'parameters.%03d%s' % (mem.membernumber, endswith)) + ncf = io.CT_CDF(filename, method='create') + dimparams = ncf.add_params_dim(self.nparams) + + data = mem.param_values + + savedict = io.std_savedict.copy() + savedict['name'] = "parametervalues" + savedict['long_name'] = "parameter_values_for_member_%d" % mem.membernumber + savedict['units'] = "unitless" + savedict['dims'] = dimparams + savedict['values'] = data + savedict['comment'] = 'These are parameter values to use for member %d' % mem.membernumber + ncf.add_data(savedict) + + ncf.close() + + logging.debug('Successfully wrote data from ensemble member %d to file (%s) ' % (mem.membernumber, filename)) + + def make_new_ensemble(self, dacycle, lag, covariancematrix=None): + """ + :param lag: an integer indicating the time step in the lag order + :param covariancematrix: a matrix to draw random values from + :rtype: None + + Make a new ensemble, the attribute lag refers to the position in the state vector. + Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below. + The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag] + + The optional covariance object to be passed holds a matrix of dimensions [nparams, nparams] which is + used to draw ensemblemembers from. If this argument is not passed it will ne substituted with an + identity matrix of the same dimensions. + + """ + self.seed = int(dacycle.dasystem['random.seed']) + if self.seed != 0: + np.random.seed(self.seed) + sds = np.random.randint(1,10000,20) + else: + sds = np.random.randint(1,10000,20) + sid = (dacycle['time.start'] - dacycle['time.fxstart']).days + + np.random.seed(sds[sid]) + enssds = np.random.randint(1,10000,self.nmembers) + + #option 1: start each cycle with the same prior values (makes several independent estimates) + if self.prop == 1 or dacycle['time.restart']==False: + file=os.path.join(self.obsdir,self.pparam) + f = io.ct_read(file, 'read') + prmval = f.get_variable('prior_values')[:self.nparams] + f.close() + #option 2: propagate optimized parameter values, but not the covariance matrix + elif self.prop == 2: + selectdate = dacycle['time.start']-dtm.timedelta(1) + dt=selectdate.strftime('%Y%m%d') + file=os.path.join(dacycle['dir.da_run'], 'output', selectdate.strftime('%Y%m%d'),'optimizer.%s.nc' % selectdate.strftime('%Y%m%d')) + f = io.ct_read(file, 'read') + prmval = f.get_variable('statevectormean_optimized')[:] + f.close() + elif self.prop == 3: + #option 3: start each cycle with the parameter values and uncertainties of the previous cycle (optimized) + selectdate = dacycle['time.start']-dtm.timedelta(1) + dt=selectdate.strftime('%Y%m%d') + file=os.path.join(dacycle['dir.da_run'], 'output', selectdate.strftime('%Y%m%d'),'optimizer.%s.nc' % selectdate.strftime('%Y%m%d')) + f = io.ct_read(file, 'read') + prmval = f.get_variable('statevectormean_optimized')[:] + devs = f.get_variable('statevectordeviations_optimized')[:] + f.close() + covariancematrix = (np.dot(devs,devs.T)/(devs.shape[1]-1)) + + # Check dimensions of covariance matrix list, must add up to nparams + + dims = covariancematrix.shape[0] + if dims != self.nparams: + logging.error("The total dimension of the covariance matrices passed (%d) does not add up to the prescribed nparams (%d), exiting..." % (dims, self.nparams)) + raise ValueError + + # Make a cholesky decomposition of the covariance matrix + + try: + _, s, _ = np.linalg.svd(covariancematrix) + except: + s = np.linalg.svd(covariancematrix, full_matrices=1, compute_uv=0) #Cartesius fix + dof = np.sum(s) ** 2 / sum(s ** 2) + C = np.linalg.cholesky(covariancematrix) + + logging.debug('Cholesky decomposition has succeeded ') + logging.info('Appr. degrees of freedom in covariance matrix is %s' % (int(dof))) + + # Create mean values + + newmean = np.ones(self.nparams, float) * prmval # standard value for a new time step is 1.0 + + # If this is not the start of the filter, average previous two optimized steps into the mix + + if lag == self.nlag - 1 and self.nlag >= 3: + newmean += self.ensemble_members[lag - 1][0].param_values + \ + self.ensemble_members[lag - 2][0].param_values + newmean = newmean / 3.0 + + # Create the first ensemble member with a deviation of 0.0 and add to list + + newmember = EnsembleMember(0) + newmember.param_values = newmean.flatten() # no deviations + self.ensemble_members[lag].append(newmember) + + # Create members 1:nmembers and add to ensemble_members list + + for member in range(1, self.nmembers): + np.random.seed(enssds[member]) + rands = np.random.randn(self.nparams) + # logging.debug('rands are %f, %f, %f, %f, %f'%(rands[0],rands[1],rands[2],rands[3],rands[4])) + + newmember = EnsembleMember(member) + newmember.param_values = np.dot(C, rands) + newmean + self.ensemble_members[lag].append(newmember) + + logging.debug('%d new ensemble members were added to the state vector # %d' % (self.nmembers, (lag + 1))) + + +################### End Class OPSStateVector ################### + +if __name__ == "__main__": + pass + diff --git a/da/stiltops/stilt-ops.rc b/da/stiltops/stilt-ops.rc new file mode 100755 index 00000000..80d14b12 --- /dev/null +++ b/da/stiltops/stilt-ops.rc @@ -0,0 +1,47 @@ +!!! Info for the CarbonTracker data assimilation system + +datadir : ../../../../Data + +! list of all observation sites +obs.input.id : obsfiles_urban.csv +! number of observation sites included; number of species included and to be used in inversion +obs.input.nr : 7 +obs.spec.nr : 4 +! number of emission categories defined in the emission model +obs.cat.nr : 14 +! For Rdam obs +obs.sites.rc : ${datadir}/sites_weights_Rijnmond.rc + +! number of parameters +nparameters : 44 +! set fixed seed for random number generator, or use 0 if you want to use any random seed +random.seed : 4385 +!file with prior estimate of scaling factors (statevector) and covariances +emis.pparam : param_values.nc +ff.covariance : covariances.nc +!files with prior and true emission model parameter values +emis.paramtrue : emis_parameters_true.csv +emis.paramprior : emis_parameters_prior.csv + +! switch (1=on/0=off) and input data for background CO2 and CO concentrations +obs.bgswitch : 1 +obs.background : ${datadir}/background.nc + +! input data for emission model +emis.input.spatial : spatial_data.nc +emis.input.tempobs : temporal_data.nc +emis.input.tempprior : temporal_data.nc + +! overwrite existing prior/ensemble emission files + pseudo-data (0: keep existing files; 1: create new files) +run.emisflag : 0 +run.emisflagens : 1 +run.obsflag : 0 + +! back trajectory time of STILT footprints, also applied to OPS (in hours) +run.backtime : 6 + +! choose propagation scheme: +! 1: no propagation, start each cycle with the same prior parameter values and covariance matrix +! 2: propagation of optimized parameter values, but not of the covariance matrix +! 3: propagation of both optimized parameter values and covariance matrix +run.propscheme : 1 -- GitLab