Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
NearRealTimeCTDAS
CTDAS
Commits
f6935b36
Commit
f6935b36
authored
Jun 07, 2013
by
karolina
Browse files
further cleanup of names: object names to lowercase
parent
bea92ced
Changes
23
Expand all
Hide whitespace changes
Inline
Side-by-side
gridded/da/analysis/expand_fluxes.py
View file @
f6935b36
This diff is collapsed.
Click to expand it.
gridded/da/analysis/expand_mixingratios.py
View file @
f6935b36
...
...
@@ -23,7 +23,7 @@ File created on 11 May 2012.
"""
def
write_mixing_ratios
(
DaC
ycle
):
def
write_mixing_ratios
(
dac
ycle
):
"""
Write Sample information to NetCDF files. These files are organized by site and
...
...
@@ -40,23 +40,23 @@ def write_mixing_ratios(DaCycle):
"""
dirname
=
create_dirs
(
os
.
path
.
join
(
DaC
ycle
[
'dir.analysis'
],
'data_molefractions'
))
dirname
=
create_dirs
(
os
.
path
.
join
(
dac
ycle
[
'dir.analysis'
],
'data_molefractions'
))
#
# Some help variables
#
dectime0
=
date2num
(
datetime
(
2000
,
1
,
1
))
dt
=
DaC
ycle
[
'cyclelength'
]
startdate
=
DaC
ycle
[
'time.start'
]
enddate
=
DaC
ycle
[
'time.end'
]
dt
=
dac
ycle
[
'cyclelength'
]
startdate
=
dac
ycle
[
'time.start'
]
enddate
=
dac
ycle
[
'time.end'
]
logging
.
debug
(
"DA Cycle start date is %s"
%
startdate
.
strftime
(
'%Y-%m-%d %H:%M'
))
logging
.
debug
(
"DA Cycle end date is %s"
%
enddate
.
strftime
(
'%Y-%m-%d %H:%M'
))
DaC
ycle
[
'time.sample.stamp'
]
=
"%s_%s"
%
(
startdate
.
strftime
(
"%Y%m%d%H"
),
enddate
.
strftime
(
"%Y%m%d%H"
),)
dac
ycle
[
'time.sample.stamp'
]
=
"%s_%s"
%
(
startdate
.
strftime
(
"%Y%m%d%H"
),
enddate
.
strftime
(
"%Y%m%d%H"
),)
# Step (1): Get the posterior sample output data file for this cycle
infile
=
os
.
path
.
join
(
DaC
ycle
[
'dir.output'
],
'sampleinfo_%s__newstyle.nc'
%
DaC
ycle
[
'time.sample.stamp'
])
infile
=
os
.
path
.
join
(
dac
ycle
[
'dir.output'
],
'sampleinfo_%s__newstyle.nc'
%
dac
ycle
[
'time.sample.stamp'
])
ncf_in
=
io
.
CT_Read
(
infile
,
'read'
)
...
...
@@ -72,7 +72,7 @@ def write_mixing_ratios(DaCycle):
# Step (2): Get the prior sample output data file for this cycle
infile
=
os
.
path
.
join
(
DaC
ycle
[
'dir.output'
],
'optimizer.%s.nc'
%
startdate
.
strftime
(
'%Y%m%d'
))
infile
=
os
.
path
.
join
(
dac
ycle
[
'dir.output'
],
'optimizer.%s.nc'
%
startdate
.
strftime
(
'%Y%m%d'
))
if
os
.
path
.
exists
(
infile
):
optimized_present
=
True
...
...
@@ -88,13 +88,13 @@ def write_mixing_ratios(DaCycle):
fc_simulated
=
ncf_fc_in
.
get_variable
(
'modelsamplesmean_prior'
)
fc_simulated_ens
=
ncf_fc_in
.
get_variable
(
'modelsamplesdeviations_prior'
)
fc_flag
=
ncf_fc_in
.
get_variable
(
'flag'
)
if
not
DaC
ycle
.
DaS
ystem
.
has_key
(
'opt.algorithm'
):
if
not
dac
ycle
.
das
ystem
.
has_key
(
'opt.algorithm'
):
fc_r
=
ncf_fc_in
.
get_variable
(
'modeldatamismatchvariance'
)
fc_hphtr
=
ncf_fc_in
.
get_variable
(
'totalmolefractionvariance'
)
elif
DaC
ycle
.
DaS
ystem
[
'opt.algorithm'
]
==
'serial'
:
elif
dac
ycle
.
das
ystem
[
'opt.algorithm'
]
==
'serial'
:
fc_r
=
ncf_fc_in
.
get_variable
(
'modeldatamismatchvariance'
)
fc_hphtr
=
ncf_fc_in
.
get_variable
(
'totalmolefractionvariance'
)
elif
DaC
ycle
.
DaS
ystem
[
'opt.algorithm'
]
==
'bulk'
:
elif
dac
ycle
.
das
ystem
[
'opt.algorithm'
]
==
'bulk'
:
fc_r
=
ncf_fc_in
.
get_variable
(
'modeldatamismatchvariance'
).
diagonal
()
fc_hphtr
=
ncf_fc_in
.
get_variable
(
'totalmolefractionvariance'
).
diagonal
()
filesitecode
=
ncf_fc_in
.
get_variable
(
'sitecode'
)
...
...
@@ -136,11 +136,11 @@ def write_mixing_ratios(DaCycle):
ncf_out
.
History
=
'
\n
Original observation file modified by user %s on %s
\n
'
%
(
os
.
environ
[
'USER'
],
datetime
.
today
().
strftime
(
'%F'
),)
ncf_out
.
CTDAS_info
=
'Simulated values added from a CTDAS run by %s on %s
\n
'
%
(
os
.
environ
[
'USER'
],
datetime
.
today
().
strftime
(
'%F'
),)
\
+
'
\n
CTDAS was run on platform %s'
%
(
os
.
environ
[
'HOST'
],)
\
+
'
\n
CTDAS job directory was %s'
%
(
DaC
ycle
[
'dir.da_run'
],)
\
+
'
\n
CTDAS Da System was %s'
%
(
DaC
ycle
[
'da.system'
],)
\
+
'
\n
CTDAS Da ObsOperator was %s'
%
(
DaC
ycle
[
'da.obsoperator'
],)
ncf_out
.
CTDAS_startdate
=
DaC
ycle
[
'time.start'
].
strftime
(
'%F'
)
ncf_out
.
CTDAS_enddate
=
DaC
ycle
[
'time.finish'
].
strftime
(
"%F"
)
+
'
\n
CTDAS job directory was %s'
%
(
dac
ycle
[
'dir.da_run'
],)
\
+
'
\n
CTDAS Da System was %s'
%
(
dac
ycle
[
'da.system'
],)
\
+
'
\n
CTDAS Da ObsOperator was %s'
%
(
dac
ycle
[
'da.obsoperator'
],)
ncf_out
.
CTDAS_startdate
=
dac
ycle
[
'time.start'
].
strftime
(
'%F'
)
ncf_out
.
CTDAS_enddate
=
dac
ycle
[
'time.finish'
].
strftime
(
"%F"
)
ncf_out
.
original_file
=
orig_file
...
...
@@ -323,20 +323,20 @@ if __name__ == "__main__":
logging
.
root
.
setLevel
(
logging
.
DEBUG
)
DaC
ycle
=
CycleControl
(
args
=
{
'rc'
:
'../../ctdas-od-gfed2-glb6x4-obspack-full.rc'
})
DaC
ycle
.
initialize
()
DaC
ycle
.
parse_times
()
dac
ycle
=
CycleControl
(
args
=
{
'rc'
:
'../../ctdas-od-gfed2-glb6x4-obspack-full.rc'
})
dac
ycle
.
initialize
()
dac
ycle
.
parse_times
()
DaS
ystem
=
CO2DaSystem
(
'../rc/carbontracker_ct09_opf.rc'
)
DaS
ystem
.
initialize
()
das
ystem
=
CO2DaSystem
(
'../rc/carbontracker_ct09_opf.rc'
)
das
ystem
.
initialize
()
DaC
ycle
.
DaS
ystem
=
DaS
ystem
dac
ycle
.
das
ystem
=
das
ystem
while
DaC
ycle
[
'time.start'
]
<
DaC
ycle
[
'time.finish'
]:
while
dac
ycle
[
'time.start'
]
<
dac
ycle
[
'time.finish'
]:
outfile
=
write_mixing_ratios
(
DaC
ycle
)
outfile
=
write_mixing_ratios
(
dac
ycle
)
DaC
ycle
.
advance_cycle_times
()
dac
ycle
.
advance_cycle_times
()
sys
.
exit
(
0
)
gridded/da/analysis/siteseries.py
View file @
f6935b36
...
...
@@ -22,7 +22,6 @@ import numpy as np
import
datetime
as
dt
import
da.tools.io4
as
io
import
logging
import
copy
from
da.analysis.summarize_obs
import
nice_lon
,
nice_lat
,
nice_alt
import
Image
import
urllib2
...
...
@@ -48,7 +47,7 @@ fontsize = 9
figures that were created are stamped and saved properly
"""
def
site_timeseries
(
DaC
ycle
):
def
site_timeseries
(
dac
ycle
):
"""***************************************************************************************
Call example:
...
...
@@ -59,13 +58,13 @@ def site_timeseries(DaCycle):
#
# Create directories if needed
#
mrdir
=
os
.
path
.
join
(
DaC
ycle
[
'dir.analysis'
],
'timeseries_molefractions'
)
mrdir
=
os
.
path
.
join
(
dac
ycle
[
'dir.analysis'
],
'timeseries_molefractions'
)
if
not
os
.
path
.
exists
(
mrdir
):
create_dirs
(
mrdir
)
#
# Make a dictionary of available sites and the NetCDF file names associated with them
#
sitelist
=
os
.
listdir
(
os
.
path
.
join
(
DaC
ycle
[
'dir.analysis'
],
'data_molefractions'
))
sitelist
=
os
.
listdir
(
os
.
path
.
join
(
dac
ycle
[
'dir.analysis'
],
'data_molefractions'
))
sitelist
=
[
f
for
f
in
sitelist
if
f
.
endswith
(
'.nc'
)]
#
# Loop over site and sitefiles
...
...
@@ -76,7 +75,7 @@ def site_timeseries(DaCycle):
#
# Create filename and extract site codes for the sites that had day/night data separated
#
filename
=
os
.
path
.
join
(
DaC
ycle
[
'dir.analysis'
],
'data_molefractions'
,
sitefile
)
filename
=
os
.
path
.
join
(
dac
ycle
[
'dir.analysis'
],
'data_molefractions'
,
sitefile
)
#
# Create a figure instance to hold plot
#
...
...
@@ -752,17 +751,17 @@ if __name__ == '__main__': # started as script
logging
.
root
.
setLevel
(
logging
.
DEBUG
)
DaC
ycle
=
CycleControl
(
args
=
{
'rc'
:
'../../ctdas-od-gfed2-glb6x4-obspack-full.rc'
})
DaC
ycle
.
initialize
()
DaC
ycle
.
parse_times
()
dac
ycle
=
CycleControl
(
args
=
{
'rc'
:
'../../ctdas-od-gfed2-glb6x4-obspack-full.rc'
})
dac
ycle
.
initialize
()
dac
ycle
.
parse_times
()
DaS
ystem
=
CO2DaSystem
(
'../rc/carbontracker_ct09_opf.rc'
)
DaS
ystem
.
initialize
()
das
ystem
=
CO2DaSystem
(
'../rc/carbontracker_ct09_opf.rc'
)
das
ystem
.
initialize
()
DaC
ycle
.
DaS
ystem
=
DaS
ystem
dac
ycle
.
das
ystem
=
das
ystem
q
=
site_timeseries
(
DaC
ycle
)
#q=site_residuals(
DaC
ycle)
q
=
site_timeseries
(
dac
ycle
)
#q=site_residuals(
dac
ycle)
#q=site_statistics(rundat)
#q=site_histograms(rundat)
...
...
gridded/da/analysis/summarize_obs.py
View file @
f6935b36
...
...
@@ -44,7 +44,7 @@ def nice_alt(cls):
return
string
.
strip
(
'%10.1f masl'
%
round
(
cls
,
-
1
))
def
summarize_obs
(
DaC
ycle
,
printfmt
=
'html'
):
def
summarize_obs
(
dac
ycle
,
printfmt
=
'html'
):
"""***************************************************************************************
Call example:
...
...
@@ -52,22 +52,22 @@ def summarize_obs(DaCycle, printfmt='html'):
Option printfmt : [tex,scr,html] print summary table in latex, terminal, or html format
Other options are all those needed to create a
DaC
ycle object
Other options are all those needed to create a
dac
ycle object
OR:
call directly from a python script as:
q=summarize_obs(
DaC
ycle,printfmt='html')
q=summarize_obs(
dac
ycle,printfmt='html')
***************************************************************************************"""
sumdir
=
os
.
path
.
join
(
DaC
ycle
[
'dir.analysis'
],
'summary'
)
sumdir
=
os
.
path
.
join
(
dac
ycle
[
'dir.analysis'
],
'summary'
)
if
not
os
.
path
.
exists
(
sumdir
):
logging
.
info
(
"Creating new directory "
+
sumdir
)
os
.
makedirs
(
sumdir
)
mrdir
=
os
.
path
.
join
(
DaC
ycle
[
'dir.analysis'
],
'data_molefractions'
)
mrdir
=
os
.
path
.
join
(
dac
ycle
[
'dir.analysis'
],
'data_molefractions'
)
if
not
os
.
path
.
exists
(
mrdir
):
logging
.
error
(
"Input directory does not exist (%s), exiting... "
%
mrdir
)
return
None
...
...
@@ -185,7 +185,7 @@ def summarize_obs(DaCycle, printfmt='html'):
logging
.
info
(
"File written with summary: %s"
%
saveas
)
def
summarize_stats
(
DaC
ycle
):
def
summarize_stats
(
dac
ycle
):
"""
Summarize the statistics of the observations for this cycle
This includes X2 statistics, RMSD, and others for both forecast and
...
...
@@ -193,16 +193,16 @@ def summarize_stats(DaCycle):
"""
sumdir
=
os
.
path
.
join
(
DaC
ycle
[
'dir.analysis'
],
'summary'
)
sumdir
=
os
.
path
.
join
(
dac
ycle
[
'dir.analysis'
],
'summary'
)
if
not
os
.
path
.
exists
(
sumdir
):
logging
.
info
(
"Creating new directory "
+
sumdir
)
os
.
makedirs
(
sumdir
)
# get forecast data from optimizer.ddddd.nc
startdate
=
DaC
ycle
[
'time.start'
]
DaC
ycle
[
'time.sample.stamp'
]
=
"%s"
%
(
startdate
.
strftime
(
"%Y%m%d"
),)
infile
=
os
.
path
.
join
(
DaC
ycle
[
'dir.output'
],
'optimizer.%s.nc'
%
DaC
ycle
[
'time.sample.stamp'
])
startdate
=
dac
ycle
[
'time.start'
]
dac
ycle
[
'time.sample.stamp'
]
=
"%s"
%
(
startdate
.
strftime
(
"%Y%m%d"
),)
infile
=
os
.
path
.
join
(
dac
ycle
[
'dir.output'
],
'optimizer.%s.nc'
%
dac
ycle
[
'time.sample.stamp'
])
if
not
os
.
path
.
exists
(
infile
):
logging
.
error
(
"File not found: %s"
%
infile
)
...
...
@@ -234,7 +234,7 @@ def summarize_stats(DaCycle):
x2
=
np
.
ma
.
masked_where
(
HPHTR
==
0.0
,
x2
)
# calculate X2 per site
saveas
=
os
.
path
.
join
(
sumdir
,
'x2_table_%s.html'
%
DaC
ycle
[
'time.sample.stamp'
])
saveas
=
os
.
path
.
join
(
sumdir
,
'x2_table_%s.html'
%
dac
ycle
[
'time.sample.stamp'
])
logging
.
info
(
"Writing HTML tables for this cycle (%s)"
%
saveas
)
f
=
open
(
saveas
,
'w'
)
txt
=
"<meta http-equiv='content-type' content='text/html;charset=utf-8' />
\n
"
...
...
@@ -281,7 +281,7 @@ def summarize_stats(DaCycle):
# Now summarize for each site across time steps
if
not
DaC
ycle
[
'time.start'
]
>=
dt
.
datetime
(
2008
,
12
,
29
):
if
not
dac
ycle
[
'time.start'
]
>=
dt
.
datetime
(
2008
,
12
,
29
):
return
logging
.
info
(
"Writing HTML tables for each site"
)
...
...
@@ -338,20 +338,20 @@ if __name__ == '__main__': # started as script
logging
.
root
.
setLevel
(
logging
.
DEBUG
)
DaC
ycle
=
CycleControl
(
args
=
{
'rc'
:
'../../ctdas-od-gfed2-glb6x4-obspack-full.rc'
})
DaC
ycle
.
initialize
()
DaC
ycle
.
parse_times
()
dac
ycle
=
CycleControl
(
args
=
{
'rc'
:
'../../ctdas-od-gfed2-glb6x4-obspack-full.rc'
})
dac
ycle
.
initialize
()
dac
ycle
.
parse_times
()
DaS
ystem
=
CO2DaSystem
(
'../rc/carbontracker_ct09_opf.rc'
)
DaS
ystem
.
initialize
()
das
ystem
=
CO2DaSystem
(
'../rc/carbontracker_ct09_opf.rc'
)
das
ystem
.
initialize
()
DaC
ycle
.
DaS
ystem
=
DaS
ystem
dac
ycle
.
das
ystem
=
das
ystem
q
=
summarize_obs
(
DaC
ycle
)
q
=
summarize_obs
(
dac
ycle
)
while
DaC
ycle
[
'time.start'
]
<
DaC
ycle
[
'time.finish'
]:
q
=
summarize_stats
(
DaC
ycle
)
DaC
ycle
.
advance_cycle_times
()
while
dac
ycle
[
'time.start'
]
<
dac
ycle
[
'time.finish'
]:
q
=
summarize_stats
(
dac
ycle
)
dac
ycle
.
advance_cycle_times
()
sys
.
exit
(
0
)
...
...
gridded/da/baseclasses/dasystem.py
View file @
f6935b36
...
...
@@ -52,13 +52,12 @@ class DaSystem(dict):
self
.
ID
=
'CarbonTracker CO2'
# the identifier gives the platform name
self
.
load_rc
(
rcfilename
)
logging
.
debug
(
"Data Assimilation System initialized: %s"
%
self
.
I
dentifier
)
logging
.
debug
(
"Data Assimilation System initialized: %s"
%
self
.
I
D
)
def
load_rc
(
self
,
rcfilename
):
"""
This method loads a DA System Info rc-file with settings for this simulation
"""
for
k
,
v
in
rc
.
read
(
rcfilename
).
iteritems
():
self
[
k
]
=
v
...
...
gridded/da/baseclasses/obs.py
View file @
f6935b36
...
...
@@ -53,7 +53,7 @@ class Observations(object):
self
.
version
=
version
self
.
datalist
=
[]
# initialize with an empty list of obs
# The following code allows the object to be initialized with a
DaC
ycle object already present. Otherwise, it can
# The following code allows the object to be initialized with a
dac
ycle object already present. Otherwise, it can
# be added at a later moment.
logging
.
info
(
'Observations object initialized: %s'
%
self
.
ID
)
...
...
gridded/da/baseclasses/observationoperator.py
View file @
f6935b36
...
...
@@ -28,25 +28,25 @@ class ObservationOperator(object):
"""
def
__init__
(
self
,
rcfilename
,
DaC
ycle
=
None
):
def
__init__
(
self
,
rcfilename
,
dac
ycle
=
None
):
""" The instance of an ObservationOperator is application dependent """
self
.
I
dentifier
=
identifier
self
.
V
ersion
=
version
self
.
R
estart
F
ile
L
ist
=
[]
self
.
I
D
=
identifier
self
.
v
ersion
=
version
self
.
r
estart
_f
ile
l
ist
=
[]
self
.
outputdir
=
None
# Needed for opening the samples.nc files created
self
.
load_rc
(
rcfilename
)
# load the specified rc-file
self
.
validate_rc
()
# validate the contents
logging
.
info
(
'Observation Operator object initialized: %s'
%
self
.
I
dentifier
)
logging
.
info
(
'Observation Operator object initialized: %s'
%
self
.
I
D
)
# The following code allows the object to be initialized with a
DaC
ycle object already present. Otherwise, it can
# The following code allows the object to be initialized with a
dac
ycle object already present. Otherwise, it can
# be added at a later moment.
if
DaC
ycle
!=
None
:
self
.
DaC
ycle
=
DaC
ycle
if
dac
ycle
!=
None
:
self
.
dac
ycle
=
dac
ycle
else
:
self
.
DaC
ycle
=
{}
self
.
dac
ycle
=
{}
def
get_initial_data
(
self
):
...
...
gridded/da/baseclasses/optimizer.py
View file @
f6935b36
...
...
@@ -13,7 +13,6 @@ File created on 28 Jul 2010.
import
logging
import
numpy
as
np
import
numpy.linalg
as
la
import
da.tools.io4
as
io
identifier
=
'Optimizer baseclass'
...
...
@@ -82,7 +81,7 @@ class Optimizer(object):
# Kalman Gain matrix
self
.
KG
=
np
.
zeros
((
self
.
nlag
*
self
.
nparams
,
self
.
nobs
,),
float
)
def
state_to_matrix
(
self
,
S
tate
V
ector
):
def
state_to_matrix
(
self
,
s
tate
v
ector
):
allsites
=
[]
# collect all obs for n=1,..,nlag
allobs
=
[]
# collect all obs for n=1,..,nlag
allmdm
=
[]
# collect all mdm for n=1,..,nlag
...
...
@@ -94,10 +93,10 @@ class Optimizer(object):
allsimulated
=
None
# collect all members model samples for n=1,..,nlag
for
n
in
range
(
self
.
nlag
):
samples
=
S
tate
V
ector
.
O
bs
ToA
ssimmilate
[
n
]
members
=
S
tate
V
ector
.
E
nsemble
M
embers
[
n
]
self
.
x
[
n
*
self
.
nparams
:(
n
+
1
)
*
self
.
nparams
]
=
members
[
0
].
P
aram
eterV
alues
self
.
X_prime
[
n
*
self
.
nparams
:(
n
+
1
)
*
self
.
nparams
,
:]
=
np
.
transpose
(
np
.
array
([
m
.
P
aram
eterV
alues
for
m
in
members
]))
samples
=
s
tate
v
ector
.
o
bs
_to_a
ssimmilate
[
n
]
members
=
s
tate
v
ector
.
e
nsemble
_m
embers
[
n
]
self
.
x
[
n
*
self
.
nparams
:(
n
+
1
)
*
self
.
nparams
]
=
members
[
0
].
p
aram
_v
alues
self
.
X_prime
[
n
*
self
.
nparams
:(
n
+
1
)
*
self
.
nparams
,
:]
=
np
.
transpose
(
np
.
array
([
m
.
p
aram
_v
alues
for
m
in
members
]))
if
samples
!=
None
:
self
.
rejection_threshold
=
samples
.
rejection_threshold
...
...
@@ -139,11 +138,11 @@ class Optimizer(object):
for
i
,
mdm
in
enumerate
(
allmdm
):
self
.
R
[
i
,
i
]
=
mdm
**
2
def
matrix_to_state
(
self
,
S
tate
V
ector
):
def
matrix_to_state
(
self
,
s
tate
v
ector
):
for
n
in
range
(
self
.
nlag
):
members
=
S
tate
V
ector
.
E
nsemble
M
embers
[
n
]
members
=
s
tate
v
ector
.
e
nsemble
_m
embers
[
n
]
for
m
,
mem
in
enumerate
(
members
):
members
[
m
].
P
aram
eterV
alues
[:]
=
self
.
X_prime
[
n
*
self
.
nparams
:(
n
+
1
)
*
self
.
nparams
,
m
]
+
self
.
x
[
n
*
self
.
nparams
:(
n
+
1
)
*
self
.
nparams
]
members
[
m
].
p
aram
_v
alues
[:]
=
self
.
X_prime
[
n
*
self
.
nparams
:(
n
+
1
)
*
self
.
nparams
,
m
]
+
self
.
x
[
n
*
self
.
nparams
:(
n
+
1
)
*
self
.
nparams
]
logging
.
debug
(
'Returning optimized data to the StateVector, setting "StateVector.isOptimized = True" '
)
...
...
gridded/da/baseclasses/platform.py
View file @
f6935b36
...
...
@@ -138,7 +138,8 @@ class Platform(object):
def
job_stat
(
self
,
jobid
):
""" This method gets the status of a running job """
output
=
subprocess
.
Popen
([
'qstat'
,
jobid
],
stdout
=
subprocess
.
PIPE
).
communicate
()[
0
]
;
logging
.
info
(
output
)
output
=
subprocess
.
Popen
([
'qstat'
,
jobid
],
stdout
=
subprocess
.
PIPE
).
communicate
()[
0
]
logging
.
info
(
output
)
return
output
...
...
gridded/da/baseclasses/statevector.py
View file @
f6935b36
...
...
@@ -56,12 +56,12 @@ class EnsembleMember(object):
An EnsembleMember object is initialized with only a number, and holds two attributes as containter for later
data:
*
P
aram
eterV
alues, will hold the actual values of the parameters for this data
*
p
aram
_v
alues, will hold the actual values of the parameters for this data
* ModelSample, will hold an :class:`~da.baseclasses.obs.Observation` object and the model samples resulting from this members' data
"""
self
.
membernumber
=
membernumber
# the member number
self
.
P
aram
eterV
alues
=
None
# Parameter values of this member
self
.
p
aram
_v
alues
=
None
# Parameter values of this member
################### End Class EnsembleMember ###################
...
...
@@ -118,12 +118,12 @@ class StateVector(object):
self
.
ID
=
identifier
self
.
version
=
version
# The following code allows the object to be initialized with a
DaC
ycle object already present. Otherwise, it can
# The following code allows the object to be initialized with a
dac
ycle object already present. Otherwise, it can
# be added at a later moment.
logging
.
info
(
'Statevector object initialized: %s'
%
self
.
ID
)
def
initialize
(
self
,
DaC
ycle
):
def
initialize
(
self
,
dac
ycle
):
"""
initialize the object by specifying the dimensions.
There are two major requirements for each statvector that you want to build:
...
...
@@ -134,33 +134,33 @@ class StateVector(object):
An example is given below.
"""
self
.
nlag
=
int
(
DaC
ycle
[
'time.nlag'
])
self
.
nmembers
=
int
(
DaC
ycle
[
'da.optimizer.nmembers'
])
self
.
nparams
=
int
(
DaC
ycle
.
DaS
ystem
[
'nparameters'
])
self
.
nlag
=
int
(
dac
ycle
[
'time.nlag'
])
self
.
nmembers
=
int
(
dac
ycle
[
'da.optimizer.nmembers'
])
self
.
nparams
=
int
(
dac
ycle
.
das
ystem
[
'nparameters'
])
self
.
nobs
=
0
self
.
O
bs
ToA
ssimmilate
=
()
# empty containter to hold observations to assimilate later on
self
.
o
bs
_to_a
ssimmilate
=
()
# empty containter to hold observations to assimilate later on
# These list objects hold the data for each time step of lag in the system. Note that the ensembles for each time step consist
# of lists of EnsembleMember objects, we define member 0 as the mean of the distribution and n=1,...,nmembers as the spread.
self
.
E
nsemble
M
embers
=
range
(
self
.
nlag
)
self
.
e
nsemble
_m
embers
=
range
(
self
.
nlag
)
for
n
in
range
(
self
.
nlag
):
self
.
E
nsemble
M
embers
[
n
]
=
[]
self
.
e
nsemble
_m
embers
[
n
]
=
[]
# This specifies the file to read with the gridded mask at 1x1 degrees. Each gridbox holds a number that specifies the parametermember
# that maps onto it. From this map, a dictionary is created that allows a reverse look-up so that we can map parameters to a grid.
mapfile
=
os
.
path
.
join
(
DaC
ycle
.
DaS
ystem
[
'regionsfile'
])
mapfile
=
os
.
path
.
join
(
dac
ycle
.
das
ystem
[
'regionsfile'
])
ncf
=
io
.
CT_Read
(
mapfile
,
'read'
)
self
.
gridmap
=
ncf
.
get_variable
(
'regions'
)
self
.
tcmap
=
ncf
.
get_variable
(
'transcom_regions'
)
ncf
.
close
()
logging
.
debug
(
"A TransCom map on 1x1 degree was read from file %s"
%
DaC
ycle
.
DaS
ystem
[
'regionsfile'
])
logging
.
debug
(
"A parameter map on 1x1 degree was read from file %s"
%
DaC
ycle
.
DaS
ystem
[
'regionsfile'
])
logging
.
debug
(
"A TransCom map on 1x1 degree was read from file %s"
%
dac
ycle
.
das
ystem
[
'regionsfile'
])
logging
.
debug
(
"A parameter map on 1x1 degree was read from file %s"
%
dac
ycle
.
das
ystem
[
'regionsfile'
])
# Create a dictionary for state <-> gridded map conversions
...
...
@@ -255,29 +255,29 @@ class StateVector(object):
# If this is not the start of the filter, average previous two optimized steps into the mix
if
lag
==
self
.
nlag
-
1
and
self
.
nlag
>=
3
:
newmean
+=
self
.
E
nsemble
M
embers
[
lag
-
1
][
0
].
P
aram
eterV
alues
+
\
self
.
E
nsemble
M
embers
[
lag
-
2
][
0
].
P
aram
eterV
alues
newmean
+=
self
.
e
nsemble
_m
embers
[
lag
-
1
][
0
].
p
aram
_v
alues
+
\
self
.
e
nsemble
_m
embers
[
lag
-
2
][
0
].
p
aram
_v
alues
newmean
=
newmean
/
3.0
# Create the first ensemble member with a deviation of 0.0 and add to list
newmember
=
EnsembleMember
(
0
)
newmember
.
P
aram
eterV
alues
=
newmean
.
flatten
()
# no deviations
self
.
E
nsemble
M
embers
[
lag
].
append
(
newmember
)
newmember
.
p
aram
_v
alues
=
newmean
.
flatten
()
# no deviations
self
.
e
nsemble
_m
embers
[
lag
].
append
(
newmember
)
# Create members 1:nmembers and add to
E
nsemble
M
embers list
# Create members 1:nmembers and add to
e
nsemble
_m
embers list
for
member
in
range
(
1
,
self
.
nmembers
):
rands
=
np
.
random
.
randn
(
self
.
nparams
)
newmember
=
EnsembleMember
(
member
)
newmember
.
P
aram
eterV
alues
=
np
.
dot
(
C
,
rands
)
+
newmean
self
.
E
nsemble
M
embers
[
lag
].
append
(
newmember
)
newmember
.
p
aram
_v
alues
=
np
.
dot
(
C
,
rands
)
+
newmean
self
.
e
nsemble
_m
embers
[
lag
].
append
(
newmember
)
logging
.
debug
(
'%d new ensemble members were added to the state vector # %d'
%
(
self
.
nmembers
,
(
lag
+
1
)))
def
propagate
(
self
,
DaC
ycle
):
def
propagate
(
self
,
dac
ycle
):
"""
:rtype: None
...
...
@@ -292,12 +292,12 @@ class StateVector(object):
# Remove State Vector n=1 by simply "popping" it from the list and appending a new empty list at the front. This empty list will
# hold the new ensemble for the new cycle
self
.
E
nsemble
M
embers
.
pop
(
0
)
self
.
E
nsemble
M
embers
.
append
([])
self
.
e
nsemble
_m
embers
.
pop
(
0
)
self
.
e
nsemble
_m
embers
.
append
([])
# And now create a new time step of mean + members for n=nlag
date
=
DaC
ycle
[
'time.start'
]
+
timedelta
(
days
=
(
self
.
nlag
-
0.5
)
*
int
(
DaC
ycle
[
'time.cycle'
]))
cov
=
self
.
get_covariance
(
date
,
DaC
ycle
)
date
=
dac
ycle
[
'time.start'
]
+
timedelta
(
days
=
(
self
.
nlag
-
0.5
)
*
int
(
dac
ycle
[
'time.cycle'
]))
cov
=
self
.
get_covariance
(
date
,
dac
ycle
)
self
.
make_new_ensemble
(
self
.
nlag
-
1
,
cov
)
logging
.
info
(
'The state vector has been propagated by one cycle'
)
...
...
@@ -335,19 +335,19 @@ class StateVector(object):
dimlag
=
f
.
add_lag_dim
(
self
.
nlag
,
unlimited
=
True
)
for
n
in
range
(
self
.
nlag
):
members
=
self
.
E
nsemble
M
embers
[
n
]
M
ean
S
tate
=
members
[
0
].
P
aram
eterV
alues
members
=
self
.
e
nsemble
_m
embers
[
n
]
m
ean
_s
tate
=
members
[
0
].
p
aram
_v
alues
savedict
=
f
.
standard_var
(
varname
=
'meanstate_%s'
%
qual
)
savedict
[
'dims'
]
=
dimlag
+
dimparams
savedict
[
'values'
]
=
M
ean
S
tate
savedict
[
'values'
]
=
m
ean
_s
tate
savedict
[
'count'
]
=
n
savedict
[
'comment'
]
=
'this represents the mean of the ensemble'
f
.
add_data
(
savedict
)
members
=
self
.
E
nsemble
M
embers
[
n
]
devs
=
np
.
asarray
([
m
.
P
aram
eterV
alues
.
flatten
()
for
m
in
members
])
data
=
devs
-
np
.
asarray
(
M
ean
S
tate
)
members
=
self
.
e
nsemble
_m
embers
[
n
]
devs
=
np
.
asarray
([
m
.
p
aram
_v
alues
.
flatten
()
for
m
in
members
])
data
=
devs
-
np
.
asarray
(
m
ean
_s
tate
)
savedict
=
f
.
standard_var
(
varname
=
'ensemblestate_%s'
%
qual
)
savedict
[
'dims'
]
=
dimlag
+
dimmembers
+
dimparams
...
...
@@ -381,18 +381,18 @@ class StateVector(object):
#import da.tools.io as io
f
=
io
.
CT_Read
(
filename
,
'read'
)
meanstate
=
f
.
get_variable
(
'statevectormean_'
+
qual
)
E
ns
embleM
embers
=
f
.
get_variable
(
'statevectorensemble_'
+
qual
)
e
ns
m
embers
=
f
.
get_variable
(
'statevectorensemble_'
+
qual
)
f
.
close
()
for
n
in
range
(
self
.
nlag
):
if
not
self
.
E
nsemble
M
embers
[
n
]
==
[]:
self
.
E
nsemble
M
embers
[
n
]
=
[]
if
not
self
.
e
nsemble
_m
embers
[
n
]
==
[]:
self
.
e
nsemble
_m
embers
[
n
]
=
[]
logging
.
warning
(
'Existing ensemble for lag=%d was removed to make place for newly read data'
%
(
n
+
1
))
for
m
in
range
(
self
.
nmembers
):
newmember
=
EnsembleMember
(
m
)
newmember
.
P
aram
eterV
alues
=
E
ns
embleM
embers
[
n
,
m
,
:].
flatten
()
+
meanstate
[
n
]
# add the mean to the deviations to hold the full parameter values
self
.
E
nsemble
M
embers
[
n
].
append
(
newmember
)
newmember
.
p
aram
_v
alues
=
e
ns
m
embers
[
n
,
m
,
:].
flatten
()
+
meanstate
[
n
]
# add the mean to the deviations to hold the full parameter values
self
.
e
nsemble
_m
embers
[
n
].
append
(
newmember
)
logging
.
info
(
'Successfully read the State Vector from file (%s) '
%
filename
)
...
...
@@ -403,7 +403,7 @@ class StateVector(object):
Write ensemble member information to a NetCD