Something went wrong on our end
Forked from
CTDAS / CTDAS
356 commits behind the upstream repository.
-
Peters, Wouter authoredPeters, Wouter authored
pipeline.py 15.57 KiB
#!/usr/bin/env python
# pipeline.py
"""
.. module:: pipeline
.. moduleauthor:: Wouter Peters
Revision History:
File created on 06 Sep 2010.
The pipeline module holds methods that execute consecutive tasks with each of the objects of the DA system.
"""
import logging
import os
import sys
import datetime
header = '\n\n *************************************** '
footer = ' *************************************** \n '
def EnsembleSmootherPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperator, Optimizer):
""" The main point of entry for the pipeline """
sys.path.append(os.getcwd())
logging.info(header + "Initializing current cycle" + footer)
start_job(DaCycle, DaSystem, PlatForm, StateVector, Samples, ObsOperator)
prepare_state(DaCycle, StateVector)
sample_state(DaCycle, Samples, StateVector, ObsOperator)
invert(DaCycle, StateVector, Optimizer)
advance(DaCycle, Samples, StateVector, ObsOperator)
save_and_submit(DaCycle, StateVector)
logging.info("Cycle finished...exiting pipeline")
def ForwardPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperator, Optimizer):
""" The main point of entry for the pipeline """
sys.path.append(os.getcwd())
logging.info(header + "Initializing current cycle" + footer)
start_job(DaCycle, DaSystem, PlatForm, StateVector, Samples, ObsOperator)
# Create State Vector
StateVector.Initialize() #LU w prepare state inicjalizujemy wektor stanu ktoremu dopiero co przypisalismy wartosci.
# Read from other simulation
filename = os.path.join(DaCycle['dir.forward.savestate'],DaCycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') #LU teraz czytamy savestate.nc
StateVector.ReadFromFile(filename) # by default will read "opt"(imized) variables, and then propagate
# Write as prior fluxes to output dir
savedir = DaCycle['dir.output']
filename = os.path.join(savedir, 'savestate.nc') #LU to jest state vector po zoptymalizowaniu.
StateVector.WriteToFile(filename)
StateVector.isOptimized = True
# Run forward with these parameters
advance(DaCycle, Samples, StateVector, ObsOperator)
save_and_submit(DaCycle, StateVector)
logging.info("Cycle finished...exiting pipeline")
####################################################################################################
def start_job(DaCycle, DaSystem, DaPlatForm, StateVector, Samples, ObsOperator):
""" Set up the job specific directory structure and create an expanded rc-file """
DaSystem.Validate() #LU tylko sprawdza needed rc items in the file
DaCycle.DaSystem = DaSystem #LU przypisuje dacyclowi liste parametrow
DaCycle.DaPlatForm = DaPlatForm #LU przypisuje cyklowi platforme (tez liste parametrow)
DaCycle.Initialize() #LU nastepnie cykl zostaje inicjalizowany...bardzo logiczne
StateVector.DaCycle = DaCycle # also embed object in StateVector so it can access cycle information for I/O etc #LU cykl zostaje przypisany state vectorowi
Samples.DaCycle = DaCycle # also embed object in Samples object so it can access cycle information for I/O etc #LU cykl zostaje przypisany probkom
ObsOperator.DaCycle = DaCycle # also embed object in ObsOperator object so it can access cycle information for I/O etc #LU cykl zostaje przypisany obsoperatorowi
ObsOperator.Initialize() # Setup Observation Operator #LU a pote mobsoperator jest inicjalizowany
def prepare_state(DaCycle, StateVector):
""" Set up the input data for the forward model: obs and parameters/fluxes"""
# We now have an empty StateVector object that we need to populate with data. If this is a continuation from a previous cycle, we can read
# the previous StateVector values from a NetCDF file in the restart/current directory. If this is the first cycle, we need to populate the StateVector
# with new values for each week. After we have constructed the StateVector, it will be propagated by one cycle length so it is ready to be used
# in the current cycle
logging.info(header + "starting prepare_state" + footer)
StateVector.Initialize() #LU w prepare state inicjalizujemy wektor stanu ktoremu dopiero co przypisalismy wartosci.
#LU to jest zalezne od cyklu, i cykl pojedynczy moze miec time.restart lub moze nie miec.
if not DaCycle['time.restart']: #LU jesli jest to pierwszy cykl
# Fill each week from n=1 to n=nlag with a new ensemble
nlag = StateVector.nlag #LU dla kazdego od zera do dlugosc(cykl) wyznaczamy date oraz znajdujemy kowariancje i robimy nowa ensemble.
for n in range(0, nlag):
date = DaCycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(DaCycle['time.cycle'])) #LU ta data jest tutaj potrzebna tylko do znalezienia odpowiedniego pliku z kowariancja.
cov = StateVector.get_covariance(date)
StateVector.make_new_ensemble(n + 1, cov)
else:
# Read the StateVector data from file
filename = os.path.join(DaCycle['dir.restart.current'], 'savestate.nc') #LU teraz czytamy savestate.nc
StateVector.ReadFromFile(filename) # by default will read "opt"(imized) variables, and then propagate
# Now propagate the ensemble by one cycle to prepare for the current cycle
StateVector.propagate()
# Finally, also write the StateVector to a file so that we can always access the a-priori information
filename = os.path.join(DaCycle['dir.output'], 'savestate.nc')
StateVector.WriteToFile(filename) # write prior info because StateVector.Isoptimized == False for now
def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
""" Sample the filter state for the inversion """
# Before a forecast step, save all the data to a save/tmp directory so we can later recover it before the propagation step.
# This is especially important for:
# (i) The transport model restart data which holds the background mixing ratios. This is needed to run the model one cycle forward
# (ii) The random numbers (or the seed for the random number generator) so we can recreate the ensembles if needed
#status = DaCycle.MoveSaveData(io_option='store',save_option='partial',filter=[])
#msg = "All restart data have been copied to the save/tmp directory for future use" ; logging.debug(msg)
logging.info(header + "starting sample_state" + footer)
nlag = int(DaCycle['time.nlag'])
logging.info("Sampling model will be run over %d cycles" % nlag) #LU w ramach modelu jest 3 cykle, czyli 3 przejscia. jednoczesnie w ramach kazdego cyklu idzie sie 3 tygodnie do przodu
ObservationOperator.get_initial_data()
for lag in range(nlag): #LU no to teraz robimy te przejsica
logging.info(header + ".....Ensemble Kalman Filter at lag %d" % (lag + 1))
############# Perform the actual sampling loop #####################
sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag)
############# Finished the actual sampling loop #####################
logging.debug("StateVector now carries %d samples" % StateVector.nobs)
def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, isadvance=False):
""" Perform all actions needed to sample one cycle """
import copy
# First set up the information for time start and time end of this sample
DaCycle.set_sample_times(lag)
startdate = DaCycle['time.sample.start']
enddate = DaCycle['time.sample.end']
DaCycle['time.sample.window'] = lag
DaCycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H"))
logging.info("New simulation interval set : ")
logging.info(" start date : %s " % startdate.strftime('%F %H:%M'))
logging.info(" end date : %s " % enddate.strftime('%F %H:%M'))
logging.info(" file stamp: %s " % DaCycle['time.sample.stamp'])
# Implement something that writes the ensemble member parameter info to file, or manipulates them further into the
# type of info needed in your transport model
StateVector.write_members_to_file(lag + 1) #LU parameters.nc
Samples.Initialize() #LU to daje przede wszystkim zresetowanie data list. czyli to znaczy ze data list jest za kazdym razem nowa przy odpalaniu nowego cyklu
Samples.add_observations()
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
Samples.add_model_data_mismatch()
filename = Samples.write_sample_info() #LU observations.nc
# Write filename to DaCycle, and to output collection list
DaCycle['ObsOperator.inputfile'] = filename
# Run the observation operator
run_forecast_model(DaCycle, ObservationOperator)
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
#Samples.add_model_data_mismatch()
#msg = "Added Model Data Mismatch to all samples " ; logging.debug(msg)
# Read forecast model samples that were written to NetCDF files by each member. Add them to the exisiting
# Observation object for each sample loop. This data fill be written to file in the output folder for each sample cycle.
# We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates
# one file per member, some logic needs to be included to merge all files!!!
filename = os.path.join(ObservationOperator.outputdir, 'flask_output.%s.nc' % DaCycle['time.sample.stamp'])
Samples.add_simulations(filename)
# Now write a small file that holds for each observation a number of values that we need in postprocessing
#filename = Samples.write_sample_info()
# Now add the observations that need to be assimilated to the StateVector.
# Note that obs will only be added to the statevector if either this is the first step (restart=False), or lag==nlag
# This is to make sure that the first step of the system uses all observations available, while the subsequent
# steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only
# (and at least) once # in the assimilation
if lag == int(DaCycle['time.nlag']) - 1 or DaCycle['time.restart'] == False:
StateVector.ObsToAssimmilate += (copy.deepcopy(Samples),)
StateVector.nobs += Samples.getlength()
logging.debug("Added samples from the observation operator to the assimilated obs list in the StateVector")
else:
StateVector.ObsToAssimmilate += (None,)
def invert(DaCycle, StateVector, Optimizer):
""" Perform the inverse calculation """
logging.info(msg = header + "starting invert" + footer)
dims = (int(DaCycle['time.nlag']),
int(DaCycle['da.optimizer.nmembers']),
int(DaCycle.DaSystem['nparameters']),
StateVector.nobs)
Optimizer.Initialize(dims)
Optimizer.state_to_matrix(StateVector)
Optimizer.write_diagnostics(DaCycle, StateVector, type='prior')
Optimizer.set_localization('None')
if not DaCycle.DaSystem.has_key('opt.algorithm'):
logging.info("There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)")
logging.info("...using serial algorithm as default...")
Optimizer.serial_minimum_least_squares()
elif DaCycle.DaSystem['opt.algorithm'] == 'serial':
logging.info("Using the serial minimum least squares algorithm to solve ENKF equations")
Optimizer.serial_minimum_least_squares()
elif DaCycle.DaSystem['opt.algorithm'] == 'bulk':
logging.info("Using the bulk minimum least squares algorithm to solve ENKF equations")
Optimizer.bulk_minimum_least_squares()
Optimizer.matrix_to_state(StateVector)
Optimizer.write_diagnostics(DaCycle, StateVector, type='optimized')
StateVector.isOptimized = True
def advance(DaCycle, Samples, StateVector, ObservationOperator):
""" Advance the filter state to the next step """
# This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)
# Then, restore model state from the start of the filter
logging.info(header + "starting advance" + footer)
logging.info("Sampling model will be run over 1 cycle")
ObservationOperator.get_initial_data()
sample_step(DaCycle, Samples, StateVector, ObservationOperator, 0, True) #LU w srodku zmienia zawartosc obiektu samples
#LU ale z drugiej strony dodaje tez pozniej samples, to tak jakby chcial na nowo dodac ...
#LU a to dlatego ze samples jest inicjalizowane na nowo.
#LU skoro w srodku sample step jest inicjalizowanie samples przez przypisanie datalist=[], to czy ten obiekt samples ma jkaies podobiekty ktore kaza mu byc przekazywanym?
DaCycle.RestartFileList.extend(ObservationOperator.RestartFileList)
DaCycle.OutputFileList.extend(ObservationOperator.OutputFileList)
logging.debug("Appended ObsOperator restart and output file lists to DaCycle for collection ")
DaCycle.OutputFileList.append(DaCycle['ObsOperator.inputfile'])
logging.debug("Appended Observation filename to DaCycle for collection ")
#LU tego nie ma w oryginalnym kodzie
Samples.write_obs_to_file("newstyle")
def save_and_submit(DaCycle, StateVector):
""" Save the model state and submit the next job """
logging.info(header + "starting save_and_submit" + footer)
savedir = DaCycle['dir.output']
filename = os.path.join(savedir, 'savestate.nc') #LU to jest state vector po zoptymalizowaniu.
StateVector.WriteToFile(filename)
DaCycle.RestartFileList.append(filename) # write optimized info because StateVector.Isoptimized == False for now
DaCycle.Finalize()
def run_forecast_model(DaCycle, ObsOperator):
"""Prepare and execute a forecast step using an external Fortran model. Note that the flavor of model
used is determined in the very first line where the import statement of module "model" depends on a
setting in your da.rc file. After that choice, the module written specifically for a particular
application takes over. There are a few required variables and methods in such a module:
version [variable, string] : defines the version number of the module, e.g. "1.0"
identifier [variable, string] : identifies the model used as a string variable, e.g. "TM5"
PrepareExe (method) : A method that preps the model simulation. It can for instance create an input
parameter file for the model (tm5.rc), or execute some scripts needed before the
actual execution starts (get_meteo). After this step, a call to the model
executable should start a succesfull simulation
run (method) : Start the executable. How this is done depends on your model and might involve
submitting a string of jobs to the queue, or simply spawning a subprocess, or ...
"""
ObsOperator.prepare_run()
ObsOperator.validate_input()
ObsOperator.run()
ObsOperator.save_data()