Commit 461883e7 authored by Florentie, Liesbeth's avatar Florentie, Liesbeth
Browse files

added respiration parameterization v1 + modified analysis pipeline and...

added respiration parameterization v1 + modified analysis pipeline and functions to write output in standard format
parent d82f30c8
......@@ -114,7 +114,7 @@ class ctsfTM5ObsOperator(TM5ObservationOperator):
def run_forecast_model(self,fluxmodel,postprocessing=False):
def run_forecast_model(self,fluxmodel,postprocessing=False,calc_flux_only=False):
# set timerange for flux calculation
startdate = self.dacycle['time.sample.start']
......@@ -132,11 +132,12 @@ class ctsfTM5ObsOperator(TM5ObservationOperator):
# calculate and write fluxmap for ensemble
fluxmodel.calc_flux(self.timevec,postprocessing=postprocessing)
# TM5 run...
self.prepare_run(postprocessing)
self.validate_input() # perform checks
self.run() # calculate the NEE fit
self.save_data() # save samples to netCDF file
if not calc_flux_only:
# TM5 run...
self.prepare_run(postprocessing)
self.validate_input() # perform checks
self.run() # calculate the NEE fit
self.save_data() # save samples to netCDF file
......
......@@ -69,7 +69,8 @@ class ctsfFluxModel(ObservationOperator):
self.add_resp = dacycle.dasystem['add.respiration']
if self.add_resp:
self.nresp = 2
# self.nresp = 2
self.nresp = 13
self.temp0 = 273.15
self.k0 = float(dacycle.dasystem['params.init.k'])
else:
......@@ -89,6 +90,9 @@ class ctsfFluxModel(ObservationOperator):
self.resp_ano_name = dacycle.dasystem['resp.anomaly.name'] # name of anomaly (temperature) in inputfile
self.resp_mean_file = dacycle.dasystem['resp.mean.inputfile']
self.resp_mean_name = dacycle.dasystem['resp.mean.name']
mapfile = os.path.join(dacycle.dasystem['regionsfile'])
ncf = io.ct_read(mapfile, 'read')
self.gridmap = ncf.get_variable('regions')
self.unit_initialization = dacycle.dasystem['fit.unit_initialization'] # if True: initial fit will be constructed based on latitude, else: read from file
self.refdate = to_datetime(dacycle.dasystem['time.reference'])
......@@ -135,7 +139,7 @@ class ctsfFluxModel(ObservationOperator):
if self.resp_mean_name is None:
self.resp_mean = self.temp0 * np.ones(self.resp_ano.shape)
else:
self.resp_mean = self.get_anomalies(self.resp_mean_file, self.resp_mean_name, timevec)
self.resp_mean = self.get_mean(self.resp_mean_file, self.resp_mean_name, timevec)
# self.resp_ano, self.resp_mean = self.get_resp_anomalies(timevec)
logging.info('Obtained anomalies and mean for respiration term')
......@@ -231,6 +235,45 @@ class ctsfFluxModel(ObservationOperator):
def get_mean(self, file, name, timevec):
""" Read mean seasonal cycle from file. The returned mean array will contain data for all times in timevec. Fillvalue = NaN."""
# read input data
mean, t_mean = self.get_anodata(file,name,getTime=True)
# convert to gridded mean data for timevec
mean_out = np.zeros((len(timevec),self.nlat,self.nlon))
if len(mean.shape) == 2 and mean.shape[0] == 12:
for nr in range(max(self.gridmap)):
mask = np.where(self.gridmap == nr+1,1,0)
for t in range(len(timevec)):
month = (self.refdate + dt.timedelta(days=int(timevec[t]))).month - 1
mean_out[t,:,:] += mask * mean[month,nr]
del t
del nr
elif len(mean.shape) == 3 and mean.shape[0] == 12 and mean.shape[1] == self.nlat and mean.shape[2] == self.nlon:
for t in range(len(timevec)):
month = (self.refdate + dt.timedelta(days=int(timevec[t]))).month - 1
mean_out[t,:,:] = mean[month,:,:]
del t
elif len(mean.shape) == 3 and mean.shape[0] == len(t_mean) and mean.shape[1] == self.nlat and mean.shape[2] == self.nlon:
# get index of simulation times in mean input array
t_ano_index = self.get_time_index(t_mean, timevec)
# replace anomaly array by new array that contains anomalies for the requested simulation times
mean_out = self.build_anomalies_samples(mean, t_ano_index)
else:
logging.error('Array with mean temperature data for respiration parametrization has invalid size (%s). Valid options are [12,nregions], [12,%s,%s] or [t,%s,%s]' \
% (str(mean.shape), self.nlat, self.nlon, self.nlat, self.nlon))
raise ValueError
return mean_out
def get_anodata(self,filename,datatype,getTime = False):
"""
......@@ -286,7 +329,7 @@ class ctsfFluxModel(ObservationOperator):
def build_anomalies_samples(self, anomaly, index):
""" Returns a 3D array with input anomaly data for times corresponding to timevec only. Fillvalue = NaN."""
logging.debug('anomaly shape = %s' %str(anomaly.shape))
sampled_anomaly = np.zeros((len(index),anomaly.shape[1],anomaly.shape[2]))
for i in range(len(index)):
if index[i] is None:
......@@ -400,9 +443,18 @@ class ctsfFluxModel(ObservationOperator):
anoterm = np.zeros(self.resp_ano.shape)
for t in range(len(timevec)):
anoterm[t,:,:] = statemap[0,:,:] * (np.exp(self.k0*(self.resp_ano[t,:,:]-self.temp0)) - np.exp(self.k0*(self.resp_mean[t,:,:]-self.temp0))) \
+ statemap[1,:,:] * ( (self.resp_ano[t,:,:]-self.temp0) * np.exp(self.k0*(self.resp_ano[t,:,:]-self.temp0)) \
- (self.resp_mean[t,:,:]-self.temp0) * np.exp(self.k0*(self.resp_mean[t,:,:]-self.temp0)) )
if self.nresp == 2:
anoterm[t,:,:] = statemap[0,:,:] * (np.exp(self.k0*(self.resp_ano[t,:,:]-self.temp0)) - np.exp(self.k0*(self.resp_mean[t,:,:]-self.temp0))) \
+ statemap[1,:,:] * ( (self.resp_ano[t,:,:]-self.temp0) * np.exp(self.k0*(self.resp_ano[t,:,:]-self.temp0)) \
- (self.resp_mean[t,:,:]-self.temp0) * np.exp(self.k0*(self.resp_mean[t,:,:]-self.temp0)) )
elif self.nresp == 13:
month = (self.refdate + dt.timedelta(days=int(timevec[t]))).month - 1
anoterm[t,:,:] = statemap[0,:,:] * (np.exp(self.k0*(self.resp_ano[t,:,:]-self.temp0)) - np.exp(self.k0*(self.resp_mean[t,:,:]-self.temp0))) \
+ statemap[1+month,:,:] * ( (self.resp_ano[t,:,:]-self.temp0) * np.exp(self.k0*(self.resp_ano[t,:,:]-self.temp0)) \
- (self.resp_mean[t,:,:]-self.temp0) * np.exp(self.k0*(self.resp_mean[t,:,:]-self.temp0)) )
else:
logging.error("Invalid number of respiration parameters (%s), valid options are 2 or 13" % self.nresp)
raise ValueError
del t
return anoterm
......@@ -46,7 +46,7 @@ class FluxObsOperator(ObservationOperator):
def run_forecast_model(self,fluxmodel,postprocessing=False):
def run_forecast_model(self,fluxmodel,postprocessing=False, calc_flux_only=False):
""" ... """
self.prepare_run(postprocessing)
......
......@@ -112,10 +112,10 @@ def analysis_pipeline(dacycle, platform, dasystem, samples, statevector):
shutil.move(os.path.join(dacycle['dir.analysis'],'copied_regions_extended.nc'),os.path.join(dacycle['dir.exec'],'da','analysis','copied_regions_extended.nc'))
# ----------
# from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data
from da.ctsf.expand_fluxes_ctsf import save_monthly_avg_1x1_data, save_monthly_avg_tc_data, save_monthly_avg_agg_data #save_monthly_avg_state_data#, save_weekly_avg_ext_tc_data
from da.analysis.expand_molefractions import write_mole_fractions
from da.analysis.summarize_obs import summarize_obs
# from da.analysis.time_avg_fluxes import time_avg
from da.analysis.time_avg_fluxes import time_avg
logging.info(header + "Starting analysis" + footer)
......@@ -125,31 +125,29 @@ def analysis_pipeline(dacycle, platform, dasystem, samples, statevector):
dacycle.setup()
statevector.setup(dacycle)
logging.info(header + "Starting mole fractions" + footer)
write_mole_fractions(dacycle)
summarize_obs(dacycle['dir.analysis'])
# logging.info(header + "Starting weekly averages" + footer)
# logging.info(header + "Starting mole fractions" + footer)
#
# save_weekly_avg_1x1_data(dacycle, statevector)
# write_mole_fractions(dacycle)
# summarize_obs(dacycle['dir.analysis'])
logging.info(header + "Starting monthly averages" + footer)
save_monthly_avg_1x1_data(dacycle, statevector)
# save_weekly_avg_state_data(dacycle, statevector)
# save_weekly_avg_tc_data(dacycle, statevector)
# save_weekly_avg_ext_tc_data(dacycle)
# save_weekly_avg_agg_data(dacycle,region_aggregate='transcom')
# save_weekly_avg_agg_data(dacycle,region_aggregate='transcom_extended')
# save_weekly_avg_agg_data(dacycle,region_aggregate='olson')
# save_weekly_avg_agg_data(dacycle,region_aggregate='olson_extended')
# save_weekly_avg_agg_data(dacycle,region_aggregate='country')
#
# logging.info(header + "Starting monthly and yearly averages" + footer)
#
# time_avg(dacycle,'flux1x1')
# time_avg(dacycle,'transcom')
# time_avg(dacycle,'transcom_extended')
# time_avg(dacycle,'olson')
# time_avg(dacycle,'olson_extended')
# time_avg(dacycle,'country')
save_monthly_avg_tc_data(dacycle, statevector)
save_monthly_avg_agg_data(dacycle,region_aggregate='transcom')
save_monthly_avg_agg_data(dacycle,region_aggregate='transcom_extended')
save_monthly_avg_agg_data(dacycle,region_aggregate='olson')
save_monthly_avg_agg_data(dacycle,region_aggregate='olson_extended')
save_monthly_avg_agg_data(dacycle,region_aggregate='country')
logging.info(header + "Starting monthly and yearly averages" + footer)
time_avg(dacycle,'flux1x1',monthly=True)
time_avg(dacycle,'transcom',monthly=True)
time_avg(dacycle,'transcom_extended',monthly=True)
time_avg(dacycle,'olson',monthly=True)
time_avg(dacycle,'olson_extended',monthly=True)
time_avg(dacycle,'country',monthly=True)
logging.info(header + "Finished analysis" + footer)
......@@ -308,12 +306,13 @@ def write_optimized_state_and_fluxes(dacycle, samples, statevector, fluxmodel, o
filename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d'))
statevector.write_to_file(filename, 'opt')
dacycle.output_filelist.extend(obsoperator.output_filelist)
dacycle.output_filelist.append(filename)
statevector.write_members_to_file(0, dacycle['dir.output'])
# Run the observation operator (TM5 to get atmospheric CO2 concentrations)
obsoperator.run_forecast_model(fluxmodel,postprocessing=True)
obsoperator.run_forecast_model(fluxmodel,postprocessing=True,calc_flux_only=True)
sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
if os.path.exists(sampling_coords_file):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment