Skip to content
Snippets Groups Projects
Commit f7af0107 authored by Berg, Anne-Wil van den's avatar Berg, Anne-Wil van den
Browse files

Finally fixed flag writing for the column auxiliary files. Now the prior-flags...

Finally fixed flag writing for the column auxiliary files. Now the prior-flags that ended up in the auxiliary file are updated for the optimizer-samples (were flags are manipulated based on the rejection threshold), just before the write_sample_auxiliary during advance.
parent 119e25d4
No related branches found
No related tags found
No related merge requests found
......@@ -721,18 +721,16 @@ class ColumnObservations(Observations):
savedict['values'] = data.tolist()
f.add_data(savedict)
# Add assimilated or not flag
if qual != 'optimized':
data = self.getvalues('flag')
savedict = io.std_savedict.copy()
savedict['name'] = "flag"
savedict['long_name'] = "Assimilated flag"
savedict['units'] = "None"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Flag (0/1/2/96/97/98/99) for observation value, 0 means okay, 1 means QC error, 2 means rejected, 96 means sampled not assimilated, 97 means not representative, 98 means assim concerns, 99 means not sampled'
f.add_data(savedict)
data = self.getvalues('flag')
savedict = io.std_savedict.copy()
savedict['name'] = "flag"
savedict['long_name'] = "Assimilated flag"
savedict['units'] = "None"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Flag (0/1/2/96/97/98/99) for observation value, 0 means okay, 1 means QC error, 2 means rejected, 96 means sampled not assimilated, 97 means not representative, 98 means assim concerns, 99 means not sampled'
f.add_data(savedict)
f.close()
logging.debug(f"Successfully wrote data to auxiliary sample output file ({filename})")
......@@ -773,6 +771,76 @@ class ColumnObservations(Observations):
def load_list_from_pickle(self):
return pickle.load(open(self.outfile.replace('.nc','.pickle'), 'rb'))
def update_flags(self, auxiliary_filename, optimizer_filename):
"""
Sample flags are potentially updated by the optimizer.
The optimizer receives a subset of samples (0-valued ones from prior) and
potentially updates these based on the rejection threshold.
Here we update the flags in the datalist. This method is called before
advance-write_auxiliary and after the invert step.
"""
# Read flags and sample IDs from the auxiliary file
with nc.Dataset(auxiliary_filename) as aux_file:
flags_before_optimization = aux_file.variables['flag'][:].data
sample_ids_aux = aux_file.variables['sample_id'][:].data
dates_aux = aux_file.variables['date_components'][:,0:3].data.astype(int)
dates_aux = np.array([dtm.datetime(dates_aux[i,0], dates_aux[i,1], dates_aux[i,2]) for i in range(len(dates_aux))])
# Read flags and sitecodes from the optimizer file
with nc.Dataset(optimizer_filename) as opt_file:
flags_after_optimization = opt_file.variables['flag'][:].data
sitecodes_opt = opt_file.variables['sitecode'][:].data.astype(str)
dates_opt = opt_file.variables['obs_dates'][:].data.astype(int)
dates_opt = np.array([dtm.datetime(dates_opt[i,0], dates_opt[i,1], dates_opt[i,2]) for i in range(len(dates_opt))])
# Convert sitecodes from bytes to strings
sitecodes_opt = np.array([''.join(sitecodes_opt[i, :].astype(str)) for i in range(len(sitecodes_opt))])
# Filter optimizer dataseries for the current instrument
idxs = [i for i, sc in enumerate(sitecodes_opt) if sc.startswith(f'{self.column_type_id}_')]
sitecodes_opt = sitecodes_opt[idxs]
flags_after_optimization = flags_after_optimization[idxs]
dates_opt = dates_opt[idxs]
# Extract sample IDs from sitecodes_opt
sample_ids_opt = np.array([int(sc.split('_')[1]) for sc in sitecodes_opt])
# Safety check: Ensure datalist and auxiliary file samples match
sample_ids_datalist = np.array([int(str(self.datalist[i].code).split('_')[1]) for i in range(len(self.datalist))])
if not np.array_equal(sample_ids_datalist, sample_ids_aux):
logging.error(
f"The samples in the datalist and {auxiliary_filename} file do not match. "
f"Returning from this method. This means that the flags written to the file "
f"after advance are not correct (do not include rejected sample flags!)."
)
return
# Map optimizer sample IDs to indices in the auxiliary file
# the if statement is there because for a lagged run, the optimizer contains samples from
# 2 coordinates/aux files in the 1st cycle. So there is not always a match.
idxs_aux, idxs_opt = zip(*[(np.where((sample_ids_aux == sample_ids_opt[i]) & (dates_aux == dates_opt[i]))[0][0], i) for i in range(len(sample_ids_opt)) if len(np.where((sample_ids_aux == sample_ids_opt[i]) & (dates_aux == dates_opt[i]))[0]) > 0])
idxs_aux = list(idxs_aux)
idxs_opt = list(idxs_opt)
logging.debug(f"len of idxs: {len(idxs)}")
# Safety check: Ensure all flags are 0 before optimization
if not np.all(flags_before_optimization[idxs_aux] == 0.):
logging.warning(f'Not all flags are zero before optimization. This is unexpected. Going to overwrite these in {auxiliary_filename}, based on the values in {optimizer_filename}.')
# Update flags for the optimizer samples
new_flags = flags_before_optimization.copy()
new_flags[idxs_aux] = flags_after_optimization[idxs_opt]
# Update the datalist with the new flags
# self.datalist[:].flag = newflags
for i, flag in enumerate(new_flags):
self.datalist[i].flag = int(flag)
logging.info(
f"Updated the flags for the {self.column_type} samples that went into the optimizer. "
f"Now the correct ones will be written to the corresponding column_auxiliary file."
)
return
# Other functionalities
......
......@@ -646,6 +646,11 @@ def sample_step(dacycle, samples, obsoperator, lag, advance=False, statevector=N
if sample.get_samples_type() == 'flask':
sample.write_sample_auxiliary(qual='optimized')
else:
if sample.get_samples_type() == 'column':
# We need to update the the obsflags that currently already are written to the auxiliary file and update these for the samples that went into the optimizer (do no contain yet the rejected samples).
# and then, in the advance step, write flags. (instead of only the ones in prior mode like now; so also update obs_column_general)
optimizer_file = os.path.join(dacycle['dir.output'], 'optimizer.%s.nc' % dacycle['time.start'].strftime('%Y%m%d'))
sample.update_flags(outfile, optimizer_file)
sample.write_sample_auxiliary(outfile, qual='optimized')
elif not dacycle['time.restart'] or lag == int(dacycle['time.nlag']) - 1:
if sample.get_samples_type() == 'flask':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment