diff --git a/da/cosmo/covariances.py b/da/cosmo/covariances.py index f1708a018f63415f01f8bd86216bb793c14e3f66..fc9c2b9c288788cc4212b939eb7f0695cc7c7e6c 100755 --- a/da/cosmo/covariances.py +++ b/da/cosmo/covariances.py @@ -68,7 +68,8 @@ class CO2StateVector(StateVector): (0.04, 0.04, 0.04, 0.04, 0.04, 0.04, 0.16, 1.00, 0.16, 0.16, 0.00), \ (0.04, 0.04, 0.04, 0.04, 0.04, 0.04, 0.16, 0.16, 1.00, 0.16, 0.00), \ (0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.16, 0.16, 0.16, 1.00, 0.00), \ - (0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 1.e-10) ]) +# (0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 1.e-10) ]) + (0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 1.00) ]) return fullcov diff --git a/da/cosmo/observationoperator.py b/da/cosmo/observationoperator.py index 6131d6b2279d38bb322aa82f23f75f4b128d2232..6f929cde736b3fdd8bba578f60e3c650536c2387 100755 --- a/da/cosmo/observationoperator.py +++ b/da/cosmo/observationoperator.py @@ -125,32 +125,32 @@ class ObservationOperator(object): # UNCOMMENT FROM HERE - co2_bg = np.empty(self.forecast_nmembers) - - logging.info('Multiplying emissions with parameters for lag %d' % (lag)) - for dt in rrule.rrule(rrule.HOURLY, dtstart=dacycle['time.start']+timedelta(hours=24*lag*int(dacycle['time.cycle'])), until=dacycle['time.start']+timedelta(hours=(lag+1)*24*int(dacycle['time.cycle']))): - for ens in range(0,self.forecast_nmembers): - dthh = dt.strftime('%H') - co2_bg[ens] = members[ens].param_values[-1] - ens = str(ens).zfill(3) - cdo.setunit("'kg m-2 s-1' -expr,GPP_"+ens+"_F=CO2_GPP_F*parametermap -merge "+os.path.join(dacycle['da.bio.input'], 'gpp_%s.nc' % dt.strftime('%Y%m%d%H')), input = os.path.join(dacycle['dir.input'],"parameters_gpp_lag"+str(lag)+"."+ens+".nc"), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_"+ens+"_%s.nc" % dt.strftime('%Y%m%d%H'))) - cdo.setunit("'kg m-2 s-1' -expr,RESP_"+ens+"_F=CO2_RESP_F*parametermap -merge "+os.path.join(dacycle['da.bio.input'], 'ra_%s.nc' % dt.strftime('%Y%m%d%H')), input = os.path.join(dacycle['dir.input'],"parameters_resp_lag"+str(lag)+"."+ens+".nc"), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_"+ens+"_%s.nc" % dt.strftime('%Y%m%d%H'))) -# logging.debug('Background CO2 params are (%s)' % co2_bg) - if dthh=='00': - ct(dt.strftime('%Y%m%d'), co2_bg) - - cdo.merge(input = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_???_%s.nc" % dt.strftime('%Y%m%d%H')), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_%s.nc" % dt.strftime('%Y%m%d%H'))) - cdo.merge(input = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_???_%s.nc" % dt.strftime('%Y%m%d%H')), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_%s.nc" % dt.strftime('%Y%m%d%H'))) - - os.chdir(dacycle['da.obsoperator.home']) - - if os.path.exists("/scratch/snx3000/parsenov/ctdas/"+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)+"/cosmo/output/"): - if os.path.exists("/scratch/snx3000/parsenov/ctdas/non_opt_"+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)+"/cosmo/output/"): - os.rename("/scratch/snx3000/parsenov/ctdas/"+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168), "/scratch/snx3000/parsenov/ctdas/old_non_opt_"+dacycle['time.start'].strftime('%Y%m%d%H')+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)) +# co2_bg = np.empty(self.forecast_nmembers) +# + # logging.info('Multiplying emissions with parameters for lag %d' % (lag)) + # for dt in rrule.rrule(rrule.HOURLY, dtstart=dacycle['time.start']+timedelta(hours=24*lag*int(dacycle['time.cycle'])), until=dacycle['time.start']+timedelta(hours=(lag+1)*24*int(dacycle['time.cycle']))): + # for ens in range(0,self.forecast_nmembers): + # dthh = dt.strftime('%H') + # co2_bg[ens] = members[ens].param_values[-1] + # ens = str(ens).zfill(3) + # cdo.setunit("'kg m-2 s-1' -expr,GPP_"+ens+"_F=CO2_GPP_F*parametermap -merge "+os.path.join(dacycle['da.bio.input'], 'gpp_%s.nc' % dt.strftime('%Y%m%d%H')), input = os.path.join(dacycle['dir.input'],"parameters_gpp_lag"+str(lag)+"."+ens+".nc"), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_"+ens+"_%s.nc" % dt.strftime('%Y%m%d%H'))) + # cdo.setunit("'kg m-2 s-1' -expr,RESP_"+ens+"_F=CO2_RESP_F*parametermap -merge "+os.path.join(dacycle['da.bio.input'], 'ra_%s.nc' % dt.strftime('%Y%m%d%H')), input = os.path.join(dacycle['dir.input'],"parameters_resp_lag"+str(lag)+"."+ens+".nc"), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_"+ens+"_%s.nc" % dt.strftime('%Y%m%d%H'))) +# # logging.debug('Background CO2 params are (%s)' % co2_bg) + # if dthh=='00': + # ct(dt.strftime('%Y%m%d'), co2_bg) +# + # cdo.merge(input = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_???_%s.nc" % dt.strftime('%Y%m%d%H')), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_%s.nc" % dt.strftime('%Y%m%d%H'))) + # cdo.merge(input = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_???_%s.nc" % dt.strftime('%Y%m%d%H')), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_%s.nc" % dt.strftime('%Y%m%d%H'))) +# + # os.chdir(dacycle['da.obsoperator.home']) +# + if os.path.exists(dacycle['dir.da_run']+'/'+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)+"/cosmo/output/"): + if os.path.exists(dacycle['dir.da_run']+"/non_opt_"+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)+"/cosmo/output/"): + os.rename(dacycle['dir.da_run']+"/"+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168), dacycle['dir.da_run']+"/old_non_opt_"+dacycle['time.start'].strftime('%Y%m%d%H')+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)) else: - os.rename("/scratch/snx3000/parsenov/ctdas/"+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168), "/scratch/snx3000/parsenov/ctdas/non_opt_"+dacycle['time.start'].strftime('%Y%m%d%H')+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)) + os.rename(dacycle['dir.da_run']+"/"+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168), dacycle['dir.da_run']+"/non_opt_"+dacycle['time.start'].strftime('%Y%m%d%H')+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)) - os.system('python run_chain.py ctdas '+absolute_start_time_ch+' '+str(starth+lag*168)+' '+str(endh+lag*168)+' -j meteo icbc emissions biofluxes int2lm post_int2lm cosmo') + os.system('python run_chain.py 21ens '+absolute_start_time_ch+' '+str(starth+lag*168)+' '+str(endh+lag*168)+' -j meteo icbc emissions biofluxes int2lm post_int2lm cosmo') os.chdir(dacycle['dir.da_run']) args = [ @@ -165,6 +165,7 @@ class ObservationOperator(object): for i in range(0,self.forecast_nmembers): idx = str(i).zfill(3) +# cosmo_file = os.path.join('/store/empa/em05/parsenov/cosmo_data/51ens/model_'+idx+'_%s.nc' % dacycle['time.sample.stamp']) cosmo_file = os.path.join('/store/empa/em05/parsenov/cosmo_data/model_'+idx+'_%s.nc' % dacycle['time.sample.stamp']) ifile = Dataset(cosmo_file, mode='r') model_data[i,:] = (np.squeeze(ifile.variables['CO2'][:])*29./44.01)*1E6 # in ppm @@ -189,8 +190,8 @@ class ObservationOperator(object): time_stamp = dacycle['time.sample.stamp'] abs_start_time = str((to_datetime(dacycle['abs.time.start'])).strftime('%Y%m%d%H')) - cosmo_out = "/scratch/snx3000/parsenov/ctdas/"+abs_start_time+"_"+str(hstart)+"_"+str(hstop+1)+"/cosmo/output/" - hhl_cosmo_out = "/scratch/snx3000/parsenov/ctdas/"+abs_start_time+"_0_168/cosmo/output/" + cosmo_out = dacycle['dir.da_run']+"/"+abs_start_time+"_"+str(hstart)+"_"+str(hstop+1)+"/cosmo/output/" + hhl_cosmo_out = dacycle['dir.da_run']+"/"+abs_start_time+"_0_168/cosmo/output/" cosmo_save = "/store/empa/em05/parsenov/cosmo_data/" hhl_fn = hhl_cosmo_out+'lffd'+abs_start_time+'c.nc' diff --git a/da/cosmo/observationoperator_parallel.py b/da/cosmo/observationoperator_parallel.py index e435113e47122820737d294d5ac2e2000afc09c1..34225addbec938a01031496005a861f84851e508 100755 --- a/da/cosmo/observationoperator_parallel.py +++ b/da/cosmo/observationoperator_parallel.py @@ -13,7 +13,7 @@ from dateutil import rrule from cdo import * from . import site_height from da.cosmo.icbc4ctdas import ct -from itertools import repeat +#from itertools import repeat from multiprocessing import Pool from da.tools.general import to_datetime @@ -131,6 +131,14 @@ class ObservationOperator(object): for dt in rrule.rrule(rrule.HOURLY, dtstart=dacycle['time.start']+timedelta(hours=24*lag*int(dacycle['time.cycle'])), until=dacycle['time.start']+timedelta(hours=(lag+1)*24*int(dacycle['time.cycle']))): + for ens in range(0,self.forecast_nmembers): + co2_bg[ens] = members[ens].param_values[-1] + + dthh = dt.strftime('%H') + if dthh=='00': + ct(dt.strftime('%Y%m%d'), co2_bg) + print("ct proso",dt) + args = [ (dacycle, lag, members, dt, n) for n in range(0,self.forecast_nmembers) @@ -138,20 +146,18 @@ class ObservationOperator(object): with Pool(self.forecast_nmembers) as pool: pool.starmap(self.multiply_flux, args) - - if dthh=='00': - ct(dt.strftime('%Y%m%d'), co2_bg) + print("vprm proso",dt) cdo.merge(input = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_???_%s.nc" % dt.strftime('%Y%m%d%H')), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_%s.nc" % dt.strftime('%Y%m%d%H'))) cdo.merge(input = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_???_%s.nc" % dt.strftime('%Y%m%d%H')), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_%s.nc" % dt.strftime('%Y%m%d%H'))) os.chdir(dacycle['da.obsoperator.home']) - if os.path.exists("/scratch/snx3000/parsenov/ctdas/"+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)+"/cosmo/output/"): - if os.path.exists("/scratch/snx3000/parsenov/ctdas/non_opt_"+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)+"/cosmo/output/"): - os.rename("/scratch/snx3000/parsenov/ctdas/"+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168), "/scratch/snx3000/parsenov/ctdas/old_non_opt_"+dacycle['time.start'].strftime('%Y%m%d%H')+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)) + if os.path.exists(dacycle['dir.da_run']+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)+"/cosmo/output/"): + if os.path.exists(dacycle['dir.da_run']+"/non_opt_"+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)+"/cosmo/output/"): + os.rename(dacycle['dir.da_run']+"/"+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168), dacycle['dir.da_run']+"/old_non_opt_"+dacycle['time.start'].strftime('%Y%m%d%H')+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)) else: - os.rename("/scratch/snx3000/parsenov/ctdas/"+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168), "/scratch/snx3000/parsenov/ctdas/non_opt_"+dacycle['time.start'].strftime('%Y%m%d%H')+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)) + os.rename(dacycle['dir.da_run']+"/"+absolute_start_time+"_"+str(starth+lag*168)+"_"+str(endh+lag*168), dacycle['dir.da_run']+"/non_opt_"+dacycle['time.start'].strftime('%Y%m%d%H')+"_"+str(starth+lag*168)+"_"+str(endh+lag*168)) os.system('python run_chain.py ctdas '+absolute_start_time_ch+' '+str(starth+lag*168)+' '+str(endh+lag*168)+' -j meteo icbc emissions biofluxes int2lm post_int2lm cosmo') os.chdir(dacycle['dir.da_run']) @@ -187,8 +193,6 @@ class ObservationOperator(object): self.run(lag, dacycle, statevector) def multiply_flux(self,dacycle,lag,members,dt,ens): - dthh = dt.strftime('%H') - co2_bg[ens] = members[ens].param_values[-1] ens = str(ens).zfill(3) cdo.setunit("'kg m-2 s-1' -expr,GPP_"+ens+"_F=CO2_GPP_F*parametermap -merge "+os.path.join(dacycle['da.bio.input'], 'gpp_%s.nc' % dt.strftime('%Y%m%d%H')), input = os.path.join(dacycle['dir.input'],"parameters_gpp_lag"+str(lag)+"."+ens+".nc"), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_"+ens+"_%s.nc" % dt.strftime('%Y%m%d%H'))) cdo.setunit("'kg m-2 s-1' -expr,RESP_"+ens+"_F=CO2_RESP_F*parametermap -merge "+os.path.join(dacycle['da.bio.input'], 'ra_%s.nc' % dt.strftime('%Y%m%d%H')), input = os.path.join(dacycle['dir.input'],"parameters_resp_lag"+str(lag)+"."+ens+".nc"), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_"+ens+"_%s.nc" % dt.strftime('%Y%m%d%H'))) @@ -199,8 +203,8 @@ class ObservationOperator(object): time_stamp = dacycle['time.sample.stamp'] abs_start_time = str((to_datetime(dacycle['abs.time.start'])).strftime('%Y%m%d%H')) - cosmo_out = "/scratch/snx3000/parsenov/ctdas/"+abs_start_time+"_"+str(hstart)+"_"+str(hstop+1)+"/cosmo/output/" - hhl_cosmo_out = "/scratch/snx3000/parsenov/ctdas/"+abs_start_time+"_0_168/cosmo/output/" + cosmo_out = dacycle['dir.da_run']+"/"+abs_start_time+"_"+str(hstart)+"_"+str(hstop+1)+"/cosmo/output/" + hhl_cosmo_out = dacycle['dir.da_run']+"/"+abs_start_time+"_0_168/cosmo/output/" cosmo_save = "/store/empa/em05/parsenov/cosmo_data/" hhl_fn = hhl_cosmo_out+'lffd'+abs_start_time+'c.nc' diff --git a/da/cosmo/statevector.py b/da/cosmo/statevector.py index 330f6660900aa6036216bc0b467f52f9e28c68b8..d5ab62a3e5c2d2af953bf6707fce71efb59d4254 100644 --- a/da/cosmo/statevector.py +++ b/da/cosmo/statevector.py @@ -380,8 +380,8 @@ class StateVector(object): newmember = EnsembleMember(member) # newmember.param_values = rands + newmean newmember.param_values = (np.hstack((np.dot(C, rands[0:11]),np.dot(C, rands[11:22]), rands_bg)) + newmean).ravel() - newmember.param_values[10] = 0. - newmember.param_values[21] = 0. + # newmember.param_values[10] = 0. + # newmember.param_values[21] = 0. newmember.param_values[newmember.param_values<0.] = 0. newmember.param_values[newmember.param_values>2.] = 2. self.ensemble_members[lag].append(newmember) diff --git a/template.py b/template.py index 27aa6c627af6def3a695a32e9f5d699ea24a8a78..72119a647e6540e3af415a6e4d80bb93f57a993e 100755 --- a/template.py +++ b/template.py @@ -34,6 +34,7 @@ from da.cosmo.covariances import CO2StateVector from da.cosmo.obspack_globalviewplus2 import ObsPackObservations #from da.cosmo.obs import Obs from da.cosmo.optimizer import CO2Optimizer +#from da.cosmo.observationoperator_parallel import ObservationOperator # does not fully work from da.cosmo.observationoperator import ObservationOperator #from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data #from da.analysis.expand_molefractions import write_mole_fractions