Commit f50ec4c5 authored by brunner's avatar brunner
Browse files

No commit message

No commit message
parent e2099517
......@@ -12,6 +12,8 @@ from datetime import datetime, timedelta
from dateutil import rrule
from cdo import *
from . import site_height
from itertools import repeat
from multiprocessing import Pool
identifier = 'ObservationOperator'
version = '10'
......@@ -117,26 +119,43 @@ class ObservationOperator(object):
# for ncfile in ncfilelist:
# infile = os.path.join(ncfile + '.nc')
hour_time_stamp = ("0_168","168_336","336_504")
for dt in rrule.rrule(rrule.HOURLY, dtstart=dacycle['time.start'], until=dacycle['time.start']+timedelta(hours=504)):
for ens in range(0,self.forecast_nmembers):
ens = str(ens).zfill(3)
cdo.setunit("'kg m-2 s-1' -expr,GPP_"+ens+"_F=CO2_GPP_F*parametermap -merge "+os.path.join(dacycle['da.bio.input'], 'gpp_%s.nc' % dt.strftime('%Y%m%d%H')), input = os.path.join(dacycle['dir.da_run'],"input/parameters."+ens+".nc"), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_"+ens+"_%s.nc" % dt.strftime('%Y%m%d%H')))
cdo.setunit("'kg m-2 s-1' -expr,RESP_"+ens+"_F=CO2_RESP_F*parametermap -merge "+os.path.join(dacycle['da.bio.input'], 'ra_%s.nc' % dt.strftime('%Y%m%d%H')), input = os.path.join(dacycle['dir.da_run'],"input/parameters."+ens+".nc"), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_"+ens+"_%s.nc" % dt.strftime('%Y%m%d%H')))
cdo.merge(input = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_???_%s.nc" % dt.strftime('%Y%m%d%H')), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_%s.nc" % dt.strftime('%Y%m%d%H')))
cdo.merge(input = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_???_%s.nc" % dt.strftime('%Y%m%d%H')), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_%s.nc" % dt.strftime('%Y%m%d%H')))
os.chdir(dacycle['da.obsoperator.home'])
os.system('python run_chain.py ctdas '+dacycle['time.start'].strftime('%Y-%m-%d')+' 0 504 -j meteo icbc emissions biofluxes int2lm post_int2lm cosmo')
os.chdir(dacycle['dir.da_run'])
for h,hts in enumerate(hour_time_stamp):
self.extract_model_data(dacycle,hts,self.forecast_nmembers) # hts is hourly time stamp
hour_time_stamp = ("0" "336","504")
# hour_time_stamp = ("0_168","168_336","336_504")
# UNCOMMENT FROM HERE
# for dt in rrule.rrule(rrule.HOURLY, dtstart=dacycle['time.start'], until=dacycle['time.start']+timedelta(hours=504)):
# for ens in range(0,self.forecast_nmembers):
# ens = str(ens).zfill(3)
# cdo.setunit("'kg m-2 s-1' -expr,GPP_"+ens+"_F=CO2_GPP_F*parametermap -merge "+os.path.join(dacycle['da.bio.input'], 'gpp_%s.nc' % dt.strftime('%Y%m%d%H')), input = os.path.join(dacycle['dir.da_run'],"input/parameters."+ens+".nc"), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_"+ens+"_%s.nc" % dt.strftime('%Y%m%d%H')))
# cdo.setunit("'kg m-2 s-1' -expr,RESP_"+ens+"_F=CO2_RESP_F*parametermap -merge "+os.path.join(dacycle['da.bio.input'], 'ra_%s.nc' % dt.strftime('%Y%m%d%H')), input = os.path.join(dacycle['dir.da_run'],"input/parameters."+ens+".nc"), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_"+ens+"_%s.nc" % dt.strftime('%Y%m%d%H')))
# cdo.merge(input = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_???_%s.nc" % dt.strftime('%Y%m%d%H')), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_%s.nc" % dt.strftime('%Y%m%d%H')))
# cdo.merge(input = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_???_%s.nc" % dt.strftime('%Y%m%d%H')), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_%s.nc" % dt.strftime('%Y%m%d%H')))
#os.chdir(dacycle['da.obsoperator.home'])
# os.system('python run_chain.py ctdas '+dacycle['time.start'].strftime('%Y-%m-%d')+' 0 504 -j meteo icbc emissions biofluxes int2lm post_int2lm cosmo')
# os.chdir(dacycle['dir.da_run'])
# for h,hts in enumerate(hour_time_stamp):
# self.extract_model_data(dacycle,0,167,self.forecast_nmembers)
# self.extract_model_data(dacycle,168,335,self.forecast_nmembers)
# self.extract_model_data(dacycle,336,503,self.forecast_nmembers)
args = [
(dacycle, hstart, hstop, self.forecast_nmembers)
for dacycle, (hstart, hstop), self.forecast_nmembers
in zip(repeat(dacycle),
[(0,167),(168,335),(336,503)],
repeat(self.forecast_nmembers))
]
with Pool(3) as pool:
pool.starmap(self.extract_model_data, args)
for i in range(0,self.forecast_nmembers):
idx = str(i+1).zfill(3)
idx = str(i).zfill(3)
cosmo_file = os.path.join('/store/empa/em05/parsenov/cosmo_data/model_'+idx+'_%s.nc' % dacycle['time.sample.stamp'])
ifile = Dataset(cosmo_file, mode='r')
model_data[i,:] = (np.squeeze(ifile.variables['CO2'][:])*29./44.)*1E6 # in ppm
model_data[i,:] = (np.squeeze(ifile.variables['CO2'][:])*29./44.01)*1E6 # in ppm
ifile.close()
for j,data in enumerate(zip(ids,obs,mdm)):
......@@ -153,14 +172,14 @@ class ObservationOperator(object):
self.prepare_run()
self.run(dacycle)
def extract_model_data(self,dacycle,hts,ensnum):
def extract_model_data(self,dacycle,hstart,hstop,ensnum):
time_stamp = dacycle['time.sample.stamp']
time_stamp = str((dacycle['time.start']+timedelta(hours=hstart)).strftime('%Y%m%d%H'))+'_'+str((dacycle['time.start']+timedelta(hours=hstop)).strftime('%Y%m%d%H'))
sites = ("lhw","brm","jfj","ssl")
self.dacycle = dacycle
cosmo_time_stamp = dacycle['time.start'].strftime('%Y%m%d%H') #+timedelta(hours=168)
cosmo_out = "/scratch/snx3000/parsenov/ctdas/"+cosmo_time_stamp+"_"+hts+"/cosmo/output/"
cosmo_start = dacycle['time.start'].strftime('%Y%m%d%H') #+timedelta(hours=168)
cosmo_out = "/scratch/snx3000/parsenov/ctdas/"+cosmo_start+"_"+str(hstart)+"_"+str(hstop+1)+"/cosmo/output/"
cosmo_save = "/store/empa/em05/parsenov/cosmo_data/"
hhl_fn = cosmo_out+'lffd'+dacycle['time.start'].strftime('%Y%m%d%H')+'c.nc'
cdo.selname("HHL", input = hhl_fn, output = cosmo_out+"hhl.nc")
......@@ -168,11 +187,12 @@ class ObservationOperator(object):
for ens in range(0,ensnum):
ens = str(ens).zfill(3)
files2cat=[]
for dt in rrule.rrule(rrule.HOURLY, dtstart=dacycle['time.start'], until=dacycle['time.start']+timedelta(hours=168)):
for dt in rrule.rrule(rrule.HOURLY, dtstart=dacycle['time.start']+timedelta(hours=hstart), until=dacycle['time.start']+timedelta(hours=hstop)):
dt=dt.strftime('%Y%m%d%H')
logging.info('Extracting output for ens %s, time %s' % (str(ens),str(dt)))
co2_in_fn = cosmo_out+'lffd'+dt+'.nc'
co2_out_fn = cosmo_out+'CO2_'+ens+'_'+dt+'.nc'
cdo.expr("CO2=(CO2_BG-GPP_"+ens+"+RESP_"+ens+"+CO2_A_CH+CO2_A)/(1.-QV)", input = "-selname,QV,CO2_BG,GPP_"+ens+",RESP_"+ens+",CO2_A_CH,CO2_A "+co2_in_fn, output = co2_out_fn)
cdo.expr("'CO2=(CO2_BG-GPP_"+ens+"+RESP_"+ens+"+CO2_A_CH+CO2_A)/(1.-QV)'", input = "-selname,QV,CO2_BG,GPP_"+ens+",RESP_"+ens+",CO2_A_CH,CO2_A "+co2_in_fn, output = co2_out_fn)
files2cat.append(co2_out_fn)
cdo.cat(input = files2cat, output = cosmo_out+"CO2_"+ens+"_"+time_stamp+".nc")
......@@ -198,7 +218,6 @@ class ObservationOperator(object):
cdo.intlevel("1205", input = cosmo_out+"CO2_60lev_"+ens+"_ssl_"+time_stamp+".nc", output = cosmo_out+"modelled_"+ens+"_ssl_"+time_stamp+".nc")
cdo.cat(input = cosmo_out+"modelled_"+ens+"_brm_"+time_stamp+".nc "+cosmo_out+"modelled_"+ens+"_jfj_"+time_stamp+".nc "+cosmo_out+"modelled_"+ens+"_lhw_"+time_stamp+".nc "+cosmo_out+"modelled_"+ens+"_ssl_"+time_stamp+".nc ", output = cosmo_save+"model_"+ens+"_"+time_stamp+".nc")
# sys.exit()
################### End Class ObservationOperator ###################
......
......@@ -130,8 +130,8 @@ class ObsPackObservations(Observations):
alts = ncf.get_variable('altitude').take(subselect, axis=0)
obs = ncf.get_variable('value').take(subselect, axis=0)
if (obs.max()<1.): # pavle fix this: if units are mol mol-1
obs = obs*1E6
# if (obs.max()<1.): # pavle fix this: if units are mol mol-1
# obs = obs*1E6
species = 'co2'
# species = ncf.get_attribute('dataset_parameter')
# flags = ncf.get_variable('obs_flag').take(subselect, axis=0)
......
......@@ -288,8 +288,14 @@ class StateVector(object):
# Create members 1:nmembers and add to ensemble_members list
for member in range(1, self.nmembers):
rands = np.random.randn(self.nparams)
rands = np.clip(rands,-1,1)
# rands = np.random.randn(self.nparams)
# rands = np.clip(rands,-1,1)
if (member==1):
rands = np.array([1.836451,1.91572,1.719371,1.947495,1.788597,1.109507,0.9401712,2,0.003201536,1.2276,0])-1.
elif (member==2):
rands = np.array([2,0.04773442,1.809479,1.136452,2,0.8753765,1.488298,2,1.908293,0.02784704,0])-1.
elif (member==3):
rands = np.array([0.9136019,1.386204,2,0.1250395,1.815127,2,0.3002012,1.666945,0,0,0])-1.
newmember = EnsembleMember(member)
newmember.param_values = np.dot(C, rands) + newmean
......
......@@ -29,7 +29,8 @@
! The time for which to start and end the data assimilation experiment in format YYYY-MM-DD HH:MM:SS
time.start : 2013-04-01 00:00:00
time.finish : 2013-05-01 00:00:00
time.finish : 2013-04-07 23:00:00
!time.finish : 2013-05-01 00:00:00
! Whether to restart the CTDAS system from a previous cycle, or to start the sequence fresh. Valid entries are T/F/True/False/TRUE/FALSE
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment