From 0a421b90c9716a2eb29da01c704a6adf97340392 Mon Sep 17 00:00:00 2001 From: Ingrid Luijx <ingrid.vanderlaan@wur.nl> Date: Fri, 31 Jan 2014 08:24:49 +0000 Subject: [PATCH] Added additional options for South America fwd runs, including option to run forward when no observation is found in that cycle. --- da/analysis/expand_fluxes.py | 18 +++ da/carbondioxide/obspack_geocarbon.py | 187 +++++++++++++------------- da/carbondioxide/statevector.py | 39 ++++++ da/rc/NamingScheme.wp_Mar2011.rc | 2 + da/tools/pipeline.py | 26 +++- 5 files changed, 172 insertions(+), 100 deletions(-) diff --git a/da/analysis/expand_fluxes.py b/da/analysis/expand_fluxes.py index 2c91aee8..5a032768 100755 --- a/da/analysis/expand_fluxes.py +++ b/da/analysis/expand_fluxes.py @@ -99,7 +99,16 @@ def save_weekly_avg_1x1_data(dacycle, statevector): fire = np.array(file.get_variable(dacycle.dasystem['background.co2.fires.flux'])) fossil = np.array(file.get_variable(dacycle.dasystem['background.co2.fossil.flux'])) #mapped_parameters = np.array(file.get_variable(dacycle.dasystem['final.param.mean.1x1'])) + if dacycle.dasystem['background.co2.biosam.flux'] in file.variables.keys(): + sam = True + biosam = np.array(file.get_variable(dacycle.dasystem['background.co2.biosam.flux'])) + firesam = np.array(file.get_variable(dacycle.dasystem['background.co2.firesam.flux'])) + else: sam = False file.close() + + if sam: + bio = bio + biosam + fire = fire + firesam next = ncf.inq_unlimlen()[0] @@ -278,8 +287,17 @@ def save_weekly_avg_state_data(dacycle, statevector): fire = np.array(file.get_variable(dacycle.dasystem['background.co2.fires.flux'])) fossil = np.array(file.get_variable(dacycle.dasystem['background.co2.fossil.flux'])) #mapped_parameters = np.array(file.get_variable(dacycle.dasystem['final.param.mean.1x1'])) + if dacycle.dasystem['background.co2.biosam.flux'] in file.variables.keys(): + sam = True + biosam = np.array(file.get_variable(dacycle.dasystem['background.co2.biosam.flux'])) + firesam = np.array(file.get_variable(dacycle.dasystem['background.co2.firesam.flux'])) + else: sam = False file.close() + if sam: + bio = bio + biosam + fire = fire + firesam + next = ncf.inq_unlimlen()[0] vectorbio = statevector.grid2vector(griddata=bio * area, method='sum') diff --git a/da/carbondioxide/obspack_geocarbon.py b/da/carbondioxide/obspack_geocarbon.py index 3f7d3fa0..ae4fcf6d 100755 --- a/da/carbondioxide/obspack_geocarbon.py +++ b/da/carbondioxide/obspack_geocarbon.py @@ -148,103 +148,104 @@ class ObsPackObservations(Observations): """ - f = io.CT_CDF(obsinputfile, method='create') - logging.debug('Creating new observations file for ObservationOperator (%s)' % obsinputfile) - - dimid = f.add_dim('obs', len(self.datalist)) - dim200char = f.add_dim('string_of200chars', 200) - dim10char = f.add_dim('string_of10chars', 10) - dimcalcomp = f.add_dim('calendar_components', 6) - if len(self.datalist) == 0: - f.close() + #f.close() #return obsinputfile + logging.debug("No observations found for this time period, nothing written to obs file") + else: + f = io.CT_CDF(obsinputfile, method='create') + logging.debug('Creating new observations file for ObservationOperator (%s)' % obsinputfile) + + dimid = f.add_dim('obs', len(self.datalist)) + dim200char = f.add_dim('string_of200chars', 200) + dim10char = f.add_dim('string_of10chars', 10) + dimcalcomp = f.add_dim('calendar_components', 6) + + for key, value in self.site_move.iteritems(): + msg = "Site is moved by %3.2f degrees latitude and %3.2f degrees longitude" % value + f.add_attribute(key, msg) + + data = self.getvalues('id') + + savedict = io.std_savedict.copy() + savedict['name'] = "obs_num" + savedict['dtype'] = "int" + savedict['long_name'] = "Unique_Dataset_observation_index_number" + savedict['units'] = "" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED." + f.add_data(savedict) + + data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate') ] + + savedict = io.std_savedict.copy() + savedict['dtype'] = "int" + savedict['name'] = "date_components" + savedict['units'] = "integer components of UTC date/time" + savedict['dims'] = dimid + dimcalcomp + savedict['values'] = data + savedict['missing_value'] = -9 + savedict['comment'] = "Calendar date components as integers. Times and dates are UTC." + savedict['order'] = "year, month, day, hour, minute, second" + f.add_data(savedict) + + data = self.getvalues('lat') + + savedict = io.std_savedict.copy() + savedict['name'] = "latitude" + savedict['units'] = "degrees_north" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -999.9 + f.add_data(savedict) + + data = self.getvalues('lon') + + savedict = io.std_savedict.copy() + savedict['name'] = "longitude" + savedict['units'] = "degrees_east" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -999.9 + f.add_data(savedict) + + data = self.getvalues('height') + + savedict = io.std_savedict.copy() + savedict['name'] = "altitude" + savedict['units'] = "meters_above_sea_level" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -999.9 + f.add_data(savedict) + + data = self.getvalues('samplingstrategy') + + savedict = io.std_savedict.copy() + savedict['dtype'] = "int" + savedict['name'] = "sampling_strategy" + savedict['units'] = "NA" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -9 + f.add_data(savedict) + + data = self.getvalues('evn') + + savedict = io.std_savedict.copy() + savedict['dtype'] = "char" + savedict['name'] = "obs_id" + savedict['units'] = "ObsPack datapoint identifier" + savedict['dims'] = dimid + dim200char + savedict['values'] = data + savedict['missing_value'] = '!' + f.add_data(savedict) - for key, value in self.site_move.iteritems(): - msg = "Site is moved by %3.2f degrees latitude and %3.2f degrees longitude" % value - f.add_attribute(key, msg) - - data = self.getvalues('id') - - savedict = io.std_savedict.copy() - savedict['name'] = "obs_num" - savedict['dtype'] = "int" - savedict['long_name'] = "Unique_Dataset_observation_index_number" - savedict['units'] = "" - savedict['dims'] = dimid - savedict['values'] = data.tolist() - savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED." - f.add_data(savedict) - - data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate') ] - - savedict = io.std_savedict.copy() - savedict['dtype'] = "int" - savedict['name'] = "date_components" - savedict['units'] = "integer components of UTC date/time" - savedict['dims'] = dimid + dimcalcomp - savedict['values'] = data - savedict['missing_value'] = -9 - savedict['comment'] = "Calendar date components as integers. Times and dates are UTC." - savedict['order'] = "year, month, day, hour, minute, second" - f.add_data(savedict) - - data = self.getvalues('lat') - - savedict = io.std_savedict.copy() - savedict['name'] = "latitude" - savedict['units'] = "degrees_north" - savedict['dims'] = dimid - savedict['values'] = data.tolist() - savedict['missing_value'] = -999.9 - f.add_data(savedict) - - data = self.getvalues('lon') - - savedict = io.std_savedict.copy() - savedict['name'] = "longitude" - savedict['units'] = "degrees_east" - savedict['dims'] = dimid - savedict['values'] = data.tolist() - savedict['missing_value'] = -999.9 - f.add_data(savedict) - - data = self.getvalues('height') - - savedict = io.std_savedict.copy() - savedict['name'] = "altitude" - savedict['units'] = "meters_above_sea_level" - savedict['dims'] = dimid - savedict['values'] = data.tolist() - savedict['missing_value'] = -999.9 - f.add_data(savedict) - - data = self.getvalues('samplingstrategy') - - savedict = io.std_savedict.copy() - savedict['dtype'] = "int" - savedict['name'] = "sampling_strategy" - savedict['units'] = "NA" - savedict['dims'] = dimid - savedict['values'] = data.tolist() - savedict['missing_value'] = -9 - f.add_data(savedict) - - data = self.getvalues('evn') - - savedict = io.std_savedict.copy() - savedict['dtype'] = "char" - savedict['name'] = "obs_id" - savedict['units'] = "ObsPack datapoint identifier" - savedict['dims'] = dimid + dim200char - savedict['values'] = data - savedict['missing_value'] = '!' - f.add_data(savedict) - - f.close() + f.close() - logging.debug("Successfully wrote data to obs file") - logging.info("Sample input file for obs operator now in place [%s]" % obsinputfile) + logging.debug("Successfully wrote data to obs file") + logging.info("Sample input file for obs operator now in place [%s]" % obsinputfile) diff --git a/da/carbondioxide/statevector.py b/da/carbondioxide/statevector.py index 78aafedf..b7b290d5 100755 --- a/da/carbondioxide/statevector.py +++ b/da/carbondioxide/statevector.py @@ -134,6 +134,45 @@ class CO2StateVector(StateVector): f.close() logging.info('Successfully read the State Vector from file (%s) ' % filename) + + def read_from_file_exceptsam(self, filename, qual='opt'): + """ + :param filename: the full filename for the input NetCDF file + :param qual: a string indicating whether to read the 'prior' or 'opt'(imized) StateVector from file + :rtype: None + + Read the StateVector information from a NetCDF file and put in a StateVector object + In principle the input file will have only one four datasets inside + called: + * `meanstate_prior`, dimensions [nlag, nparamaters] + * `ensemblestate_prior`, dimensions [nlag,nmembers, nparameters] + * `meanstate_opt`, dimensions [nlag, nparamaters] + * `ensemblestate_opt`, dimensions [nlag,nmembers, nparameters] + + This NetCDF information can be written to file using + :meth:`~da.baseclasses.statevector.StateVector.write_to_file` + + """ + + f = io.ct_read(filename, 'read') + + meanstate = f.get_variable('statevectormean_' + qual) + meanstate[:,39:77] = 1 + ensmembers = f.get_variable('statevectorensemble_' + qual) + f.close() + + for n in range(self.nlag): + if not self.ensemble_members[n] == []: + self.ensemble_members[n] = [] + logging.warning('Existing ensemble for lag=%d was removed to make place for newly read data' % (n + 1)) + + for m in range(self.nmembers): + newmember = EnsembleMember(m) + newmember.param_values = ensmembers[n, m, :].flatten() + meanstate[n] # add the mean to the deviations to hold the full parameter values + self.ensemble_members[n].append(newmember) + + logging.info('Successfully read the State Vector from file (%s) ' % filename) + logging.info('State Vector set to 1 for South American regions') ################### End Class CO2StateVector ################### diff --git a/da/rc/NamingScheme.wp_Mar2011.rc b/da/rc/NamingScheme.wp_Mar2011.rc index 094cb6c5..b1908a12 100644 --- a/da/rc/NamingScheme.wp_Mar2011.rc +++ b/da/rc/NamingScheme.wp_Mar2011.rc @@ -46,7 +46,9 @@ final.co2_mixingratio.ensemble.simulated : dF_f background.co2.fossil.flux : flux_ff_prior_mean background.co2.fires.flux : flux_fires_prior_mean +background.co2.firesam.flux : flux_firesam_prior_mean background.co2.bio.flux : flux_bio_prior_mean +background.co2.biosam.flux : flux_biosam_prior_mean background.co2.ocean.flux : flux_ocean_prior_mean background.co2.res.flux : flux_res_prior_mean background.co2.gpp.flux : flux_gpp_prior_mean diff --git a/da/tools/pipeline.py b/da/tools/pipeline.py index 45738cef..d5304f01 100755 --- a/da/tools/pipeline.py +++ b/da/tools/pipeline.py @@ -44,6 +44,9 @@ def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsopera logging.info(header + "Initializing current cycle" + footer) start_job(dacycle, dasystem, platform, statevector, samples, obsoperator) + if dacycle.has_key('forward.savestate.exceptsam'): + sam = (dacycle['forward.savestate.exceptsam'].upper() in ["TRUE","T","YES","Y"]) + if dacycle.has_key('forward.savestate.dir'): fwddir = dacycle['forward.savestate.dir'] else: @@ -66,11 +69,14 @@ def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsopera # Read prior information from another simulation into the statevector. # This loads the results from another assimilation experiment into the current statevector - if not legacy: + if sam: + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') + statevector.read_from_file_exceptsam(filename, 'prior') + elif not legacy: filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d')) statevector.read_from_file(filename, 'prior') else: - filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc') + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') statevector.read_from_legacy_file(filename, 'prior') @@ -95,7 +101,9 @@ def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsopera # Read posterior information from another simulation into the statevector. # This loads the results from another assimilation experiment into the current statevector - if not legacy: + if sam: + statevector.read_from_file_exceptsam(filename, 'opt') + elif not legacy: statevector.read_from_file(filename, 'opt') else: statevector.read_from_legacy_file(filename, 'opt') @@ -318,7 +326,9 @@ def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False): # We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates # one file per member, some logic needs to be included to merge all files!!! simulated_file = os.path.join(obsoperator.outputdir, 'flask_output.%s.nc' % dacycle['time.sample.stamp']) - samples.add_simulations(simulated_file) + if os.path.exists(sampling_coords_file): + samples.add_simulations(simulated_file) + else: logging.warning("No simulations added, because input file does not exist (no samples found in obspack)") # Now write a small file that holds for each observation a number of values that we need in postprocessing @@ -398,9 +408,11 @@ def advance(dacycle, samples, statevector, obsoperator): dacycle.output_filelist.append(dacycle['ObsOperator.inputfile']) logging.debug("Appended Observation filename to dacycle for collection ") - outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp']) - samples.write_sample_auxiliary(outfile) - + sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp']) + if os.path.exists(sampling_coords_file): + outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp']) + samples.write_sample_auxiliary(outfile) + else: logging.warning("Sample auxiliary output not written, because input file does not exist (no samples found in obspack)") def save_and_submit(dacycle, statevector): """ Save the model state and submit the next job """ -- GitLab