Commit 63b166f0 authored by karolina's avatar karolina
Browse files

functions "move_restart_data" and "recover_run" removed, the latter to be rewritten

parent 3884afbb
......@@ -117,7 +117,7 @@ def save_weekly_avg_1x1_data(DaCycle, StateVector):
for n in range(nlag, 0, -1):
priordate = enddate - timedelta(dt.days * n)
savedir = DaCycle['dir.output'].replace(startdate.strftime('%Y%m%d'), priordate.strftime('%Y%m%d'))
filename = os.path.join(savedir, 'savestate.nc')
filename = os.path.join(savedir, 'savestate.nc')#LU i suppose that it would be priordate (savestate gets same stamp as dir.output=
if os.path.exists(filename):
StateVector.read_from_file(filename, qual=qual_short)
gridmean, gridensemble = StateVector.state_to_grid(lag=n)
......@@ -270,7 +270,7 @@ def save_weekly_avg_state_data(DaCycle, StateVector):
#
# if not, process this cycle. Start by getting flux input data from CTDAS
#
filename = os.path.join(DaCycle['dir.output'], 'flux1x1_%s_%s.nc' % (startdate.strftime('%Y%m%d%H'), enddate.strftime('%Y%m%d%H'),))
filename = os.path.join(DaCycle['dir.output'], 'flux1x1_%s_%s.nc' % (startdate.strftime('%Y%m%d%H'), enddate.strftime('%Y%m%d%H')))
file = io.CT_Read(filename, 'read')
bio = np.array(file.get_variable(DaCycle.DaSystem['background.co2.bio.flux']))
......
......@@ -102,7 +102,7 @@ class Optimizer(object):
self.x[n * self.nparams:(n + 1) * self.nparams] = members[0].ParameterValues
self.X_prime[n * self.nparams:(n + 1) * self.nparams, :] = np.transpose(np.array([m.ParameterValues for m in members]))
if Samples != None: #LU jednoznaczne z if Samples
if Samples != None:
self.rejection_threshold = Samples.rejection_threshold
allreject.extend(Samples.getvalues('may_reject'))
......
......@@ -298,7 +298,7 @@ class StateVector(object):
# And now create a new time step of mean + members for n=nlag
date = DaCycle['time.start'] + timedelta(days=(self.nlag - 0.5) * int(DaCycle['time.cycle']))
cov = self.get_covariance(date, DaCycle)
self.make_new_ensemble(self.nlag, cov)
self.make_new_ensemble(self.nlag - 1, cov)
logging.info('The state vector has been propagated by one cycle')
......
......@@ -32,7 +32,8 @@ from da.analysis.expand_mixingratios import write_mixing_ratios
#################################################################################################
start_logger()
opts, args = validate_opts_args(parse_options())
opts, args = parse_options()
opts, args = validate_opts_args(opts, args)
#################################################################################################
# Create the Cycle Control object for this job
......
......@@ -288,7 +288,7 @@ class TM5ObservationOperator(ObservationOperator):
logging.debug('rc-file has been validated succesfully')
def modify_rc(self, NewValues):
def modify_rc(self, newvalues):
"""
Modify parts of the tm5 settings, for instance to give control of file locations to the DA shell
instead of to the tm5.rc script.
......@@ -297,7 +297,7 @@ class TM5ObservationOperator(ObservationOperator):
"""
for k, v in NewValues.iteritems():
for k, v in newvalues.iteritems():
if self.tm_settings.has_key(k):
# keep previous value
v_orig = self.tm_settings[k]
......@@ -369,8 +369,7 @@ class TM5ObservationOperator(ObservationOperator):
logging.error("Please compile the model with the specified rc-file and the regular TM5 scripts first")
raise IOError
#LU zmienic definicje w dokumentacji bo mowi o save.hdf podczas gdy juz takiego nie ma.
#LU kopiujemy tm5-owe restarty , chociaz w sumie juz je skopiowalismy wczesniej
def get_initial_data(self):
""" This method places all initial data needed by an ObservationOperator in the proper folder for the model.
For TM5, this means copying the save_*.hdf* files to the dir.save directory from which TM5 will read initial
......@@ -382,28 +381,28 @@ class TM5ObservationOperator(ObservationOperator):
"""
logging.debug("Moving TM5 model restart data from the restart/current directory to the TM5 save dir")
logging.debug("Moving TM5 model restart data from the restart directory to the TM5 save dir")
# First get the restart data for TM5 from the current restart dir of the filter
sourcedir = self.DaCycle['dir.restart.current']
sourcedir = self.DaCycle['dir.restart']
targetdir = self.tm_settings[self.savedirkey]
self.outputdir = self.tm_settings[self.outputdirkey] #IV test
for f in os.listdir(sourcedir):
f = os.path.join(sourcedir, f)
if os.path.isdir(f): # skip dirs
logging.debug(" [skip] .... %s " % f)
fpath = os.path.join(sourcedir, f)
if os.path.isdir(fpath): # skip dirs
logging.debug(" [skip] .... %s " % fpath)
continue
#if not f.startswith('save_'):
if not f.startswith('TM5_restart'):
logging.debug(" [skip] .... %s " % f)
logging.debug(" [skip] .... %s " % fpath)
continue
# all okay, copy file
logging.debug(" [copy] .... %s " % f)
shutil.copy(f, f.replace(sourcedir, targetdir))
logging.debug(" [copy] .... %s " % fpath)
shutil.copy(fpath, fpath.replace(sourcedir, targetdir))
logging.debug("All restart data have been copied from the restart/current directory to the TM5 save dir")
def run_forecast_model(self):
......
......@@ -142,12 +142,14 @@ class CycleControl(dict):
self[k] = False
if 'date' in k :
self[k] = to_datetime(v)
if 'time.start' in k :
self[k] = to_datetime(v)
if 'time.end' in k :
self[k] = to_datetime(v)
if 'time.finish' in k :
self[k] = to_datetime(v)
if k in ['time.start', 'time.end', 'time.finish', 'da.restart.tstamp']:
self[k] = to_datetime(v)#LU a gdzie time.sample.start? ona chyba zawsze jest wczytywana na nowo... w takim razie po co ona w pliku_
# if 'time.start' in k :
# self[k] = to_datetime(v)
# if 'time.end' in k :
# self[k] = to_datetime(v)
# if 'time.finish' in k :
# self[k] = to_datetime(v)
for key in needed_da_items:
if not self.has_key(key):
msg = 'Missing a required value in rc-file : %s' % key
......@@ -242,35 +244,25 @@ class CycleControl(dict):
self['time.end'] = enddate
def random_seed(self, action='read'):
"""
Get the randomseed and save it, or read the random seed and set it. The seed is currently stored
in a python :mod:`pickle` file, residing in the ``exec`` directory
"""
filename = os.path.join(self['dir.exec'], 'randomseed.pickle')
def write_random_seed(self):
filename = os.path.join(self['dir.restart'], 'randomseed_%s.pickle' % self['time.start'].strftime('%Y%m%d'))
f = open(filename, 'wb')
seed = np.random.get_state()
cPickle.dump(seed, f, -1)
f.close()
if action == 'write':
f = open(filename, 'wb')
seed = np.random.get_state()
cPickle.dump(seed, f, -1)
f.close()
logging.info("Saved the random seed generator values to file")
msg = "Saved the random seed generator values to file"
if action == 'read':
f = open(filename, 'rb')
seed = cPickle.load(f)
np.random.set_state(seed)
f.close()
msg = "Retrieved the random seed generator values from file"
def read_random_seed(self):
filename = os.path.join(self['dir.restart'], 'randomseed_%s.pickle' % self['da.restart.tstamp'].strftime('%Y%m%d'))
f = open(filename, 'rb')
seed = cPickle.load(f)
np.random.set_state(seed)
f.close()
logging.info(msg)
self.RestartFileList.append(filename)
logging.debug("Added the randomseed.pickle file to the RestartFileList")
logging.info("Retrieved the random seed generator values of last cycle from file")
def initialize(self):
......@@ -306,27 +298,16 @@ class CycleControl(dict):
* parse_times()
* WriteRc('jobfilename')
"""
#
# case 1: A recover from a previous crash, this is signaled by flag "-r"
#
if self['da.crash.recover']:
logging.info("Recovering simulation from data in: %s" % self['dir.da_run'])
self.setup_file_structure()
self.recover_run()
self.random_seed('read')
#
# case 2: A continuation, this is signaled by rc-item time.restart = True
#
elif self['time.restart']:
if self['time.restart']:
logging.info("Restarting filter from previous step")
self.setup_file_structure()
strippedname = os.path.split(self['jobrcfilename'])[-1]
self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname)
self.random_seed('read')
self.read_random_seed()
#
# case 3: A fresh start, this is signaled by rc-item time.restart = False
#
elif not self['time.restart']:
#elif not self['time.restart']:
else: #assume that it is a fresh start, change this condition to more specific if crash recover added
logging.info("First time step in filter sequence")
self.setup_file_structure()
......@@ -337,7 +318,7 @@ class CycleControl(dict):
self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname)
self.parse_times()
self.write_rc(self['jobrcfilename'])
self.write_rc(self['jobrcfilename'])#LU rc z dodatkowymi nazwami folderow itd. normalnie zapisany da.rc z false, ap otem przemieniony da_runtime. ile w tym sensu..... file_structure niby taka sama.(a na pewno byla w poprzendim...)ale czy to sie do czegos przydaje_
def setup_file_structure(self):
"""
......@@ -370,62 +351,24 @@ class CycleControl(dict):
self['dir.exec'] = os.path.join(self['dir.da_run'], 'exec')
self['dir.input'] = os.path.join(self['dir.da_run'], 'input')
self['dir.output'] = os.path.join(self['dir.da_run'], 'output', filtertime)
self['dir.diagnostics'] = os.path.join(self['dir.da_run'], 'diagnostics')
self['dir.analysis'] = os.path.join(self['dir.da_run'], 'analysis')
self['dir.jobs'] = os.path.join(self['dir.da_run'], 'jobs')
self['dir.restart'] = os.path.join(self['dir.da_run'], 'restart')
self['dir.restart.current'] = os.path.join(self['dir.restart'], 'current')
self['dir.restart.oneago'] = os.path.join(self['dir.restart'], 'one-ago')
#self['dir.restart.current'] = os.path.join(self['dir.restart'], 'current')
#self['dir.restart.oneago'] = os.path.join(self['dir.restart'], 'one-ago')
create_dirs(self['dir.da_run'])
create_dirs(os.path.join(self['dir.exec']))
create_dirs(os.path.join(self['dir.input']))
create_dirs(os.path.join(self['dir.output']))
create_dirs(os.path.join(self['dir.diagnostics']))
create_dirs(os.path.join(self['dir.analysis']))
create_dirs(os.path.join(self['dir.jobs']))
create_dirs(os.path.join(self['dir.restart']))
create_dirs(os.path.join(self['dir.restart.current']))
create_dirs(os.path.join(self['dir.restart.oneago']))
#create_dirs(os.path.join(self['dir.restart.current']))
#create_dirs(os.path.join(self['dir.restart.oneago']))
logging.info('Succesfully created the file structure for the assimilation job')
#LU tutaj chyba brakuje move restart data
def recover_run(self):
"""
Prepare a recovery from a crashed run. This consists of:
- copying all data from the restart/one-ago folder (:meth:`~da.tools.initexit.CycleControl.move_restart_data`),
- replacing all ``rc-file`` items with those from the ``da_runtime.rc`` in the restart/current dir
- resetting the seed of the random number generator to the value it had before the crash (:meth:`~da.tools.initexit.CycleControl.random_seed`)
- replacing the output dir name, since it has the sample time in it...
"""
# Replace rc-items with those from the crashed run's last rc-file (now in restart.current dir)
file_rc_rec = os.path.join(self['dir.restart.current'], 'da_runtime.rc')
rc_rec = rc.read(file_rc_rec)
for k, v in rc_rec.iteritems():
self[k] = v
self.validate_rc()
logging.debug("Replaced rc-items.... ")
logging.debug("Next cycle start date is %s" % self['time.start'])
# Copy randomseed.pickle file to exec dir
source = os.path.join(self['dir.restart.current'], 'randomseed.pickle') #LU wydaje mi sie ze tutaj nie trzeba podawac nazwy pliku w folderze docelowym, jesli sie obczai ze to folder to sie kopiuje.
dest = os.path.join(self['dir.exec'], 'randomseed.pickle')
shutil.copy(source, dest)
logging.debug("Replaced randomseed file with previous cycles' last values")
# Re-create the output dir for this time step, if needed
filtertime = self['time.start'].strftime('%Y%m%d')
self['dir.output'] = os.path.join(self['dir.da_run'], 'output', filtertime)
create_dirs(os.path.join(self['dir.output']))
def finalize(self):
"""
......@@ -439,9 +382,9 @@ class CycleControl(dict):
* Submit the next cycle
"""
self.random_seed('write')
self.write_new_rc_file()
self.move_restart_data(io_option='store') # Move restart data from current to one-ago
self.write_random_seed() #LU with timestamp -> to bedzie biezaca data, bo tez nie mamy za bardzo skad wziac nastepnej.
self.write_new_rc_file() #LU with timestamp -> to bedzie nastepna data. bo da_runtime
self.collect_restart_data() # Collect restart data for next cycle into a clean restart/current folder
self.collect_output() # Collect restart data for next cycle into a clean restart/current folder
self.submit_next_cycle()
......@@ -497,11 +440,11 @@ class CycleControl(dict):
"""
targetdir = os.path.join(self['dir.restart.current'])
targetdir = os.path.join(self['dir.restart'])
logging.info("Purging the current restart directory before collecting new data")
#logging.info("Purging the current restart directory before collecting new data")
create_dirs(targetdir, forceclean=True)
#create_dirs(targetdir, forceclean=True)
logging.info("Collecting the required restart data")
logging.debug(" to directory: %s " % targetdir)
......@@ -509,57 +452,13 @@ class CycleControl(dict):
for file in set(self.RestartFileList):
if os.path.isdir(file): # skip dirs
continue
if not os.path.exists(file): # skip dirs
if not os.path.exists(file):
logging.warning(" [not found] .... %s " % file)
else:
logging.debug(" [copy] .... %s " % file)
shutil.copy(file, file.replace(os.path.split(file)[0], targetdir))
def move_restart_data(self, io_option='restore'):
"""
Store or restore model state to/from a restart directory.
Two IO options are available:
(1) io_option = restore : Get data from restart.oneago directory
(2) io_option = store : Save data to restart.oneago directory
In case of a 'store' command the restart.oneago folder is re-created so that the contents are empty to begin with.
"""
if io_option not in ['store', 'restore']:
raise ValueError, 'Invalid option specified for io_option (%s)' % io_option
if io_option == 'store':
targetdir = self['dir.restart.oneago']
sourcedir = self['dir.restart.current']
elif io_option == 'restore':
sourcedir = self['dir.restart.oneago']
targetdir = self['dir.restart.current']
# If "store" is requested, recreate target dir, cleaning the contents
if io_option == 'store':
create_dirs(os.path.join(targetdir), forceclean=True)
logging.debug("Performing a %s of data" % io_option)
logging.debug(" from directory: %s " % sourcedir)
logging.debug(" to directory: %s " % targetdir)
for file in os.listdir(sourcedir):
file = os.path.join(sourcedir, file)
if not os.path.exists(file):
logging.debug("Cannot find requested file to move: %s " % file)
sys.exit(2)
if os.path.isdir(file): # skip dirs
logging.debug(" [skip] .... %s " % file)
continue
else:
logging.debug(" [copy] .... %s " % file)
shutil.copy(file, file.replace(sourcedir, targetdir))
#
def write_new_rc_file(self):
......@@ -572,26 +471,27 @@ class CycleControl(dict):
"""
# We make a copy of the current DaCycle object, and modify the start + end dates and restart value
newDaCycle = copy.deepcopy(self)
newDaCycle['da.restart.tstamp'] = self['time.start']
newDaCycle.advance_cycle_times()
newDaCycle['time.restart'] = True
# Create the name of the rc-file that will hold this new input, and write it
fname = os.path.join(self['dir.exec'], 'da_runtime.rc') # current exec dir holds next rc file
#fname = os.path.join(self['dir.exec'], 'da_runtime.rc') # current exec dir holds next rc file
fname = os.path.join(self['dir.restart'], 'da_runtime_%s.rc' % newDaCycle['time.start'].strftime('%Y%m%d'))#advanced time
rc.write(fname, newDaCycle)
logging.debug('Wrote new da_runtime.rc (%s) to exec dir' % fname)
# The rest is info needed for a system restart, so it modifies the current DaCycle object (self)
self['da.restart.fname'] = fname # needed for next job template
self.RestartFileList.append(fname) # current restart list holds next rc file name
logging.debug('Added da_runtime.rc to the RestartFileList for later collection')
#self.RestartFileList.append(fname) # current restart list holds next rc file name #LU not that needed since it is already written to the restart dir...
#logging.debug('Added da_runtime.rc to the RestartFileList for later collection')
def write_rc(self, fname):
......@@ -728,8 +628,4 @@ def validate_opts_args(opts, args):
if __name__ == "__main__":
sys.path.append('../../')
opts, args = parse_options()
print opts
print args
......@@ -42,11 +42,7 @@ def forward_pipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOpera
sys.path.append(os.getcwd())
logging.info(header + "Initializing current cycle" + footer)
start_job(DaCycle, DaSystem, PlatForm, StateVector, Samples, ObsOperator)
# Create State Vector
StateVector.initialize(DaCycle)
start_job(DaCycle, DaSystem, PlatForm, StateVector, Samples, ObsOperator)
# Read from other simulation and write priors, then read posteriors and propagate
......@@ -60,7 +56,8 @@ def forward_pipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOpera
# Write as prior fluxes to output dir
savefilename = os.path.join(DaCycle['dir.output'], 'savestate.nc') #LU to jest state vector po zoptymalizowaniu.
#savefilename = os.path.join(DaCycle['dir.output'], 'savestate.nc')
savefilename = os.path.join(DaCycle['dir.restart'], 'savestate_%s.nc' % DaCycle['time.start'].strftime('%Y%m%d'))
StateVector.write_to_file(savefilename, 'prior')
# Now read optimized fluxes to propagate
......@@ -68,6 +65,7 @@ def forward_pipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOpera
if DaCycle['dir.forward.savestate'] != 'None': #LU !? byloby tam cos takiego jka None?
StateVector.read_from_legacy_file(filename) # by default will read "opt"(imized) variables, and then propagate
else:
StateVector.read_from_file(savefilename, 'prior')
......@@ -116,15 +114,18 @@ def prepare_state(DaCycle, StateVector):
# Read the StateVector data from file
saved_sv = os.path.join(DaCycle['dir.restart.current'], 'savestate.nc')
StateVector.read_from_file(saved_sv) # by default will read "opt"(imized) variables, and then propagate
#saved_sv = os.path.join(DaCycle['dir.restart.current'], 'savestate.nc')
saved_sv = os.path.join(DaCycle['dir.restart'], 'savestate_%s.nc' % DaCycle['da.restart.tstamp'].strftime('%Y%m%d'))
StateVector.read_from_file(saved_sv) # by default will read "opt"(imized) variables, and then propagate #LU najlatwiej - jesli jest to da.restart.tstamp
# Now propagate the ensemble by one cycle to prepare for the current cycle
StateVector.propagate(DaCycle)
# Finally, also write the StateVector to a file so that we can always access the a-priori information
current_sv = os.path.join(DaCycle['dir.output'], 'savestate.nc')
current_sv = os.path.join(DaCycle['dir.restart'], 'savestate_%s.nc' % DaCycle['time.start'].strftime('%Y%m%d'))
StateVector.write_to_file(current_sv, 'prior') # write prior info
def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
......@@ -154,7 +155,7 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
logging.debug("StateVector now carries %d samples" % StateVector.nobs)
def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag):
def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, advance=False):
""" Perform all actions needed to sample one cycle """
......@@ -214,13 +215,14 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag):
# (and at least) once # in the assimilation
#LU jaki to ma sens: taki ze zawsze mamy flask_output.nc. tylko ze on jest chyba nadpisywany za kazdym cyklem? no coz.. chyba byla na ten temat dyskusja, i wyszlo ze for debugging. niech i tak zostanie.
if DaCycle['time.restart'] == False or lag == int(DaCycle['time.nlag']) - 1:
StateVector.ObsToAssimmilate += (copy.deepcopy(Samples),)
StateVector.nobs += Samples.getlength()
logging.debug("Added samples from the observation operator to the assimilated obs list in the StateVector")
if not advance:
if DaCycle['time.restart'] == False or lag == int(DaCycle['time.nlag']) - 1:
StateVector.ObsToAssimmilate += (copy.deepcopy(Samples),)
StateVector.nobs += Samples.getlength()
logging.debug("Added samples from the observation operator to the assimilated obs list in the StateVector")
else:
StateVector.ObsToAssimmilate += (None,)
else:
StateVector.ObsToAssimmilate += (None,)
def invert(DaCycle, StateVector, Optimizer):
......@@ -245,7 +247,7 @@ def invert(DaCycle, StateVector, Optimizer):
Optimizer.initialize(dims)
Optimizer.state_to_matrix(StateVector)
diagnostics_file = os.path.join(DaCycle['dir.diagnostics'], 'optimizer.%s.nc' % DaCycle['time.start'].strftime('%Y%m%d'))
diagnostics_file = os.path.join(DaCycle['dir.output'], 'optimizer.%s.nc' % DaCycle['time.start'].strftime('%Y%m%d'))
Optimizer.write_diagnostics(diagnostics_file, 'prior')
Optimizer.set_localization('None')
......@@ -257,7 +259,7 @@ def invert(DaCycle, StateVector, Optimizer):
Optimizer.matrix_to_state(StateVector)
Optimizer.write_diagnostics(diagnostics_file, 'optimized')
DaCycle.OutputFileList.append(diagnostics_file)
def advance(DaCycle, Samples, StateVector, ObservationOperator):
......@@ -271,7 +273,7 @@ def advance(DaCycle, Samples, StateVector, ObservationOperator):
ObservationOperator.get_initial_data()
sample_step(DaCycle, Samples, StateVector, ObservationOperator, 0)
sample_step(DaCycle, Samples, StateVector, ObservationOperator, 0, True)
DaCycle.RestartFileList.extend(ObservationOperator.RestartFileList)
DaCycle.OutputFileList.extend(ObservationOperator.OutputFileList)
......@@ -280,7 +282,7 @@ def advance(DaCycle, Samples, StateVector, ObservationOperator):
DaCycle.OutputFileList.append(DaCycle['ObsOperator.inputfile'])
logging.debug("Appended Observation filename to DaCycle for collection ")
outfile = os.path.join(DaCycle['dir.output'], 'sampleinfo_%s.nc' % (DaCycle['time.sample.stamp']))
outfile = os.path.join(DaCycle['dir.output'], 'sampleinfo_%s.nc' % DaCycle['time.sample.stamp'])
Samples.write_obs_to_file(outfile)
......@@ -288,10 +290,10 @@ def save_and_submit(DaCycle, StateVector):
""" Save the model state and submit the next job """
logging.info(header + "starting save_and_submit" + footer)
filename = os.path.join(DaCycle['dir.output'], 'savestate.nc')
filename = os.path.join(DaCycle['dir.restart'], 'savestate_%s.nc' % DaCycle['time.start'].strftime('%Y%m%d'))
StateVector.write_to_file(filename, 'opt')
DaCycle.RestartFileList.append(filename)
DaCycle.OutputFileList.append(filename)
DaCycle.finalize()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment