Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
NearRealTimeCTDAS
CTDAS
Commits
262b4c9c
Commit
262b4c9c
authored
Jan 22, 2019
by
brunner
Browse files
No commit message
No commit message
parent
9c2e8960
Changes
4
Hide whitespace changes
Inline
Side-by-side
da/cosmo/observationoperator.py
View file @
262b4c9c
...
...
@@ -55,8 +55,8 @@ class ObservationOperator(object):
self
.
simulated_file
=
os
.
path
.
join
(
self
.
outputdir
,
'samples_simulated.%s.nc'
%
self
.
dacycle
[
'time.sample.stamp'
])
self
.
forecast_nmembers
=
int
(
self
.
dacycle
[
'da.optimizer.nmembers'
])
def
run
(
self
,
lag
,
dacycle
):
def
run
(
self
,
lag
,
dacycle
,
statevector
):
members
=
statevector
.
ensemble_members
[
lag
]
absolute_start_time
=
str
((
to_datetime
(
dacycle
[
'abs.time.start'
])).
strftime
(
'%Y-%m-%d'
))
starth
=
abs
((
to_datetime
(
dacycle
[
'abs.time.start'
])
-
dacycle
[
'time.start'
]).
days
)
*
24
endh
=
abs
((
to_datetime
(
dacycle
[
'abs.time.start'
])
-
dacycle
[
'time.finish'
]).
days
)
*
24
...
...
@@ -126,11 +126,18 @@ class ObservationOperator(object):
for
dt
in
rrule
.
rrule
(
rrule
.
HOURLY
,
dtstart
=
dacycle
[
'time.start'
]
+
timedelta
(
hours
=
24
*
lag
*
int
(
dacycle
[
'time.cycle'
])),
until
=
dacycle
[
'time.start'
]
+
timedelta
(
hours
=
(
lag
+
1
)
*
24
*
int
(
dacycle
[
'time.cycle'
]))):
logging
.
info
(
'Multiplying emissions with parameters for lag %d, date %s'
%
(
lag
,
dt
.
strftime
(
'%Y%m%d%H'
)))
for
ens
in
range
(
0
,
self
.
forecast_nmembers
):
dthh
=
dt
.
strftime
(
'%H'
)
co2_bg
=
str
((
members
[
ens
].
param_values
[
-
1
]))
ens
=
str
(
ens
).
zfill
(
3
)
cdo
.
setunit
(
"'kg m-2 s-1' -expr,GPP_"
+
ens
+
"_F=CO2_GPP_F*parametermap -merge "
+
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'gpp_%s.nc'
%
dt
.
strftime
(
'%Y%m%d%H'
)),
input
=
os
.
path
.
join
(
dacycle
[
'restartmap.dir'
],
"parameters_gpp_lag"
+
str
(
lag
)
+
"."
+
ens
+
".nc"
),
output
=
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'ensemble'
,
"gpp_"
+
ens
+
"_%s.nc"
%
dt
.
strftime
(
'%Y%m%d%H'
)))
cdo
.
setunit
(
"'kg m-2 s-1' -expr,RESP_"
+
ens
+
"_F=CO2_RESP_F*parametermap -merge "
+
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'ra_%s.nc'
%
dt
.
strftime
(
'%Y%m%d%H'
)),
input
=
os
.
path
.
join
(
dacycle
[
'restartmap.dir'
],
"parameters_resp_lag"
+
str
(
lag
)
+
"."
+
ens
+
".nc"
),
output
=
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'ensemble'
,
"ra_"
+
ens
+
"_%s.nc"
%
dt
.
strftime
(
'%Y%m%d%H'
)))
if
dthh
==
'00'
or
dthh
==
'03'
or
dthh
==
'06'
or
dthh
==
'09'
or
dthh
==
'12'
or
dthh
==
'15'
or
dthh
==
'18'
or
dthh
==
'21'
:
cdo
.
setunit
(
"'kg m-2 s-1' -expr,BG_"
+
ens
+
"=CO2_BG*"
+
co2_bg
+
" -merge "
+
os
.
path
.
join
(
dacycle
[
'da.bg.input'
],
'ct_%s.nc'
%
dt
.
strftime
(
'%Y%m%d%H'
)),
output
=
os
.
path
.
join
(
dacycle
[
'da.bg.input'
],
'ensemble'
,
"ct_"
+
ens
+
"_%s.nc"
%
dt
.
strftime
(
'%Y%m%d%H'
)))
cdo
.
merge
(
input
=
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'ensemble'
,
"gpp_???_%s.nc"
%
dt
.
strftime
(
'%Y%m%d%H'
)),
output
=
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'ensemble'
,
"gpp_%s.nc"
%
dt
.
strftime
(
'%Y%m%d%H'
)))
cdo
.
merge
(
input
=
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'ensemble'
,
"ra_???_%s.nc"
%
dt
.
strftime
(
'%Y%m%d%H'
)),
output
=
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'ensemble'
,
"ra_%s.nc"
%
dt
.
strftime
(
'%Y%m%d%H'
)))
if
dthh
==
'00'
or
dthh
==
'03'
or
dthh
==
'06'
or
dthh
==
'09'
or
dthh
==
'12'
or
dthh
==
'15'
or
dthh
==
'18'
or
dthh
==
'21'
:
cdo
.
merge
(
input
=
os
.
path
.
join
(
dacycle
[
'da.bg.input'
],
'ensemble'
,
"ct_???_%s.nc"
%
dt
.
strftime
(
'%Y%m%d%H'
)),
output
=
os
.
path
.
join
(
dacycle
[
'da.bg.input'
],
'ensemble'
,
"ct_%s.nc"
%
dt
.
strftime
(
'%Y%m%d%H'
)))
os
.
chdir
(
dacycle
[
'da.obsoperator.home'
])
...
...
@@ -167,9 +174,9 @@ class ObservationOperator(object):
logging
.
info
(
'ObservationOperator finished successfully, output file written (%s)'
%
self
.
simulated_file
)
def
run_forecast_model
(
self
,
lag
,
dacycle
):
def
run_forecast_model
(
self
,
lag
,
dacycle
,
statevector
):
self
.
prepare_run
()
self
.
run
(
lag
,
dacycle
)
self
.
run
(
lag
,
dacycle
,
statevector
)
def
extract_model_data
(
self
,
dacycle
,
hstart
,
hstop
,
ensnum
):
...
...
@@ -192,7 +199,7 @@ class ObservationOperator(object):
logging
.
info
(
'Extracting output for ens %s, time %s'
%
(
str
(
ens
),
str
(
dt
)))
co2_in_fn
=
cosmo_out
+
'lffd'
+
dt
+
'.nc'
co2_out_fn
=
cosmo_out
+
'CO2_'
+
ens
+
'_'
+
dt
+
'.nc'
cdo
.
expr
(
"'CO2=(
CO2_BG
-GPP_"
+
ens
+
"+RESP_"
+
ens
+
"+CO2_A_CH+CO2_A)/(1.-QV)'"
,
input
=
"-selname,QV,
CO2_BG
,GPP_"
+
ens
+
",RESP_"
+
ens
+
",CO2_A_CH,CO2_A "
+
co2_in_fn
,
output
=
co2_out_fn
)
cdo
.
expr
(
"'CO2=(
BG_"
+
ens
+
"
-GPP_"
+
ens
+
"+RESP_"
+
ens
+
"+CO2_A_CH+CO2_A)/(1.-QV)'"
,
input
=
"-selname,QV,
BG_"
+
ens
+
"
,GPP_"
+
ens
+
",RESP_"
+
ens
+
",CO2_A_CH,CO2_A "
+
co2_in_fn
,
output
=
co2_out_fn
)
files2cat
.
append
(
co2_out_fn
)
cdo
.
cat
(
input
=
files2cat
,
output
=
cosmo_out
+
"CO2_"
+
ens
+
"_"
+
time_stamp
+
".nc"
)
...
...
da/cosmo/pipeline.py
View file @
262b4c9c
...
...
@@ -336,7 +336,7 @@ def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False):
# Run the observation operator
obsoperator
.
run_forecast_model
(
lag
,
dacycle
)
obsoperator
.
run_forecast_model
(
lag
,
dacycle
,
statevector
)
# Read forecast model samples that were written to NetCDF files by each member. Add them to the exisiting
...
...
da/cosmo/statevector.py
View file @
262b4c9c
...
...
@@ -150,7 +150,7 @@ class StateVector(object):
self
.
nlag
=
int
(
dacycle
[
'time.nlag'
])
self
.
nmembers
=
int
(
dacycle
[
'da.optimizer.nmembers'
])
# self.nparams = int(dacycle.dasystem['nparameters'])
self
.
nparams
=
2
2
self
.
nparams
=
2
3
self
.
nobs
=
0
self
.
obs_to_assimilate
=
()
# empty containter to hold observations to assimilate later on
...
...
@@ -182,7 +182,7 @@ class StateVector(object):
# Create a dictionary for state <-> gridded map conversions
nparams
=
2
2
nparams
=
2
3
#nparams = self.gridmap.max()
self
.
griddict
=
{}
for
r
in
range
(
1
,
11
):
...
...
@@ -267,7 +267,7 @@ class StateVector(object):
except
:
s
=
np
.
linalg
.
svd
(
covariancematrix
,
full_matrices
=
1
,
compute_uv
=
0
)
#Cartesius fix
dof
=
np
.
sum
(
s
)
**
2
/
sum
(
s
**
2
)
# covariancematrix = np.identity(self.nparams) # pavle - for testing, final matrix will be 2
2
x2
2
# covariancematrix = np.identity(self.nparams) # pavle - for testing, final matrix will be 2
3
x2
3
C
=
np
.
linalg
.
cholesky
(
covariancematrix
)
logging
.
debug
(
'Cholesky decomposition has succeeded '
)
...
...
@@ -291,9 +291,9 @@ class StateVector(object):
self
.
ensemble_members
[
lag
].
append
(
newmember
)
# Create members 1:nmembers and add to ensemble_members list
for
member
in
range
(
1
,
self
.
nmembers
):
rands
=
np
.
random
.
uniform
(
low
=-
1.
,
high
=
1.
,
size
=
self
.
nparams
)
rands
=
np
.
random
.
uniform
(
low
=-
1.
,
high
=
1.
,
size
=
self
.
nparams
-
1
)
rands_bg
=
np
.
random
.
uniform
(
low
=-
0.05
.,
high
=
0.05
.,
size
=
1
)
# if (member==1):
# rands = np.array([1.836451,1.91572,1.719371,1.947495,1.788597,1.109507,0.9401712,2,0.003201536,1.2276,0])-1.
# elif (member==2):
...
...
@@ -303,11 +303,9 @@ class StateVector(object):
newmember
=
EnsembleMember
(
member
)
# newmember.param_values = np.concatenate([np.dot(C, rands[0:self.nparams/2]),np.dot(C, rands[self.nparams/2:self.nparams])]) + newmean
newmember
.
param_values
=
np
.
concatenate
([
np
.
dot
(
C
,
rands
[
0
:
11
]),
np
.
dot
(
C
,
rands
[
11
:
22
])])
+
newmean
#newmember.param_values = np.dot(C, rands) + newmean
newmember
.
param_values
=
(
np
.
hstack
((
np
.
dot
(
C
,
rands
[
0
:
11
]),
np
.
dot
(
C
,
rands
[
11
:
22
]),
rands_bg
))
+
newmean
).
ravel
()
newmember
.
param_values
[
10
]
=
0.
newmember
.
param_values
[
21
]
=
0.
# newmember.param_values[-1] = 0
newmember
.
param_values
[
newmember
.
param_values
<
0.
]
=
0.
self
.
ensemble_members
[
lag
].
append
(
newmember
)
...
...
@@ -501,11 +499,12 @@ class StateVector(object):
# GPP
filename_gpp
=
os
.
path
.
join
(
outdir
,
'parameters_gpp_lag'
+
str
(
lag
)
+
'.%03d%s'
%
(
mem
.
membernumber
,
endswith
))
ncf
=
io
.
CT_CDF
(
filename_gpp
,
method
=
'create'
)
dimparams
=
ncf
.
add_params_dim
(
self
.
nparams
//
2
)
#dimparams = ncf.add_params_dim(self.nparams)
dimparams
=
ncf
.
add_params_dim
(
11
)
#dimparams = ncf.add_params_dim(
(
self.nparams
-1)//2
)
dimgrid
=
ncf
.
add_latlon_dim
()
data
=
mem
.
param_values
[
0
:
self
.
nparams
//
2
]
data
=
mem
.
param_values
[
0
:
11
]
#data = mem.param_values[0:(self.nparams-1)//2]
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'name'
]
=
"parametervalues"
...
...
@@ -531,10 +530,12 @@ class StateVector(object):
# RESP
filename_resp
=
os
.
path
.
join
(
outdir
,
'parameters_resp_lag'
+
str
(
lag
)
+
'.%03d%s'
%
(
mem
.
membernumber
,
endswith
))
ncf
=
io
.
CT_CDF
(
filename_resp
,
method
=
'create'
)
dimparams
=
ncf
.
add_params_dim
(
self
.
nparams
//
2
)
dimparams
=
ncf
.
add_params_dim
(
11
)
#dimparams = ncf.add_params_dim((self.nparams-1)//2)
dimgrid
=
ncf
.
add_latlon_dim
()
data
=
mem
.
param_values
[
self
.
nparams
//
2
:
self
.
nparams
]
data
=
mem
.
param_values
[
11
:
22
]
#data = mem.param_values[(self.nparams-1)//2:self.nparams-1]
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'name'
]
=
"parametervalues"
...
...
@@ -558,6 +559,38 @@ class StateVector(object):
ncf
.
close
()
# BACKGROUND CO2
# filename_bg = os.path.join(outdir, 'parameters_bg_lag'+str(lag)+'.%03d%s' % (mem.membernumber, endswith))
# ncf = io.CT_CDF(filename_bg, method='create')
# dimparams = ncf.add_params_dim(11)
# dimgrid = ncf.add_latlon_dim()
#
# data = mem.param_values[-1]
# data = np.hstack((data, data, data, data, data, data, data, data, data, data, data)).ravel()
#
# savedict = io.std_savedict.copy()
# savedict['name'] = "parametervalues"
# savedict['long_name'] = "parameter_values_for_member_%d" % mem.membernumber
# savedict['units'] = "unitless"
# savedict['dims'] = dimparams
# savedict['values'] = data
# savedict['comment'] = 'These are parameter values to use for member %d' % mem.membernumber
# ncf.add_data(savedict)
#
# griddata = self.vector2grid(vectordata=data)
#
# savedict = io.std_savedict.copy()
# savedict['name'] = "parametermap"
# savedict['long_name'] = "parametermap_for_member_%d" % mem.membernumber
# savedict['units'] = "unitless"
# savedict['dims'] = dimgrid
# savedict['values'] = griddata.tolist()
# savedict['comment'] = 'These are gridded parameter values to use for member %d' % mem.membernumber
# ncf.add_data(savedict)
#
# ncf.close()
#
#
def
grid2vector
(
self
,
griddata
=
None
,
method
=
'avg'
):
# not used --pavle
"""
...
...
template.rc
View file @
262b4c9c
...
...
@@ -29,19 +29,19 @@
! The time for which to start and end the data assimilation experiment in format YYYY-MM-DD HH:MM:SS
! the following 3 lines are for initial start
!
time.start : 2013-04-01 00:00:00
!
time.finish : 2013-04-07 23:00:00
!
time.end : 2013-04-07 23:00:00
time.start : 2013-04-01 00:00:00
time.finish : 2013-04-07 23:00:00
time.end : 2013-04-07 23:00:00
time.start : 2013-04-08 00:00:00
time.finish : 2013-04-14 23:00:00
time.end : 2013-04-14 23:00:00
!
time.start : 2013-04-08 00:00:00
!
time.finish : 2013-04-14 23:00:00
!
time.end : 2013-04-14 23:00:00
abs.time.start : 2013-04-01 00:00:00
! Whether to restart the CTDAS system from a previous cycle, or to start the sequence fresh. Valid entries are T/F/True/False/TRUE/FALSE
time.restart :
T
time.restart :
F
da.restart.tstamp : 2013-04-01 00:00:00
! The length of a cycle is given in days, such that the integer 7 denotes the typically used weekly cycle. Valid entries are integers > 1
...
...
@@ -104,6 +104,7 @@ da.obsoperator : cosmo
da.obsoperator.home : /store/empa/em05/parsenov/cosmo_processing_chain
da.bio.input : /store/empa/em05/parsenov/cosmo_input/vprm/processed
da.bg.input : /store/empa/em05/parsenov/cosmo_input/icbc/processed
da.obsoperator.rc : ${da.obsoperator.home}/tm5-ctdas-ei-zoom.rc
!forward.savestate.exceptsam : TRUE
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment