Commit 0ba95c62 authored by Florentie, Liesbeth's avatar Florentie, Liesbeth
Browse files

read statevector uncertainties from file

parent ea85da06
......@@ -29,13 +29,25 @@ version = '1.0'
################### Begin Class ObservationOperator ###################
class ctsfFluxModel(ObservationOperator):
"""
Testing
=======
This is a class that defines an ObervationOperator. This object is used to control the sampling of
a statevector in the ensemble Kalman filter framework. The methods of this class specify which (external) code
is called to perform the sampling, and which files should be read for input and are written for output.
The baseclasses consist mainly of empty methods that require an application specific application. The baseclass will take observed values, and perturb them with a random number chosen from the model-data mismatch distribution. This means no real operator will be at work, but random normally distributed residuals will come out of y-H(x) and thus the inverse model can proceed. This is mainly for testing the code...
This class calculates NEE using a statistical model. Nee is defined as :
the sum of a series expansion representing the mean seasonal cycle and long term trends
+ a GPP anomaly term that is linearly related to the anomaly in a proxy
+ a TER anomaly term that is a linearized version of a Q10 temperature relationship
Functions:
- setup(dacycle)
- validate_input()
- calc_flux(time, member=None, updateano=True, postprocessing=False, write=True)
- write_flux(flux, name, time, member, postprocessing)
- get_anomalies(file, name, time): for reading the anomalies in GPP proxy and temperature
- get_mean(file, name, time): for reading the mean temperatures for TER anomaly term
- get_anodata(filename,datatype,getTime = False): called from get_anomalies and get_mean
- get_time_index(t_ano, time): called from get_anomalies and get_mean
- build_anomalies_samples(anomaly, index): called from get_anomalies and get_mean
- evaluate_model(statemap,time): actual NEE calculation, returns 3D array (time x lat x lon)
- calculate_harmonics_sincos(statemap,time): calculates the sin and cos terms that represent mean seasonal cycle (called from evaluate_model)
- calculate_anoterm(statemap,time): calculates the GPP anomaly term (called from evaluate_model)
- calculate_resp_anoterm(statemap,time): calculates the TER anomaly term (called from evaluate_model)
"""
......@@ -56,16 +68,16 @@ class ctsfFluxModel(ObservationOperator):
self.forecast_nmembers = int(dacycle['da.optimizer.nmembers'])
# fit settings
self.harmonicstype = dacycle.dasystem['harmonicstype'] # 'sincos' [default], or 'phase'
# self.harmonicstype = dacycle.dasystem['harmonicstype'] # 'sincos' [default], or 'phase'
self.nhar = int(dacycle.dasystem['nharmonics']) # number of harmonical terms in baseline fit
if self.harmonicstype == 'phase':
self.nbasefitparams = 3 + self.nhar*3
self.scale_nonlin = dacycle.dasystem['scale_nonlinear_term'] # if False: no scaling of phase in harmonic terms (nonlinear)
else:
self.nbasefitparams = 3 + self.nhar*4
logging.info('Using default harmonicstype \'sincos\'')
# if self.harmonicstype == 'phase':
# self.nbasefitparams = 3 + self.nhar*3
# self.scale_nonlin = dacycle.dasystem['scale_nonlinear_term'] # if False: no scaling of phase in harmonic terms (nonlinear)
# else:
self.nbasefitparams = 3 + self.nhar*4
# logging.info('Using default harmonicstype \'sincos\'')
self.ngamma = int(dacycle.dasystem['ngamma']) # number of sensitivity parameters per ecoregion: default = 12, 1 per calendar month
self.ngamma = int(dacycle.dasystem['ngamma']) # number of sensitivity parameters per ecoregion: default = 12, 1 per calendar month
self.add_resp = dacycle.dasystem['add.respiration']
if self.add_resp:
......@@ -87,7 +99,7 @@ class ctsfFluxModel(ObservationOperator):
if self.add_resp:
self.resp_ano_file = dacycle.dasystem['resp.anomaly.inputfile']
self.resp_ano_name = dacycle.dasystem['resp.anomaly.name'] # name of anomaly (temperature) in inputfile
self.resp_ano_name = dacycle.dasystem['resp.anomaly.name'] # name of anomaly (temperature) in inputfile
self.resp_mean_file = dacycle.dasystem['resp.mean.inputfile']
self.resp_mean_name = dacycle.dasystem['resp.mean.name']
mapfile = os.path.join(dacycle.dasystem['regionsfile'])
......@@ -161,11 +173,11 @@ class ctsfFluxModel(ObservationOperator):
ncf.close()
# calculate flux for member
nee = self.evaluate_model(statemap, timevec)
nee, dgpp, dter = self.evaluate_model(statemap, timevec)
logging.debug('Calculated fluxes for ensemble member %i' % em)
# write gridded flux to file
self.write_flux(nee, timevec, em, postprocessing)
self.write_flux(nee, dgpp, dter, timevec, em, postprocessing)
if member is not None:
break
......@@ -177,7 +189,7 @@ class ctsfFluxModel(ObservationOperator):
def write_flux(self, flux, timevec, em, postprocessing):
def write_flux(self, flux, dgpp, dter, timevec, em, postprocessing):
"""Write time and calculated fluxmap to fluxmodel.<member>.nc."""
# Create a file to hold simulated fluxes and parameters for each member
......@@ -211,7 +223,27 @@ class ctsfFluxModel(ObservationOperator):
savedict['name'] = 'NEE_grid'
savedict['long_name'] = ('Simulated NEE values on lat-lon grid for ensemble member %s' % em )
savedict['dims'] = dimt + dimlatlon
savedict['values'] = flux * 1E-6 # convert from micromol to mol
savedict['values'] = flux * 1.0E-6 # convert from micromol to mol
savedict['units'] = 'mol/m2/s'
f.add_data(savedict)
# add GPP anomaly
dimlatlon = ('latitude', 'longitude',)
savedict = io.std_savedict.copy()
savedict['name'] = 'dGPP_grid'
savedict['long_name'] = ('Simulated GPP anomaly values on lat-lon grid for ensemble member %s' % em )
savedict['dims'] = dimt + dimlatlon
savedict['values'] = dgpp * 1.0E-6 # convert from micromol to mol
savedict['units'] = 'mol/m2/s'
f.add_data(savedict)
# add TER anomaly
dimlatlon = ('latitude', 'longitude',)
savedict = io.std_savedict.copy()
savedict['name'] = 'dTER_grid'
savedict['long_name'] = ('Simulated TER anomaly values on lat-lon grid for ensemble member %s' % em )
savedict['dims'] = dimt + dimlatlon
savedict['values'] = dter * 1.0E-6 # convert from micromol to mol
savedict['units'] = 'mol/m2/s'
f.add_data(savedict)
......@@ -244,12 +276,15 @@ class ctsfFluxModel(ObservationOperator):
# convert to gridded mean data for timevec
mean_out = np.zeros((len(timevec),self.nlat,self.nlon))
if len(mean.shape) == 2 and mean.shape[0] == 12:
for nr in range(max(self.gridmap)):
if len(mean.shape) == 2 and (mean.shape[0] == 12 or mean.shape[1] == 12):
for nr in range(209): #max(self.gridmap)):
mask = np.where(self.gridmap == nr+1,1,0)
for t in range(len(timevec)):
month = (self.refdate + dt.timedelta(days=int(timevec[t]))).month - 1
mean_out[t,:,:] += mask * mean[month,nr]
if mean.shape[0] == 12:
mean_out[t,:,:] += mask * mean[month,nr]
else:
mean_out[t,:,:] += mask * mean[nr,month]
del t
del nr
......@@ -348,7 +383,7 @@ class ctsfFluxModel(ObservationOperator):
"""
Method to calculate an NEE field consisting of regionally constant second-order polynomial and harmonical terms with linearly varying amplitude
(to account for mean seasonal cycle), enhanced with scaled SIF anomalies to account for spatial and interannual variability.
(to account for mean seasonal cycle), enhanced with scaled anomalies to account for spatial and interannual variability.
"""
# check dimensions of input
......@@ -367,23 +402,30 @@ class ctsfFluxModel(ObservationOperator):
raise ValueError
# calculate NEE (len(time) x nlat x nlon)
if self.harmonicstype == 'phase':
nee = statemap[0,:,:] + statemap[1,:,:]*timevec[:,np.newaxis,np.newaxis] + statemap[2,:,:]*(timevec[:,np.newaxis,np.newaxis]**2) \
+ self.calculate_harmonics(statemap[3:i1,:,:],timevec) + self.calculate_anoterm(statemap[i1:i2,:,:],timevec)
else:
nee = statemap[0,:,:] + statemap[1,:,:]*timevec[:,np.newaxis,np.newaxis] + statemap[2,:,:]*(timevec[:,np.newaxis,np.newaxis]**2) \
+ self.calculate_harmonics_sincos(statemap[3:i1,:,:],timevec) + self.calculate_anoterm(statemap[i1:i2,:,:],timevec)
# if self.harmonicstype == 'phase':
# nee = statemap[0,:,:] + statemap[1,:,:]*timevec[:,np.newaxis,np.newaxis] + statemap[2,:,:]*(timevec[:,np.newaxis,np.newaxis]**2) \
# + self.calculate_harmonics(statemap[3:i1,:,:],timevec) #+ self.calculate_anoterm(statemap[i1:i2,:,:],timevec)
# else:
nee = statemap[0,:,:] + statemap[1,:,:]*timevec[:,np.newaxis,np.newaxis] + statemap[2,:,:]*(timevec[:,np.newaxis,np.newaxis]**2) \
+ self.calculate_harmonics_sincos(statemap[3:i1,:,:],timevec) #+ self.calculate_anoterm(statemap[i1:i2,:,:],timevec)
dgpp = self.calculate_anoterm(statemap[i1:i2,:,:],timevec)
if self.add_resp:
nee = nee + self.calculate_resp_anoterm(statemap[i2:i3,:,:],timevec)
# nee = nee + self.calculate_resp_anoterm(statemap[i2:i3,:,:],timevec)
dter = self.calculate_resp_anoterm(statemap[i2:i3,:,:],timevec)
else: dter = np.zeros(nee.shape)
nee = nee + dgpp + dter
return nee
return nee, dgpp, dter
def calculate_harmonics(self,statemap,timevec):
"""
DO NOT USE!!!
: param statemap: scalingfactors for the fit parameters and anomalies, dim = nhar*3 , 180 , 360
: param baselinefit: polynomial and harmonic fit parameters, dim = nhar*3 , 180 , 360
......
......@@ -39,9 +39,12 @@ def ctsf_pipeline(dacycle, platform, dasystem, samples, statevector, fluxmodel,
sys.path.append(os.getcwd())
logging.info(header + 'Initializing CTSF inversion cycle' + footer)
if type(samples) is not list: samples = [samples]
start_job(dacycle, dasystem, platform, statevector, samples, fluxmodel, obsoperator)
prepare_state(dacycle, statevector)
sample_state(dacycle, samples, statevector, fluxmodel, obsoperator)
invert(dacycle, statevector, optimizer)
......@@ -52,6 +55,39 @@ def ctsf_pipeline(dacycle, platform, dasystem, samples, statevector, fluxmodel,
def forward_pipeline(dacycle, platform, dasystem, samples, statevector, fluxmodel, obsoperator):
""" The main point of entry for the pipeline.
For forward run to get mole fractions at independent stations, make sure to define initial
condition statevector such that result from inverse run is used! """
sys.path.append(os.getcwd())
if type(samples) is not list: samples = [samples]
logging.info(header + 'Initializing CTSF forward cycle' + footer)
start_job(dacycle, dasystem, platform, statevector, samples, fluxmodel, obsoperator)
if dacycle.has_key('forward.savestate.file'):
fwddir = dacycle['forward.savestate.file']
else:
logging.debug("No forward.savestate.file key found in rc-file, proceeding with self-constructed prior parameters")
fwddir = False
if not fwddir:
# Simply make a prior statevector using the normal method
prepare_state(dacycle, statevector)
else:
# Read prior information from another simulation into the statevector.
# This loads the results from another assimilation experiment into the current statevector
statevector.read_from_file(fwddir, 'opt')
sample_state(dacycle, samples, statevector, fluxmodel, obsoperator)
write_optimized_state_and_fluxes(dacycle, samples, statevector, fluxmodel, obsoperator, forwardrun=True)
logging.info('Forward run finished... exiting pipeline')
def check_setup(dacycle, platform, dasystem, samples, statevector, fluxmodel, obsoperator, optimizer):
sys.path.append(os.getcwd())
......@@ -78,11 +114,33 @@ def check_setup(dacycle, platform, dasystem, samples, statevector, fluxmodel, ob
logging.info(" file stamp: %s " % dacycle['time.sample.stamp'])
# Create observation vector for simulation interval
samples.setup(dacycle)
samples.add_observations()
samples.add_model_data_mismatch()#dacycle.dasystem['obs.sites.rc'])
sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
samples.write_sample_coords(sampling_coords_file)
for sample in samples:
sample.setup(dacycle)
sample.add_observations()
# If OSSE setup: overwrite obspack observation values by previously calculated samples
if hasattr(sample, 'osse') and sample.osse: sample.add_osse_observations(sample.osse_file)
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
sample.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc'], advance)
sampling_coords_file = os.path.join(dacycle['dir.input'], sample.get_samples_type()+'_coordinates_%s.nc' % dacycle['time.sample.stamp'])
logging.debug('sampling_coords_file = %s' % sampling_coords_file)
sample.write_sample_coords(sampling_coords_file)
# Write filename to dacycle, and to output collection list
dacycle['ObsOperator.inputfile.'+sample.get_samples_type()] = sampling_coords_file
sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
del sample
# samples.setup(dacycle)
# samples.add_observations()
# samples.add_model_data_mismatch()#dacycle.dasystem['obs.sites.rc'])
# sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
# samples.write_sample_coords(sampling_coords_file)
dacycle.finalize()
logging.info('check_setup cycle finished... exiting pipeline')
......@@ -125,10 +183,10 @@ def analysis_pipeline(dacycle, platform, dasystem, samples, statevector):
dacycle.setup()
statevector.setup(dacycle)
# logging.info(header + "Starting mole fractions" + footer)
#
# write_mole_fractions(dacycle)
# summarize_obs(dacycle['dir.analysis'])
logging.info(header + "Starting mole fractions" + footer)
write_mole_fractions(dacycle)
summarize_obs(dacycle['dir.analysis'])
logging.info(header + "Starting monthly averages" + footer)
save_monthly_avg_1x1_data(dacycle, statevector)
......@@ -216,19 +274,34 @@ def sample_step(dacycle, samples, statevector, fluxmodel, obsoperator, lag, adva
statevector.write_members_to_file(lag, dacycle['dir.input'])
samples.setup(dacycle)
samples.add_observations()
for sample in samples:
sample.setup(dacycle)
sample.add_observations()
# If OSSE setup: overwrite obspack observation values by previously calculated samples
if hasattr(sample, 'osse') and sample.osse: sample.add_osse_observations(sample.osse_file)
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
samples.add_model_data_mismatch()#dacycle.dasystem['obs.sites.rc'])
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
sample.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc'], advance)
sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
samples.write_sample_coords(sampling_coords_file)
# Write filename to dacycle, and to output collection list
dacycle['ObsOperator.inputfile'] = sampling_coords_file
sampling_coords_file = os.path.join(dacycle['dir.input'], sample.get_samples_type()+'_coordinates_%s.nc' % dacycle['time.sample.stamp'])
logging.debug('sampling_coords_file = %s' % sampling_coords_file)
sample.write_sample_coords(sampling_coords_file)
# Write filename to dacycle, and to output collection list
dacycle['ObsOperator.inputfile.'+sample.get_samples_type()] = sampling_coords_file
# sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
del sample
# samples.write_sample_coords(sampling_coords_file)
# # Write filename to dacycle, and to output collection list
# dacycle['ObsOperator.inputfile'] = sampling_coords_file
# Run the observation operator (TM5 to get atmospheric CO2 concentrations)
obsoperator.run_forecast_model(fluxmodel)
obsoperator.run_forecast_model(fluxmodel, samples)
# Read forecast model samples that were written to NetCDF files by each member. Add them to the exisiting
# Observation object for each sample loop. This data fill be written to file in the output folder for each sample cycle.
......@@ -236,9 +309,14 @@ def sample_step(dacycle, samples, statevector, fluxmodel, obsoperator, lag, adva
# We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates
# one file per member, some logic needs to be included to merge all files!!!
if os.path.exists(sampling_coords_file):
samples.add_simulations(obsoperator.simulated_file)
else: logging.warning("No simulations added, because input file does not exist (no samples found in obspack)")
for i in range(len(samples)):
if os.path.exists(dacycle['ObsOperator.inputfile.'+samples[i].get_samples_type()]):
samples[i].add_simulations(obsoperator.simulated_file[i])
else: logging.warning("No simulations added, because sample input file does not exist")
del i
# if os.path.exists(sampling_coords_file):
# samples.add_simulations(obsoperator.simulated_file)
# else: logging.warning("No simulations added, because input file does not exist (no samples found in obspack)")
# # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
# samples.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc'])
......@@ -255,12 +333,17 @@ def sample_step(dacycle, samples, statevector, fluxmodel, obsoperator, lag, adva
if not advance:
if dacycle['time.restart'] == False or lag == int(dacycle['time.nlag']) - 1:
statevector.obs_to_assimilate += (copy.deepcopy(samples),)
statevector.nobs += samples.getlength()
for sample in samples:
statevector.nobs += sample.getlength()
del sample
logging.info('nobs = %i' %statevector.nobs)
logging.debug("Added samples from the observation operator to the assimilated obs list in the statevector")
else:
statevector.obs_to_assimilate += (None,)
def invert(dacycle, statevector, optimizer):
""" Perform the inverse calculation """
logging.info(header + "starting invert" + footer)
......@@ -298,7 +381,7 @@ def invert(dacycle, statevector, optimizer):
def write_optimized_state_and_fluxes(dacycle, samples, statevector, fluxmodel, obsoperator):
def write_optimized_state_and_fluxes(dacycle, samples, statevector, fluxmodel, obsoperator, forwardrun=False):
""" Calculate the fluxes for the optimized statevector and write to output file """
logging.info(header + 'Starting write_optimized_state_and_fluxes' + footer)
......@@ -312,12 +395,21 @@ def write_optimized_state_and_fluxes(dacycle, samples, statevector, fluxmodel, o
statevector.write_members_to_file(0, dacycle['dir.output'])
# Run the observation operator (TM5 to get atmospheric CO2 concentrations)
obsoperator.run_forecast_model(fluxmodel,postprocessing=True,calc_flux_only=True)
if forwardrun:
obsoperator.run_forecast_model(fluxmodel, samples, postprocessing=True,calc_flux_only=True)
else:
obsoperator.run_forecast_model(fluxmodel, samples, postprocessing=True)
# sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
for sample in samples:
sampling_coords_file = os.path.join(dacycle['dir.input'], sample.get_samples_type()+'_coordinates_%s.nc' % dacycle['time.sample.stamp'])
del sample
sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
if os.path.exists(sampling_coords_file):
outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp'])
samples.write_sample_auxiliary(outfile)
for sample in samples:
if sample.get_samples_type() == 'flask':
outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp'])
sample.write_sample_auxiliary(outfile)
else: logging.warning("Sample auxiliary output not written, because input file does not exist (no samples found in obspack)")
dacycle.output_filelist.extend(obsoperator.output_filelist)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment