Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
NearRealTimeCTDAS
CTDAS
Commits
f7a938f2
Commit
f7a938f2
authored
Feb 21, 2020
by
Woude, Auke van der
Browse files
Added biosphere, multiple countries and biosphere
parent
c0706f73
Changes
5
Show whitespace changes
Inline
Side-by-side
da/ffdas/emissionmodel.py
View file @
f7a938f2
...
...
@@ -23,7 +23,7 @@ import math
import
da.tools.rc
as
rc
from
da.tools.general
import
create_dirs
,
to_datetime
import
netCDF4
as
nc
identifier
=
'EmissionModel ensemble '
version
=
'1.0'
...
...
@@ -48,45 +48,49 @@ class EmisModel(object):
self
.
btime
=
int
(
dacycle
.
dasystem
[
'run.backtime'
])
self
.
obsfile
=
dacycle
.
dasystem
[
'obs.input.id'
]
self
.
nrspc
=
int
(
dacycle
.
dasystem
[
'obs.spec.nr'
])
self
.
species
=
dacycle
.
dasystem
[
'obs.spec.name'
].
split
(
','
)
self
.
nrcat
=
int
(
dacycle
.
dasystem
[
'obs.cat.nr'
])
self
.
nparams
=
int
(
dacycle
.
dasystem
[
'nparameters'
])
self
.
nmembers
=
int
(
dacycle
[
'da.optimizer.nmembers'
])
self
.
pparam
=
dacycle
.
dasystem
[
'emis.pparam'
]
self
.
paramfile
=
dacycle
.
dasystem
[
'emis.paramfile'
]
#self.paramfile2 = dacycle.dasystem['emis.paramfile2']
self
.
paramfile
=
dacycle
.
dasystem
[
'emis.paramfiles'
]
self
.
countries
=
[
country
.
strip
()
for
country
in
dacycle
.
dasystem
[
'countries'
].
split
(
';'
)]
areafile
=
dacycle
.
dasystem
[
'area.file'
]
self
.
area
=
nc
.
Dataset
(
areafile
)[
'Area'
][:]
def
get_emis
(
self
,
dacycle
,
p
sdo
):
"""set up emission information for pseudo-obs (
ps
do=1) and ensemble runs (
ps
do=0)"""
def
get_emis
(
self
,
dacycle
,
s
amples
,
do_pseu
do
):
"""set up emission information for pseudo-obs (
do_pseu
do=1) and ensemble runs (
do_pseu
do=0)"""
if
ps
do
==
1
:
if
do_pseu
do
==
1
:
priorparam
=
os
.
path
.
join
(
self
.
emisdir
,
self
.
pparam
)
f
=
io
.
ct_read
(
priorparam
,
'read'
)
self
.
prm
=
f
.
get_variable
(
'true_values'
)[:
self
.
nparams
]
f
.
close
()
self
.
get_spatial
(
dacycle
,
n
=
999
,
infile
=
os
.
path
.
join
(
self
.
emisdir
,
self
.
paramfile
))
self
.
get_temporal
(
dacycle
,
n
=
999
,
ps
do
=
1
)
elif
ps
do
==
0
:
self
.
get_temporal
(
dacycle
,
n
=
999
,
do_pseu
do
=
1
)
elif
do_pseu
do
==
0
:
self
.
timestartkey
=
self
.
dacycle
[
'time.sample.start'
]
self
.
timefinishkey
=
self
.
dacycle
[
'time.sample.end'
]
for
j
in
range
(
self
.
nmembers
):
for
member
in
range
(
self
.
nmembers
):
#first remove old files, then create new emission files per ensemble member
if
self
.
startdate
==
self
.
timestartkey
:
file
=
os
.
path
.
join
(
dacycle
.
dasystem
[
'datadir'
],
'temporal_data_%03d.nc'
%
j
)
file
=
os
.
path
.
join
(
dacycle
.
dasystem
[
'datadir'
],
'temporal_data_%03d.nc'
%
member
)
try
:
os
.
remove
(
file
)
except
OSError
:
pass
prmfile
=
os
.
path
.
join
(
dacycle
[
'dir.input'
],
'parameters.%03d.nc'
%
j
)
prmfile
=
os
.
path
.
join
(
dacycle
[
'dir.input'
],
'parameters.%03d.nc'
%
member
)
f
=
io
.
ct_read
(
prmfile
,
'read'
)
self
.
prm
=
f
.
get_variable
(
'parametervalues'
)
f
.
close
()
self
.
get_spatial
(
dacycle
,
n
=
j
,
infile
=
os
.
path
.
join
(
self
.
emisdir
,
self
.
paramfile
))
self
.
get_temporal
(
dacycle
,
n
=
j
,
ps
do
=
0
)
self
.
get_spatial
(
dacycle
,
member
,
infile
=
os
.
path
.
join
(
self
.
emisdir
,
self
.
paramfile
))
self
.
get_temporal
(
dacycle
,
member
,
samples
,
do_pseu
do
=
0
)
def
get_totals
(
self
,
infile
=
None
):
def
get_totals
(
self
,
in
put
file
=
None
):
"""gather all required data and calculate total emissions per sector in kg/yr"""
self
.
inputfile
=
inputfile
yremis
=
np
.
zeros
((
len
(
self
.
countries
),
self
.
nrcat
,
self
.
nrspc
))
yremis
=
np
.
array
(
self
.
nrcat
*
[
self
.
nrspc
*
[
0.
]])
self
.
spatial_var
=
[]
self
.
spatial_inc
=
[]
self
.
temporal_var
=
[]
...
...
@@ -94,72 +98,71 @@ class EmisModel(object):
self
.
ops_sector
=
[]
### Read in parameter values for the emission functions and ratios to calculate total emissions
f
=
open
(
infile
,
'r'
)
for
i
,
country
in
enumerate
(
self
.
countries
):
# Get the file that is corresponding to the country
infile
=
inputfile
+
'_'
+
country
.
upper
()
+
'.csv'
f
=
open
(
infile
,
'r'
,
encoding
=
'utf-8-sig'
)
lines
=
f
.
readlines
()
f
.
close
()
ct
=
0
counter
=
0
for
line
in
lines
:
dum
=
line
.
split
(
","
)
if
dum
[
0
]
==
'#'
:
if
line
[
0
]
==
'#'
:
continue
else
:
id
=
int
(
dum
[
0
])
### If id == 0 this (sub)sector is treated only by WRF-STILT; if id takes another value the local sources are treated by OPS
if
id
!=
0
:
self
.
ops_sector
.
append
(
ct
)
inc
=
int
(
dum
[
2
])
### If inc == 999 this parameter is not in the state vector and will not be optimized; otherwise inc refers to the
### position of this parameter in the state vector
if
inc
!=
999
:
EC
=
float
(
dum
[
1
])
*
self
.
prm
[
inc
]
else
:
EC
=
float
(
dum
[
1
])
inc
=
int
(
dum
[
4
])
if
inc
!=
999
:
EF
=
float
(
dum
[
3
])
*
self
.
prm
[
inc
]
else
:
EF
=
float
(
dum
[
3
])
inc
=
int
(
dum
[
6
])
if
inc
!=
999
:
AD
=
float
(
dum
[
5
])
*
self
.
prm
[
inc
]
else
:
AD
=
float
(
dum
[
5
])
inc
=
int
(
dum
[
8
])
if
inc
!=
999
:
fr
=
float
(
dum
[
7
])
*
self
.
prm
[
inc
]
else
:
fr
=
float
(
dum
[
7
])
### emission = energy consumption per activity x emission factor x activity
(
name
,
model
,
spatial_distr
,
parameter_num0
,
temporal_distr
,
temporal_distr2
,
parameter_num1
,
\
energy_use
,
parameter_num2
,
fraction_of_total
,
parameter_num3
,
emission_factor
,
*
_
)
=
line
.
split
(
','
)
model
,
energy_use
,
parameter_num0
,
parameter_num1
,
fraction_of_total
,
parameter_num2
,
parameter_num3
,
emission_factor
=
\
int
(
model
),
float
(
energy_use
),
int
(
parameter_num0
),
int
(
parameter_num1
),
float
(
fraction_of_total
),
int
(
parameter_num2
),
int
(
parameter_num3
),
float
(
emission_factor
)
### If id == 0: use stilt. Else: Use Gaussian plume model
if
model
!=
0
:
self
.
ops_sector
.
append
(
counter
)
### If parameter_num1 == 999 this parameter is not in the state vector and will not be optimized;
### otherwise inc refers to the position of this parameter in the state vector
if
parameter_num2
!=
999
:
param_value
=
self
.
prm
[
parameter_num2
]
# optimised
else
:
param_value
=
1
# Not optimised
energy_use
=
energy_use
*
param_value
if
parameter_num3
!=
999
:
param_value
=
self
.
prm
[
parameter_num3
]
# optimised
else
:
param_value
=
1
# Not optimised
fraction_of_total
=
fraction_of_total
*
param_value
### emission = energy consumption emission factor x fraction
### fr can be used to divide sectoral emissions over several subsectors (e.g. to split road traffic into cars and HDV)
ems
=
EC
*
EF
*
AD
*
fr
emission
=
energy_use
*
fraction_of_total
*
emission_factor
### Now we calculated emissions of CO2. To get emissions of other trace gases we multiply with an emission ratio
for
s
in
range
(
self
.
nrspc
):
inc
=
int
(
dum
[
15
+
s
*
3
])
if
inc
!=
999
:
rat
=
float
(
dum
[
13
+
s
*
3
])
*
self
.
prm
[
inc
]
else
:
rat
=
float
(
dum
[
13
+
s
*
3
])
for
species
in
range
(
self
.
nrspc
):
ratio
,
uncertainty
,
in_statevector
=
line
.
split
(
','
)[
12
+
3
*
species
:
15
+
3
*
species
]
ratio
,
in_statevector
=
float
(
ratio
),
int
(
in_statevector
)
if
in_statevector
!=
999
:
multiplication_factor
=
self
.
prm
[
in_statevector
]
else
:
multiplication_factor
=
1
ratio
*=
multiplication_factor
### Some emission ratios have lognormal uncertainty distributions (label 'l')
if
dum
[
14
+
s
*
3
]
==
'n'
:
yremis
[
ct
,
s
]
=
ems
*
rat
elif
dum
[
14
+
s
*
3
]
==
'l'
:
yremis
[
ct
,
s
]
=
ems
*
np
.
exp
(
rat
)
ct
=
ct
+
1
if
uncertainty
==
'n'
:
yremis
[
i
,
counter
,
species
]
=
emission
*
ratio
elif
uncertainty
==
'l'
:
yremis
[
i
,
counter
,
species
]
=
emission
*
np
.
exp
(
ratio
)
counter
+=
1
if
i
==
0
:
self
.
spatial_var
.
append
(
spatial_distr
)
self
.
spatial_inc
.
append
(
parameter_num0
)
# Never optimised, but for backwards compabiilty
self
.
temporal_var
.
append
(
temporal_distr2
)
self
.
temporal_inc
.
append
(
parameter_num1
)
### Here we list the spatial and temporal variables that are part of the state vector for use in the get_spatial and get_temporal functions
self
.
spatial_var
.
append
(
dum
[
9
])
self
.
spatial_inc
.
append
(
int
(
dum
[
10
]))
self
.
temporal_var
.
append
(
dum
[
11
])
self
.
temporal_inc
.
append
(
int
(
dum
[
12
]))
### This, we only do once
logging
.
debug
(
"Successfully calculated total emissions"
)
yremis
=
np
.
swapaxes
(
yremis
,
0
,
1
)
# [cat, country, species]
self
.
yremis
=
yremis
return
yremis
def
get_spatial
(
self
,
dacycle
,
n
,
infile
=
None
):
"""read in proxy data used for spatial distribution of the gridded emissions, disaggregate yearly totals for the area"""
# Get the yearly emissions
yremis
=
self
.
get_totals
(
infile
)
# read in species information
infile
=
os
.
path
.
join
(
self
.
emisdir
,
self
.
obsfile
)
f
=
open
(
infile
,
'r'
)
...
...
@@ -168,114 +171,111 @@ class EmisModel(object):
M_mass
=
[]
spname
=
[]
for
line
in
lines
:
dum
=
line
.
split
(
","
)
if
dum
[
0
]
==
'#'
:
if
line
[
0
]
==
'#'
:
continue
else
:
dum
=
line
.
split
(
","
)
M_mass
.
append
(
float
(
dum
[
6
])
*
1e-9
)
#kg/micromole
spname
.
append
(
dum
[
5
])
#name of the species
sec_year
=
8760.
*
3600.
#seconds in a year
arcor
=
1e6
#km2 -> m2
conv
=
np
.
array
(
M_mass
)
*
sec_year
*
arcor
# to convert emissions in kg/km2/yr to micromole/m2/s
# Create a recalculation factor from kg/km2/yr to umol/m2/sec
sec_year
=
24
*
366
*
3600.
#seconds in a year (leapyear)
kgperkmperyr2umolperm2pers
=
np
.
array
(
M_mass
)[:,
None
,
None
]
*
sec_year
*
self
.
area
[
None
,
:,
:]
self
.
kgperkmperyr2umolperm2pers
=
kgperkmperyr2umolperm2pers
#read in proxy data for spatial disaggregation
infile
=
os
.
path
.
join
(
self
.
emisdir
,
self
.
proxyfile
)
prx
=
io
.
ct_read
(
infile
,
method
=
'read'
)
sp_distr
=
[]
for
c
in
range
(
self
.
nrcat
):
sp_distr
.
append
(
prx
.
get_variable
(
self
.
spatial_var
[
c
]))
sp_distr
=
np
.
array
(
sp_distr
)
prx
.
close
()
### create output file
proxy
=
io
.
ct_read
(
infile
,
method
=
'read'
)
proxy_category_names
=
proxy
[
'emis_category_names'
][:]
proxy_category_names
=
[
b
''
.
join
(
category_name
).
strip
().
decode
()
for
category_name
in
proxy_category_names
]
proxy_country_names
=
proxy
[
'country_names'
][:]
proxy_country_names
=
[
b
''
.
join
(
country_name
).
strip
().
decode
()
for
country_name
in
proxy_country_names
]
self
.
proxy
=
proxy
# Create the spatial distributions
spatial_distributions
=
np
.
zeros
((
self
.
nrcat
,
len
(
self
.
countries
),
self
.
area
.
shape
[
0
],
self
.
area
.
shape
[
1
]))
# Loop over all categories
for
i
,
category
in
enumerate
(
self
.
spatial_var
):
cat_index
=
proxy_category_names
.
index
(
category
)
do_optimise
=
self
.
spatial_inc
[
i
]
# Check if the variable is optimised
if
do_optimise
!=
999
:
multiplication_factor
=
self
.
prm
[
do_optimise
]
else
:
multiplication_factor
=
1
# Get the emission distribution for the category
category_emission_distr
=
proxy
[
'proxy_maps'
][
cat_index
,:,:,:]
*
multiplication_factor
for
j
,
country
in
enumerate
(
self
.
countries
):
# Get the emission distribution for the country and the category
country_index
=
proxy_country_names
.
index
(
country
)
category_distribution_country
=
category_emission_distr
[
country_index
,
:,
:]
# print(category_distribution_country.sum())
spatial_distributions
[
i
,
country_index
,
:,
:]
=
category_distribution_country
self
.
spatial_distributions
=
spatial_distributions
# Multiply spatial distributions with the yearly emissions in the country to get spatially distributed emissions
spatial_emissions
=
spatial_distributions
[:,
:,
None
,
:,
:]
*
yremis
[:,
:,
:,
None
,
None
]
# Sum over the countries to overlay them.
spatial_emissions
=
spatial_emissions
.
sum
(
axis
=
1
)
# Indices: category, species, lat, lon
spatial_emissions
=
np
.
swapaxes
(
spatial_emissions
,
0
,
1
)
# Indices: species, category, lat, lon
# Recalculate spatial emissions to umol/sec/m2
spatial_emissions
=
spatial_emissions
/
kgperkmperyr2umolperm2pers
[:,
None
,
:,
:]
## create output file
prior_file
=
os
.
path
.
join
(
self
.
emisdir
,
'prior_spatial_%03d.nc'
%
n
)
f
=
io
.
CT_CDF
(
prior_file
,
method
=
'create'
)
dimid
=
f
.
add_dim
(
'ncat'
,
self
.
nrcat
)
dimid2
=
f
.
add_dim
(
'ops'
,
3
)
diml
on
=
f
.
add_dim
(
'lon'
,
s
p_distr
.
shape
[
1
])
diml
at
=
f
.
add_dim
(
'lat'
,
s
p_distr
.
shape
[
2
])
dimid2
=
f
.
add_dim
(
'ops'
,
2
)
diml
at
=
f
.
add_dim
(
'lon'
,
s
elf
.
area
.
shape
[
0
])
diml
on
=
f
.
add_dim
(
'lat'
,
s
elf
.
area
.
shape
[
1
])
#loop over all tracers
for
s
in
range
(
self
.
nrspc
):
# determine which fraction of the emissions are local and are treated by OPS
datalistOPS
=
[]
opsfr
=
[
0.317
,
0.317
,
0.267
]
for
j
in
range
(
len
(
self
.
ops_sector
)):
OPSemis
=
yremis
[
self
.
ops_sector
[
j
],
s
]
*
opsfr
[
j
]
*
1E3
/
sec_year
datalistOPS
.
append
(
OPSemis
)
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'name'
]
=
"OPStotals_%s"
%
spname
[
s
]
savedict
[
'long_name'
]
=
"total OPS emissions"
savedict
[
'units'
]
=
"g/s"
savedict
[
'dims'
]
=
dimid2
savedict
[
'values'
]
=
datalistOPS
f
.
add_data
(
savedict
)
datalist
=
[]
ct
=
0
for
c
in
range
(
self
.
nrcat
):
inc
=
self
.
spatial_inc
[
c
]
if
inc
!=
999
:
### note that the spatial distribution has to gridded state vector, such that a scaling factor would affect the total
### emissions in this area
distr
=
sp_distr
[
c
]
*
self
.
prm
[
inc
]
else
:
distr
=
sp_distr
[
c
]
if
c
in
self
.
ops_sector
:
emis_spatial
=
(
yremis
[
c
,
s
]
*
(
1
-
opsfr
[
ct
]))
/
conv
[
s
]
*
distr
ct
=
ct
+
1
else
:
emis_spatial
=
yremis
[
c
,
s
]
/
conv
[
s
]
*
distr
datalist
.
append
(
emis_spatial
)
for
species
in
range
(
self
.
nrspc
):
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'name'
]
=
spname
[
s
]
savedict
[
'name'
]
=
spname
[
s
pecies
]
savedict
[
'long_name'
]
=
"Spatially distributed emissions"
savedict
[
'units'
]
=
"micromole/m2/s"
savedict
[
'dims'
]
=
dimid
+
diml
on
+
diml
at
savedict
[
'values'
]
=
d
atal
ist
savedict
[
'dims'
]
=
dimid
+
diml
at
+
diml
on
savedict
[
'values'
]
=
sp
at
i
al
_emissions
[
species
,:,:,:]
f
.
add_data
(
savedict
)
f
.
close
()
logging
.
debug
(
"Successfully wrote data to prior spatial distribution file (%s)"
%
prior_file
)
def
get_temporal
(
self
,
dacycle
,
n
,
ps
do
):
def
get_temporal
(
self
,
dacycle
,
member
,
samples
,
do_pseu
do
):
"""read in time profiles used for temporal distribution of the emissions"""
### For pseudo-observation (psdo==1) or when no time profiles need to be optimized the profiles are simply read from the
### input file and copied to another file.
### Otherwise create a new file per ensemble member at t=0 and update the profiles for each time step
if
psdo
==
0
and
min
(
self
.
temporal_inc
)
<
999
:
ensfile
=
os
.
path
.
join
(
self
.
emisdir
,
'temporal_data_%03d.nc'
%
n
)
if
os
.
path
.
exists
(
ensfile
)
==
False
:
# First, get the station names from the smaples. For these stations, the time profiles will be written.
codes
=
samples
.
getvalues
(
'code'
)
self
.
codes
=
codes
stationnames
=
[]
for
code
in
codes
:
two_names
=
any
(
x
in
code
for
x
in
[
'UW'
,
'DW'
])
stationnames
.
append
(
'_'
.
join
(
code
.
split
(
'_'
)[
1
:
2
+
two_names
]))
stationnames
=
list
(
set
(
stationnames
))
# For pseudo-observation (do_pseudo==1) or when no time profiles need to be optimized the profiles are simply read from the
# input file and copied to another file. Otherwise create a new file per ensemble member at t=0 and update the profiles for each time step
if
do_pseudo
==
0
and
min
(
self
.
temporal_inc
)
<
999
:
# Check if the ensemble file exists. Otherwise create.
ensfile
=
os
.
path
.
join
(
self
.
emisdir
,
'temporal_data_%03d.nc'
%
member
)
if
not
os
.
path
.
exists
(
ensfile
):
dumfile
=
os
.
path
.
join
(
self
.
emisdir
,
self
.
tempfilep
)
shutil
.
copy2
(
dumfile
,
ensfile
)
t
pr
=
io
.
ct_read
(
ensfile
,
method
=
'read'
)
i
times
=
t
pr
.
get_variable
(
'Times'
)
times
=
array
([
dtm
.
datetime
(
int
(
''
.
join
(
d
[:
4
])),
int
(
''
.
join
(
d
[
5
:
7
])),
int
(
''
.
join
(
d
[
8
:
10
])),
int
(
''
.
join
(
d
[
11
:
13
])),
int
(
''
.
join
(
d
[
14
:
16
])))
for
d
in
i
times
])
s
ubselect
=
logical_and
(
times
>=
self
.
timestartkey
,
times
<=
self
.
timefinishkey
).
nonzero
()[
0
]
datlist
=
times
.
ta
ke
(
subselect
,
axis
=
0
)
#
##
The time profiles should always cover at least one full year
t
ime_profiles_ds
=
nc
.
Dataset
(
ensfile
)
times
=
t
ime_profiles_ds
[
'Times'
][:]
times
=
np
.
array
([
dtm
.
datetime
.
strptime
(
time
,
'%Y-%m-%d %H:%M:%S'
)
for
time
in
np
.
array
(
times
)
])
s
elf
.
times
=
times
subselect
=
logical_and
(
times
>=
self
.
timesta
rtkey
,
times
<
self
.
timefinishkey
).
nonzero
()[
0
]
date_selection
=
times
.
take
(
subselect
,
axis
=
0
)
# The time profiles should always cover at least one full year
start_date
=
dtm
.
datetime
(
self
.
timestartkey
.
year
,
1
,
1
,
0
,
0
)
#first time included
end_date
=
dtm
.
datetime
(
self
.
timestartkey
.
year
,
12
,
31
,
23
,
0
)
#last time included
dt
=
dtm
.
timedelta
(
0
,
3600
)
dum
=
start_date
times
=
[]
while
dum
<=
end_date
:
times
.
append
(
dum
)
dum
=
dum
+
dt
times
=
np
.
array
(
times
)
stidx
=
np
.
where
(
times
==
self
.
timestartkey
)[
0
][
0
]
stidx2
=
np
.
where
(
times
==
self
.
startdate
)[
0
][
0
]
edidx
=
np
.
where
(
times
==
self
.
timefinishkey
)[
0
][
0
]
dumsel
=
np
.
where
((
times
<
self
.
startdate
)
|
(
times
>
self
.
timefinishkey
))[
0
]
starttime_index
=
np
.
where
(
times
==
self
.
timestartkey
)[
0
][
0
]
startdate_index
=
np
.
where
(
times
==
self
.
startdate
)[
0
][
0
]
end_index
=
np
.
where
(
times
==
self
.
timefinishkey
)[
0
][
0
]
""" Time profiles should, for a full year, always have an average value of 1.0. Therefore, a new method has been developed
to optimize time profiles such that we comply with this and the time profiles do not affect the total emissions.
...
...
@@ -283,74 +283,69 @@ class EmisModel(object):
outside this period are scaled equally such that the time profile remains its average value of 1.0. Except previously updated
dates (from previous cycles) are maintained (they are already optimized!)."""
profiles
=
[]
for
c
in
range
(
self
.
nrcat
):
if
self
.
temporal_inc
[
c
]
!=
999
:
f_orig
=
tpr
.
get_variable
(
self
.
temporal_var
[
c
])
f_sel
=
tpr
.
get_variable
(
self
.
temporal_var
[
c
]).
take
(
subselect
,
axis
=
0
)
f_new
=
f_sel
*
self
.
prm
[
self
.
temporal_inc
[
c
]]
dumsum
=
np
.
array
(
f_orig
[
dumsel
]).
sum
()
for
i
in
range
(
len
(
f_new
)):
f_orig
[:
stidx2
]
=
f_orig
[:
stidx2
]
-
(
f_orig
[:
stidx2
]
/
dumsum
)
*
(
f_new
.
sum
()
-
f_sel
.
sum
())
f_orig
[
edidx
+
1
:]
=
f_orig
[
edidx
+
1
:]
-
(
f_orig
[
edidx
+
1
:]
/
dumsum
)
*
(
f_new
.
sum
()
-
f_sel
.
sum
())
f_orig
[
stidx
:
edidx
+
1
]
=
f_new
profiles
.
append
(
f_orig
)
tpr
.
close
()
f
=
io
.
CT_CDF
(
ensfile
,
method
=
'write'
)
ct
=
0
for
c
in
range
(
self
.
nrcat
):
if
self
.
temporal_inc
[
c
]
!=
999
:
f
.
variables
[
self
.
temporal_var
[
c
]][:]
=
profiles
[
ct
]
ct
=
ct
+
1
f
.
close
()
### Now read in the correct profiles, select the correct time period and write the profiles into one file per ensemble member
if
psdo
==
1
:
unselected_day
=
np
.
where
((
times
<
self
.
startdate
)
|
(
times
>
self
.
timefinishkey
))[
0
]
category_names
=
list
(
time_profiles_ds
[
'category_name'
][:])
station_names_ds
=
list
(
time_profiles_ds
[
'station_names'
][:])
profiles
=
np
.
zeros
(
time_profiles_ds
[
'time_profile'
][:].
shape
)
for
category
in
range
(
self
.
nrcat
):
if
self
.
temporal_inc
[
category
]
!=
999
:
cat_index
=
category_names
.
index
(
self
.
temporal_var
[
category
])
for
station
in
stationnames
:
station_index
=
station_names_ds
.
index
(
station
)
original_profile
=
time_profiles_ds
[
'time_profile'
][
station_index
,
cat_index
,
:]
selected_profile
=
time_profiles_ds
[
'time_profile'
][
station_index
,
cat_index
,
:].
take
(
subselect
)
new_profile
=
selected_profile
[:]
*
self
.
prm
[
self
.
temporal_inc
[
category
]]
daily_sum
=
np
.
array
(
original_profile
[
unselected_day
]).
sum
()
original_profile
[:
startdate_index
]
=
original_profile
[:
startdate_index
]
-
(
original_profile
[:
startdate_index
]
/
daily_sum
)
*
(
new_profile
.
sum
()
-
selected_profile
.
sum
())
original_profile
[
end_index
:]
=
original_profile
[
end_index
:]
-
(
original_profile
[
end_index
:]
/
daily_sum
)
*
(
new_profile
.
sum
()
-
selected_profile
.
sum
())
original_profile
[
starttime_index
:
end_index
]
=
new_profile
profiles
[
station_index
,
cat_index
,
:]
=
original_profile
time_profiles_ds
.
close
()
# Now, write the output
tofile
=
nc
.
Dataset
(
ensfile
,
'r+'
)
for
category
in
range
(
self
.
nrcat
):
cat_index
=
category_names
.
index
(
self
.
temporal_var
[
category
])
if
not
self
.
temporal_inc
[
category
]
==
999
:
for
station
in
stationnames
:
station_index
=
station_names_ds
.
index
(
station
)
tofile
[
'time_profile'
][
station_index
,
cat_index
,
:]
=
profiles
[
station_index
,
cat_index
,
:]
tofile
.
close
()
# Now read in the correct profiles, select the correct time period and write the profiles into one file per ensemble member
time_profiles_ds
=
nc
.
Dataset
(
ensfile
)
if
do_pseudo
==
1
:
infile
=
os
.
path
.
join
(
self
.
emisdir
,
self
.
tempfileo
)
elif
psdo
==
0
and
min
(
self
.
temporal_inc
)
==
999
:
infile
=
os
.
path
.
join
(
self
.
emisdir
,
self
.
tempfilep
)
elif
psdo
==
0
and
min
(
self
.
temporal_inc
)
<
999
:
infile
=
os
.
path
.
join
(
self
.
emisdir
,
'temporal_data_%03d.nc'
%
n
)
tpr
=
io
.
ct_read
(
infile
,
method
=
'read'
)
itimes
=
tpr
.
get_variable
(
'Times'
)
itimes
=
[
b
''
.
join
(
d
)
for
d
in
itimes
]
times
=
array
([
dtm
.
datetime
.
strptime
(
d
.
decode
(),
'%Y-%m-%d_%H:%M:%S'
)
for
d
in
itimes
])
if
psdo
==
1
:
startdum
=
dtm
.
datetime
(
self
.
startdate
.
year
,
self
.
startdate
.
month
,
self
.
startdate
.
day
-
1
,
1
,
0
)
subselect
=
logical_and
(
times
>=
startdum
,
times
<=
self
.
enddate
).
nonzero
()[
0
]
else
:
elif
do_pseudo
==
0
and
min
(
self
.
temporal_inc
)
==
999
:
dum
=
self
.
timestartkey
-
dtm
.
timedelta
(
0
,
self
.
btime
*
3600
)
if
dum
.
hour
!=
0
:
startdum
=
dtm
.
datetime
(
dum
.
year
,
dum
.
month
,
dum
.
day
,
1
,
0
)
else
:
startdum
=
dtm
.
datetime
(
dum
.
year
,
dum
.
month
,
dum
.
day
-
1
,
1
,
0
)
subselect
=
logical_and
(
times
>=
startdum
,
times
<=
self
.
timefinishkey
).
nonzero
()[
0
]
datlist
=
times
.
take
(
subselect
,
axis
=
0
)
profiles
=
[]
for
c
in
range
(
self
.
nrcat
):
f_orig
=
tpr
.
get_variable
(
self
.
temporal_var
[
c
]).
take
(
subselect
,
axis
=
0
)
profiles
.
append
(
f_orig
)
tpr
.
close
()
profiles
=
np
.
array
(
profiles
)
prior_file
=
os
.
path
.
join
(
self
.
emisdir
,
'prior_temporal_%03d.nc'
%
n
)
f
=
io
.
CT_CDF
(
prior_file
,
method
=
'create'
)
dimtime
=
f
.
add_dim
(
'Times'
,
len
(
datlist
))
else
:
# select all times
subselect
=
logical_and
(
times
>=
times
[
0
]
,
times
<=
times
[
-
1
]).
nonzero
()[
0
]
date_selection
=
times
.
take
(
subselect
,
axis
=
0
)
dum
=
[]
for
c
in
range
(
self
.
nrcat
):
if
self
.
temporal_var
[
c
]
in
dum
:
continue
else
:
for
station
in
stationnames
:
station_index
=
station_names_ds
.
index
(
station
)
prior_file
=
os
.
path
.
join
(
self
.
emisdir
,
'prior_temporal_{0}_{1:03}.nc'
.
format
(
station
,
member
))
f
=
io
.
CT_CDF
(
prior_file
,
method
=
'create'
)
dimtime
=
f
.
add_dim
(
'Times'
,
len
(
date_selection
))
cat_names_done
=
[]
for
category
in
range
(
self
.
nrcat
):
cat_name
=
self
.
temporal_var
[
category
]
cat_index
=
category_names
.
index
(
cat_name
)
if
not
cat_name
in
cat_names_done
:
profile
=
np
.
array
(
time_profiles_ds
[
'time_profile'
][
station_index
,
cat_index
,
:].
take
(
subselect
))
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'name'
]
=
self
.
temporal_var
[
c
]
savedict
[
'name'
]
=
cat_name
savedict
[
'long_name'
]
=
"Temporal distribution"
savedict
[
'units'
]
=
""
savedict
[
'dims'
]
=
dimtime
savedict
[
'values'
]
=
profile
s
[
c
,:]
savedict
[
'values'
]
=
profile
f
.
add_data
(
savedict
)
dum
.
append
(
self
.
temporal_var
[
c
]
)
cat_names_done
.
append
(
cat_name
)
f
.
close
()
logging
.
debug
(
"Successfully wrote data to prior temporal distribution file (%s)"
%
prior_file
)
...
...
da/ffdas/obs.py
View file @
f7a938f2
...
...
@@ -14,12 +14,14 @@ import sys
import
logging
import
datetime
as
dtm
from
string
import
strip
from
numpy
import
array
,
logical_and
import
numpy
as
np
sys
.
path
.
append
(
os
.
getcwd
())
sys
.
path
.
append
(
'../../'
)
from
pylab
import
*
#from pylab import *
from
numpy
import
*
from
scipy
import
*
from
matplotlib.pylab
import
*
identifier
=
'Rotterdam CO2 mole fractions'
version
=
'0.0'
...
...
@@ -29,7 +31,7 @@ import da.tools.io4 as io
import
da.tools.rc
as
rc
################### Begin Class RdamObservations ###################
class
R
dam
Observations
(
Observations
):
class
R
INGO
Observations
(
Observations
):
""" an object that holds data + methods and attributes needed to manipulate mole fraction values """
def
setup
(
self
,
dacycle
):
...
...
@@ -62,28 +64,32 @@ class RdamObservations(Observations):
"""
infile
=
os
.
path
.
join
(
self
.
obspack_dir
,
self
.
obspack_id
)
logging
.
debug
(
'infile = {}'
.
format
(
infile
))
f
=
open
(
infile
,
'r'
)
lines
=
f
.
readlines
()
f
.
close
()
ncfilelist
=
[]
for
line
in
lines
:
if
line
.
startswith
(
'#'
):
continue
# header
dum
=
line
.
split
(
","
)
ncfile
=
[
dum
[
1
]]
if
not
ncfile
[
0
]
in
ncfilelist
:
ncfilelist
.
extend
(
ncfile
)
ncfilelist
=
[
'obsfiles/'
+
line
.
split
(
','
)[
1
]
for
line
in
lines
if
not
line
[
0
]
==
'#'
]
ncfilelist
=
ncfilelist
[:
int
(
dacycle
.
dasystem
[
'obs.input.nr'
])]
# ncfilelist = []
# for line in lines:
# if line.startswith('#'): continue # header
#
# dum = line.split(",")
# ncfile = [dum[1]]
#
# if not ncfile[0] in ncfilelist:
# ncfilelist.extend(ncfile)
for
ncfile
in
ncfilelist
:
infile
=
os
.
path
.
join
(
self
.
obspack_dir
,
ncfile
+
'.nc'
)
ncf
=
io
.
ct_read
(
infile
,
'read'
)
idates
=
ncf
.
get_variable
(
'Times'
)
idates
=
[
b
''
.
join
(
d
)
for
d
in
idates
]
dates
=
array
([
dtm
.
datetime
.
strptime
(
d
.
decode
()
,
'%Y-%m-%d_%H:%M:%S'
)
for
d
in
idates
])