Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
C
CTDAS
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Deploy
Releases
Container registry
Model registry
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
CTDAS
CTDAS
Commits
c62b1397
Commit
c62b1397
authored
13 years ago
by
Peters, Wouter
Browse files
Options
Downloads
Patches
Plain Diff
changes to the structure of sampling, and how Obs are accummulated in the system
parent
da2d36c3
Branches
Branches containing commit
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
da/tools/pipeline.py
+46
-35
46 additions, 35 deletions
da/tools/pipeline.py
with
46 additions
and
35 deletions
da/tools/pipeline.py
+
46
−
35
View file @
c62b1397
...
...
@@ -144,7 +144,29 @@ def SampleState(DaCycle,Samples,StateVector, ObservationOperator):
msg
=
header
+
"
.....Ensemble Kalman Filter at lag %d
"
%
(
int
(
lag
)
+
1
,)
;
logging
.
info
(
msg
)
status
=
SampleOneCycle
(
DaCycle
,
Samples
,
StateVector
,
ObservationOperator
,
lag
)
# If we are going to sample the lag = 0 time step, place the needed files to run the transport model in the right folder first
if
lag
==
0
:
status
=
ObservationOperator
.
GetInitialData
()
############# Perform the actual sampling loop #####################
status
=
SampleOneCycle
(
DaCycle
,
Samples
,
StateVector
,
ObservationOperator
,
lag
)
############# Finished the actual sampling loop #####################
msg
=
"
StateVector now carries %d samples
"
%
StateVector
.
nobs
;
logging
.
debug
(
msg
)
# Add the observation operator restart+output files to the general Output+RestartFileList, only after 'Advance'
# Same logic for observation files
if
lag
==
0
:
DaCycle
.
RestartFileList
.
extend
(
ObservationOperator
.
RestartFileList
)
DaCycle
.
OutputFileList
.
extend
(
ObservationOperator
.
OutputFileList
)
msg
=
"
Appended ObsOperator restart and output file lists to DaCycle for collection
"
;
logging
.
debug
(
msg
)
DaCycle
.
OutputFileList
.
extend
([
DaCycle
[
'
ObsOperator.inputfile
'
]])
msg
=
"
Appended Observation filename to DaCycle for collection
"
;
logging
.
debug
(
msg
)
# Optionally, post-processing of the model output can be added that deals for instance with
...
...
@@ -159,10 +181,7 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag):
# First set up the information for time start and time end of this sample
if
lag
==
0
:
status
=
ObservationOperator
.
GetInitialData
()
status
=
DaCycle
.
SetSampleTimes
(
lag
)
status
=
DaCycle
.
SetSampleTimes
(
lag
)
startdate
=
DaCycle
[
'
time.sample.start
'
]
enddate
=
DaCycle
[
'
time.sample.end
'
]
...
...
@@ -196,52 +215,40 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag):
status
=
RunForecastModel
(
DaCycle
,
ObservationOperator
)
# Add the observation operator restart+output files to the general Output+RestartFileList, only after 'Advance'
# Same logic for observation files
if
lag
==
0
:
DaCycle
.
RestartFileList
.
extend
(
ObservationOperator
.
RestartFileList
)
DaCycle
.
OutputFileList
.
extend
(
ObservationOperator
.
OutputFileList
)
msg
=
"
Appended ObsOperator restart and output file lists to DaCycle for collection
"
;
logging
.
debug
(
msg
)
DaCycle
.
OutputFileList
.
extend
([
DaCycle
[
'
ObsOperator.inputfile
'
]])
msg
=
"
Appended Observation filename to DaCycle for collection
"
;
logging
.
debug
(
msg
)
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
status
=
Samples
.
AddModelDataMismatch
()
msg
=
"
Added Model Data Mismatch to all samples
"
;
logging
.
debug
(
msg
)
# Read forecast model samples that were written to NetCDF files by each member, also add the obs to the statevector
# Note that obs will only be added to the statevector is either this is the first step (restart=False), or lag==nlag
if
lag
==
int
(
DaCycle
[
'
time.nlag
'
])
-
1
or
DaCycle
[
'
time.restart
'
]
==
False
:
# Read forecast model samples that were written to NetCDF files by each member. Add them to the exisiting
# Observation object for each sample loop. This data fill be written to file in the output folder for each sample cycle.
# We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates
# one file per member, some logic needs to be included to merge all files!!!
# We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates
# one file per member, some logic needs to be included to merge all files!!!
filename
=
os
.
path
.
join
(
ObservationOperator
.
outputdir
,
'
flask_output.%s.nc
'
%
DaCycle
[
'
time.sample.stamp
'
])
status
=
Samples
.
AddSimulations
(
filename
)
filename
=
os
.
path
.
join
(
ObservationOperator
.
outputdir
,
'
flask_output.%s.nc
'
%
DaCycle
[
'
time.sample.stamp
'
])
status
=
Samples
.
AddSimulations
(
filename
)
# Give each member a model sample by first copying all samples, and then selecting the data for Member #n
# Now write a small file that holds for each observation a number of values that we need in postprocessing
members
=
StateVector
.
EnsembleMembers
[
lag
]
for
n
,
Member
in
enumerate
(
members
):
Member
.
ModelSample
=
copy
.
deepcopy
(
Samples
)
filename
=
Samples
.
WriteSampleInfo
()
# Now subselect the data needed for member #n
# Now add the observations that need to be assimilated to the StateVector.
# Note that obs will only be added to the statevector if either this is the first step (restart=False), or lag==nlag
# This is to make sure that the first step of the system uses all observations available, while the subsequent
# steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only
# (and at least) once # in the assimilation
for
Sim
in
Member
.
ModelSample
.
Data
:
Sim
.
simulated
=
Sim
.
simulated
[
n
]
if
lag
==
int
(
DaCycle
[
'
time.nlag
'
])
-
1
or
DaCycle
[
'
time.restart
'
]
==
False
:
StateVector
.
ObsToAssimmilate
+=
(
copy
.
deepcopy
(
Samples
),)
StateVector
.
nobs
+=
Samples
.
getlength
()
StateVector
.
nobs
+=
Samples
.
getlength
()
msg
=
"
Added samples from the observation operator to
each member of
the StateVector
"
;
logging
.
debug
(
msg
)
msg
=
"
Added samples from the observation operator to
the assimilted obs list in
the StateVector
"
;
logging
.
debug
(
msg
)
msg
=
"
StateVector now carries %d samples
"
%
StateVector
.
nobs
;
logging
.
debug
(
msg
)
return
None
...
...
@@ -292,6 +299,10 @@ def Advance( DaCycle, Samples, StateVector, ObservationOperator):
SampleOneCycle
(
DaCycle
,
Samples
,
StateVector
,
ObservationOperator
,
0
)
# Write info from optimized flux cycle to a file
filename
=
Samples
.
WriteObsToFile
()
return
None
def
SaveAndSubmit
(
DaCycle
,
StateVector
):
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment