Commit 98e4c27b authored by Florentie, Liesbeth's avatar Florentie, Liesbeth
Browse files

obs.scaling.factor to assimilate co2 obs in ppm, additional forecast step at...

obs.scaling.factor to assimilate co2 obs in ppm, additional forecast step at end to get posterior samples
parent 4d0fda0a
......@@ -114,7 +114,7 @@ class ctsfTM5ObsOperator(TM5ObservationOperator):
def run_forecast_model(self,fluxmodel):
def run_forecast_model(self,fluxmodel,postprocessing=False):
# set timerange for flux calculation
startdate = self.dacycle['time.sample.start']
......@@ -130,13 +130,13 @@ class ctsfTM5ObsOperator(TM5ObservationOperator):
logging.debug('Timevec = %s' % timevec)
# calculate and write fluxmap for ensemble
fluxmodel.calc_flux(self.timevec)
fluxmodel.calc_flux(self.timevec,postprocessing=postprocessing)
# TM5 run...
self.prepare_run()
self.prepare_run(postprocessing)
self.validate_input() # perform checks
self.run() # calculate the NEE fit
self.save_data() # save NEE distribution to netCDF file
self.save_data() # save samples to netCDF file
......@@ -157,7 +157,7 @@ class ctsfTM5ObsOperator(TM5ObservationOperator):
def prepare_run(self):
def prepare_run(self,postprocessing=False):
"""
Prepare a forward model TM5 run, this consists of:
......@@ -182,6 +182,10 @@ class ctsfTM5ObsOperator(TM5ObservationOperator):
'output.flask': 'True'
}
if postprocessing:
new_items['ct.params.input.dir'] = self.dacycle['dir.output']
new_items['ct.params.input.file'] = os.path.join(self.dacycle['dir.output'], 'fluxmodel')
if self.dacycle['transition']:
new_items[self.istartkey] = self.transitionvalue
logging.debug('Resetting TM5 to perform transition of od meteo from 25 to 34 levels')
......
......@@ -126,7 +126,7 @@ class ctsfFluxModel(ObservationOperator):
# write gridded flux to file
self.write_flux(nee, timevec, em, postprocessing)
if postprocessing or (member is not None):
if member is not None:
break
del em
......
......@@ -41,6 +41,28 @@ import da.tools.rc as rc
class ctsfObsPackObservations(ObsPackObservations):
def setup(self, dacycle):
self.startdate = dacycle['time.sample.start']
self.enddate = dacycle['time.sample.end']
op_id = dacycle.dasystem['obspack.input.id']
op_dir = dacycle.dasystem['obspack.input.dir']
if not os.path.exists(op_dir):
msg = 'Could not find the required ObsPack distribution (%s) ' % op_dir
logging.error(msg)
raise IOError, msg
else:
self.obspack_dir = op_dir
self.obspack_id = op_id
self.obs_scaling = float(dacycle.dasystem['obs.scaling.factor'])
self.datalist = []
def add_observations(self):
""" Returns a MoleFractionList holding individual MoleFractionSample objects for all obs in a file
The ObsPack mole fraction files are provided as time series per site with all dates in sequence.
......@@ -95,6 +117,9 @@ class ctsfObsPackObservations(ObsPackObservations):
flags = ncf.get_variable('obs_flag').take(subselect, axis=0)
ncf.close()
# convert obs unit if required
obs = obs * self.obs_scaling
# only add observations with flag = 1
counter = 0
for n in range(len(dates)):
......@@ -171,19 +196,14 @@ class ctsfObsPackObservations(ObsPackObservations):
else: obs.flag = 99
# second loop over all available data points to set mdm
obs_to_keep = []
for obs in self.datalist:
identifier = obs.code
species, site, method, lab, datasetnr = identifier.split('_')
if site_info.has_key(identifier):
if site_info[identifier]['category'] == 'do-not-use' or obs.flag == 99:
logging.warning("Observation found (%s, %d), but not used in assimilation." % (identifier, obs.id))
obs.mdm = site_info[identifier]['error'] * self.global_R_scaling
obs.may_localize = site_info[identifier]['may_localize']
obs.may_reject = site_info[identifier]['may_reject']
obs.flag = 99
else:
if site_info[identifier]['category'] != 'do-not-use' and obs.flag != 99:
if site_info[identifier]['category'] == 'aircraft':
nr_obs_per_day = 1
else:
......@@ -193,6 +213,9 @@ class ctsfObsPackObservations(ObsPackObservations):
obs.may_localize = site_info[identifier]['may_localize']
obs.may_reject = site_info[identifier]['may_reject']
obs.flag = 0
obs_to_keep.append(obs)
elif obs.flag == 99:
logging.info('Observation has flag 99, excluded from assimilation')
else:
logging.warning("Observation NOT found (%s, %d), please check sites.rc file (%s) !!!" % (identifier, obs.id, self.sites_file))
......@@ -211,19 +234,50 @@ class ctsfObsPackObservations(ObsPackObservations):
logging.warning("Observation location for (%s, %d), is moved by %3.2f meters in altitude" % (identifier, obs.id, incalt))
# Only keep obs in datalist that will be used in assimilation
obs_to_keep = []
for obs in self.datalist:
if obs.flag != 99:
obs_to_keep.append(obs)
self.datalist = obs_to_keep
logging.info('Removed unused observations, observation list now holds %s values' % len(self.datalist))
# Add site_info dictionary to the Observations object for future use
self.site_info = site_info
self.site_move = site_move
self.site_incalt = site_incalt
logging.debug("Added Model Data Mismatch to all samples ")
def add_simulations(self, filename, silent=False):
""" Adds model simulated values to the mole fraction objects """
if not os.path.exists(filename):
msg = "Sample output filename for observations could not be found : %s" % filename
logging.error(msg)
logging.error("Did the sampling step succeed?")
logging.error("...exiting")
raise IOError, msg
ncf = io.ct_read(filename, method='read')
ids = ncf.get_variable('obs_num')
simulated = ncf.get_variable('flask') * self.obs_scaling
ncf.close()
logging.info("Successfully read data from model sample file (%s)" % filename)
obs_ids = self.getvalues('id').tolist()
ids = map(int, ids)
missing_samples = []
for idx, val in zip(ids, simulated):
if idx in obs_ids:
index = obs_ids.index(idx)
self.datalist[index].simulated = val # in mol/mol
else:
missing_samples.append(idx)
if not silent and missing_samples != []:
logging.warning('Model samples were found that did not match any ID in the observation list. Skipping them...')
#msg = '%s'%missing_samples ; logging.warning(msg)
logging.debug("Added %d simulated values to the Data list" % (len(ids) - len(missing_samples)))
......@@ -49,7 +49,7 @@ class ctsfOptimizer(CO2Optimizer):
logging.debug('Skipping observation (%s,%i) because of flag value %d' % (self.sitecode[n], self.obs_ids[n], self.flags[n]))
continue
# Screen for outliers greather than 3x model-data mismatch, only apply if obs may be rejected
# Screen for outliers greater than 3x model-data mismatch, only apply if obs may be rejected
res = self.obs[n] - self.Hx[n]
......
......@@ -38,7 +38,7 @@ footer = ' *************************************** \n '
def ctsf_pipeline(dacycle, platform, dasystem, samples, statevector, fluxmodel, obsoperator, optimizer):
sys.path.append(os.getcwd())
logging.info(header + 'Initializing SIF-T inversion cycle' + footer)
logging.info(header + 'Initializing CTSF inversion cycle' + footer)
start_job(dacycle, dasystem, platform, statevector, samples, fluxmodel, obsoperator)
prepare_state(dacycle, statevector)
......@@ -51,6 +51,44 @@ def ctsf_pipeline(dacycle, platform, dasystem, samples, statevector, fluxmodel,
logging.info('Inversion cycle finished... exiting pipeline')
def check_setup(dacycle, platform, dasystem, samples, statevector, fluxmodel, obsoperator, optimizer):
sys.path.append(os.getcwd())
logging.info(header + 'Initializing check_setup cycle' + footer)
dasystem.validate()
dacycle.dasystem = dasystem
dacycle.daplatform = platform
dacycle.setup()
statevector.setup(dacycle)
prepare_state(dacycle, statevector)
logging.info(header + 'Addind observations' + footer)
lag = 0
dacycle.set_sample_times(lag)
startdate = dacycle['time.sample.start']
enddate = dacycle['time.sample.end']
dacycle['time.sample.window'] = lag
dacycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H"))
logging.info("New simulation interval set : ")
logging.info(" start date : %s " % startdate.strftime('%F %H:%M'))
logging.info(" end date : %s " % enddate.strftime('%F %H:%M'))
logging.info(" file stamp: %s " % dacycle['time.sample.stamp'])
# Create observation vector for simulation interval
samples.setup(dacycle)
samples.add_observations()
samples.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc'])
sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
samples.write_sample_coords(sampling_coords_file)
dacycle.finalize()
logging.info('check_setup cycle finished... exiting pipeline')
####################################################################################################
......@@ -272,7 +310,11 @@ def write_optimized_state_and_fluxes(dacycle, samples, statevector, fluxmodel, o
dacycle.output_filelist.append(filename)
statevector.write_members_to_file(0, dacycle['dir.output'],member=0)
fluxmodel.calc_flux(obsoperator.timevec, member=0, updateano=True, postprocessing=True, write=True)
statevector.write_members_to_file(0, dacycle['dir.output'])
# Run the observation operator (TM5 to get atmospheric CO2 concentrations)
obsoperator.run_forecast_model(fluxmodel,postprocessing=True)
#fluxmodel.calc_flux(obsoperator.timevec, member=0, updateano=True, postprocessing=True, write=True)
dacycle.finalize()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment