diff --git a/da/tools/pipeline.py b/da/tools/pipeline.py index b9a949bde7e0c4c2b834b80daa2363eab36e099f..490130b6fa71b89244d19c888796dc53abb139ef 100755 --- a/da/tools/pipeline.py +++ b/da/tools/pipeline.py @@ -44,35 +44,72 @@ def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsopera logging.info(header + "Initializing current cycle" + footer) start_job(dacycle, dasystem, platform, statevector, samples, obsoperator) - # Read from other simulation and write priors, then read posteriors and propagate + if dacycle.has_key('forward.savestate.dir'): + fwddir = dacycle['forward.savestate.dir'] + else: + logging.debug("No forward.savestate.dir key found in rc-file, proceeding with self-constructed prior parameters") + fwddir = False + + if dacycle.has_key('forward.savestate.legacy'): + legacy = (dacycle['forward.savestate.legacy'].upper() in ["TRUE","T","YES","Y"]) + else: + legacy = False + logging.debug("No forward.savestate.legacy key found in rc-file") - if dacycle['dir.forward.savestate'] != 'None': - filename = os.path.join(dacycle['dir.forward.savestate'], dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') - statevector.read_from_legacy_file(filename, 'prior') - else: + if not fwddir: + # Simply make a prior statevector using the normal method + + prepare_state(dacycle, statevector)#LU tutaj zamiast tego raczej to stworzenie nowej kowariancji i ensembli bo pozostale rzeczy sa na gorze i na doel. - + else: + # Read prior information from another simulation into the statevector. + # This loads the results from another assimilation experiment into the current statevector + + if not legacy: + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d')) + statevector.read_from_file(filename, 'prior') + else: + filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc') + statevector.read_from_legacy_file(filename, 'prior') + - # Write as prior fluxes to output dir + # We write this "prior" statevector to the restart directory, so we can later also populate it with the posterior statevector + # Note that we could achieve the same by just copying the wanted forward savestate.nc file to the restart folder of our current + # experiment, but then it would already contain a posterior field as well which we will try to write in save_and_submit. + # This could cause problems. Moreover, this method allows us to read older formatted savestate.nc files (legacy) and write them into + # the current format through the "write_to_file" method. - savefilename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) + savefilename = os.path.join(dacycle['dir.restart.current'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) statevector.write_to_file(savefilename, 'prior') - # Now read optimized fluxes to propagate + # Now read optimized fluxes which we will actually use to propagate through the system - if dacycle['dir.forward.savestate'] != 'None': #LU !? byloby tam cos takiego jka None? - statevector.read_from_legacy_file(filename) # by default will read "opt"(imized) variables, and then propagate + if not fwddir: + + # if there is no forward dir specified, we simply run forward with unoptimized prior fluxes in the statevector + logging.info("Running forward with prior savestate from: %s"%filename) + else: - - statevector.read_from_file(savefilename, 'prior') + # Read posterior information from another simulation into the statevector. + # This loads the results from another assimilation experiment into the current statevector + + if not legacy: + statevector.read_from_file(filename, 'opt') + else: + statevector.read_from_legacy_file(filename, 'opt') + + logging.info("Running forward with optimized savestate from: %s"%filename) - # Run forward with these parameters + # Finally, we run forward with these parameters advance(dacycle, samples, statevector, obsoperator) - - save_and_submit(dacycle, statevector) + + # In save_and_submit, the posterior statevector will be added to the savestate.nc file, and it is added to the copy list. + # This way, we have both the prior and posterior data from another run copied into this assimilation, for later analysis. + + save_and_submit(dacycle, statevector) logging.info("Cycle finished...exiting pipeline") #################################################################################################### @@ -249,7 +286,7 @@ def invert(dacycle, statevector, optimizer): diagnostics_file = os.path.join(dacycle['dir.output'], 'optimizer.%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) optimizer.write_diagnostics(diagnostics_file, 'prior') - optimizer.set_localization('None') + optimizer.set_localization('None') #WP This should go to the rc-file !!! if optimizer.algorithm == 'Serial': optimizer.serial_minimum_least_squares()