Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Tsurata, Aki
CTDAS
Commits
fd7af47e
Commit
fd7af47e
authored
Mar 23, 2011
by
Peters, Wouter
Browse files
changed logic of start-up steps, more moved to main script
parent
29152626
Changes
3
Hide whitespace changes
Inline
Side-by-side
da/tools/initexit.py
View file @
fd7af47e
...
...
@@ -104,6 +104,7 @@ class CycleControl(dict):
self
[
'verbose'
]
=
'-v'
in
opts
self
.
DaSystem
=
None
# to be filled later
self
.
RestartFileList
=
[]
# List of files needed for restart, to be extended later
self
.
OutputFileList
=
[]
# List of files needed for output, to be extended later
def
__str__
(
self
):
"""
...
...
@@ -434,11 +435,40 @@ class CycleControl(dict):
dummy
=
self
.
WriteNewRCfile
()
dummy
=
self
.
MoveRestartData
(
io_option
=
'store'
)
# Move restart data from current to one-ago
dummy
=
self
.
CollectRestartData
()
# Collect restart data for next cycle into a clean restart/current folder
dummy
=
self
.
CollectOutput
()
# Collect restart data for next cycle into a clean restart/current folder
dummy
=
self
.
SubmitNextCycle
()
def
CollectOutput
(
self
):
""" Collect files that are vpart of the requested output for this cycle. This function allows users to add files
to a list, and then the system will copy these to the current cycle's output directory.
The list of files included is read from the
attribute "OutputFileList" which is a simple list of files that can be appended by other objects/methods that
require output data to be saved.
"""
from
da.tools.general
import
CreateDirs
targetdir
=
os
.
path
.
join
(
self
[
'dir.output'
])
CreateDirs
(
os
.
path
.
join
(
targetdir
)
)
msg
=
"Collecting the required output data"
;
logging
.
info
(
msg
)
msg
=
" to directory: %s "
%
targetdir
;
logging
.
debug
(
msg
)
for
file
in
set
(
self
.
OutputFileList
):
if
os
.
path
.
isdir
(
file
):
# skip dirs
continue
msg
=
" [copy] .... %s "
%
file
;
logging
.
debug
(
msg
)
dummy
=
shutil
.
copy
(
file
,
file
.
replace
(
os
.
path
.
split
(
file
)[
0
],
targetdir
)
)
def
CollectRestartData
(
self
):
""" Collect files needed for the restart of this cycle in case of a crash, or for the continuation of the next cycle.
All files need
d
ed are written to the restart/current directory. The list of files included is read from the
All files needed are written to the restart/current directory. The list of files included is read from the
attribute "RestartFileList" which is a simple list of files that can be appended by other objects/methods that
require restart data to be saved.
...
...
@@ -461,8 +491,6 @@ class CycleControl(dict):
"""
from
da.tools.general
import
CreateDirs
self
.
RestartFileList
.
append
(
os
.
path
.
join
(
self
[
'dir.output'
],
'savestate.nc'
)
)
targetdir
=
os
.
path
.
join
(
self
[
'dir.restart.current'
])
msg
=
"Purging the current restart directory before collecting new data"
;
logging
.
info
(
msg
)
...
...
@@ -472,6 +500,8 @@ class CycleControl(dict):
msg
=
"Collecting the required restart data"
;
logging
.
info
(
msg
)
msg
=
" to directory: %s "
%
targetdir
;
logging
.
debug
(
msg
)
print
self
.
RestartFileList
for
file
in
set
(
self
.
RestartFileList
):
if
os
.
path
.
isdir
(
file
):
# skip dirs
...
...
da/tools/pipeline.py
View file @
fd7af47e
...
...
@@ -21,7 +21,7 @@ import da.tools.rc as rc
header
=
'
\n\n
*************************************** '
footer
=
' ***************************************
\n
'
def
Main
(
PlatForm
,
DaSystem
,
Samples
,
StateVector
,
ObsOperator
,
Optimizer
):
def
Main
(
DaCycle
,
PlatForm
,
DaSystem
,
Samples
,
StateVector
,
ObsOperator
,
Optimizer
):
""" The main point of entry for the pipeline """
# Append current working dir to path
...
...
@@ -30,30 +30,15 @@ def Main(PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer):
# Import methods and classes contained in this package
from
da.tools.initexit
import
ValidateOptsArgs
from
da.tools.initexit
import
ParseOptions
from
da.tools.general
import
StoreData
,
RestoreData
msg
=
header
+
"Initializing current cycle"
+
footer
;
logging
.
info
(
msg
)
# Parse options from the command line
opts
,
args
=
ParseOptions
()
# Validate Options and arguments passed
opts
,
args
=
ValidateOptsArgs
(
opts
,
args
)
# Create the Cycle Control object for this job
DaCycle
=
JobStart
(
opts
,
args
,
DaSystem
,
PlatForm
)
msg
=
header
+
"Initializing Da Cycle"
+
footer
;
logging
.
info
(
msg
)
dummy
=
DaCycle
.
Initialize
()
dummy
=
JobStart
(
DaCycle
,
DaSystem
,
PlatForm
,
StateVector
)
msg
=
header
+
"starting JobInput"
+
footer
;
logging
.
info
(
msg
)
StateVector
=
JobInput
(
DaCycle
,
StateVector
)
dummy
=
JobInput
(
DaCycle
,
StateVector
)
msg
=
header
+
"starting SampleState"
+
footer
;
logging
.
info
(
msg
)
...
...
@@ -80,38 +65,34 @@ def Main(PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer):
####################################################################################################
def
JobStart
(
opts
,
args
,
DaSystem
,
DaPlatForm
):
def
JobStart
(
DaCycle
,
DaSystem
,
DaPlatForm
,
StateVector
):
""" Set up the job specific directory structure and create an expanded rc-file """
from
da.tools.initexit
import
CycleControl
# First create a CycleControl object that handles all the details of the da cycle
DaCycle
=
CycleControl
(
opts
,
args
)
dummy
=
DaSystem
.
Initialize
(
)
# Next fill the DaSystem object that contains all the details related to the DA system
dummy
=
DaSystem
.
Validate
()
dummy
=
DaSystem
.
Validate
()
DaCycle
.
DaSystem
=
DaSystem
dummy
=
Da
System
.
Initialize
()
DaCycle
.
DaPlatForm
=
Da
PlatForm
DaCycle
.
DaSystem
=
DaSystem
dummy
=
DaCycle
.
Initialize
()
DaCycle
.
DaPlatForm
=
DaPlatForm
dims
=
(
int
(
DaCycle
[
'time.nlag'
]),
int
(
DaCycle
[
'forecast.nmembers'
]),
int
(
DaCycle
.
DaSystem
[
'nparameters'
]),
)
return
DaCycle
dummy
=
StateVector
.
Initialize
(
dims
)
StateVector
.
DaCycle
=
DaCycle
def
JobInput
(
DaCycle
,
StateVector
):
""" Set up the input data for the forward model: obs and parameters/fluxes"""
dims
=
(
int
(
DaCycle
[
'time.nlag'
]),
int
(
DaCycle
[
'forecast.nmembers'
]),
int
(
DaCycle
.
DaSystem
[
'nparameters'
]),
)
return
None
dummy
=
StateVector
.
Initialize
(
dims
)
nlag
=
dims
[
0
]
def
JobInput
(
DaCycle
,
StateVector
):
""" Set up the input data for the forward model: obs and parameters/fluxes"""
# We now have an empty StateVector object that we need to populate with data. If this is a continuation from a previous cycle, we can read
# the previous StateVector values from a NetCDF file in the restart/current directory. If this is the first cycle, we need to populate the StateVector
...
...
@@ -122,6 +103,7 @@ def JobInput(DaCycle, StateVector):
# Fill each week from n=1 to n=nlag with a new ensemble
nlag
=
StateVector
.
nlag
for
n
in
range
(
0
,
nlag
):
date
=
DaCycle
[
'time.start'
]
+
datetime
.
timedelta
(
days
=
(
n
+
0.5
)
*
int
(
DaCycle
[
'time.cycle'
]))
cov
=
StateVector
.
GetCovariance
(
DaCycle
.
DaSystem
,
date
)
...
...
@@ -141,7 +123,7 @@ def JobInput(DaCycle, StateVector):
dummy
=
StateVector
.
Propagate
(
DaCycle
)
return
StateVector
return
None
def
SampleState
(
DaCycle
,
Samples
,
StateVector
,
ObservationOperator
):
""" Sample the filter state for the inversion """
...
...
@@ -217,10 +199,11 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag):
dummy
=
RunForecastModel
(
DaCycle
,
ObservationOperator
)
# Add the
TM5 restar
t files to the general RestartFileList, only after 'Advance'
# Add the
observation operator restart+outpu
t files to the general
Output+
RestartFileList, only after 'Advance'
if
lag
==
0
:
DaCycle
.
RestartFileList
.
extend
(
ObservationOperator
.
RestartFileList
)
DaCycle
.
OutputFileList
.
extend
(
ObservationOperator
.
OutputFileList
)
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
...
...
@@ -322,6 +305,9 @@ def SaveAndSubmit( DaCycle, StateVector):
filename
=
os
.
path
.
join
(
savedir
,
'savestate.nc'
)
dummy
=
StateVector
.
WriteToFile
(
filename
)
DaCycle
.
RestartFileList
.
append
(
os
.
path
.
join
(
DaCycle
[
'dir.output'
],
'savestate.nc'
)
)
dummy
=
DaCycle
.
Finalize
()
return
None
...
...
das.py
View file @
fd7af47e
...
...
@@ -10,14 +10,38 @@
#$ -j y
#$ -r n
#################################################################################################
# First order of business is always to make all other python modules accessible through the path
#################################################################################################
import
sys
import
os
dummy
=
sys
.
path
.
append
(
os
.
getcwd
())
#################################################################################################
# Next, import the tools needed to initialize a data assimilation cycle, as well as the pipeline (Main())
#################################################################################################
from
da.tools.initexit
import
StartLogger
from
da.tools.initexit
import
ValidateOptsArgs
from
da.tools.initexit
import
ParseOptions
from
da.tools.pipeline
import
Main
dummy
=
StartLogger
()
#################################################################################################
# Parse and validate the command line options
#################################################################################################
dummy
=
StartLogger
()
opts
,
args
=
ParseOptions
()
opts
,
args
=
ValidateOptsArgs
(
opts
,
args
)
#################################################################################################
# Create the Cycle Control object for this job
#################################################################################################
from
da.tools.initexit
import
CycleControl
DaCycle
=
CycleControl
(
opts
,
args
)
###########################################################################################
### IMPORT THE APPLICATION SPECIFIC MODULES HERE, TO BE PASSED INTO THE MAIN PIPELINE!!! ##
...
...
@@ -31,8 +55,8 @@ from da.tm5.observationoperator import TM5ObservationOperator
from
da.ct.optimizer
import
CtOptimizer
PlatForm
=
MaunaloaPlatForm
()
DaSystem
=
CtDaSystem
(
'carbontracker.rc'
)
ObsOperator
=
TM5ObservationOperator
(
'/Users/peters/Modeling/TM5/tm5-ctdas.rc'
)
DaSystem
=
CtDaSystem
(
'
da/rc/
carbontracker.rc'
)
ObsOperator
=
TM5ObservationOperator
(
'/Users/peters/Modeling/TM5/tm5-ctdas
-inv-ei-6x4
.rc'
)
Samples
=
CtObservations
()
StateVector
=
CtStateVector
()
Optimizer
=
CtOptimizer
()
...
...
@@ -46,7 +70,10 @@ print "\n *******************************************************************
print
" *************************************** Entering Pipeline ******************************************"
print
" ********************************************************************************************************
\n
"
Main
(
PlatForm
,
DaSystem
,
Samples
,
StateVector
,
ObsOperator
,
Optimizer
)
Main
(
DaCycle
,
PlatForm
,
DaSystem
,
Samples
,
StateVector
,
ObsOperator
,
Optimizer
)
##########################################################################################
################### All done
##########################################################################################
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment