diff --git a/da/stilt/expand_molefractions.py b/da/stilt/expand_molefractions.py new file mode 100755 index 0000000000000000000000000000000000000000..2df7daa9cefd26a16921759ab69b506806a36570 --- /dev/null +++ b/da/stilt/expand_molefractions.py @@ -0,0 +1,346 @@ +#!/usr/bin/env python +# expand_fluxes.py +import sys +import os +import getopt +import shutil +import logging +import netCDF4 +import numpy as np +from string import join +from datetime import datetime, timedelta +sys.path.append('../../') +from da.tools.general import date2num, num2date +from da.tools.general import create_dirs +import da.tools.io4 as io + + +""" +Author: Wouter Peters (Wouter.Peters@wur.nl) + +Revision History: +File created on 11 May 2012. + +""" + +def write_mole_fractions(dacycle): + """ + + Write Sample information to NetCDF files. These files are organized by site and + have an unlimited time axis to which data is appended each cycle. + + The needed information is obtained from the sample_auxiliary.nc files and the original input data files from ObsPack. + + The steps are: + + (1) Create a directory to hold timeseries output files + (2) Read the sample_auxiliary.nc file for this cycle and get a list of original files they were obtained from + (3) For each file, copy the original data file from ObsPack (if not yet present) + (4) Open the copied file, find the index of each observation, fill in the simulated data + + """ + + dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_molefractions')) +# +# Some help variables +# + dectime0 = date2num(datetime(2000, 1, 1)) + dt = dacycle['cyclelength'] + startdate = dacycle['time.start'] + enddate = dacycle['time.end'] + + logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M')) + logging.debug("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M')) + + dacycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H"),) + + # Step (1): Get the posterior sample output data file for this cycle + + infile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp']) + + ncf_in = io.ct_read(infile, 'read') + + obs_num = ncf_in.get_variable('obs_num') + obs_val = ncf_in.get_variable('observed') + simulated = ncf_in.get_variable('modelsamples') + infilename = ncf_in.get_variable('inputfilename') + infiles = netCDF4.chartostring(infilename).tolist() + + ncf_in.close() + + # Step (2): Get the prior sample output data file for this cycle + + infile = os.path.join(dacycle['dir.output'], 'optimizer.%s.nc' % startdate.strftime('%Y%m%d')) + + if os.path.exists(infile): + optimized_present = True + else: + optimized_present = False + + if optimized_present: + + ncf_fc_in = io.ct_read(infile, 'read') + + fc_obs_num = ncf_fc_in.get_variable('obspack_num') + fc_obs_val = ncf_fc_in.get_variable('observed') + fc_simulated = ncf_fc_in.get_variable('modelsamplesmean_prior') + fc_simulated_ens = ncf_fc_in.get_variable('modelsamplesdeviations_prior') + fc_flag = ncf_fc_in.get_variable('flag') + if not dacycle.dasystem.has_key('opt.algorithm'): + fc_r = ncf_fc_in.get_variable('modeldatamismatchvariance') + fc_hphtr = ncf_fc_in.get_variable('totalmolefractionvariance') + elif dacycle.dasystem['opt.algorithm'] == 'serial': + fc_r = ncf_fc_in.get_variable('modeldatamismatchvariance') + fc_hphtr = ncf_fc_in.get_variable('totalmolefractionvariance') + elif dacycle.dasystem['opt.algorithm'] == 'bulk': + fc_r = ncf_fc_in.get_variable('modeldatamismatchvariance').diagonal() + fc_hphtr = ncf_fc_in.get_variable('totalmolefractionvariance').diagonal() + filesitecode = ncf_fc_in.get_variable('sitecode') + + fc_sitecodes = netCDF4.chartostring(filesitecode).tolist() + #fc_sitecodes = [join(s.compressed(),'') for s in filesitecode] + + ncf_fc_in.close() + + # Expand the list of input files with those available from the forecast list + + infiles_rootdir = os.path.split(infiles[0])[0] + infiles.extend(os.path.join(infiles_rootdir, f + '.nc') for f in fc_sitecodes) + + + #Step (2): For each observation timeseries we now have data for, open it and fill with data + + for orig_file in set(infiles): + + if not os.path.exists(orig_file): + logging.error("The original input file (%s) could not be found, continuing to next file..." % orig_file) + continue + + + copy_file = os.path.join(dirname, os.path.split(orig_file)[-1]) + if not os.path.exists(copy_file): + shutil.copy(orig_file, copy_file) + logging.debug("Copied a new original file (%s) to the analysis directory" % orig_file) + + ncf_out = io.CT_CDF(copy_file, 'write') + + # Modify the attributes of the file to reflect added data from CTDAS properly + + try: + host=os.environ['HOSTNAME'] + except: + host='unknown' + + + ncf_out.Caution = '===================================================================================' + try: + ncf_out.History += '\nOriginal observation file modified by user %s on %s\n' % (os.environ['USER'], datetime.today().strftime('%F'),) + except: + ncf_out.History = '\nOriginal observation file modified by user %s on %s\n' % (os.environ['USER'], datetime.today().strftime('%F'),) + ncf_out.CTDAS_info = 'Simulated values added from a CTDAS run by %s on %s\n' % (os.environ['USER'], datetime.today().strftime('%F'),)\ + + '\nCTDAS was run on platform %s' % (host,)\ + + '\nCTDAS job directory was %s' % (dacycle['dir.da_run'],)\ + + '\nCTDAS Da System was %s' % (dacycle['da.system'],)\ + + '\nCTDAS Da ObsOperator was %s' % (dacycle['da.obsoperator'],) + ncf_out.CTDAS_startdate = dacycle['time.start'].strftime('%F') + ncf_out.CTDAS_enddate = dacycle['time.finish'].strftime("%F") + ncf_out.original_file = orig_file + + + # get nobs dimension + + if ncf_out.dimensions.has_key('id'): + dimidob = ncf_out.dimensions['id'] + dimid = ('id',) + elif ncf_out.dimensions.has_key('obs'): + dimidob = ncf_out.dimensions['obs'] + dimid = ('obs',) + + if dimidob.isunlimited: + nobs = ncf_out.inq_unlimlen() + else: + nobs = len(dimid) + + + # add nmembers dimension + + dimmembersob = ncf_out.createDimension('nmembers', size=simulated.shape[1]) + dimmembers = ('nmembers',) + nmembers = len(dimmembers) + + # Create empty arrays for posterior samples, as well as for forecast sample statistics + + savedict = io.std_savedict.copy() + savedict['name'] = "flag_forecast" + savedict['long_name'] = "flag_for_obs_model in forecast" + savedict['units'] = "None" + savedict['dims'] = dimid + savedict['comment'] = 'Flag (0/1/2/99) for observation value, 0 means okay, 1 means QC error, 2 means rejected, 99 means not sampled' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modeldatamismatch" + savedict['long_name'] = "modeldatamismatch" + savedict['units'] = "[mol mol-1]^2" + savedict['dims'] = dimid + savedict['comment'] = 'Variance of mole fractions resulting from model-data mismatch' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "totalmolefractionvariance_forecast" + savedict['long_name'] = "totalmolefractionvariance of forecast" + savedict['units'] = "[mol mol-1]^2" + savedict['dims'] = dimid + savedict['comment'] = 'Variance of mole fractions resulting from prior state and model-data mismatch' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesmean" + savedict['long_name'] = "mean modelsamples" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['comment'] = 'simulated mole fractions based on optimized state vector' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesmean_forecast" + savedict['long_name'] = "mean modelsamples from forecast" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['comment'] = 'simulated mole fractions based on prior state vector' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesstandarddeviation" + savedict['long_name'] = "standard deviaton of modelsamples over all ensemble members" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['comment'] = 'std dev of simulated mole fractions based on optimized state vector' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesstandarddeviation_forecast" + savedict['long_name'] = "standard deviaton of modelsamples from forecast over all ensemble members" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['comment'] = 'std dev of simulated mole fractions based on prior state vector' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesensemble" + savedict['long_name'] = "modelsamples over all ensemble members" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + dimmembers + savedict['comment'] = 'ensemble of simulated mole fractions based on optimized state vector' + ncf_out.add_variable(savedict) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamplesensemble_forecast" + savedict['long_name'] = "modelsamples from forecast over all ensemble members" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + dimmembers + savedict['comment'] = 'ensemble of simulated mole fractions based on prior state vector' + ncf_out.add_variable(savedict) + + else: + logging.debug("Modifying existing file (%s) in the analysis directory" % copy_file) + + ncf_out = io.CT_CDF(copy_file, 'write') + + + # Get existing file obs_nums to determine match to local obs_nums + + if ncf_out.variables.has_key('id'): + file_obs_nums = ncf_out.get_variable('id') + elif ncf_out.variables.has_key('ccgg_evn'): + file_obs_nums = ncf_out.get_variable('ccgg_evn') + elif ncf_out.variables.has_key('obspack_num'): + file_obs_nums = ncf_out.get_variable('obspack_num') + + # Get all obs_nums related to this file, determine their indices in the local arrays + + selected_obs_nums = [num for infile, num in zip(infiles, obs_num) if infile == orig_file] + + + # Optimized data 1st: For each index, get the data and add to the file in the proper file index location + + for num in selected_obs_nums: + + model_index = obs_num.tolist().index(num) + file_index = file_obs_nums.tolist().index(num) + + #var = ncf_out.variables['modeldatamismatch'] # Take from optimizer.yyyymmdd.nc file instead + #var[file_index] = mdm[model_index] + + var = ncf_out.variables['modelsamplesmean'] + var[file_index] = simulated[model_index, 0] + + var = ncf_out.variables['modelsamplesstandarddeviation'] + var[file_index] = simulated[model_index, 1:].std() + + var = ncf_out.variables['modelsamplesensemble'] + var[file_index] = simulated[model_index, :] + + # Now forecast data too: For each index, get the data and add to the file in the proper file index location + + if optimized_present: + + selected_fc_obs_nums = [num for sitecode, num in zip(fc_sitecodes, fc_obs_num) if sitecode in orig_file] + + for num in selected_fc_obs_nums: + + model_index = fc_obs_num.tolist().index(num) + file_index = file_obs_nums.tolist().index(num) + + var = ncf_out.variables['modeldatamismatch'] + var[file_index] = np.sqrt(fc_r[model_index]) + + var = ncf_out.variables['modelsamplesmean_forecast'] + var[file_index] = fc_simulated[model_index] + + var = ncf_out.variables['modelsamplesstandarddeviation_forecast'] + var[file_index] = fc_simulated_ens[model_index, 1:].std() + + var = ncf_out.variables['modelsamplesensemble_forecast'] + var[file_index] = fc_simulated_ens[model_index, :] + + var = ncf_out.variables['totalmolefractionvariance_forecast'] + var[file_index] = fc_hphtr[model_index] + + var = ncf_out.variables['flag_forecast'] + var[file_index] = fc_flag[model_index] + + + # close the file + + status = ncf_out.close() + + return None + + + +if __name__ == "__main__": + import logging + from da.tools.initexit import CycleControl + from da.carbondioxide.dasystem import CO2DaSystem + + sys.path.append('../../') + + logging.root.setLevel(logging.DEBUG) + + dacycle = CycleControl(args={'rc':'../../ctdas-od-gfed2-glb6x4-obspack-full.rc'}) + dasystem = CO2DaSystem('../rc/carbontracker_ct09_opfnew.rc') + dacycle.dasystem = dasystem + dacycle.setup() + dacycle.parse_times() + + + + while dacycle['time.start'] < dacycle['time.finish']: + + outfile = write_mole_fractions(dacycle) + + dacycle.advance_cycle_times() + + sys.exit(0) + diff --git a/da/stilt/obspack_geocarbon.py b/da/stilt/obspack_geocarbon.py new file mode 100755 index 0000000000000000000000000000000000000000..46a18fbd6608f1576d004e80117f1a6c10321d06 --- /dev/null +++ b/da/stilt/obspack_geocarbon.py @@ -0,0 +1,554 @@ +#!/usr/bin/env python +# obs.py + +""" +Author : peters + +Revision History: +File created on 28 Jul 2010. + +""" +import os +import sys +import logging + +import datetime as dtm +from string import strip +from numpy import array, logical_and +sys.path.append(os.getcwd()) +sys.path.append('../../') + +identifier = 'CarbonTracker CO2 mole fractions' +version = '0.0' + +from da.baseclasses.obs import Observations +import da.tools.io4 as io +import da.tools.rc as rc +################### Begin Class ObsPackObservations ################### + +class ObsPackObservations(Observations): + """ an object that holds data + methods and attributes needed to manipulate mole fraction values """ + + def setup(self, dacycle): + + self.startdate = dacycle['time.sample.start'] + self.enddate = dacycle['time.sample.end'] + + op_id = dacycle.dasystem['obspack.input.id'] + op_dir = dacycle.dasystem['obspack.input.dir'] + + if not os.path.exists(op_dir): + msg = 'Could not find the required ObsPack distribution (%s) ' % op_dir + logging.error(msg) + raise IOError, msg + else: + self.obspack_dir = op_dir + self.obspack_id = op_id + + self.datalist = [] + + def add_observations(self): + """ Returns a MoleFractionList holding individual MoleFractionSample objects for all obs in a file + + The ObsPack mole fraction files are provided as time series per site with all dates in sequence. + We will loop over all site files in the ObsPackage, and subset each to our needs + + """ + import string + + # Step 1: Read list of available site files in package + + infile = os.path.join(self.obspack_dir, 'summary', '%s_dataset_summary.txt' % (self.obspack_id,)) + f = open(infile, 'r') + lines = f.readlines() + f.close() + + ncfilelist = [] + for line in lines: + if line.startswith('#'): continue # header + + items = line.split() + #ncfile, lab , start_date, stop_date, data_comparison = items[0:5] + ncfile, lab , start_date, stop_date, data_comparison= line[:105].split() + + + ncfilelist += [ncfile] + + logging.debug("ObsPack dataset info read, proceeding with %d netcdf files" % len(ncfilelist)) + + for ncfile in ncfilelist: + + infile = os.path.join(self.obspack_dir, 'data', 'nc', ncfile + '.nc') + ncf = io.ct_read(infile, 'read') + idates = ncf.get_variable('time_components') + dates = array([dtm.datetime(*d) for d in idates]) + + subselect = logical_and(dates >= self.startdate , dates <= self.enddate).nonzero()[0] + + if len(subselect) == 0: + ncf.close() + continue + + logging.debug("Trying to add %d observations from file (%s) to the Data list" % (len(subselect), ncfile)) + + dates = dates.take(subselect, axis=0) + + #ccgg_evn = ncf.get_variable('obspack_num').take(subselect) # or should we propagate obs_num which is not unique across datasets?? + ccgg_evn = ncf.get_variable('ccgg_evn').take(subselect,axis=0) + + obspackid = ncf.get_variable('obspack_id').take(subselect, axis=0) + obspackid = [s.tostring().lower() for s in obspackid] + obspackid = map(strip, obspackid) + + datasetname = ncfile # use full name of dataset to propagate for clarity + lats = ncf.get_variable('latitude').take(subselect, axis=0) + lons = ncf.get_variable('longitude').take(subselect, axis=0) + alts = ncf.get_variable('altitude').take(subselect, axis=0) + obs = ncf.get_variable('value').take(subselect, axis=0) + species = ncf.get_attribute('dataset_parameter') + flags = ncf.get_variable('obs_flag').take(subselect, axis=0) + ncf.close() + + for n in range(len(dates)): + used_ids = self.getvalues('id',list) + if ccgg_evn[n] in used_ids: + ii = used_ids.index(ccgg_evn[n]) + logging.error("Error when reading from file: %s"%ncfile) + logging.error("This sample ID (%d) is not unique"%ccgg_evn[n]) + logging.error("Previously used from file: %s"%self.datalist[ii].fromfile) + logging.error("...skipping") + raise IOError + else: + self.datalist.append(MoleFractionSample(ccgg_evn[n], dates[n], datasetname, obs[n], 0.0, 0.0, 0.0, 0.0, flags[n], alts[n], lats[n], lons[n], obspackid[n], species, 1, 0.0, infile)) + + + logging.info("Observations list now holds %d values" % len(self.datalist)) + + def add_simulations(self, filename, silent=False): + """ Adds model simulated values to the mole fraction objects """ + + + if not os.path.exists(filename): + msg = "Sample output filename for observations could not be found : %s" % filename + logging.error(msg) + logging.error("Did the sampling step succeed?") + logging.error("...exiting") + raise IOError, msg + + ncf = io.ct_read(filename, method='read') + ids = ncf.get_variable('obs_num') + simulated = ncf.get_variable('flask') + ncf.close() + logging.info("Successfully read data from model sample file (%s)" % filename) + + obs_ids = self.getvalues('id',list) + ids = map(int, ids) + + missing_samples = [] + + for idx, val in zip(ids, simulated): + if idx in obs_ids: + index = obs_ids.index(idx) + self.datalist[index].simulated = val + else: + missing_samples.append(idx) + + if not silent and missing_samples != []: + logging.warning('Model samples were found that did not match any ID in the observation list. Skipping them...') + #msg = '%s'%missing_samples ; logging.warning(msg) + + logging.debug("Added %d simulated values to the Data list" % (len(ids) - len(missing_samples))) + + def write_sample_coords(self, obsinputfile): + """ + Write the information needed by the observation operator to a file. Return the filename that was written for later use + + """ + + if len(self.datalist) == 0: + #f.close() + #return obsinputfile + logging.debug("No observations found for this time period, nothing written to obs file") + else: + f = io.CT_CDF(obsinputfile, method='create') + logging.debug('Creating new observations file for ObservationOperator (%s)' % obsinputfile) + + dimid = f.add_dim('obs', len(self.datalist)) + dim200char = f.add_dim('string_of200chars', 200) + dim10char = f.add_dim('string_of10chars', 10) + dimcalcomp = f.add_dim('calendar_components', 6) + + for key, value in self.site_move.iteritems(): + msg = "Site is moved by %3.2f degrees latitude and %3.2f degrees longitude" % value + f.add_attribute(key, msg) + + data = self.getvalues('id') + + savedict = io.std_savedict.copy() + savedict['name'] = "obs_num" + savedict['dtype'] = "int" + savedict['long_name'] = "Unique_Dataset_observation_index_number" + savedict['units'] = "" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED." + f.add_data(savedict) + + data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate') ] + + savedict = io.std_savedict.copy() + savedict['dtype'] = "int" + savedict['name'] = "date_components" + savedict['units'] = "integer components of UTC date/time" + savedict['dims'] = dimid + dimcalcomp + savedict['values'] = data + savedict['missing_value'] = -9 + savedict['comment'] = "Calendar date components as integers. Times and dates are UTC." + savedict['order'] = "year, month, day, hour, minute, second" + f.add_data(savedict) + + data = self.getvalues('lat') + + savedict = io.std_savedict.copy() + savedict['name'] = "latitude" + savedict['units'] = "degrees_north" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -999.9 + f.add_data(savedict) + + data = self.getvalues('lon') + + savedict = io.std_savedict.copy() + savedict['name'] = "longitude" + savedict['units'] = "degrees_east" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -999.9 + f.add_data(savedict) + + data = self.getvalues('height') + + savedict = io.std_savedict.copy() + savedict['name'] = "altitude" + savedict['units'] = "meters_above_sea_level" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -999.9 + f.add_data(savedict) + + data = self.getvalues('samplingstrategy') + + savedict = io.std_savedict.copy() + savedict['dtype'] = "int" + savedict['name'] = "sampling_strategy" + savedict['units'] = "NA" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['missing_value'] = -9 + f.add_data(savedict) + + data = self.getvalues('evn') + + savedict = io.std_savedict.copy() + savedict['dtype'] = "char" + savedict['name'] = "obs_id" + savedict['units'] = "ObsPack datapoint identifier" + savedict['dims'] = dimid + dim200char + savedict['values'] = data + savedict['missing_value'] = '!' + f.add_data(savedict) + + data = self.getvalues('obs') + + savedict = io.std_savedict.copy() + savedict['name'] = "observed" + savedict['long_name'] = "observedvalues" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = 'Observations used in optimization' + f.add_data(savedict) + + data = self.getvalues('mdm') + + savedict = io.std_savedict.copy() + savedict['name'] = "modeldatamismatch" + savedict['long_name'] = "modeldatamismatch" + savedict['units'] = "[mol mol-1]" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = 'Standard deviation of mole fractions resulting from model-data mismatch' + f.add_data(savedict) + + f.close() + + logging.debug("Successfully wrote data to obs file") + logging.info("Sample input file for obs operator now in place [%s]" % obsinputfile) + + + + def add_model_data_mismatch(self, filename): + """ + Get the model-data mismatch values for this cycle. + + (1) Open a sites_weights file + (2) Parse the data + (3) Compare site list against data + (4) Take care of double sites, etc + + """ + + if not os.path.exists(filename): + msg = 'Could not find the required sites.rc input file (%s) ' % filename + logging.error(msg) + raise IOError, msg + else: + self.sites_file = filename + + sites_weights = rc.read(self.sites_file) + + self.rejection_threshold = int(sites_weights['obs.rejection.threshold']) + self.global_R_scaling = float(sites_weights['global.R.scaling']) + self.n_site_categories = int(sites_weights['n.site.categories']) + + logging.debug('Model-data mismatch rejection threshold: %d ' % self.rejection_threshold) + logging.warning('Model-data mismatch scaling factor : %f ' % self.global_R_scaling) + logging.debug('Model-data mismatch site categories : %d ' % self.n_site_categories) + + cats = [k for k in sites_weights.keys() if 'site.category' in k] + + site_categories = {} + for key in cats: + name, error, may_localize, may_reject = sites_weights[key].split(';') + name = name.strip().lower() + error = float(error) + may_reject = ("TRUE" in may_reject.upper()) + may_localize = ("TRUE" in may_localize.upper()) + site_categories[name] = {'category': name, 'error': error, 'may_localize': may_localize, 'may_reject': may_reject} + + site_info = {} + site_move = {} + site_hourly = {} # option added to include only certain hours of the day (for e.g. PAL) IvdL + site_incalt = {} # option to increase sampling altitude for sites specified in sites and weights file + for key, value in sites_weights.iteritems(): + if 'co2_' in key or 'sf6' in key: # to be fixed later, do not yet know how to parse valid keys from rc-files yet.... WP + sitename, sitecategory = key, value + sitename = sitename.strip() + sitecategory = sitecategory.split()[0].strip().lower() + site_info[sitename] = site_categories[sitecategory] + if 'site.move' in key: + identifier, latmove, lonmove = value.split(';') + site_move[identifier.strip()] = (float(latmove), float(lonmove)) + if 'site.hourly' in key: + identifier, hourfrom, hourto = value.split(';') + site_hourly[identifier.strip()] = (int(hourfrom), int(hourto)) + if 'site.incalt' in key: + identifier, incalt = value.split(';') + site_incalt[identifier.strip()] = (int(incalt)) + + for obs in self.datalist: # loop over all available data points + + obs.mdm = 1000.0 # default is very high model-data-mismatch, until explicitly set by script + obs.flag = 99 # default is do-not-use , until explicitly set by script + exclude_hourly = False # default is that hourly values are not included + + identifier = obs.code + species, site, method, lab, datasetnr = identifier.split('_') + + if site_info.has_key(identifier): + if site_hourly.has_key(identifier): + obs.samplingstrategy = 2 + hourf, hourt = site_hourly[identifier] + if int(obs.xdate.hour) >= hourf and int(obs.xdate.hour) <= hourt: + logging.warning("Observations in hourly dataset INCLUDED, while sampling time %s was between %s:00-%s:00"%(obs.xdate.time(),hourf,hourt)) + else: + logging.warning("Observation in hourly dataset EXCLUDED, while sampling time %s was outside %s:00-%s:00"%(obs.xdate.time(),hourf,hourt)) + exclude_hourly = True + if site_info[identifier]['category'] == 'do-not-use' or exclude_hourly: + logging.warning("Site found (%s, %d), but data point not used in assimilation !!!" % (identifier, obs.id)) + obs.mdm = site_info[identifier]['error'] * self.global_R_scaling + obs.may_localize = site_info[identifier]['may_localize'] + obs.may_reject = site_info[identifier]['may_reject'] + obs.flag = 99 + else: + logging.debug("Site found (%s, %d)" % (identifier, obs.id)) + obs.mdm = site_info[identifier]['error'] * self.global_R_scaling + obs.may_localize = site_info[identifier]['may_localize'] + obs.may_reject = site_info[identifier]['may_reject'] + obs.flag = 0 + + else: + logging.warning("Site NOT found (%s, %d), please check sites.rc file (%s) !!!" % (identifier, obs.id, self.sites_file)) + + if site_move.has_key(identifier): + + movelat, movelon = site_move[identifier] + obs.lat = obs.lat + movelat + obs.lon = obs.lon + movelon + + logging.warning("Observation location for (%s, %d), is moved by %3.2f degrees latitude and %3.2f degrees longitude" % (identifier, obs.id, movelat, movelon)) + + if site_incalt.has_key(identifier): + + incalt = site_incalt[identifier] + obs.height = obs.height + incalt + + logging.warning("Observation location for (%s, %d), is moved by %3.2f meters in altitude" % (identifier, obs.id, incalt)) + + + # Add site_info dictionary to the Observations object for future use + + self.site_info = site_info + self.site_move = site_move + self.site_hourly = site_hourly + self.site_incalt = site_incalt + + logging.debug("Added Model Data Mismatch to all samples ") + + def write_sample_auxiliary(self, auxoutputfile): + """ + Write selected information contained in the Observations object to a file. + + """ + + f = io.CT_CDF(auxoutputfile, method='create') + logging.debug('Creating new auxiliary sample output file for postprocessing (%s)' % auxoutputfile) + + dimid = f.add_dim('obs', len(self.datalist)) + dim200char = f.add_dim('string_of200chars', 200) + dim10char = f.add_dim('string_of10chars', 10) + dimcalcomp = f.add_dim('calendar_components', 6) + + if len(self.datalist) == 0: + f.close() + #return outfile + + for key, value in self.site_move.iteritems(): + msg = "Site is moved by %3.2f degrees latitude and %3.2f degrees longitude" % value + f.add_attribute(key, msg) + + data = self.getvalues('id') + + savedict = io.std_savedict.copy() + savedict['name'] = "obs_num" + savedict['dtype'] = "int" + savedict['long_name'] = "Unique_Dataset_observation_index_number" + savedict['units'] = "" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED." + f.add_data(savedict) + + data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate')] + + savedict = io.std_savedict.copy() + savedict['dtype'] = "int" + savedict['name'] = "date_components" + savedict['units'] = "integer components of UTC date/time" + savedict['dims'] = dimid + dimcalcomp + savedict['values'] = data + savedict['missing_value'] = -9 + savedict['comment'] = "Calendar date components as integers. Times and dates are UTC." + savedict['order'] = "year, month, day, hour, minute, second" + f.add_data(savedict) + + data = self.getvalues('obs') + + savedict = io.std_savedict.copy() + savedict['name'] = "observed" + savedict['long_name'] = "observedvalues" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = 'Observations used in optimization' + f.add_data(savedict) + + data = self.getvalues('mdm') + + savedict = io.std_savedict.copy() + savedict['name'] = "modeldatamismatch" + savedict['long_name'] = "modeldatamismatch" + savedict['units'] = "[mol mol-1]" + savedict['dims'] = dimid + savedict['values'] = data.tolist() + savedict['comment'] = 'Standard deviation of mole fractions resulting from model-data mismatch' + f.add_data(savedict) + + data = self.getvalues('simulated') + + dimmembers = f.add_dim('members', data.shape[1]) + + savedict = io.std_savedict.copy() + savedict['name'] = "modelsamples" + savedict['long_name'] = "modelsamples for all ensemble members" + savedict['units'] = "mol mol-1" + savedict['dims'] = dimid + dimmembers + savedict['values'] = data.tolist() + savedict['comment'] = 'simulated mole fractions based on optimized state vector' + f.add_data(savedict) + + data = self.getvalues('fromfile') + + savedict = io.std_savedict.copy() + savedict['name'] = "inputfilename" + savedict['long_name'] = "name of file where original obs data was taken from" + savedict['dtype'] = "char" + savedict['dims'] = dimid + dim200char + savedict['values'] = data + savedict['missing_value'] = '!' + f.add_data(savedict) + + f.close() + + logging.debug("Successfully wrote data to auxiliary sample output file (%s)" % auxoutputfile) + + #return outfile + + + +################### End Class CtObservations ################### + + + +################### Begin Class MoleFractionSample ################### + +class MoleFractionSample(object): + """ + Holds the data that defines a mole fraction Sample in the data assimilation framework. Sor far, this includes all + attributes listed below in the __init__ method. One can additionally make more types of data, or make new + objects for specific projects. + + """ + + def __init__(self, idx, xdate, code='XXX', obs=0.0, simulated=0.0, resid=0.0, hphr=0.0, mdm=0.0, flag=0, height=0.0, lat= -999., lon= -999., evn='0000', species='co2', samplingstrategy=1, sdev=0.0, fromfile='none.nc'): + self.code = code.strip() # dataset identifier, i.e., co2_lef_tower_insitu_1_99 + self.xdate = xdate # Date of obs + self.obs = obs # Value observed + self.simulated = simulated # Value simulated by model + self.resid = resid # Mole fraction residuals + self.hphr = hphr # Mole fraction prior uncertainty from fluxes and (HPH) and model data mismatch (R) + self.mdm = mdm # Model data mismatch + self.may_localize = True # Whether sample may be localized in optimizer + self.may_reject = True # Whether sample may be rejected if outside threshold + self.flag = flag # Flag + self.height = height # Sample height in masl + self.lat = lat # Sample lat + self.lon = lon # Sample lon + self.id = idx # Obspack ID within distrution (integer), e.g., 82536 + self.evn = evn # Obspack Number within distrution (string), e.g., obspack_co2_1_PROTOTYPE_v0.9.2_2012-07-26_99_82536 + self.sdev = sdev # standard deviation of ensemble + self.masl = True # Sample is in Meters Above Sea Level + self.mag = not self.masl # Sample is in Meters Above Ground + self.species = species.strip() + self.samplingstrategy = samplingstrategy + self.fromfile = fromfile # netcdf filename inside ObsPack distribution, to write back later + +################### End Class MoleFractionSample ################### + + +if __name__ == "__main__": + pass + + + diff --git a/da/stilt/pipeline.py b/da/stilt/pipeline.py new file mode 100755 index 0000000000000000000000000000000000000000..7f1cb70f4e317e88e53b436a8b267587369893f5 --- /dev/null +++ b/da/stilt/pipeline.py @@ -0,0 +1,441 @@ +#!/usr/bin/env python +# pipeline.py + +""" +.. module:: pipeline +.. moduleauthor:: Wouter Peters + +Revision History: +File created on 06 Sep 2010. + +The pipeline module holds methods that execute consecutive tasks with each of the objects of the DA system. + +""" +import logging +import os +import sys +import datetime +import copy + +header = '\n\n *************************************** ' +footer = ' *************************************** \n ' + +def ensemble_smoother_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator, optimizer): + """ The main point of entry for the pipeline """ + sys.path.append(os.getcwd()) + + logging.info(header + "Initializing current cycle" + footer) + start_job(dacycle, dasystem, platform, statevector, samples, obsoperator) + + prepare_state(dacycle, statevector) + sample_state(dacycle, samples, statevector, obsoperator) + + invert(dacycle, statevector, optimizer) + + advance(dacycle, samples, statevector, obsoperator) + + save_and_submit(dacycle, statevector) + logging.info("Cycle finished...exiting pipeline") + +def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator): + """ The main point of entry for the pipeline """ + sys.path.append(os.getcwd()) + + logging.info(header + "Initializing current cycle" + footer) + start_job(dacycle, dasystem, platform, statevector, samples, obsoperator) + + if dacycle.has_key('forward.savestate.exceptsam'): + sam = (dacycle['forward.savestate.exceptsam'].upper() in ["TRUE","T","YES","Y"]) + else: + sam = False + + if dacycle.has_key('forward.savestate.dir'): + fwddir = dacycle['forward.savestate.dir'] + else: + logging.debug("No forward.savestate.dir key found in rc-file, proceeding with self-constructed prior parameters") + fwddir = False + + if dacycle.has_key('forward.savestate.legacy'): + legacy = (dacycle['forward.savestate.legacy'].upper() in ["TRUE","T","YES","Y"]) + else: + legacy = False + logging.debug("No forward.savestate.legacy key found in rc-file") + + if not fwddir: + # Simply make a prior statevector using the normal method + + + prepare_state(dacycle, statevector)#LU tutaj zamiast tego raczej to stworzenie nowej kowariancji i ensembli bo pozostale rzeczy sa na gorze i na doel. + + else: + # Read prior information from another simulation into the statevector. + # This loads the results from another assimilation experiment into the current statevector + + if sam: + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') + statevector.read_from_file_exceptsam(filename, 'prior') + elif not legacy: + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d')) + statevector.read_from_file(filename, 'prior') + else: + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') + statevector.read_from_legacy_file(filename, 'prior') + + + # We write this "prior" statevector to the restart directory, so we can later also populate it with the posterior statevector + # Note that we could achieve the same by just copying the wanted forward savestate.nc file to the restart folder of our current + # experiment, but then it would already contain a posterior field as well which we will try to write in save_and_submit. + # This could cause problems. Moreover, this method allows us to read older formatted savestate.nc files (legacy) and write them into + # the current format through the "write_to_file" method. + + savefilename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + statevector.write_to_file(savefilename, 'prior') + + # Now read optimized fluxes which we will actually use to propagate through the system + + if not fwddir: + + # if there is no forward dir specified, we simply run forward with unoptimized prior fluxes in the statevector + logging.info("Running forward with prior savestate from: %s"%savefilename) + + else: + + # Read posterior information from another simulation into the statevector. + # This loads the results from another assimilation experiment into the current statevector + + if sam: + statevector.read_from_file_exceptsam(filename, 'opt') + elif not legacy: + statevector.read_from_file(filename, 'opt') + else: + statevector.read_from_legacy_file(filename, 'opt') + + logging.info("Running forward with optimized savestate from: %s"%filename) + + # Finally, we run forward with these parameters + + advance(dacycle, samples, statevector, obsoperator) + + # In save_and_submit, the posterior statevector will be added to the savestate.nc file, and it is added to the copy list. + # This way, we have both the prior and posterior data from another run copied into this assimilation, for later analysis. + + save_and_submit(dacycle, statevector) + + logging.info("Cycle finished...exiting pipeline") +#################################################################################################### + +def analysis_pipeline(dacycle, platform, dasystem, samples, statevector): + """ Main entry point for analysis of ctdas results """ + + from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data + from da.analysis.expand_molefractions import write_mole_fractions + from da.analysis.summarize_obs import summarize_obs + from da.analysis.time_avg_fluxes import time_avg + + logging.info(header + "Starting analysis" + footer) + + dasystem.validate() + dacycle.dasystem = dasystem + dacycle.daplatform = platform + dacycle.setup() + statevector.setup(dacycle) + + logging.info(header + "Starting mole fractions" + footer) + + write_mole_fractions(dacycle) + #summarize_obs(dacycle['dir.analysis']) + + #logging.info(header + "Starting weekly averages" + footer) + + #save_weekly_avg_1x1_data(dacycle, statevector) + #save_weekly_avg_state_data(dacycle, statevector) + #save_weekly_avg_tc_data(dacycle, statevector) + #save_weekly_avg_ext_tc_data(dacycle) + #save_weekly_avg_agg_data(dacycle,region_aggregate='transcom') + #save_weekly_avg_agg_data(dacycle,region_aggregate='transcom_extended') + #save_weekly_avg_agg_data(dacycle,region_aggregate='olson') + #save_weekly_avg_agg_data(dacycle,region_aggregate='olson_extended') + #save_weekly_avg_agg_data(dacycle,region_aggregate='country') + + #logging.info(header + "Starting monthly and yearly averages" + footer) + + #time_avg(dacycle,'flux1x1') + #time_avg(dacycle,'transcom') + #time_avg(dacycle,'transcom_extended') + #time_avg(dacycle,'olson') + #time_avg(dacycle,'olson_extended') + #time_avg(dacycle,'country') + + logging.info(header + "Finished analysis" + footer) + +def archive_pipeline(dacycle, platform, dasystem): + """ Main entry point for archiving of output from one disk/system to another """ + + if not dacycle.has_key('task.rsync'): + logging.info('rsync task not found, not starting automatic backup...') + return + else: + logging.info('rsync task found, starting automatic backup...') + + for task in dacycle['task.rsync'].split(): + sourcedirs = dacycle['task.rsync.%s.sourcedirs'%task] + destdir = dacycle['task.rsync.%s.destinationdir'%task] + + rsyncflags = dacycle['task.rsync.%s.flags'%task] + + # file ID and names + jobid = dacycle['time.end'].strftime('%Y%m%d') + targetdir = os.path.join(dacycle['dir.exec']) + jobfile = os.path.join(targetdir, 'jb.rsync.%s.%s.jb' % (task,jobid) ) + logfile = os.path.join(targetdir, 'jb.rsync.%s.%s.log' % (task,jobid) ) + # Template and commands for job + jobparams = {'jobname':"r.%s" % jobid, 'jobnodes': '1', 'jobtime': '1:00:00', 'joblog': logfile, 'errfile': logfile} + + if platform.ID == 'cartesius': + jobparams['jobqueue'] = 'staging' + + template = platform.get_job_template(jobparams) + for sourcedir in sourcedirs.split(): + execcommand = "\nrsync %s %s %s\n" % (rsyncflags, sourcedir,destdir,) + template += execcommand + + # write and submit + platform.write_job(jobfile, template, jobid) + jobid = platform.submit_job(jobfile, joblog=logfile) + + +def start_job(dacycle, dasystem, platform, statevector, samples, obsoperator): + """ Set up the job specific directory structure and create an expanded rc-file """ + + dasystem.validate() + dacycle.dasystem = dasystem + dacycle.daplatform = platform + dacycle.setup() + #statevector.dacycle = dacycle # also embed object in statevector so it can access cycle information for I/O etc + #samples.dacycle = dacycle # also embed object in samples object so it can access cycle information for I/O etc + #obsoperator.dacycle = dacycle # also embed object in obsoperator object so it can access cycle information for I/O etc + obsoperator.setup(dacycle) # Setup Observation Operator + statevector.setup(dacycle) + +def prepare_state(dacycle, statevector): + """ Set up the input data for the forward model: obs and parameters/fluxes""" + + # We now have an empty statevector object that we need to populate with data. If this is a continuation from a previous cycle, we can read + # the previous statevector values from a NetCDF file in the restart directory. If this is the first cycle, we need to populate the statevector + # with new values for each week. After we have constructed the statevector, it will be propagated by one cycle length so it is ready to be used + # in the current cycle + + logging.info(header + "starting prepare_state" + footer) + + if not dacycle['time.restart']: + + # Fill each week from n=1 to n=nlag with a new ensemble + + for n in range(statevector.nlag): + date = dacycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(dacycle['time.cycle'])) + cov = statevector.get_covariance(date, dacycle) + statevector.make_new_ensemble(n, cov) + + else: + + # Read the statevector data from file + + #saved_sv = os.path.join(dacycle['dir.restart.current'], 'savestate.nc') + + + saved_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['da.restart.tstamp'].strftime('%Y%m%d')) + statevector.read_from_file(saved_sv) # by default will read "opt"(imized) variables, and then propagate + + # Now propagate the ensemble by one cycle to prepare for the current cycle + statevector.propagate(dacycle) + + # Finally, also write the statevector to a file so that we can always access the a-priori information + + current_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + statevector.write_to_file(current_sv, 'prior') # write prior info + +def sample_state(dacycle, samples, statevector, obsoperator): + """ Sample the filter state for the inversion """ + + + # Before a forecast step, save all the data to a save/tmp directory so we can later recover it before the propagation step. + # This is especially important for: + # (i) The transport model restart data which holds the background mole fractions. This is needed to run the model one cycle forward + # (ii) The random numbers (or the seed for the random number generator) so we can recreate the ensembles if needed + + #status = dacycle.MoveSaveData(io_option='store',save_option='partial',filter=[]) + #msg = "All restart data have been copied to the save/tmp directory for future use" ; logging.debug(msg) + logging.info(header + "starting sample_state" + footer) + nlag = int(dacycle['time.nlag']) + logging.info("Sampling model will be run over %d cycles" % nlag) + + obsoperator.get_initial_data() + + for lag in range(nlag): + logging.info(header + ".....Ensemble Kalman Filter at lag %d" % (lag + 1)) + + ############# Perform the actual sampling loop ##################### + + sample_step(dacycle, samples, statevector, obsoperator, lag) + + logging.debug("statevector now carries %d samples" % statevector.nobs) + + +def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False): + """ Perform all actions needed to sample one cycle """ + + + # First set up the information for time start and time end of this sample + dacycle.set_sample_times(lag) + + startdate = dacycle['time.sample.start'] + enddate = dacycle['time.sample.end'] + dacycle['time.sample.window'] = lag + dacycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H")) + + logging.info("New simulation interval set : ") + logging.info(" start date : %s " % startdate.strftime('%F %H:%M')) + logging.info(" end date : %s " % enddate.strftime('%F %H:%M')) + logging.info(" file stamp: %s " % dacycle['time.sample.stamp']) + + + # Implement something that writes the ensemble member parameter info to file, or manipulates them further into the + # type of info needed in your transport model + + statevector.write_members_to_file(lag, dacycle['dir.input'],'.%s.nc'%dacycle['time.sample.stamp']) + + # nothing needed anymore after writing parameters for Lagrangian systems + if not advance and lag != int(dacycle['time.nlag'])-1 and obsoperator.ID == 'WRF-STILT': + statevector.obs_to_assimilate = (None,) + logging.info("Skipping the rest of the sampling step for %s"%obsoperator.ID) + logging.info(" ... No observations needed for this cycle of lag") + logging.info(" ... No simulations needed for this cycle of lag") + logging.info(" ... No residuals needed for this cycle of lag") + logging.info(" ... Proceeding to next cycle of lag") + return + + samples.setup(dacycle) + samples.add_observations() + + # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future?? + + samples.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc']) + + sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp']) + samples.write_sample_coords(sampling_coords_file) + # Write filename to dacycle, and to output collection list + dacycle['ObsOperator.inputfile'] = sampling_coords_file + + # Run the observation operator + + obsoperator.run_forecast_model() + + + # Read forecast model samples that were written to NetCDF files by each member. Add them to the exisiting + # Observation object for each sample loop. This data fill be written to file in the output folder for each sample cycle. + + + # We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates + # one file per member, some logic needs to be included to merge all files!!! + + if os.path.exists(sampling_coords_file): + samples.add_simulations(obsoperator.simulated_file) + else: logging.warning("No simulations added, because input file does not exist (no samples found in obspack)") + + # Now write a small file that holds for each observation a number of values that we need in postprocessing + + #filename = samples.write_sample_coords() + + # Now add the observations that need to be assimilated to the statevector. + # Note that obs will only be added to the statevector if either this is the first step (restart=False), or lag==nlag + # This is to make sure that the first step of the system uses all observations available, while the subsequent + # steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only + # (and at least) once # in the assimilation + + + if not advance: + if dacycle['time.restart'] == False or lag == int(dacycle['time.nlag']) - 1: + statevector.obs_to_assimilate += (copy.deepcopy(samples),) + statevector.nobs += samples.getlength() + logging.debug("Added samples from the observation operator to the assimilated obs list in the statevector") + + else: + statevector.obs_to_assimilate += (None,) + + +def invert(dacycle, statevector, optimizer): + """ Perform the inverse calculation """ + logging.info(header + "starting invert" + footer) + dims = (int(dacycle['time.nlag']), + int(dacycle['da.optimizer.nmembers']), + int(dacycle.dasystem['nparameters']), + statevector.nobs) + + if not dacycle.dasystem.has_key('opt.algorithm'): + logging.info("There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)") + logging.info("...using serial algorithm as default...") + optimizer.set_algorithm('Serial') + elif dacycle.dasystem['opt.algorithm'] == 'serial': + logging.info("Using the serial minimum least squares algorithm to solve ENKF equations") + optimizer.set_algorithm('Serial') + elif dacycle.dasystem['opt.algorithm'] == 'bulk': + logging.info("Using the bulk minimum least squares algorithm to solve ENKF equations") + optimizer.set_algorithm('Bulk') + + optimizer.setup(dims) + optimizer.state_to_matrix(statevector) + + diagnostics_file = os.path.join(dacycle['dir.output'], 'optimizer.%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + + optimizer.write_diagnostics(diagnostics_file, 'prior') + optimizer.set_localization(dacycle['da.system.localization']) + + if optimizer.algorithm == 'Serial': + optimizer.serial_minimum_least_squares() + else: + optimizer.bulk_minimum_least_squares() + + optimizer.matrix_to_state(statevector) + optimizer.write_diagnostics(diagnostics_file, 'optimized') + + + +def advance(dacycle, samples, statevector, obsoperator): + """ Advance the filter state to the next step """ + + # This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance) + + # Then, restore model state from the start of the filter + logging.info(header + "starting advance" + footer) + logging.info("Sampling model will be run over 1 cycle") + + obsoperator.get_initial_data() + + sample_step(dacycle, samples, statevector, obsoperator, 0, True) + + dacycle.restart_filelist.extend(obsoperator.restart_filelist) + dacycle.output_filelist.extend(obsoperator.output_filelist) + logging.debug("Appended ObsOperator restart and output file lists to dacycle for collection ") + + dacycle.output_filelist.append(dacycle['ObsOperator.inputfile']) + logging.debug("Appended Observation filename to dacycle for collection ") + + sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp']) + if os.path.exists(sampling_coords_file): + outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp']) + samples.write_sample_auxiliary(outfile) + else: logging.warning("Sample auxiliary output not written, because input file does not exist (no samples found in obspack)") + +def save_and_submit(dacycle, statevector): + """ Save the model state and submit the next job """ + logging.info(header + "starting save_and_submit" + footer) + + filename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + statevector.write_to_file(filename, 'opt') + + dacycle.output_filelist.append(filename) + dacycle.finalize() + + + +