Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
NearRealTimeCTDAS
CTDAS
Commits
d44f0391
Commit
d44f0391
authored
Dec 12, 2018
by
brunner
Browse files
No commit message
No commit message
parent
0b986a1e
Changes
4
Hide whitespace changes
Inline
Side-by-side
da/cosmo/base_statevector.py
View file @
d44f0391
...
...
@@ -287,21 +287,21 @@ class StateVector(object):
# Create members 1:nmembers and add to ensemble_members list
for
member
in
range
(
1
,
self
.
nmembers
):
#
rands = np.random.uniform(low=-1., high=1., size=self.nparams)
rands
=
np
.
random
.
uniform
(
low
=-
1.
,
high
=
1.
,
size
=
self
.
nparams
)
# rands = np.random.randn(self.nparams)
# rands = np.clip(rands,-1,1)
if
(
member
==
1
):
rands
=
np
.
array
([
1.836451
,
1.91572
,
1.719371
,
1.947495
,
1.788597
,
1.109507
,
0.9401712
,
2
,
0.003201536
,
1.2276
,
0
])
-
1.
elif
(
member
==
2
):
rands
=
np
.
array
([
2
,
0.04773442
,
1.809479
,
1.136452
,
2
,
0.8753765
,
1.488298
,
2
,
1.908293
,
0.02784704
,
0
])
-
1.
elif
(
member
==
3
):
rands
=
np
.
array
([
0.9136019
,
1.386204
,
2
,
0.1250395
,
1.815127
,
2
,
0.3002012
,
1.666945
,
0
,
0
,
0
])
-
1.
#
if (member==1):
#
rands = np.array([1.836451,1.91572,1.719371,1.947495,1.788597,1.109507,0.9401712,2,0.003201536,1.2276,0])-1.
#
elif (member==2):
#
rands = np.array([2,0.04773442,1.809479,1.136452,2,0.8753765,1.488298,2,1.908293,0.02784704,0])-1.
#
elif (member==3):
#
rands = np.array([0.9136019,1.386204,2,0.1250395,1.815127,2,0.3002012,1.666945,0,0,0])-1.
newmember
=
EnsembleMember
(
member
)
newmember
.
param_values
=
np
.
dot
(
C
,
rands
)
+
newmean
newmember
.
param_values
[
-
1
]
=
0
self
.
ensemble_members
[
lag
].
append
(
newmember
)
newmember
.
param_values
[
newmember
.
param_values
<
0.
]
=
0.
self
.
ensemble_members
[
lag
].
append
(
newmember
)
logging
.
debug
(
'%d new ensemble members were added to the state vector # %d'
%
(
self
.
nmembers
,
(
lag
+
1
)))
...
...
@@ -452,7 +452,8 @@ class StateVector(object):
members
=
self
.
ensemble_members
[
lag
]
for
mem
in
members
:
filename
=
os
.
path
.
join
(
outdir
,
'parameters_'
+
str
(
lag
)
+
'.%03d%s'
%
(
mem
.
membernumber
,
endswith
))
# filename = os.path.join(outdir, 'parameters.%03d%s' % (mem.membernumber, endswith))
filename
=
os
.
path
.
join
(
outdir
,
'parameters_lag'
+
str
(
lag
)
+
'.%03d%s'
%
(
mem
.
membernumber
,
endswith
))
ncf
=
io
.
CT_CDF
(
filename
,
method
=
'create'
)
dimparams
=
ncf
.
add_params_dim
(
self
.
nparams
)
dimgrid
=
ncf
.
add_latlon_dim
()
...
...
da/cosmo/observationoperator.py
View file @
d44f0391
...
...
@@ -14,6 +14,7 @@ from cdo import *
from
.
import
site_height
from
itertools
import
repeat
from
multiprocessing
import
Pool
from
da.tools.general
import
to_datetime
identifier
=
'ObservationOperator'
version
=
'10'
...
...
@@ -54,9 +55,9 @@ class ObservationOperator(object):
self
.
simulated_file
=
os
.
path
.
join
(
self
.
outputdir
,
'samples_simulated.%s.nc'
%
self
.
dacycle
[
'time.sample.stamp'
])
self
.
forecast_nmembers
=
int
(
self
.
dacycle
[
'da.optimizer.nmembers'
])
def
run
(
self
,
dacycle
):
def
run
(
self
,
lag
,
dacycle
):
absolute_start_time
=
str
((
to_datetime
(
dacycle
[
'abs.time.start'
])).
strftime
(
'%Y-%m-%d'
))
f
=
io
.
CT_CDF
(
self
.
simulated_file
,
method
=
'create'
)
logging
.
debug
(
'Creating new simulated observation file in ObservationOperator (%s)'
%
self
.
simulated_file
)
...
...
@@ -94,8 +95,6 @@ class ObservationOperator(object):
f_in
.
close
()
# Loop over observations, add random white noise, and write to file
shape
=
(
self
.
forecast_nmembers
,
mdm
.
size
)
model_data
=
np
.
empty
(
shape
=
shape
)
# 3x7
...
...
@@ -120,35 +119,36 @@ class ObservationOperator(object):
# infile = os.path.join(ncfile + '.nc')
hour_time_stamp
=
(
"0"
"336"
,
"504"
)
# hour_time_stamp = ("0_168","168_336","336_504")
# UNCOMMENT FROM HERE
# for dt in rrule.rrule(rrule.HOURLY, dtstart=dacycle['time.start'], until=dacycle['time.start']+timedelta(hours=504)):
# for ens in range(0,self.forecast_nmembers):
# ens = str(ens).zfill(3)
# cdo.setunit("'kg m-2 s-1' -expr,GPP_"+ens+"_F=CO2_GPP_F*parametermap -merge "+os.path.join(dacycle['da.bio.input'], 'gpp_%s.nc' % dt.strftime('%Y%m%d%H')), input = os.path.join(dacycle['dir.da_run'],"input/parameters."+ens+".nc"), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_"+ens+"_%s.nc" % dt.strftime('%Y%m%d%H')))
# cdo.setunit("'kg m-2 s-1' -expr,RESP_"+ens+"_F=CO2_RESP_F*parametermap -merge "+os.path.join(dacycle['da.bio.input'], 'ra_%s.nc' % dt.strftime('%Y%m%d%H')), input = os.path.join(dacycle['dir.da_run'],"input/parameters."+ens+".nc"), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_"+ens+"_%s.nc" % dt.strftime('%Y%m%d%H')))
# cdo.merge(input = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_???_%s.nc" % dt.strftime('%Y%m%d%H')), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "gpp_%s.nc" % dt.strftime('%Y%m%d%H')))
# cdo.merge(input = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_???_%s.nc" % dt.strftime('%Y%m%d%H')), output = os.path.join(dacycle['da.bio.input'], 'ensemble', "ra_%s.nc" % dt.strftime('%Y%m%d%H')))
for
dt
in
rrule
.
rrule
(
rrule
.
HOURLY
,
dtstart
=
dacycle
[
'time.start'
]
+
timedelta
(
hours
=
lag
*
168
),
until
=
dacycle
[
'time.start'
]
+
timedelta
(
hours
=
lag
*
168
+
168
)):
for
ens
in
range
(
0
,
self
.
forecast_nmembers
):
ens
=
str
(
ens
).
zfill
(
3
)
cdo
.
setunit
(
"'kg m-2 s-1' -expr,GPP_"
+
ens
+
"_F=CO2_GPP_F*parametermap -merge "
+
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'gpp_%s.nc'
%
dt
.
strftime
(
'%Y%m%d%H'
)),
input
=
os
.
path
.
join
(
dacycle
[
'restartmap.dir'
],
"parameters_lag"
+
str
(
lag
)
+
"."
+
ens
+
".nc"
),
output
=
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'ensemble'
,
"gpp_"
+
ens
+
"_%s.nc"
%
dt
.
strftime
(
'%Y%m%d%H'
)))
cdo
.
setunit
(
"'kg m-2 s-1' -expr,RESP_"
+
ens
+
"_F=CO2_RESP_F*parametermap -merge "
+
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'ra_%s.nc'
%
dt
.
strftime
(
'%Y%m%d%H'
)),
input
=
os
.
path
.
join
(
dacycle
[
'restartmap.dir'
],
"parameters_lag"
+
str
(
lag
)
+
"."
+
ens
+
".nc"
),
output
=
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'ensemble'
,
"ra_"
+
ens
+
"_%s.nc"
%
dt
.
strftime
(
'%Y%m%d%H'
)))
cdo
.
merge
(
input
=
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'ensemble'
,
"gpp_???_%s.nc"
%
dt
.
strftime
(
'%Y%m%d%H'
)),
output
=
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'ensemble'
,
"gpp_%s.nc"
%
dt
.
strftime
(
'%Y%m%d%H'
)))
cdo
.
merge
(
input
=
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'ensemble'
,
"ra_???_%s.nc"
%
dt
.
strftime
(
'%Y%m%d%H'
)),
output
=
os
.
path
.
join
(
dacycle
[
'da.bio.input'
],
'ensemble'
,
"ra_%s.nc"
%
dt
.
strftime
(
'%Y%m%d%H'
)))
os
.
chdir
(
dacycle
[
'da.obsoperator.home'
])
# cycle_start = dacycle['abs.time.start']-dacycle['time.start']
# print(cycle_start)
# sys.exit()
#
os.system('python run_chain.py ctdas '+
dacycle['abs.time.start'].strftime('%Y-%m-%d')+' 0 504
-j meteo icbc emissions biofluxes int2lm post_int2lm cosmo')
os
.
system
(
'python run_chain.py ctdas '
+
absolute_start_time
+
' '
+
str
(
lag
*
168
)
+
' '
+
str
(
lag
*
168
+
168
)
+
'
-j meteo icbc emissions biofluxes int2lm post_int2lm cosmo'
)
os
.
chdir
(
dacycle
[
'dir.da_run'
])
args
=
[
(
dacycle
,
hstart
,
hstop
,
self
.
forecast_nmembers
)
for
dacycle
,
(
hstart
,
hstop
),
self
.
forecast_nmembers
in
zip
(
repeat
(
dacycle
),
[(
0
,
167
),(
168
,
335
),(
336
,
503
)],
[(
168
*
lag
,
168
*
lag
+
167
)],
# [(0,167),(168,335),(336,503)],
repeat
(
self
.
forecast_nmembers
))
]
#
with Pool(3) as pool:
#
pool.starmap(self.extract_model_data, args)
with
Pool
(
3
)
as
pool
:
pool
.
starmap
(
self
.
extract_model_data
,
args
)
for
i
in
range
(
0
,
self
.
forecast_nmembers
):
idx
=
str
(
i
).
zfill
(
3
)
...
...
@@ -167,15 +167,15 @@ class ObservationOperator(object):
logging
.
info
(
'ObservationOperator finished successfully, output file written (%s)'
%
self
.
simulated_file
)
def
run_forecast_model
(
self
,
dacycle
):
def
run_forecast_model
(
self
,
lag
,
dacycle
):
self
.
prepare_run
()
self
.
run
(
dacycle
)
self
.
run
(
lag
,
dacycle
)
def
extract_model_data
(
self
,
dacycle
,
hstart
,
hstop
,
ensnum
):
self
.
dacycle
=
dacycle
time_stamp
=
str
((
dacycle
[
'time.start'
]
+
timedelta
(
hours
=
hstart
)).
strftime
(
'%Y%m%d%H'
))
+
'_'
+
str
((
dacycle
[
'time.start'
]
+
timedelta
(
hours
=
hstop
)).
strftime
(
'%Y%m%d%H'
))
self
.
dacycle
=
dacycle
cosmo_start
=
dacycle
[
'time.start'
].
strftime
(
'%Y%m%d%H'
)
#+timedelta(hours=168)
cosmo_out
=
"/scratch/snx3000/parsenov/ctdas/"
+
cosmo_start
+
"_"
+
str
(
hstart
)
+
"_"
+
str
(
hstop
+
1
)
+
"/cosmo/output/"
cosmo_save
=
"/store/empa/em05/parsenov/cosmo_data/"
...
...
da/cosmo/pipeline.py
View file @
d44f0391
...
...
@@ -267,7 +267,7 @@ def prepare_state(dacycle, statevector):
current_sv
=
os
.
path
.
join
(
dacycle
[
'dir.restart'
],
'savestate_%s.nc'
%
dacycle
[
'time.start'
].
strftime
(
'%Y%m%d'
))
statevector
.
write_to_file
(
current_sv
,
'prior'
)
# write prior info
nlag
=
int
(
dacycle
[
'time.nlag'
])
for
l
in
range
(
0
,
nlag
-
1
):
# pavle added from here
for
l
in
range
(
0
,
nlag
):
# pavle added from here
statevector
.
write_members_to_file
(
l
,
dacycle
[
'restartmap.dir'
])
def
sample_state
(
dacycle
,
samples
,
statevector
,
obsoperator
):
...
...
@@ -334,7 +334,7 @@ def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False):
# Run the observation operator
obsoperator
.
run_forecast_model
(
dacycle
)
obsoperator
.
run_forecast_model
(
lag
,
dacycle
)
# Read forecast model samples that were written to NetCDF files by each member. Add them to the exisiting
...
...
template.rc
View file @
d44f0391
...
...
@@ -50,7 +50,8 @@ time.nlag : 2
! '/' will be replaced through the start_ctdas.sh script by a user-specified folder name. DO NOT REPLACE
dir.da_run : /scratch/snx3000/parsenov/projname
restartmap.dir : /scratch/snx3000/parsenov/projname/restart/maps
restartmap.dir : /scratch/snx3000/parsenov/projname/restart
!restartmap.dir : /scratch/snx3000/parsenov/projname/restart/maps
! The resources used to complete the data assimilation experiment. This depends on your computing platform.
! The number of cycles per job denotes how many cycles should be completed before starting a new process or job, this
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment