Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
NearRealTimeCTDAS
CTDAS
Commits
a9485dd0
Commit
a9485dd0
authored
Feb 26, 2020
by
Woude, Auke van der
Browse files
Propegate samples to run obsoperator only for obs to assimilate
parent
1de4ae31
Changes
4
Hide whitespace changes
Inline
Side-by-side
da/ffdas/obs.py
View file @
a9485dd0
...
...
@@ -327,7 +327,7 @@ class RINGOObservations(Observations):
obs
.
flag
=
99
# default is do-not-use , until explicitly set by script
exclude_hourly
=
False
# default is that hourly values are not included
identifier
=
obs
.
code
identifier
=
obs
.
code
.
split
(
'/'
)[
-
1
]
# species, site, method, lab, datasetnr = identifier.split('_')
if
identifier
in
site_info
:
if
identifier
in
site_hourly
:
...
...
da/ffdas/observationoperator.py
View file @
a9485dd0
...
...
@@ -235,23 +235,28 @@ class STILTObservationOperator(ObservationOperator):
self
.
dacycle
[
'time.finish'
].
month
,
self
.
dacycle
[
'time.finish'
].
day
,
self
.
dacycle
[
'time.finish'
].
hour
,
0
)
dt
=
dtm
.
timedelta
(
0
,
3600
)
dumdate
=
self
.
timestartkey
self
.
datelist
=
[]
while
dumdate
<
(
self
.
timefinishkey
):
# self.timefinishkey : dt
self
.
datelist
.
append
(
dumdate
)
dumdate
=
dumdate
+
dt
# dt=dtm.timedelta(0,3600)
# dumdate=self.timestartkey
# self.datelist=[]
# while dumdate<(self.timefinishkey): # self.timefinishkey : dt
# self.datelist.append(dumdate)
# dumdate=dumdate+dt
self
.
get_c14_time_profile
()
def
get_c14_time_profile
():
def
get_samples
(
self
,
samples
):
self
.
samples
=
samples
.
datalist
datelist
=
[
sample
.
xdate
for
sample
in
self
.
samples
]
self
.
datelist
=
sorted
(
set
(
datelist
))
logging
.
info
(
'Added samples to the observation operator'
)
def
get_c14_time_profile
(
self
):
"""Function that loads the nuclear power temporal variation"""
# Get the time profiles of the nuclear plants
temp_distribution_file
=
self
.
model_settings
[
'nuclear_timeprofiledir'
]
workbook
=
xlrd
.
open_workbook
(
temp_distribution_file
)
worksheet
=
workbook
.
sheet_by_name
(
'Sheet1'
)
time_profile_nuc
=
np
.
array
(
worksheet
.
col_values
(
19
)
time_profile_nuc
=
np
.
array
(
worksheet
.
col_values
(
19
)
)
self
.
nuc_time_profile
=
time_profile_nuc
### !!!! Only cache if made dependent on datepoint/datetime !!!!
...
...
@@ -282,7 +287,7 @@ class STILTObservationOperator(ObservationOperator):
return
slice
(
time_index
-
int
(
self
.
model_settings
[
'num_backtimes'
]),
time_index
)
@
cached
def
get_foot
(
self
,
i_loc
,
datepoint
):
def
get_foot
(
self
,
site
,
datepoint
):
"""Function that gets the footprint for the current time and site.
Returns a 3D np.array with dims (time, lat, lon)
Input:
...
...
@@ -292,7 +297,6 @@ class STILTObservationOperator(ObservationOperator):
np.array (3d): Footprint with as indices time, lat, lon
"""
site
=
self
.
sitenames
[
i_loc
].
upper
()
path
=
self
.
model_settings
[
'footdir'
]
+
'/'
+
site
.
upper
()
+
'24'
fname
=
path
+
'/footprint_{0}_{1}x{2:02d}x{3:02d}x{4:02d}*.nc'
.
format
(
site
.
upper
(),
datepoint
.
year
,
...
...
@@ -306,7 +310,7 @@ class STILTObservationOperator(ObservationOperator):
return
np
.
flipud
(
footprint
)
@
cached
def
get_background
(
self
,
i_species
,
i_loc
,
datepoint
):
def
get_background
(
self
,
i_species
,
site
,
datepoint
):
"""Function that finds the center of mass of the first footprint and the time corresponding to it.
and finds the concentration in the center of mass. This is used as the background.
Input:
...
...
@@ -319,7 +323,7 @@ class STILTObservationOperator(ObservationOperator):
from
scipy
import
ndimage
# First, get the footprint and find the first time of influence
foot
=
self
.
get_foot
(
i_loc
,
datepoint
)
foot
=
self
.
get_foot
(
site
,
datepoint
)
start_influence
=
-
1
# In backtimes
total_influence
=
0
while
total_influence
<
0.0000001
:
...
...
@@ -356,7 +360,7 @@ class STILTObservationOperator(ObservationOperator):
return
backgrounds
[
i_species
]
@
cached
def
get_c14_concentration
(
self
,
i_loc
,
datepoint
,
gpp
,
ter
,
ff_flux
,
background
):
def
get_c14_concentration
(
self
,
site
,
datepoint
,
gpp
,
ter
,
ff_flux
,
background
):
"""Function that gets the c14 concentration based on the emissions by the biosphere,
fossil fuels and nuclear power. The constants are defined at the top of this script.
Input:
...
...
@@ -370,8 +374,7 @@ class STILTObservationOperator(ObservationOperator):
float: The C14 in ppm
float: Delta(14C) """
# First, get the footprint
site
=
self
.
sitenames
[
i_loc
]
foot
=
self
.
get_foot
(
i_loc
,
datepoint
)
foot
=
self
.
get_foot
(
site
,
datepoint
)
# Get the time indices
indices
=
self
.
get_time_indices
(
datepoint
)
...
...
@@ -428,7 +431,7 @@ class STILTObservationOperator(ObservationOperator):
return
fluxmap
@
cached
def
get_biosphere_concentration
(
self
,
i_loc
,
datepoint
,
total
=
False
):
def
get_biosphere_concentration
(
self
,
site
,
datepoint
,
total
=
False
):
"""Function that calculates the atmospheric increase due to the exchange of fluxes over the footprint
Input:
i_loc: int: index of the location for which the concentration should be calculated
...
...
@@ -441,8 +444,7 @@ class STILTObservationOperator(ObservationOperator):
tuple of 2 floats: GPP and TER in umol/s """
# First, get the footprint
site
=
self
.
sitenames
[
i_loc
]
foot
=
self
.
get_foot
(
i_loc
,
datepoint
)
foot
=
self
.
get_foot
(
site
,
datepoint
)
# Get the indices
indices
=
self
.
get_time_indices
(
datepoint
)
# Get the tracer fluxes
...
...
@@ -456,7 +458,7 @@ class STILTObservationOperator(ObservationOperator):
if
total
:
return
gpp_increase
+
ter_increase
else
:
return
gpp_increase
,
ter_increase
def
get_temporal_profiles
(
self
,
datepoint
,
i_
station
,
i_member
):
def
get_temporal_profiles
(
self
,
datepoint
,
station
_name
,
i_member
):
""" Function that reads in the temporal profiles for the current timestep
Input:
datepoint: datepoint: datetime for which the temporal profile should be found
...
...
@@ -466,7 +468,6 @@ class STILTObservationOperator(ObservationOperator):
np.array (2): the time profiles for all categories. Indices: time, category"""
#read temporal profiles for the times within the footprint
time_indices
=
self
.
get_time_indices
(
datepoint
)
station_name
=
self
.
sitenames
[
i_station
]
temporal_prior
=
os
.
path
.
join
(
self
.
model_settings
[
'datadir'
],
'prior_temporal_{0}_{1:03}.nc'
.
format
(
station_name
,
i_member
))
temporal_prior
=
io
.
ct_read
(
temporal_prior
,
'read'
)
temp_profiles
=
[]
...
...
@@ -494,7 +495,7 @@ class STILTObservationOperator(ObservationOperator):
return
emis
@
cached
def
run_STILT
(
self
,
datepoint
,
i_member
,
i_loc
,
i_species
,
do_pseudo
=
0
):
def
run_STILT
(
self
,
datepoint
,
i_member
,
site
,
i_species
,
do_pseudo
=
0
):
"""This function reads in STILT footprints and returns hourly concentration data
Input:
datepoint: datepoint: datetime: the datepoint for which the concentrations should be calculated
...
...
@@ -505,20 +506,13 @@ class STILTObservationOperator(ObservationOperator):
Returns:
float: the concentration increase at the respective location due to the emissions from the respective species"""
# get the date:
temp_profiles
=
self
.
get_temporal_profiles
(
datepoint
,
i_loc
,
i_member
)
temp_profiles
=
self
.
get_temporal_profiles
(
datepoint
,
site
,
i_member
)
spatial_emissions
=
self
.
get_spatial_emissions
(
i_member
,
i_species
)
#find correct background concentrationa
footprint
=
self
.
get_foot
(
i_loc
,
datepoint
)
footprint
=
self
.
get_foot
(
site
,
datepoint
)
# multiply footprint with emissions for each hour in the back trajectory. Indices: Time, Category, lat, lon
foot_flux
=
(
footprint
[:,
None
,
:,
:]
*
spatial_emissions
[
None
,
:,
:,
:]
*
temp_profiles
[:,
:,
None
,
None
]).
sum
()
# b = 0
# for cat in range(self.nrcat):
# foot_flux2 = (footprint[:, :, :] * spatial_emissions[None, cat, :, :] * temp_profiles[:, cat, None, None])
# a = foot_flux2.sum() * float(self.recalc_factors[i_species])
# print(cat, a)
# b+=a
# print('Total = ', b)
concentration_increase
=
float
(
self
.
recalc_factors
[
i_species
])
*
foot_flux
return
concentration_increase
...
...
@@ -540,20 +534,22 @@ class STILTObservationOperator(ObservationOperator):
totser
=
np
.
array
(
int
(
self
.
dacycle
[
'da.optimizer.nmembers'
])
*
[(
self
.
nrloc
*
self
.
nrspc
*
len
(
self
.
datelist
))
*
[
0.
]])
for
i_member
in
range
(
int
(
self
.
dacycle
[
'da.optimizer.nmembers'
])):
logging
.
debug
(
'Calculating concentration for member {}'
.
format
(
i_member
))
conc_STILT
=
[]
for
i_loc
in
range
(
self
.
nrloc
)
:
logging
.
debug
(
'Calculating concentration for member {}, loc {}'
.
format
(
i_member
,
self
.
sitenames
[
i_loc
])
)
for
i_species
in
range
(
self
.
nrspc
)
:
logging
.
debug
(
'... and species {}'
.
format
(
self
.
spname
[
i_species
]
))
for
i_
datepoint
,
datepoint
in
enumerate
(
self
.
datelist
):
logging
.
debug
(
'working on time {}'
.
format
(
datepoint
))
background_concentration
=
self
.
get_background_orig
(
i_species
,
datepoint
)
conc_increase_stilt
=
self
.
run_STILT
(
datepoint
,
i_member
,
i_loc
,
i_species
)
if
self
.
spname
[
i_species
].
upper
()
==
'CO2'
:
gpp
,
ter
=
self
.
get_biosphere_concentration
(
i_loc
,
datepoint
)
#
logging.debug('C14 (delta) = {}'.format(self.get_c14
_concentration(
i_loc
, datepoint
, gpp, ter, conc_increase_stilt, background_concentration))
)
for
sample
in
self
.
samples
:
if
sample
.
flag
==
99
:
conc_STILT
.
append
(
np
.
nan
)
else
:
species
=
sample
.
species
;
i_species
=
self
.
spname
.
index
(
species
.
upper
(
))
datepoint
=
sample
.
xdate
site
=
sample
.
code
.
split
(
'/'
)[
-
1
].
split
(
'_'
)[
1
]
logging
.
debug
(
'working on time {}'
.
format
(
datepoint
)
)
background_concentration
=
self
.
get_background_orig
(
i_species
,
datepoint
)
conc_increase_stilt
=
self
.
run_STILT
(
datepoint
,
i_member
,
site
,
i_species
)
if
species
.
upper
()
==
'CO2'
:
gpp
,
ter
=
self
.
get_biosphere
_concentration
(
site
,
datepoint
)
logging
.
debug
(
'{0:.3} for FF and {1:.3} for bio for {2}'
.
format
(
conc_increase_stilt
,
gpp
+
ter
,
self
.
spname
[
i_species
]))
conc_STILT
.
append
(
conc_increase_stilt
+
gpp
+
ter
+
background_concentration
)
else
:
gpp
,
ter
=
0
,
0
conc_STILT
.
append
(
conc_increase_stilt
+
gpp
+
ter
+
background_concentration
)
totser
[
i_member
]
=
np
.
array
(
conc_STILT
)
logging
.
info
(
'Concentration calculated for member {}'
.
format
(
i_member
))
self
.
mod
=
np
.
array
(
totser
).
T
...
...
da/ffdas/pipeline.py
View file @
a9485dd0
...
...
@@ -312,6 +312,9 @@ def sample_step(dacycle, samples, statevector, obsoperator, emismodel, lag, adva
samples
.
setup
(
dacycle
)
samples
.
add_observations
(
dacycle
)
obsoperator
.
get_samples
(
samples
)
# This is also the point at which the samples could be added to the observationoperator.
# This could help in only using measurements from e.g. 12-18 hour.
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
...
...
da/ffdas/statevector.py
View file @
a9485dd0
...
...
@@ -31,7 +31,6 @@ import numpy as np
from
da.baseclasses.statevector
import
StateVector
,
EnsembleMember
from
da.tools.general
import
create_dirs
,
to_datetime
import
datetime
as
dtm
import
da.tools.io4
as
io
identifier
=
'CarbonTracker Statevector '
...
...
@@ -73,10 +72,11 @@ class CO2StateVector(StateVector):
# These list objects hold the data for each time step of lag in the system. Note that the ensembles for each time step consist
# of lists of EnsembleMember objects, we define member 0 as the mean of the distribution and n=1,...,nmembers as the spread.
self
.
ensemble_members
=
range
(
self
.
nlag
)
for
n
in
range
(
self
.
nlag
):
self
.
ensemble_members
[
n
]
=
[]
self
.
ensemble_members
=
[[]
for
n
in
range
(
self
.
nlag
)]
# self.ensemble_members = list(range(self.nlag))
#
# for n in range(self.nlag):
# self.ensemble_members[n] = []
# This specifies the file to read with the gridded mask at 1x1 degrees. Each gridbox holds a number that specifies the parametermember
...
...
@@ -155,7 +155,7 @@ class CO2StateVector(StateVector):
logging
.
debug
(
'Successfully wrote data from ensemble member %d to file (%s) '
%
(
mem
.
membernumber
,
filename
))
def
make_new_ensemble
(
self
,
dacycle
,
lag
,
covariancematrix
=
None
):
def
make_new_ensemble
(
self
,
lag
,
covariancematrix
=
None
):
"""
:param lag: an integer indicating the time step in the lag order
:param covariancematrix: a matrix to draw random values from
...
...
@@ -169,13 +169,14 @@ class CO2StateVector(StateVector):
used to draw ensemblemembers from. If this argument is not passed it will ne substituted with an
identity matrix of the same dimensions.
"""
self
.
seed
=
int
(
dacycle
.
dasystem
[
'random.seed'
])
"""
dacycle
=
self
.
dacycle
self
.
seed
=
int
(
self
.
dacycle
.
dasystem
[
'random.seed'
])
if
self
.
seed
!=
0
:
np
.
random
.
seed
(
self
.
seed
)
sds
=
np
.
random
.
randint
(
1
,
10000
,
2
0
)
sds
=
np
.
random
.
randint
(
1
,
10000
,
100
0
)
else
:
sds
=
np
.
random
.
randint
(
1
,
10000
,
2
0
)
sds
=
np
.
random
.
randint
(
1
,
10000
,
100
0
)
sid
=
(
dacycle
[
'time.start'
]
-
dacycle
[
'time.fxstart'
]).
days
np
.
random
.
seed
(
sds
[
sid
])
...
...
@@ -205,8 +206,18 @@ class CO2StateVector(StateVector):
devs
=
f
.
get_variable
(
'statevectordeviations_optimized'
)[:]
f
.
close
()
covariancematrix
=
(
np
.
dot
(
devs
,
devs
.
T
)
/
(
devs
.
shape
[
1
]
-
1
))
import
pickle
with
open
(
'covmatrix.pkl'
,
'wb'
)
as
tofile
:
pickle
.
dump
(
covariancematrix
,
tofile
)
# Check dimensions of covariance matrix list, must add up to nparams
covariancematrix
=
np
.
array
(
covariancematrix
)
self
.
covariancematrix
=
covariancematrix
# self.devs = devs
# diag_covmatrix = np.zeros_like(covariancematrix)
# diag_covmatrix[np.diag_indices_from(covariancematrix)] = np.diagonal(covariancematrix)
# covariancematrix = diag_covmatrix
# logging.warning('Using only the diagonal values of the covariancematrix')
dims
=
covariancematrix
.
shape
[
0
]
if
dims
!=
self
.
nparams
:
...
...
@@ -219,7 +230,9 @@ class CO2StateVector(StateVector):
_
,
s
,
_
=
np
.
linalg
.
svd
(
covariancematrix
)
except
:
s
=
np
.
linalg
.
svd
(
covariancematrix
,
full_matrices
=
1
,
compute_uv
=
0
)
#Cartesius fix
dof
=
np
.
sum
(
s
)
**
2
/
sum
(
s
**
2
)
C
=
np
.
linalg
.
cholesky
(
covariancematrix
)
logging
.
debug
(
'Cholesky decomposition has succeeded '
)
...
...
@@ -230,7 +243,6 @@ class CO2StateVector(StateVector):
newmean
=
np
.
ones
(
self
.
nparams
,
float
)
*
prmval
# standard value for a new time step is 1.0
# If this is not the start of the filter, average previous two optimized steps into the mix
if
lag
==
self
.
nlag
-
1
and
self
.
nlag
>=
3
:
newmean
+=
self
.
ensemble_members
[
lag
-
1
][
0
].
param_values
+
\
self
.
ensemble_members
[
lag
-
2
][
0
].
param_values
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment