Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
C
CTDAS
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
NearRealTimeCTDAS
CTDAS
Commits
bff3ed87
Commit
bff3ed87
authored
13 years ago
by
Peters, Wouter
Browse files
Options
Downloads
Patches
Plain Diff
now creates output files for concentrations
parent
73965107
No related branches found
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
da/analysis/expand_mixingratios.py
+131
-737
131 additions, 737 deletions
da/analysis/expand_mixingratios.py
with
131 additions
and
737 deletions
da/analysis/expand_mixingratios.py
+
131
−
737
View file @
bff3ed87
...
...
@@ -10,427 +10,36 @@ import numpy as np
from
pylab
import
date2num
,
num2date
"""
Author: Wouter Peters (Wouter.Peters@
noaa.gov
)
Author: Wouter Peters (Wouter.Peters@
wur.nl
)
Revision History:
File created on
2
1
Ocotber
20
08
.
File created on
1
1
May
20
12
.
"""
def
proceed_dialog
(
txt
,
yes
=
[
'
y
'
,
'
yes
'
],
all
=
[
'
a
'
,
'
all
'
,
'
yes-to-all
'
]):
"""
function to ask whether to proceed or not
"""
response
=
raw_input
(
txt
)
if
response
.
lower
()
in
yes
:
return
1
if
response
.
lower
()
in
all
:
return
2
return
0
def
SaveWeeklyAvg1x1Data
(
DaCycle
,
StateVector
):
"""
Function creates a NetCDF file with output on 1x1 degree grid. It uses the flux data written by the
:class:`~da.baseclasses.obsoperator.ObsOperator.py`, and multiplies these with the mapped parameters and
variance (not covariance!) from the :class:`~da.baseclasses.statevector.StateVector`.
:param DaCycle: a :class:`~da.tools.initexit.CycleControl` object
:param StateVector: a :class:`~da.baseclasses.statevector.StateVector`
:rtype: None
"""
import
da.tools.io4
as
io
import
logging
#
dirname
=
'
data_flux1x1_weekly
'
#
dirname
=
CreateDirs
(
os
.
path
.
join
(
DaCycle
[
'
dir.analysis
'
],
dirname
))
#
# Some help variables
#
dectime0
=
date2num
(
datetime
(
2000
,
1
,
1
))
dt
=
DaCycle
[
'
cyclelength
'
]
startdate
=
DaCycle
[
'
time.start
'
]
enddate
=
DaCycle
[
'
time.end
'
]
nlag
=
StateVector
.
nlag
msg
=
"
DA Cycle start date is %s
"
%
startdate
.
strftime
(
'
%Y-%m-%d %H:%M
'
)
;
logging
.
debug
(
msg
)
msg
=
"
DA Cycle end date is %s
"
%
enddate
.
strftime
(
'
%Y-%m-%d %H:%M
'
)
;
logging
.
debug
(
msg
)
#
# Create or open NetCDF output file
#
saveas
=
os
.
path
.
join
(
dirname
,
'
flux_1x1.nc
'
)
ncf
=
io
.
CT_CDF
(
saveas
,
'
write
'
)
#
# Create dimensions and lat/lon grid
#
dimgrid
=
ncf
.
AddLatLonDim
()
dimdate
=
ncf
.
AddDateDim
()
#
# set title and tell GMT that we are using "pixel registration"
#
setattr
(
ncf
,
'
Title
'
,
'
CarbonTracker fluxes
'
)
setattr
(
ncf
,
'
node_offset
'
,
1
)
#
# skip dataset if already in file
#
ncfdate
=
date2num
(
startdate
)
-
dectime0
+
dt
.
days
/
2.0
skip
=
ncf
.
has_date
(
ncfdate
)
if
skip
:
msg
=
'
Skipping writing of data for date %s : already present in file %s
'
%
(
startdate
.
strftime
(
'
%Y-%m-%d
'
),
saveas
)
;
logging
.
warning
(
msg
)
else
:
#
# if not, process this cycle. Start by getting flux input data from CTDAS
#
filename
=
os
.
path
.
join
(
DaCycle
[
'
dir.output
'
],
'
flux1x1_%s_%s.nc
'
%
(
startdate
.
strftime
(
'
%Y%m%d%H
'
),
enddate
.
strftime
(
'
%Y%m%d%H
'
),)
)
file
=
io
.
CT_Read
(
filename
,
'
read
'
)
bio
=
np
.
array
(
file
.
GetVariable
(
DaCycle
.
DaSystem
[
'
background.co2.bio.flux
'
]))
ocean
=
np
.
array
(
file
.
GetVariable
(
DaCycle
.
DaSystem
[
'
background.co2.ocean.flux
'
]))
fire
=
np
.
array
(
file
.
GetVariable
(
DaCycle
.
DaSystem
[
'
background.co2.fires.flux
'
]))
fossil
=
np
.
array
(
file
.
GetVariable
(
DaCycle
.
DaSystem
[
'
background.co2.fossil.flux
'
]))
#mapped_parameters = np.array(file.GetVariable(DaCycle.DaSystem['final.param.mean.1x1']))
file
.
close
()
next
=
ncf
.
inq_unlimlen
()[
0
]
# Start adding datasets from here on, both prior and posterior datasets for bio and ocn
for
prior
in
[
True
,
False
]:
#
# Now fill the StateVector with the prior values for this time step. Note that the prior value for this time step
# occurred nlag time steps ago, so we make a shift in the output directory, but only if we are more than nlag cycle away from the start date..
#
if
prior
:
qual_short
=
'
prior
'
for
n
in
range
(
nlag
,
0
,
-
1
):
priordate
=
enddate
-
timedelta
(
dt
.
days
*
n
)
savedir
=
DaCycle
[
'
dir.output
'
].
replace
(
startdate
.
strftime
(
'
%Y%m%d
'
),
priordate
.
strftime
(
'
%Y%m%d
'
)
)
filename
=
os
.
path
.
join
(
savedir
,
'
savestate.nc
'
)
if
os
.
path
.
exists
(
filename
):
dummy
=
StateVector
.
ReadFromFile
(
filename
,
qual
=
qual_short
)
gridmean
,
gridvariance
=
StateVector
.
StateToGrid
(
lag
=
n
)
# Replace the mean statevector by all ones (assumed priors)
gridmean
=
StateVector
.
VectorToGrid
(
vectordata
=
np
.
ones
(
StateVector
.
nparams
,))
msg
=
'
Read prior dataset from file %s, sds %d:
'
%
(
filename
,
n
)
;
logging
.
debug
(
msg
)
break
else
:
qual_short
=
'
opt
'
savedir
=
DaCycle
[
'
dir.output
'
]
filename
=
os
.
path
.
join
(
savedir
,
'
savestate.nc
'
)
dummy
=
StateVector
.
ReadFromFile
(
filename
,
qual
=
qual_short
)
gridmean
,
gridvariance
=
StateVector
.
StateToGrid
(
lag
=
nlag
)
msg
=
'
Read posterior dataset from file %s, sds %d:
'
%
(
filename
,
nlag
)
;
logging
.
debug
(
msg
)
#
# if prior, do not multiply fluxes with parameters, otherwise do
#
biomapped
=
bio
*
gridmean
oceanmapped
=
ocean
*
gridmean
biovarmapped
=
bio
*
gridvariance
oceanvarmapped
=
ocean
*
gridvariance
#
#
# For each dataset, get the standard definitions from the module mysettings, add values, dimensions, and unlimited count, then write
#
savedict
=
ncf
.
StandardVar
(
varname
=
'
bio_flux_
'
+
qual_short
)
savedict
[
'
values
'
]
=
biomapped
.
tolist
()
savedict
[
'
dims
'
]
=
dimdate
+
dimgrid
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
#
savedict
=
ncf
.
StandardVar
(
varname
=
'
ocn_flux_
'
+
qual_short
)
savedict
[
'
values
'
]
=
oceanmapped
.
tolist
()
savedict
[
'
dims
'
]
=
dimdate
+
dimgrid
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
savedict
=
ncf
.
StandardVar
(
varname
=
'
bio_flux_%s_cov
'
%
qual_short
)
savedict
[
'
values
'
]
=
biovarmapped
.
tolist
()
savedict
[
'
dims
'
]
=
dimdate
+
dimgrid
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
#
savedict
=
ncf
.
StandardVar
(
varname
=
'
ocn_flux_%s_cov
'
%
qual_short
)
savedict
[
'
values
'
]
=
oceanvarmapped
.
tolist
()
savedict
[
'
dims
'
]
=
dimdate
+
dimgrid
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
# End prior/posterior block
savedict
=
ncf
.
StandardVar
(
varname
=
'
fire_flux_imp
'
)
savedict
[
'
values
'
]
=
fire
.
tolist
()
savedict
[
'
dims
'
]
=
dimdate
+
dimgrid
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
#
savedict
=
ncf
.
StandardVar
(
varname
=
'
fossil_flux_imp
'
)
savedict
[
'
values
'
]
=
fossil
.
tolist
()
savedict
[
'
dims
'
]
=
dimdate
+
dimgrid
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
#
savedict
=
ncf
.
StandardVar
(
varname
=
'
date
'
)
savedict
[
'
values
'
]
=
date2num
(
startdate
)
-
dectime0
+
dt
.
days
/
2.0
savedict
[
'
dims
'
]
=
dimdate
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
sys
.
stdout
.
write
(
'
.
'
)
sys
.
stdout
.
flush
()
#
# Done, close the new NetCDF file
#
ncf
.
close
()
#
# Return the full name of the NetCDF file so it can be processed by the next routine
#
msg
=
"
Gridded weekly average fluxes now written
"
;
logging
.
info
(
msg
)
return
saveas
def
SaveWeeklyAvgStateData
(
DaCycle
,
StateVector
):
"""
Function creates a NetCDF file with output for all parameters. It uses the flux data written by the
:class:`~da.baseclasses.obsoperator.ObsOperator.py`, and multiplies these with the mapped parameters and
variance (not covariance!) from the :class:`~da.baseclasses.statevector.StateVector`.
:param DaCycle: a :class:`~da.tools.initexit.CycleControl` object
:param StateVector: a :class:`~da.baseclasses.statevector.StateVector`
:rtype: None
"""
import
da.tools.io4
as
io
import
logging
from
da.analysis.tools_regions
import
globarea
def
WriteMixingRatios
(
DaCycle
):
"""
#
dirname
=
'
data_state_weekly
'
#
dirname
=
CreateDirs
(
os
.
path
.
join
(
DaCycle
[
'
dir.analysis
'
],
dirname
))
#
# Some help variables
#
dectime0
=
date2num
(
datetime
(
2000
,
1
,
1
))
dt
=
DaCycle
[
'
cyclelength
'
]
startdate
=
DaCycle
[
'
time.start
'
]
enddate
=
DaCycle
[
'
time.end
'
]
nlag
=
StateVector
.
nlag
area
=
globarea
()
vectorarea
=
StateVector
.
GridToVector
(
griddata
=
area
,
method
=
'
sum
'
)
msg
=
"
DA Cycle start date is %s
"
%
startdate
.
strftime
(
'
%Y-%m-%d %H:%M
'
)
;
logging
.
debug
(
msg
)
msg
=
"
DA Cycle end date is %s
"
%
enddate
.
strftime
(
'
%Y-%m-%d %H:%M
'
)
;
logging
.
debug
(
msg
)
#
# Create or open NetCDF output file
#
saveas
=
os
.
path
.
join
(
dirname
,
'
statefluxes.nc
'
)
ncf
=
io
.
CT_CDF
(
saveas
,
'
write
'
)
#
# Create dimensions and lat/lon grid
#
dimregs
=
ncf
.
AddDim
(
'
nparameters
'
,
StateVector
.
nparams
)
dimmembers
=
ncf
.
AddDim
(
'
nmembers
'
,
StateVector
.
nmembers
)
dimdate
=
ncf
.
AddDateDim
()
#
# set title and tell GMT that we are using "pixel registration"
#
setattr
(
ncf
,
'
Title
'
,
'
CarbonTracker fluxes
'
)
setattr
(
ncf
,
'
node_offset
'
,
1
)
#
# skip dataset if already in file
#
ncfdate
=
date2num
(
startdate
)
-
dectime0
+
dt
.
days
/
2.0
skip
=
ncf
.
has_date
(
ncfdate
)
if
skip
:
msg
=
'
Skipping writing of data for date %s : already present in file %s
'
%
(
startdate
.
strftime
(
'
%Y-%m-%d
'
),
saveas
)
;
logging
.
warning
(
msg
)
else
:
next
=
ncf
.
inq_unlimlen
()[
0
]
#
# if not, process this cycle. Start by getting flux input data from CTDAS
#
filename
=
os
.
path
.
join
(
DaCycle
[
'
dir.output
'
],
'
flux1x1_%s_%s.nc
'
%
(
startdate
.
strftime
(
'
%Y%m%d%H
'
),
enddate
.
strftime
(
'
%Y%m%d%H
'
),)
)
file
=
io
.
CT_Read
(
filename
,
'
read
'
)
bio
=
np
.
array
(
file
.
GetVariable
(
DaCycle
.
DaSystem
[
'
background.co2.bio.flux
'
]))
ocean
=
np
.
array
(
file
.
GetVariable
(
DaCycle
.
DaSystem
[
'
background.co2.ocean.flux
'
]))
fire
=
np
.
array
(
file
.
GetVariable
(
DaCycle
.
DaSystem
[
'
background.co2.fires.flux
'
]))
fossil
=
np
.
array
(
file
.
GetVariable
(
DaCycle
.
DaSystem
[
'
background.co2.fossil.flux
'
]))
#mapped_parameters = np.array(file.GetVariable(DaCycle.DaSystem['final.param.mean.1x1']))
file
.
close
()
next
=
ncf
.
inq_unlimlen
()[
0
]
vectorbio
=
StateVector
.
GridToVector
(
griddata
=
bio
*
area
,
method
=
'
sum
'
)
vectorocn
=
StateVector
.
GridToVector
(
griddata
=
ocean
*
area
,
method
=
'
sum
'
)
vectorfire
=
StateVector
.
GridToVector
(
griddata
=
fire
*
area
,
method
=
'
sum
'
)
vectorfossil
=
StateVector
.
GridToVector
(
griddata
=
fossil
*
area
,
method
=
'
sum
'
)
# Start adding datasets from here on, both prior and posterior datasets for bio and ocn
for
prior
in
[
True
,
False
]:
#
# Now fill the StateVector with the prior values for this time step. Note that the prior value for this time step
# occurred nlag time steps ago, so we make a shift in the output directory, but only if we are more than nlag cycle away from the start date..
#
if
prior
:
qual_short
=
'
prior
'
for
n
in
range
(
nlag
,
0
,
-
1
):
priordate
=
enddate
-
timedelta
(
dt
.
days
*
n
)
savedir
=
DaCycle
[
'
dir.output
'
].
replace
(
startdate
.
strftime
(
'
%Y%m%d
'
),
priordate
.
strftime
(
'
%Y%m%d
'
)
)
filename
=
os
.
path
.
join
(
savedir
,
'
savestate.nc
'
)
if
os
.
path
.
exists
(
filename
):
dummy
=
StateVector
.
ReadFromFile
(
filename
,
qual
=
qual_short
)
# Replace the mean statevector by all ones (assumed priors)
statemean
=
np
.
ones
((
StateVector
.
nparams
,))
choicelag
=
n
msg
=
'
Read prior dataset from file %s, sds %d:
'
%
(
filename
,
n
)
;
logging
.
debug
(
msg
)
break
else
:
qual_short
=
'
opt
'
savedir
=
DaCycle
[
'
dir.output
'
]
filename
=
os
.
path
.
join
(
savedir
,
'
savestate.nc
'
)
dummy
=
StateVector
.
ReadFromFile
(
filename
)
choicelag
=
nlag
statemean
=
StateVector
.
EnsembleMembers
[
choicelag
-
1
][
0
].
ParameterValues
msg
=
'
Read posterior dataset from file %s, sds %d:
'
%
(
filename
,
nlag
)
;
logging
.
debug
(
msg
)
#
# if prior, do not multiply fluxes with parameters, otherwise do
#
data
=
statemean
*
vectorbio
# units of mole region-1 s-1
savedict
=
ncf
.
StandardVar
(
varname
=
'
bio_flux_%s
'
%
qual_short
)
savedict
[
'
values
'
]
=
data
savedict
[
'
dims
'
]
=
dimdate
+
dimregs
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
members
=
StateVector
.
EnsembleMembers
[
choicelag
-
1
]
deviations
=
np
.
array
([
mem
.
ParameterValues
*
data
for
mem
in
members
])
-
data
savedict
=
ncf
.
StandardVar
(
varname
=
'
bio_flux_%s_ensemble
'
%
qual_short
)
savedict
[
'
values
'
]
=
deviations
.
tolist
()
savedict
[
'
dims
'
]
=
dimdate
+
dimmembers
+
dimregs
savedict
[
'
comment
'
]
=
"
This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance
"
savedict
[
'
units
'
]
=
"
mol region-1 s-1
"
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
savedict
=
ncf
.
StandardVar
(
'
unknown
'
)
savedict
[
'
name
'
]
=
'
bio_flux_%s_std
'
%
qual_short
savedict
[
'
long_name
'
]
=
'
Biosphere flux standard deviation, %s
'
%
qual_short
savedict
[
'
values
'
]
=
deviations
.
std
(
axis
=
0
)
savedict
[
'
dims
'
]
=
dimdate
+
dimregs
savedict
[
'
comment
'
]
=
"
This is the standard deviation on each parameter
"
savedict
[
'
units
'
]
=
"
mol region-1 s-1
"
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
data
=
statemean
*
vectorocn
# units of mole region-1 s-1
savedict
=
ncf
.
StandardVar
(
varname
=
'
ocn_flux_%s
'
%
qual_short
)
savedict
[
'
values
'
]
=
data
savedict
[
'
dims
'
]
=
dimdate
+
dimregs
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
deviations
=
np
.
array
([
mem
.
ParameterValues
*
data
for
mem
in
members
])
-
data
savedict
=
ncf
.
StandardVar
(
varname
=
'
ocn_flux_%s_ensemble
'
%
qual_short
)
savedict
[
'
values
'
]
=
deviations
.
tolist
()
savedict
[
'
dims
'
]
=
dimdate
+
dimmembers
+
dimregs
savedict
[
'
comment
'
]
=
"
This is the matrix square root, use (M x M^T)/(nmembers-1) to make covariance
"
savedict
[
'
units
'
]
=
"
mol region-1 s-1
"
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
savedict
=
ncf
.
StandardVar
(
'
unknown
'
)
savedict
[
'
name
'
]
=
'
ocn_flux_%s_std
'
%
qual_short
savedict
[
'
long_name
'
]
=
'
Ocean flux standard deviation, %s
'
%
qual_short
savedict
[
'
values
'
]
=
deviations
.
std
(
axis
=
0
)
savedict
[
'
dims
'
]
=
dimdate
+
dimregs
savedict
[
'
comment
'
]
=
"
This is the standard deviation on each parameter
"
savedict
[
'
units
'
]
=
"
mol region-1 s-1
"
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
data
=
vectorfire
savedict
=
ncf
.
StandardVar
(
varname
=
'
fire_flux_imp
'
)
savedict
[
'
values
'
]
=
data
savedict
[
'
dims
'
]
=
dimdate
+
dimregs
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
data
=
vectorfossil
savedict
=
ncf
.
StandardVar
(
varname
=
'
fossil_flux_imp
'
)
savedict
[
'
values
'
]
=
data
savedict
[
'
dims
'
]
=
dimdate
+
dimregs
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
savedict
=
ncf
.
StandardVar
(
varname
=
'
date
'
)
savedict
[
'
values
'
]
=
ncfdate
savedict
[
'
dims
'
]
=
dimdate
savedict
[
'
count
'
]
=
next
ncf
.
AddData
(
savedict
)
sys
.
stdout
.
write
(
'
.
'
)
sys
.
stdout
.
flush
()
#
# Done, close the new NetCDF file
#
ncf
.
close
()
#
# Return the full name of the NetCDF file so it can be processed by the next routine
#
msg
=
"
Vector weekly average fluxes now written
"
;
logging
.
info
(
msg
)
Write Sample information to NetCDF files. These files are organized by site and
have an unlimited time axis to which data is appended each cycle.
return
saveas
The needed information is obtained from the sampleinfo.nc files and the original input data files from ObsPack.
The steps are:
def
SaveWeeklyAvgTCData
(
DaCycle
,
StateVector
):
"""
Function creates a NetCDF file with output on TransCom regions. It uses the flux input from the
function `SaveWeeklyAvg1x1Data` to create fluxes of length `nparameters`, which are then projected
onto TC regions using the internal methods from :class:`~da.baseclasses.statevector.StateVector`.
:param DaCycle: a :class:`~da.tools.initexit.CycleControl` object
:param StateVector: a :class:`~da.baseclasses.statevector.StateVector`
:rtype: None
This function only read the prior fluxes from the flux_1x1.nc files created before, because we want to convolve
these with the parameters in the StateVector. This creates posterior fluxes, and the posterior covariance for the complete
StateVector in units of mol/box/s which we then turn into TC fluxes and covariances.
(1) Create a directory to hold timeseries output files
(2) Read the sampleinfo.nc file for this cycle and get a list of original files they were obtained from
(3) For each file, copy the original data file from ObsPack (if not yet present)
(4) Open the copied file, find the index of each observation, fill in the simulated data
"""
from
da.analysis.tools_regions
import
globarea
from
da.analysis.tools_transcom
import
transcommask
import
da.tools.io4
as
io
from
string
import
join
import
shutil
import
logging
#
dirname
=
'
data_tc_weekly
'
dirname
=
'
data_molefractions
'
#
dirname
=
CreateDirs
(
os
.
path
.
join
(
DaCycle
[
'
dir.analysis
'
],
dirname
))
#
...
...
@@ -440,396 +49,181 @@ def SaveWeeklyAvgTCData(DaCycle, StateVector):
dt
=
DaCycle
[
'
cyclelength
'
]
startdate
=
DaCycle
[
'
time.start
'
]
enddate
=
DaCycle
[
'
time.end
'
]
ncfdate
=
date2num
(
startdate
)
-
dectime0
+
dt
.
days
/
2.0
msg
=
"
DA Cycle start date is %s
"
%
startdate
.
strftime
(
'
%Y-%m-%d %H:%M
'
)
;
logging
.
debug
(
msg
)
msg
=
"
DA Cycle end date is %s
"
%
enddate
.
strftime
(
'
%Y-%m-%d %H:%M
'
)
;
logging
.
debug
(
msg
)
# Write/Create NetCDF output file
#
saveas
=
os
.
path
.
join
(
dirname
,
'
tcfluxes.nc
'
)
ncf
=
io
.
CT_CDF
(
saveas
,
'
write
'
)
dimdate
=
ncf
.
AddDateDim
()
dimidateformat
=
ncf
.
AddDateDimFormat
()
dimregs
=
ncf
.
AddRegionDim
(
type
=
'
tc
'
)
#
# set title and tell GMT that we are using "pixel registration"
#
setattr
(
ncf
,
'
Title
'
,
'
CarbonTracker TransCom fluxes
'
)
setattr
(
ncf
,
'
node_offset
'
,
1
)
#
skip
=
ncf
.
has_date
(
ncfdate
)
if
skip
:
msg
=
'
Skipping writing of data for date %s : already present in file %s
'
%
(
startdate
.
strftime
(
'
%Y-%m-%d
'
),
saveas
)
;
logging
.
warning
(
msg
)
else
:
DaCycle
[
'
time.sample.stamp
'
]
=
"
%s_%s
"
%
(
startdate
.
strftime
(
"
%Y%m%d%H
"
),
enddate
.
strftime
(
"
%Y%m%d%H
"
),)
# Get input data
# Step (1): Get the sample output data file for this cycle
area
=
globarea
(
)
infile
=
os
.
path
.
join
(
DaCycle
[
'
dir.output
'
],
'
sampleinfo_%s.nc
'
%
DaCycle
[
'
time.sample.stamp
'
]
)
infile
=
os
.
path
.
join
(
DaCycle
[
'
dir.analysis
'
],
'
data_state_weekly
'
,
'
statefluxes.nc
'
)
if
not
os
.
path
.
exists
(
infile
):
msg
=
"
Needed input file (%s) does not exist yet, please create file first, returning...
"
%
infile
;
logging
.
error
(
msg
)
return
None
ncf_in
=
io
.
CT_Read
(
infile
,
'
read
'
)
ncf_in
=
io
.
CT_Read
(
infile
,
'
read
'
)
obs_num
=
ncf_in
.
GetVariable
(
'
obs_num
'
)
obs_val
=
ncf_in
.
GetVariable
(
'
observed
'
)
mdm
=
ncf_in
.
GetVariable
(
'
modeldatamismatch
'
)
simulated
=
ncf_in
.
GetVariable
(
'
modelsamples
'
)
infilename
=
ncf_in
.
GetVariable
(
'
inputfilename
'
)
# Transform data one by one
infiles
=
[
join
(
s
.
compressed
(),
''
)
for
s
in
infilename
]
# Get the date variable, and find index corresponding to the DaCycle date
ncf_in
.
close
()
try
:
dates
=
ncf_in
.
variables
[
'
date
'
][:]
except
KeyError
:
msg
=
"
The variable date cannot be found in the requested input file (%s)
"
%
infile
;
logging
.
error
(
msg
)
msg
=
"
Please make sure you create gridded fluxes before making TC fluxes
"
;
logging
.
error
(
msg
)
raise
KeyError
for
orig_file
in
set
(
infiles
):
try
:
index
=
dates
.
tolist
().
index
(
ncfdate
)
except
ValueError
:
msg
=
"
The requested cycle date is not yet available in file %s
"
%
infile
;
logging
.
error
(
msg
)
msg
=
"
Please make sure you create state based fluxes before making TC fluxes
"
;
logging
.
error
(
msg
)
raise
ValueError
if
not
os
.
path
.
exists
(
orig_file
):
msg
=
"
The original input file (%s) could not be found, continuing to next file...
"
%
orig_file
;
logging
.
error
(
msg
)
continue
# First add the date for this cycle to the file, this grows the unlimited dimension
savedict
=
ncf
.
StandardVar
(
varname
=
'
date
'
)
savedict
[
'
values
'
]
=
ncfdate
savedict
[
'
dims
'
]
=
dimdate
savedict
[
'
count
'
]
=
index
dummy
=
ncf
.
AddData
(
savedict
)
copy_file
=
os
.
path
.
join
(
dirname
,
os
.
path
.
split
(
orig_file
)[
-
1
])
if
not
os
.
path
.
exists
(
copy_file
):
shutil
.
copy
(
orig_file
,
copy_file
)
msg
=
"
Copied a new original file (%s) to the analysis directory
"
%
orig_file
;
logging
.
debug
(
msg
)
# Now convert other variables that were inside the flux_1x1 file
ncf_out
=
io
.
CT_CDF
(
copy_file
,
'
write
'
)
vardict
=
ncf_in
.
variables
for
vname
,
vprop
in
vardict
.
iteritems
():
# Modify the attributes of the file to reflect added data from CTDAS properly
data
=
ncf_in
.
GetVariable
(
vname
)[
index
]
ncf_out
.
Caution
=
'
===================================================================================
'
ncf_out
.
History
+=
'
\n
Original observation file modified by user %s on %s
\n
'
%
(
os
.
environ
[
'
USER
'
],
datetime
.
today
().
strftime
(
'
%F
'
),)
ncf_out
.
CTDAS_info
=
'
Simulated values added from a CTDAS run by %s on %s
\n
'
%
(
os
.
environ
[
'
USER
'
],
datetime
.
today
().
strftime
(
'
%F
'
),)
\
+
'
\n
CTDAS was run on platform %s
'
%
(
os
.
environ
[
'
HOST
'
],)
\
+
'
\n
CTDAS job directory was %s
'
%
(
DaCycle
[
'
dir.da_run
'
],)
\
+
'
\n
CTDAS Da System was %s
'
%
(
DaCycle
[
'
da.system
'
],)
\
+
'
\n
CTDAS Da ObsOperator was %s
'
%
(
DaCycle
[
'
da.obsoperator
'
],)
ncf_out
.
CTDAS_startdate
=
DaCycle
[
'
time.start
'
].
strftime
(
'
%F
'
)
ncf_out
.
CTDAS_enddate
=
DaCycle
[
'
time.finish
'
].
strftime
(
"
%F
"
)
ncf_out
.
original_file
=
orig_file
if
vname
==
'
latitude
'
:
continue
elif
vname
==
'
longitude
'
:
continue
elif
vname
==
'
date
'
:
continue
elif
vname
==
'
idate
'
:
continue
elif
'
std
'
in
vname
:
continue
elif
'
ensemble
'
in
vname
:
tcdata
=
[]
for
member
in
data
:
tcdata
.
append
(
StateVector
.
VectorToTC
(
vectordata
=
member
)
)
tcdata
=
np
.
array
(
tcdata
)
cov
=
tcdata
.
transpose
().
dot
(
tcdata
)
/
(
StateVector
.
nmembers
-
1
)
tcdata
=
cov
# get nobs dimension
if
ncf_out
.
dimensions
.
has_key
(
'
id
'
):
dimidob
=
ncf_out
.
dimensions
[
'
id
'
]
dimid
=
(
'
id
'
,)
elif
ncf_out
.
dimensions
.
has_key
(
'
obs
'
):
dimidob
=
ncf_out
.
dimensions
[
'
obs
'
]
dimid
=
(
'
obs
'
,)
savedict
=
ncf
.
StandardVar
(
varname
=
vname
.
replace
(
'
ensemble
'
,
'
cov
'
)
)
savedict
[
'
units
'
]
=
'
[mol/region/s]**2
'
savedict
[
'
dims
'
]
=
dimdate
+
dimregs
+
dimregs
if
dimidob
.
isunlimited
:
nobs
=
ncf_out
.
inq_unlimlen
()
else
:
nobs
=
len
(
dimid
)
# add nmembers dimension
dimmembersob
=
ncf_out
.
createDimension
(
'
nmembers
'
,
size
=
simulated
.
shape
[
1
])
dimmembers
=
(
'
nmembers
'
,)
nmembers
=
len
(
dimmembers
)
# Create empty arrays
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'
name
'
]
=
"
modeldatamismatch
"
savedict
[
'
long_name
'
]
=
"
modeldatamismatch
"
savedict
[
'
units
'
]
=
"
[mol mol-1]^2
"
savedict
[
'
dims
'
]
=
dimid
savedict
[
'
comment
'
]
=
'
Variance of mole fractions resulting from model-data mismatch
'
status
=
ncf_out
.
AddVariable
(
savedict
)
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'
name
'
]
=
"
modelsamplesmean
"
savedict
[
'
long_name
'
]
=
"
mean modelsamples
"
savedict
[
'
units
'
]
=
"
mol mol-1
"
savedict
[
'
dims
'
]
=
dimid
savedict
[
'
comment
'
]
=
'
simulated mixing ratios based on optimized state vector
'
status
=
ncf_out
.
AddVariable
(
savedict
)
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'
name
'
]
=
"
modelsamplesstandarddeviation
"
savedict
[
'
long_name
'
]
=
"
standard deviaton of modelsamples over all ensemble members
"
savedict
[
'
units
'
]
=
"
mol mol-1
"
savedict
[
'
dims
'
]
=
dimid
savedict
[
'
comment
'
]
=
'
std dev of simulated mixing ratios based on optimized state vector
'
status
=
ncf_out
.
AddVariable
(
savedict
)
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'
name
'
]
=
"
modelsamplesensemble
"
savedict
[
'
long_name
'
]
=
"
modelsamples over all ensemble members
"
savedict
[
'
units
'
]
=
"
mol mol-1
"
savedict
[
'
dims
'
]
=
dimid
+
dimmembers
savedict
[
'
comment
'
]
=
'
ensemble of simulated mixing ratios based on optimized state vector
'
status
=
ncf_out
.
AddVariable
(
savedict
)
tcdata
=
StateVector
.
VectorToTC
(
vectordata
=
data
)
# vector to TC
savedict
=
ncf
.
StandardVar
(
varname
=
vname
)
savedict
[
'
dims
'
]
=
dimdate
+
dimregs
savedict
[
'
units
'
]
=
'
mol/region/s
'
savedict
[
'
count
'
]
=
index
savedict
[
'
values
'
]
=
tcdata
ncf
.
AddData
(
savedict
)
ncf_in
.
close
()
ncf
.
close
()
msg
=
"
TransCom weekly average fluxes now written
"
;
logging
.
info
(
msg
)
return
saveas
def
SaveWeeklyAvgExtTCData
(
DaCycle
):
"""
Function SaveTCDataExt saves surface flux data to NetCDF files for extended TransCom regions
*** Inputs ***
rundat : a RunInfo object
*** Outputs ***
NetCDF file containing n-hourly global surface fluxes per TransCom region
*** Example ***
./expand_savestate project=enkf_release sd=20000101 ed=20010101
"""
from
da.analysis.tools_transcom
import
ExtendedTCRegions
import
da.tools.io4
as
io
#
dirname
=
'
data_tc_weekly
'
#
dirname
=
CreateDirs
(
os
.
path
.
join
(
DaCycle
[
'
dir.analysis
'
],
dirname
))
#
# Some help variables
#
dectime0
=
date2num
(
datetime
(
2000
,
1
,
1
))
dt
=
DaCycle
[
'
cyclelength
'
]
startdate
=
DaCycle
[
'
time.start
'
]
enddate
=
DaCycle
[
'
time.end
'
]
ncfdate
=
date2num
(
startdate
)
-
dectime0
+
dt
.
days
/
2.0
msg
=
"
DA Cycle start date is %s
"
%
startdate
.
strftime
(
'
%Y-%m-%d %H:%M
'
)
;
logging
.
debug
(
msg
)
msg
=
"
DA Cycle end date is %s
"
%
enddate
.
strftime
(
'
%Y-%m-%d %H:%M
'
)
;
logging
.
debug
(
msg
)
# Write/Create NetCDF output file
#
saveas
=
os
.
path
.
join
(
dirname
,
'
tc_extfluxes.nc
'
)
ncf
=
io
.
CT_CDF
(
saveas
,
'
write
'
)
dimdate
=
ncf
.
AddDateDim
()
dimidateformat
=
ncf
.
AddDateDimFormat
()
dimregs
=
ncf
.
AddRegionDim
(
type
=
'
tc_ext
'
)
#
# set title and tell GMT that we are using "pixel registration"
#
setattr
(
ncf
,
'
Title
'
,
'
CarbonTracker TransCom fluxes
'
)
setattr
(
ncf
,
'
node_offset
'
,
1
)
#
skip
=
ncf
.
has_date
(
ncfdate
)
if
skip
:
msg
=
'
Skipping writing of data for date %s : already present in file %s
'
%
(
startdate
.
strftime
(
'
%Y-%m-%d
'
),
saveas
)
;
logging
.
warning
(
msg
)
else
:
infile
=
os
.
path
.
join
(
DaCycle
[
'
dir.analysis
'
],
'
data_tc_weekly
'
,
'
tcfluxes.nc
'
)
if
not
os
.
path
.
exists
(
infile
):
msg
=
"
Needed input file (%s) does not exist yet, please create file first, returning...
"
%
infile
;
logging
.
error
(
msg
)
return
None
ncf_in
=
io
.
CT_Read
(
infile
,
'
read
'
)
# Transform data one by one
# Get the date variable, and find index corresponding to the DaCycle date
try
:
dates
=
ncf_in
.
variables
[
'
date
'
][:]
except
KeyError
:
msg
=
"
The variable date cannot be found in the requested input file (%s)
"
%
infile
;
logging
.
error
(
msg
)
msg
=
"
Please make sure you create gridded fluxes before making extended TC fluxes
"
;
logging
.
error
(
msg
)
raise
KeyError
try
:
index
=
dates
.
tolist
().
index
(
ncfdate
)
except
ValueError
:
msg
=
"
The requested cycle date is not yet available in file %s
"
%
infile
;
logging
.
error
(
msg
)
msg
=
"
Please make sure you create state based fluxes before making extended TC fluxes
"
;
logging
.
error
(
msg
)
raise
ValueError
# First add the date for this cycle to the file, this grows the unlimited dimension
savedict
=
ncf
.
StandardVar
(
varname
=
'
date
'
)
savedict
[
'
values
'
]
=
ncfdate
savedict
[
'
dims
'
]
=
dimdate
savedict
[
'
count
'
]
=
index
dummy
=
ncf
.
AddData
(
savedict
)
# Now convert other variables that were inside the tcfluxes.nc file
vardict
=
ncf_in
.
variables
for
vname
,
vprop
in
vardict
.
iteritems
():
data
=
ncf_in
.
GetVariable
(
vname
)[
index
]
if
vname
==
'
latitude
'
:
continue
elif
vname
==
'
longitude
'
:
continue
elif
vname
==
'
date
'
:
continue
elif
vname
==
'
idate
'
:
continue
elif
'
cov
'
in
vname
:
tcdata
=
ExtendedTCRegions
(
data
,
cov
=
True
)
savedict
=
ncf
.
StandardVar
(
varname
=
vname
)
savedict
[
'
units
'
]
=
'
[mol/region/s]**2
'
savedict
[
'
dims
'
]
=
dimdate
+
dimregs
+
dimregs
else
:
tcdata
=
ExtendedTCRegions
(
data
,
cov
=
False
)
savedict
=
ncf
.
StandardVar
(
varname
=
vname
)
savedict
[
'
dims
'
]
=
dimdate
+
dimregs
savedict
[
'
units
'
]
=
'
mol/region/s
'
savedict
[
'
count
'
]
=
index
savedict
[
'
values
'
]
=
tcdata
ncf
.
AddData
(
savedict
)
ncf_in
.
close
()
ncf
.
close
()
msg
=
"
TransCom weekly average extended fluxes now written
"
;
logging
.
info
(
msg
)
return
saveas
else
:
msg
=
"
Modifying existing file (%s) in the analysis directory
"
%
copy_file
;
logging
.
debug
(
msg
)
def
SaveTimeAvgData
(
DaCycle
,
infile
,
avg
=
'
monthly
'
):
"""
Function saves time mean surface flux data to NetCDF files
*** Inputs ***
rundat : a RunInfo object
ncf_out
=
io
.
CT_CDF
(
copy_file
,
'
write
'
)
*** Outputs ***
daily NetCDF file containing 1-hourly global surface fluxes at 1x1 degree
*** Example ***
./expand_savestate project=enkf_release sd=20000101 ed=20010101
"""
# Get existing file obs_nums to determine match to local obs_nums
import
da.analysis.tools_time
as
timetools
import
da.tools.io4
as
io
if
ncf_out
.
variables
.
has_key
(
'
id
'
):
file_obs_nums
=
ncf_out
.
GetVariable
(
'
id
'
)
elif
ncf_out
.
variables
.
has_key
(
'
obspack_num
'
):
file_obs_nums
=
ncf_out
.
GetVariable
(
'
obspack_num
'
)
if
'
weekly
'
in
infile
:
intime
=
'
weekly
'
if
'
monthly
'
in
infile
:
intime
=
'
monthly
'
if
'
yearly
'
in
infile
:
intime
=
'
yearly
'
# Get all obs_nums related to this file, determine their indices in the local arrays
dirname
,
filename
=
os
.
path
.
split
(
infile
)
outdir
=
CreateDirs
(
os
.
path
.
join
(
DaCycle
[
'
dir.analysis
'
],
dirname
.
replace
(
intime
,
avg
)))
selected_obs_nums
=
[
num
for
infile
,
num
in
zip
(
infiles
,
obs_num
)
if
infile
==
orig_file
]
dectime0
=
date2num
(
datetime
(
2000
,
1
,
1
))
# For each index, get the data and add to the file in the proper file index location
# Create NetCDF output file
#
saveas
=
os
.
path
.
join
(
outdir
,
filename
)
ncf
=
io
.
CT_CDF
(
saveas
,
'
create
'
)
dimdate
=
ncf
.
AddDateDim
()
#
# Open input file specified from the command line
#
if
not
os
.
path
.
exists
(
infile
):
print
"
Needed input file (%s) not found. Please create this first:
"
%
infile
print
"
returning...
"
return
None
else
:
pass
for
num
in
selected_obs_nums
:
file
=
io
.
CT_Read
(
infile
,
'
read
'
)
datasets
=
file
.
variables
.
keys
()
date
=
file
.
GetVariable
(
'
date
'
)
globatts
=
file
.
ncattrs
()
model_index
=
obs_num
.
tolist
().
index
(
num
)
file_index
=
file_obs_nums
.
tolist
().
index
(
num
)
for
att
in
globatts
:
attval
=
file
.
getncattr
(
att
)
if
not
att
in
ncf
.
ncattrs
():
ncf
.
setncattr
(
att
,
attval
)
var
=
ncf_out
.
variables
[
'
modeldatamismatch
'
]
var
[
file_index
]
=
mdm
[
model_index
]
var
=
ncf_out
.
variables
[
'
modelsamplesmean
'
]
var
[
file_index
]
=
simulated
[
model_index
,
0
]
time
=
[
datetime
(
2000
,
1
,
1
)
+
timedelta
(
days
=
d
)
for
d
in
date
]
var
=
ncf_out
.
variables
[
'
modelsamplesstandarddeviation
'
]
var
[
file_index
]
=
simulated
[
model_index
,
1
:].
std
()
# loop over datasets in infile, skip idate and date as we will make new time axis for the averaged data
var
=
ncf_out
.
variables
[
'
modelsamplesensemble
'
]
var
[
file_index
]
=
simulated
[
model_index
,:]
for
sds
in
[
'
date
'
]
+
datasets
:
# close the file
# get original data
status
=
ncf_out
.
close
()
data
=
file
.
GetVariable
(
sds
)
varatts
=
file
.
variables
[
sds
].
ncattrs
()
vardims
=
file
.
variables
[
sds
].
dimensions
#
# Depending on dims of input dataset, create dims for output dataset. Note that we add the new dimdate now.
#
return
None
for
d
in
vardims
:
if
'
date
'
in
d
:
continue
if
d
in
ncf
.
dimensions
.
keys
():
pass
else
:
dim
=
ncf
.
createDimension
(
d
,
size
=
len
(
file
.
dimensions
[
d
]))
savedict
=
ncf
.
StandardVar
(
sds
)
savedict
[
'
name
'
]
=
sds
savedict
[
'
dims
'
]
=
vardims
savedict
[
'
units
'
]
=
file
.
variables
[
sds
].
units
savedict
[
'
long_name
'
]
=
file
.
variables
[
sds
].
long_name
savedict
[
'
comment
'
]
=
file
.
variables
[
sds
].
comment
savedict
[
'
standard_name
'
]
=
file
.
variables
[
sds
].
standard_name
savedict
[
'
count
'
]
=
0
if
not
'
date
'
in
vardims
:
savedict
[
'
values
'
]
=
data
ncf
.
AddData
(
savedict
)
else
:
if
avg
==
'
monthly
'
:
time_avg
,
data_avg
=
timetools
.
monthly_avg
(
time
,
data
)
elif
avg
==
'
seasonal
'
:
time_avg
,
data_avg
=
timetools
.
season_avg
(
time
,
data
)
elif
avg
==
'
yearly
'
:
time_avg
,
data_avg
=
timetools
.
yearly_avg
(
time
,
data
)
elif
avg
==
'
longterm
'
:
time_avg
,
data_avg
=
timetools
.
longterm_avg
(
time
,
data
)
time_avg
=
[
time_avg
]
data_avg
=
[
data_avg
]
else
:
raise
ValueError
,
'
Averaging (%s) does not exist
'
%
avg
count
=-
1
for
dd
,
data
in
zip
(
time_avg
,
data_avg
):
count
=
count
+
1
if
sds
==
'
date
'
:
savedict
[
'
values
'
]
=
date2num
(
dd
)
-
dectime0
else
:
savedict
[
'
values
'
]
=
data
savedict
[
'
count
'
]
=
count
ncf
.
AddData
(
savedict
,
silent
=
True
)
sys
.
stdout
.
write
(
'
.
'
)
sys
.
stdout
.
write
(
'
\n
'
)
sys
.
stdout
.
flush
()
# end NetCDF file access
file
.
close
()
ncf
.
close
()
msg
=
"
------------------- Finished time averaging---------------------------------
"
;
logging
.
info
(
msg
)
return
saveas
if
__name__
==
"
__main__
"
:
import
logging
from
da.tools.initexit
import
CycleControl
from
da.ct.dasystem
import
CtDaSystem
from
da.ctgridded.statevector
import
CtGriddedStateVector
from
da.ct.dasystem
import
CtDaSystem
sys
.
path
.
append
(
'
../../
'
)
logging
.
root
.
setLevel
(
logging
.
DEBUG
)
DaCycle
=
CycleControl
(
args
=
{
'
rc
'
:
'
../../da
griddedjet
.rc
'
})
DaCycle
=
CycleControl
(
args
=
{
'
rc
'
:
'
../../da.rc
'
})
DaCycle
.
Initialize
()
DaCycle
.
ParseTimes
()
DaSystem
=
CtDaSystem
(
'
../rc/carbontracker
gridded
.rc
'
)
DaSystem
=
CtDaSystem
(
'
../rc/carbontracker.rc
'
)
DaSystem
.
Initialize
()
DaCycle
.
DaSystem
=
DaSystem
StateVector
=
CtGriddedStateVector
(
DaCycle
)
dummy
=
StateVector
.
Initialize
()
while
DaCycle
[
'
time.start
'
]
<
DaCycle
[
'
time.finish
'
]:
savedas_1x1
=
SaveWeeklyAvg1x1Data
(
DaCycle
,
StateVector
)
savedas_state
=
SaveWeeklyAvgStateData
(
DaCycle
,
StateVector
)
savedas_tc
=
SaveWeeklyAvgTCData
(
DaCycle
,
StateVector
)
savedas_tcext
=
SaveWeeklyAvgExtTCData
(
DaCycle
)
outfile
=
WriteMixingRatios
(
DaCycle
)
DaCycle
.
AdvanceCycleTimes
()
StateVector
=
None
# free memory
for
avg
in
[
'
monthly
'
,
'
yearly
'
,
'
longterm
'
]:
savedas_1x1
=
SaveTimeAvgData
(
DaCycle
,
savedas_1x1
,
avg
)
savedas_state
=
SaveTimeAvgData
(
DaCycle
,
savedas_state
,
avg
)
savedas_tc
=
SaveTimeAvgData
(
DaCycle
,
savedas_tc
,
avg
)
savedas_tcext
=
SaveTimeAvgData
(
DaCycle
,
savedas_tcext
,
avg
)
sys
.
exit
(
0
)
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment