Commit d8ba3e4c authored by Florentie, Liesbeth's avatar Florentie, Liesbeth
Browse files

relative weight for flask and xco2 obs, and added advance parameter to...

relative weight for flask and xco2 obs, and added advance parameter to add_observations to make sure all flask obs are read in advance step
parent a3839be9
......@@ -126,13 +126,14 @@ class TotalColumnObservations(Observations):
self.additional_variables = []
# Model data mismatch approach
self.mdm_calculation = dacycle.dasystem.get('mdm.calculation')
self.mdm_calculation = dacycle.dasystem.get('obs.column.mdm.calculation')
logging.debug('mdm.calculation = %s' %self.mdm_calculation)
if not self.mdm_calculation in ['parametrization','empirical']:
logging.warning('No valid model data mismatch method found. Valid options are \'parametrization\' and \'empirical\'. ' + \
'Using a constant estimate for the model uncertainty of 1ppm everywhere.')
else:
logging.info('Model data mismatch approach = %s' %self.mdm_calculation)
self.mdm_weight = float(dacycle.dasystem['obs.column.mdm.weight'])
# Path to file with observation error settings for column observations
# Currently the same settings for all assimilated retrieval products: should this be one file per product?
......@@ -157,7 +158,7 @@ class TotalColumnObservations(Observations):
def add_observations(self):
def add_observations(self, advance=False):
""" Reading of total column observations, and selection of observations that will be sampled and assimilated.
"""
......@@ -258,12 +259,12 @@ class TotalColumnObservations(Observations):
for obs in self.datalist:
# parametrization as used by Frederic Chevallier
if self.mdm_calculation == 'parametrization':
obs.mdm = ( obs.mdm*obs.mdm + (0.8*np.exp((90.0+obs.lat)/300.0))**2 )**0.5
obs.mdm = self.mdm_weight * ( obs.mdm*obs.mdm + (0.8*np.exp((90.0+obs.lat)/300.0))**2 )**0.5
# empirical approach of Andy Jacobson, TO BE IMPLEMENTED
# elif self.mdm_calculation == 'empirical':
# obs.mdm = ...
else: # assume general model uncertainty of 1 ppm (arbitrary value)
obs.mdm = ( obs.mdm*obs.mdm + 1**2 )**0.5
obs.mdm = self.mdm_weight * ( obs.mdm*obs.mdm + 1**2 )**0.5
del obs
meanmdm = np.average(np.array( [obs.mdm for obs in self.datalist] ))
......
......@@ -68,12 +68,14 @@ class ObsPackObservations(Observations):
else:
self.sites_file = dacycle.dasystem['obs.sites.rc']
self.mdm_weight = float(dacycle.dasystem['obspack.mdm.weight'])
def get_samples_type(self):
return 'flask'
def add_observations(self):
def add_observations(self,advance=False):
""" Returns a MoleFractionList holding individual MoleFractionSample objects for all obs in a file
The ObsPack mole fraction files are provided as time series per site with all dates in sequence.
......@@ -107,7 +109,7 @@ class ObsPackObservations(Observations):
sitename, sitecategory = key, value
sitename = sitename.strip()
sitecategory = sitecategory.split()[0].strip().lower()
if sitecategory != 'do-not-use':
if (sitecategory != 'do-not-use') or advance:
valid_sites.append(sitename)
del key, value
......@@ -116,7 +118,7 @@ class ObsPackObservations(Observations):
for ncfile in ncfilelist:
if ncfile not in valid_sites: continue
if ncfile not in valid_sites: continue
infile = os.path.join(self.obspack_dir, 'data', 'nc', ncfile + '.nc')
ncf = io.ct_read(infile, 'read')
......@@ -364,7 +366,7 @@ class ObsPackObservations(Observations):
sitename, sitecategory = key, value
sitename = sitename.strip()
sitecategory = sitecategory.split()[0].strip().lower()
if sitecategory == 'do-not-use': continue
if (sitecategory == 'do-not-use') and (not advance): continue
site_info[sitename] = site_categories[sitecategory]
if 'site.move' in key:
identifier, latmove, lonmove = value.split(';')
......@@ -402,7 +404,7 @@ class ObsPackObservations(Observations):
else:
nr_obs_per_day = len([c.code for c in self.datalist if c.code == obs.code and c.xdate.day == obs.xdate.day and c.flag == 0])
logging.debug("Observation found (%s, %d), mdm category is: %0.2f, scaled with number of observations per day (%i), final mdm applied is: %0.2f." % (identifier, obs.id, site_info[identifier]['error'],nr_obs_per_day,site_info[identifier]['error']*sqrt(nr_obs_per_day)))
obs.mdm = site_info[identifier]['error'] * sqrt(nr_obs_per_day) * self.global_R_scaling
obs.mdm = self.mdm_weight * site_info[identifier]['error'] * sqrt(nr_obs_per_day) * self.global_R_scaling
obs.may_localize = site_info[identifier]['may_localize']
obs.may_reject = site_info[identifier]['may_reject']
obs.flag = 0
......
......@@ -141,20 +141,21 @@ def analysis_pipeline_v2(dacycle, platform, dasystem, samples, statevector):
""" Main entry point for analysis of ctdas results """
# copy da/analysis folder to exec folder and copy copied_region files
if dacycle['dir.da_source'] != dacycle['dir.da_run']:
import shutil
from da.tools.general import create_dirs
create_dirs(os.path.join(dacycle['dir.exec'],'da'))
try:
shutil.copytree(os.path.join(dacycle['dir.da_source'],'da','analysis'),os.path.join(dacycle['dir.exec'],'da','analysis'))
except:
if not os.path.exists(os.path.join(dacycle['dir.exec'],'da','analysis','copied_regions.nc')):
if dacycle['dir.da_source'] != dacycle['dir.da_run']:
import shutil
from da.tools.general import create_dirs
create_dirs(os.path.join(dacycle['dir.exec'],'da'))
try:
create_dirs(os.path.join(dacycle['dir.exec'],'da','analysis'))
shutil.copy(os.path.join(dacycle['dir.da_source'],'da','analysis/*'),os.path.join(dacycle['dir.exec'],'da','analysis/'))
shutil.copytree(os.path.join(dacycle['dir.da_source'],'da','analysis'),os.path.join(dacycle['dir.exec'],'da','analysis'))
except:
pass
shutil.move(os.path.join(dacycle['dir.analysis'],'copied_regions.nc'),os.path.join(dacycle['dir.exec'],'da','analysis','copied_regions.nc'))
shutil.move(os.path.join(dacycle['dir.analysis'],'copied_regions_extended.nc'),os.path.join(dacycle['dir.exec'],'da','analysis','copied_regions_extended.nc'))
try:
create_dirs(os.path.join(dacycle['dir.exec'],'da','analysis'))
shutil.copy(os.path.join(dacycle['dir.da_source'],'da','analysis/*'),os.path.join(dacycle['dir.exec'],'da','analysis/'))
except:
pass
shutil.move(os.path.join(dacycle['dir.analysis'],'copied_regions.nc'),os.path.join(dacycle['dir.exec'],'da','analysis','copied_regions.nc'))
shutil.move(os.path.join(dacycle['dir.analysis'],'copied_regions_extended.nc'),os.path.join(dacycle['dir.exec'],'da','analysis','copied_regions_extended.nc'))
# ----------
from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data
......@@ -388,10 +389,10 @@ def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False):
sample.setup(dacycle)
# Read observations (include option to read from different satellites) + perform observation selection
sample.add_observations()
sample.add_observations(advance)
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
sample.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc'])
sample.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc'], advance)
sampling_coords_file = os.path.join(dacycle['dir.input'], sample.get_samples_type()+'_coordinates_%s.nc' % dacycle['time.sample.stamp'])
logging.debug('sampling_coords_file = %s' % sampling_coords_file)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment