Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
#!/usr/bin/env python
# das.py
"""
Author : peters
Revision History:
File created on 29 Sep 2009.
"""
import rc
from tools_da import ValidateRC
from tools_da import needed_rc_da_items
header = '\n\n *************************************** '
footer = ' *************************************** \n '
validprocess=['jobstart','jobinput','sample','invert','propagate','resubmit','all']
def JobStart(opts,args):
""" Set up the job specific directory structure and create an expanded rc-file """
import rc
from da_initexit import StartRestartRecover
from tools_da import ParseTimes
from da_initexit import WriteRC
rc_da_shell = rc.read(args['rc'])
# Add some useful variables to the rc-file dictionary
rc_da_shell['log'] = args["logfile"]
rc_da_shell['dir.da_submit'] = os.getcwd()
rc_da_shell['da.crash.recover'] = '-r' in opts
rc_da_shell['verbose'] = '-v' in opts
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
# Figure out what to do: is this is a fresh start, a continuation, or a recover from crash
dummy = StartRestartRecover(rc_da_shell)
# Calculate DA system startdate, enddate, and finaldate from rc-file items
dummy = ParseTimes(rc_da_shell)
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return rc_da_shell
def JobInput(args):
""" Set up the input data for the forward model: obs and parameters/fluxes"""
from tools_da import PrepareObs
from tools_da import PrepareEnsemble
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = PrepareEnsemble(rc_da_shell)
dummy = PrepareObs(rc_da_shell,'forecast')
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def Sample(args):
""" Sample the filter state for the inversion """
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = ForwardRun(args,'forecast')
# Optionally, post-processing of the model ouptu can be added that deals for instance with
# sub-sampling of time series, vertical averaging, etc.
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def ForwardRun(args,runtype='forecast'):
""" Run the forward model from startdate to enddate """
from tools_da import SetInterval
from tools_da import RunForecastModel
from da_initexit import IOSaveData
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = SetInterval(rc_da_shell,runtype)
dummy = RunForecastModel(rc_da_shell)
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def Invert(args):
""" Perform the inverse calculation """
import tools_da
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = tools_da.Invert(rc_da_shell)
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def Propagate(args):
""" Propagate the filter state to the next step """
from da_initexit import WriteRC
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
# This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)
dummy = ForwardRun(args,'advance')
dummy = WriteRC(rc_da_shell,args['jobrcfilename'])
return None
def SaveAndSubmit(args):
""" Save the model state and submit the next job """
from da_initexit import IOSaveData
from da_initexit import WriteRC
from da_initexit import WriteNewRCfile
from da_initexit import SubmitNextCycle
rc_da_shell = rc.read(args['jobrcfilename'])
dummy = ValidateRC(rc_da_shell,needed_rc_da_items)
dummy = IOSaveData(rc_da_shell,io_option='store',save_option='full')
dummy = WriteNewRCfile(rc_da_shell)
dummy = SubmitNextCycle(rc_da_shell)
return None
if __name__ == "__main__":
import sys
import os
import logging
import shutil
import subprocess
from tools_da import ParseOptions
from tools_da import StartLogger
# Append current working dir to path
sys.path.append(os.getcwd())
# Get name of logfile
opts, args = ParseOptions()
if not args.has_key("logfile"):
msg = "There is no logfile specified on the command line. Using logfile=logfile.log"
args['logfile'] = 'logfile.log'
logfile = args['logfile']
dummy = StartLogger(logfile=logfile)
if not args.has_key("rc"):
msg = "There is no rc-file specified on the command line. Please use rc=yourfile.rc" ; logging.error(msg)
raise IOError,msg
elif not os.path.exists(args['rc']):
msg = "The specified rc-file (%s) does not exist " % args['rc'] ; logging.error(msg)
raise IOError,msg
if not args.has_key("process"):
msg = "There is no execution process specified on the command line. Using default process=all" ; logging.error(msg)
args["process"] = 'all'
if not args["process"] in validprocess:
msg = "The specified execution process is not valid (%s). Please use one of %s"%(args['process'],validprocess) ; logging.error(msg)
raise IOError,msg
# Get name of the process requested
process=args['process']
msg = 'Process %s starting, entered python from master shell'%process ; logging.debug(msg)
if process == 'jobstart':
rcf = JobStart(opts,args)
if process == 'jobinput':
dummy = JobInput(args)
if process == 'sample':
dummy = ForwardRun(args,'forecast')
if process == 'invert':
dummy = Invert(args)
if process == 'propagate':
dummy = Propagate(args)
if process == 'resubmit':
dummy = SaveAndSubmit(args)
if process == 'all':
args['jobrcfilename'] = "jb.%s.rc"%(os.getpid(),)
msg = header+"starting JobStart"+footer ; logging.info(msg)
rcf = JobStart(opts,args)
msg = header+"starting JobInput"+footer ; logging.info(msg)
dummy = JobInput(args)
msg = header+"starting ForwardRun"+footer ; logging.info(msg)
dummy = ForwardRun(args,'forecast')
msg = header+"starting Invert"+footer ; logging.info(msg)
dummy = Invert(args)
msg = header+"starting Propagate"+footer ; logging.info(msg)
dummy = Propagate(args)
msg = header+"starting SaveAndSubmit"+footer ; logging.info(msg)
dummy = SaveAndSubmit(args)
msg = "Cycle finished...exiting" ; logging.info(msg)
# move log file to rundir/jobs
jobdir = os.path.join(rcf['dir.da_run'],"jobs")
joblogfile = os.path.join(jobdir,logfile)
dummy = shutil.move(logfile,joblogfile)
msg = "....Moved %s to %s"%(logfile,joblogfile) ; logging.debug(msg)
# move rc file to rundir/jobs
jobrcfile = os.path.join(jobdir,args["jobrcfilename"] )
dummy = shutil.move(args["jobrcfilename"],jobrcfile )
msg = "....Moved %s to %s"%(args['jobrcfilename'],jobrcfile) ; logging.debug(msg)
# cat TM5 output and rc-file to the job file output
tm5jobfile = os.path.join(jobdir,"tm5.%s"%(args['logfile']) )
if os.path.exists(tm5jobfile):
msg = "....Concatenating %s to %s"%(tm5jobfile,joblogfile) ; logging.debug(msg)
f = open(joblogfile,'a')
dummy = f.write(open(tm5jobfile,'r').read())
dummy = f.close()
if os.path.exists(jobrcfile):
msg = "....Concatenating %s to %s"%(jobrcfile,joblogfile) ; logging.debug(msg)
f = open(joblogfile,'a')
dummy = f.write(open(jobrcfile,'r').read())
dummy = f.close()
msg = "The complete log file is now at: %s"%(joblogfile) ; logging.info(msg)
msg = 'Process %s done, returning from python to master shell'%process ; logging.debug(msg)
sys.exit(0)