From 5f24d17b123dcf68ab41309a230e7d720abff125 Mon Sep 17 00:00:00 2001 From: "Woude, Auke van der" <auke.vanderwoude@wur.nl> Date: Tue, 16 Feb 2021 14:41:14 +0100 Subject: [PATCH] ringo ready --- da/ccffdas/emissionmodel.py | 8 ++- da/ccffdas/obs.py | 4 +- da/ccffdas/observationoperator.py | 114 +++++++++++++++++++++++------- da/ccffdas/pipeline.py | 6 +- da/ccffdas/statevector.py | 20 ++++-- da/ccffdas/stilt-ops_urbanall.rc | 13 ++-- 6 files changed, 120 insertions(+), 45 deletions(-) diff --git a/da/ccffdas/emissionmodel.py b/da/ccffdas/emissionmodel.py index 90c154a..07ac02b 100755 --- a/da/ccffdas/emissionmodel.py +++ b/da/ccffdas/emissionmodel.py @@ -57,8 +57,11 @@ class EmisModel(object): self.emisdir = dacycle.dasystem['datadir'] self.inputdir = dacycle['dir.input'] self.proxyfile = dacycle.dasystem['emis.input.spatial'] - self.nrspc = int(dacycle.dasystem['obs.spec.nr']) - self.species = dacycle.dasystem['obs.spec.name'].split(',') + species = dacycle.dasystem['obs.spec.name'].split(',') + if 'C14' in species: species.remove('C14') + self.species = species + self.nrspc = len(species) + self.nrcat = int(dacycle.dasystem['obs.cat.nr']) self.nmembers = int(dacycle['da.optimizer.nmembers']) self.countries = [country.strip() for country in dacycle.dasystem['countries'].split(';')] @@ -108,7 +111,6 @@ class EmisModel(object): key = station + '.' + cat else: key = station + '.' + cat + '.' + name - if key in self.paramdict: i_in_state = int(self.paramdict[key]) if return_index: diff --git a/da/ccffdas/obs.py b/da/ccffdas/obs.py index 6d171bf..90c6375 100755 --- a/da/ccffdas/obs.py +++ b/da/ccffdas/obs.py @@ -103,7 +103,7 @@ class RINGOObservations(Observations): logging.info('Using data from {}'.format(path)) ncfilelist = [path + line.split(',')[1].strip() for line in lines if not line[0] == '#'] ncfilelist = ncfilelist[:int(dacycle.dasystem['obs.input.nr'])] - species = [s.lower() for s in dacycle.dasystem['obs.spec.name'].split(',')] + ['c14'] + species = [s.lower() for s in dacycle.dasystem['obs.spec.name'].split(',')] self.species = species self.ncfilelist = ncfilelist datalist = [] @@ -113,10 +113,8 @@ class RINGOObservations(Observations): idates = ncf.get_variable('Times') idates = [''.join(d) for d in idates] dates = np.array([dtm.datetime.strptime(d, '%Y-%m-%d %H:%M:%S') for d in idates]) - self.dates = dates subselect = np.logical_and(dates >= self.startdate , dates < self.enddate).nonzero()[0] dates = dates.take(subselect, axis=0) - self.dates2 = dates datasetname = ncfile # use full name of dataset to propagate for clarity lats = ncf.get_variable('lat').take(subselect, axis=0) lons = ncf.get_variable('lon').take(subselect, axis=0) diff --git a/da/ccffdas/observationoperator.py b/da/ccffdas/observationoperator.py index 493a7a5..2edac61 100755 --- a/da/ccffdas/observationoperator.py +++ b/da/ccffdas/observationoperator.py @@ -184,7 +184,6 @@ class STILTObservationOperator(ObservationOperator): self.obsfile = dacycle.dasystem['obs.input.id'] self.obsdir = dacycle.dasystem['datadir'] self.nrloc = int(dacycle.dasystem['obs.input.nr']) - self.nrspc = int(dacycle.dasystem['obs.spec.nr']) self.nrcat = int(dacycle.dasystem['obs.cat.nr']) self.bgswitch = int(dacycle.dasystem['obs.bgswitch']) self.bgfile = dacycle.dasystem['obs.background'] @@ -197,6 +196,7 @@ class STILTObservationOperator(ObservationOperator): self.gpp = biosphere_fluxes['GPP'][:].astype(np.float32) self.ter = biosphere_fluxes['TER'][:].astype(np.float32) biosphere_fluxes.close() + self.written_prior_fluxes = False with nc.Dataset(self.dacycle.dasystem['country.mask']) as countries: self.masks = countries['country_mask'][:].astype(np.float32) @@ -274,7 +274,7 @@ class STILTObservationOperator(ObservationOperator): emismodel.multiprocess_emissions(self.dacycle, indices=self.indices) logging.info('Emissions calculated') - def prepare_biosphere(self, datepoint): + def prepare_biosphere(self, datepoint, do='members'): """Function that prepares the biosphere fluxes on a map Input: datepoint: datetime.datetime: time for which the biosphere fluxes should be prepared @@ -282,29 +282,57 @@ class STILTObservationOperator(ObservationOperator): None, writes the biosphere fluxes to self""" # First, get the time indices + if datepoint == None: + datepoint = self.start_time + indices = get_time_indices(datepoint, startdate=self.model_settings['files_startdate']) # Get the biosphere flux gpp = self.gpp[indices] ter = self.ter[indices] - - self.gpp_mem = np.zeros((self.forecast_nmembers, *gpp.shape), dtype=np.float32) - self.ter_mem = np.zeros_like(self.gpp_mem) - for mem in range(self.forecast_nmembers): - # multiply with the statevectorvalues - param_values = self.param_values[mem] - gpp_new = np.zeros_like(gpp) - ter_new = np.zeros_like(ter) - for i, country in enumerate(self.country_names): - country_mask = self.masks[i] - country_name = b''.join(country).decode() - param_value= self.emismodel.find_in_state(param_values, 'BIO', country_name.upper()) - gpp_country = country_mask[None, :, :] * gpp[:, :, :] * param_value - ter_country = country_mask[None, :, :] * ter[:, :, :] * param_value - gpp_new += gpp_country - ter_new += ter_country - self.gpp_mem[mem] = gpp_new * (1./MASS_C) * MOL2UMOL * PERHR2PERS - self.ter_mem[mem] = ter_new * (1./MASS_C) * MOL2UMOL * PERHR2PERS + + if do == 'members': + self.gpp_mem = np.zeros((self.forecast_nmembers, *gpp.shape), dtype=np.float32) + self.ter_mem = np.zeros_like(self.gpp_mem) + for mem in range(self.forecast_nmembers): + # multiply with the statevectorvalues + param_values = self.param_values[mem] + gpp_new, ter_new = self.scale_bio(param_values, gpp, ter) + self.gpp_mem[mem] = gpp_new + self.ter_mem[mem] = ter_new + if mem == 0 and self.written_prior_fluxes==False: + self.write_biosphere_fluxes(gpp_new + ter_new, qual='prior') + self.written_prior_fluxes = True + logging.debug('Biosphere prepared for {}'.format(datepoint)) + return + + elif do == 'true': + param_values = np.ones(100) + elif do == 'optimised': + prmfile = os.path.join(self.dacycle['dir.output'], 'optimizer.%s.nc' % self.dacycle['time.start'].strftime('%Y%m%d')) + prmname = 'statevectormean_optimized' + f = io.ct_read(prmfile, 'read') + param_values = f.get_variable(prmname) + gpp_new, ter_new = self.scale_bio(param_values, gpp, ter) + self.write_biosphere_fluxes(gpp_new + ter_new, qual=do) + logging.info('Biosphere fluxes written for {}'.format(do)) + + def scale_bio(self, param_values, gpp, ter): + gpp_new = np.zeros_like(gpp) + ter_new = np.zeros_like(ter) + for i, country in enumerate(self.country_names): + country_mask = self.masks[i] + country_name = b''.join(country).decode() + param_value = self.emismodel.find_in_state(param_values, country_name.upper(), 'BIO') + gpp_country = country_mask[None, :, :] * gpp[:, :, :] * param_value + ter_country = country_mask[None, :, :] * ter[:, :, :] * param_value + gpp_new += gpp_country + ter_new += ter_country + + # Recalculate to umol/sec: + gpp_new *= (1./MASS_C) * MOL2UMOL * PERHR2PERS + ter_new *= (1./MASS_C) * MOL2UMOL * PERHR2PERS + return gpp_new, ter_new def prepare_c14(self): """Function that loads the nuclear power temporal variation""" @@ -331,14 +359,15 @@ class STILTObservationOperator(ObservationOperator): np.array (3d): Footprint with as indices time, lat, lon """ - path = self.model_settings['footdir'] + '/' + site.upper() + '24' + path = self.dacycle.dasystem['footdir'] + '/' + site.upper() fname = path + '/footprint_{0}_{1}x{2:02d}x{3:02d}x{4:02d}*.nc'.format(site.upper(), datepoint.year, datepoint.month, datepoint.day, datepoint.hour) filelist = glob.glob(fname) - assert len(filelist) == 1 + if not len(filelist) == 1: + raise KeyError ('No footprint found for {} {}'.format(site, datepoint)) f = filelist[0] footprint = nc.Dataset(f)['foot'] @@ -540,7 +569,7 @@ class STILTObservationOperator(ObservationOperator): # Only about half a second self.prepare_biosphere(datepoint) if datepoint.day != previous_day: - logging.debug('Working on year {}, month {}, day {}'.format(datepoint.year, datepoint.month, datepoint.day)) + logging.info('Working on year {}, month {}, day {}'.format(datepoint.year, datepoint.month, datepoint.day)) # We are now ready to make a simulation for this datepoint # Check if this is a new site: @@ -592,7 +621,8 @@ class STILTObservationOperator(ObservationOperator): background = self.get_background_orig(species.upper()) # First, add noise (this represents errors in the transport model - noise = np.random.normal(0, sample.mdm) + noise = np.random.normal(0, sample.mdm); logging.debug('adding noise of {}'.format(noise)) + #noise = 0; logging.warning('Noise = 0') if self.do_observations: noise = 0 # Some different cases for different species. @@ -731,8 +761,42 @@ class STILTObservationOperator(ObservationOperator): f.variables['model'][i,:] = data[1] f.close() f_in.close() - + def write_biosphere_fluxes(self, values, qual='prior', member=None): + # Write the biosphere fluxes to a file + from dateutil.rrule import rrule, DAILY, HOURLY + if member is not None: + bio_emis_file = os.path.join(self.dacycle['dir.input'], 'bio_emissions_{0:03}.nc'.format(member)) + times = list(rrule(dtstart=self.start_time, freq=HOURLY, until=self.dacycle['time.sample.end'])) + else: + bio_emis_file = os.path.join(self.dacycle['dir.output'], 'bio_emissions_{}_{}.nc'.format(qual, self.dacycle['time.sample.start'].strftime('%Y%m%d'))) + values = values.mean(axis=0).reshape(1, *values.shape[1:]) + times = [self.start_time] + logging.debug('Writing biosphere fluxes to file {}'.format(bio_emis_file)) + + + f = io.CT_CDF(bio_emis_file, method='create') + dimtime= f.add_dim('time', len(times)) + time_units = 'seconds since 2000-01-01T00:00:00Z' + times = nc.date2num(times, time_units) + f.createVariable('time', times.dtype, dimtime) + f['time'][:] = times + f['time'].units = time_units + + dimlat = f.add_dim('lat', values.shape[1]) + dimlon = f.add_dim('lon', values.shape[2]) + + # Add the data and metadata to the file. + savedict = io.std_savedict.copy() + savedict['name'] = 'Biosphere fluxes' + savedict['long_name'] = "Spatially distributed emissions" + savedict['units'] = "micromole/m2/s" + dims = dimtime + dimlat + dimlon + savedict['dims'] = dims + savedict['values'] = values + savedict['dtype'] = 'float' + f.add_data(savedict) + f.close() ################### End Class STILT ################### diff --git a/da/ccffdas/pipeline.py b/da/ccffdas/pipeline.py index e8c446f..55107ae 100755 --- a/da/ccffdas/pipeline.py +++ b/da/ccffdas/pipeline.py @@ -410,8 +410,10 @@ def advance(dacycle, samples, statevector, obsoperator, optimizer): logging.info(header + "starting advance" + footer) time_profiles = obsoperator.emismodel.make_time_profiles(obsoperator.indices) - obsoperator.emismodel.get_emissions(dacycle, time_profiles, member='optimised') - obsoperator.emismodel.get_emissions(dacycle, time_profiles, member='true') + + for qual in ['optimised', 'true']: + obsoperator.emismodel.get_emissions(dacycle, time_profiles, member=qual) + obsoperator.prepare_biosphere(None, do=qual) # logging.info("Sampling model will be run over 1 cycle") # obsoperator.get_initial_data(samples) diff --git a/da/ccffdas/statevector.py b/da/ccffdas/statevector.py index e16129b..2dc8598 100755 --- a/da/ccffdas/statevector.py +++ b/da/ccffdas/statevector.py @@ -141,11 +141,10 @@ class CO2StateVector(StateVector): def get_covariance(self, date, dacycle): - file=os.path.join(self.obsdir,self.covm) - f = io.ct_read(file, 'read') + f = io.ct_read(self.covm, 'read') covmatrix = f.get_variable('covariances')[:self.nparams,:self.nparams] f.close() - logging.debug('Covariancematrix read from {}'.format(file)) + logging.debug('Covariancematrix read from {}'.format(self.covm)) return covmatrix @@ -214,7 +213,7 @@ class CO2StateVector(StateVector): self.ensemble_members.append([]) #option 1: start each cycle with the same prior values (makes several independent estimates) if self.prop == 1 or dacycle['time.restart'] == False: - file = os.path.join(self.obsdir, self.pparam) + file = self.pparam f = io.ct_read(file, 'read') if dacycle.dasystem['make.obs']: prmval = f.get_variable('true_values')[:self.nparams] @@ -257,7 +256,7 @@ class CO2StateVector(StateVector): assert len(oldmeans) == self.nparams logging.debug('Means has size {}'.format(len(oldmeans))) X = (1/np.sqrt(self.nmembers - 1)) * (x - oldmeans)# Get deviation matrix - P = self.nearestPD(np.dot(X.T, X)) # posterior covariance + P = np.dot(X.T, X) # posterior covariance z = f['red_noise_vector'][lag] return P, z @@ -283,7 +282,7 @@ class CO2StateVector(StateVector): # #option 1: start each cycle with the same prior values (makes several independent estimates) if self.prop == 1 or dacycle['time.restart']==False: - file = os.path.join(self.obsdir,self.pparam) + file = self.pparam f = io.ct_read(file, 'read') if dacycle.dasystem['make.obs']: prmval = f.get_variable('true_values')[:self.nparams] @@ -321,8 +320,15 @@ class CO2StateVector(StateVector): if dacycle['time.restart'] == True: P, z = self.read_red_noise_params(os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['da.restart.tstamp'].strftime('%Y%m%d')), lag) + # Make P so that the only covariances are the ones that we implied: + indices = np.where(covariancematrix != 0) + P_values = P[indices] + P[:] = 0 + P[indices] = P_values + P = self.nearestPD(P) #if self.prop == 1: - # deflation_factor = np.sum(P) / np.sum(covariancematrix) + #deflation_factor = np.sum(P) / np.sum(covariancematrix) + deflation_factor = np.sum(covariancematrix) / np.sum(P) logging.debug('Using deflation factor for red noise of {0:.3}'.format(deflation_factor)) else: # For the first cycle, the red noise is still white noise P, z = covariancematrix, self.create_white_noise((self.nparams, self.nmembers)) diff --git a/da/ccffdas/stilt-ops_urbanall.rc b/da/ccffdas/stilt-ops_urbanall.rc index d9d4ba5..7bff8bf 100755 --- a/da/ccffdas/stilt-ops_urbanall.rc +++ b/da/ccffdas/stilt-ops_urbanall.rc @@ -2,7 +2,6 @@ datadir : /projects/0/ctdas/RINGO/inversions/Data/ ! list of all observation sites -obs.input.id : /projects/0/ctdas/awoude/data/Ch1/Obsfiles/obsfiles2.csv ! number of observation sites included (smaller for testing); number of species included and to be used in inversion obs.input.nr : 1000 @@ -10,6 +9,7 @@ obs.spec.nr : 1 ! Set the name of the sampling strategy, so that the obsfiles can be found strategy : strat2b +obs.input.id : /projects/0/ctdas/awoude/data/Ch1/Obsfiles/obsfiles2.csv obs.dir : /projects/0/ctdas/awoude/data/Ch1/Obsfiles/${strategy}/ ! Flags to do certain species (only if included in the obs.dir AND obs.input.id) @@ -18,7 +18,7 @@ do.c14integrated: 0 do.c14targeted: 1 ! List the species ! Note that this only are the species that are used in the dynamic ff model, so radiocarbon is not a species to be listed here -obs.spec.name : CO2!,CO,NOx,SO2 +obs.spec.name : CO2,C14!,CO,NOx,SO2 ! number of emission categories defined in the emission model obs.cat.nr : 10 @@ -52,8 +52,9 @@ file.pft : ${datadir}/PFT_highres.nc random.seed : 0 !file with prior estimate of scaling factors (statevector) and covariances -emis.pparam : param_values.nc -ff.covariance : covariances2.nc +param.dir : /projects/0/ctdas/awoude/param_values/ +emis.pparam : ${param.dir}/param_values.nc +ff.covariance : ${param.dir}/covariances2.nc !file with emission model parameter values emis.paramfiles : emission_factors file.timeprofs : ${datadir}/CAMS_TEMPO_Tprof_subset.nc @@ -77,6 +78,8 @@ run.emisflag : 0 run.emisflagens : 1 run.obsflag : 0 +! Folder with footprints: +footdir : /projects/0/ctdas/awoude/footprints/ ! back trajectory time of STILT footprints, also applied to OPS (in hours) run.backtime : 24 biosphere_fluxdir : /projects/0/ctdas/RINGO/EmissionInventories/True/RINGO_ORCHIDEE_GPP_TER_dC14_old.nc @@ -93,5 +96,5 @@ run.propscheme : 2 ! white: temporal uncorrelated noise (so new members are completely random) ! red: temporal autocorrelated noise (so new members are drawn, based on the expected parameter deviations) ! If red, a correlation.coefficient is also needed. This float -1 < r < 1 indicates how much the noise correlates. -noise : white +noise : red correlation.coefficient: 0.75 -- GitLab