Commit 0dd522b7 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

Minor changes to flow of sample_step and sample_state. Caught a possible...

Minor changes to flow of sample_step and sample_state. Caught a possible problem with the get_initialdata for the ObsOperator, which also needs to happen for an advance step.

Implemented the first outlines of a ForwardPipeline that simply runs TM5 with optimized parameters from another run. To be completed...

parent 298dfacf
......@@ -36,6 +36,36 @@ def EnsembleSmootherPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector,
save_and_submit(DaCycle, StateVector)
logging.info("Cycle finished...exiting pipeline")
def ForwardPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperator, Optimizer):
""" The main point of entry for the pipeline """
sys.path.append(os.getcwd())
logging.info(header + "Initializing current cycle" + footer)
start_job(DaCycle, DaSystem, PlatForm, StateVector, Samples, ObsOperator)
# Create State Vector
StateVector.Initialize() #LU w prepare state inicjalizujemy wektor stanu ktoremu dopiero co przypisalismy wartosci.
# Read from other simulation
filename = os.path.join(DaCycle['dir.forward.savestate'],DaCycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') #LU teraz czytamy savestate.nc
StateVector.ReadFromFile(filename) # by default will read "opt"(imized) variables, and then propagate
# Write as prior fluxes to output dir
savedir = DaCycle['dir.output']
filename = os.path.join(savedir, 'savestate.nc') #LU to jest state vector po zoptymalizowaniu.
StateVector.WriteToFile(filename)
StateVector.isOptimized = True
# Run forward with these parameters
advance(DaCycle, Samples, StateVector, ObsOperator)
save_and_submit(DaCycle, StateVector)
logging.info("Cycle finished...exiting pipeline")
####################################################################################################
def start_job(DaCycle, DaSystem, DaPlatForm, StateVector, Samples, ObsOperator):
......@@ -109,14 +139,11 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
nlag = int(DaCycle['time.nlag'])
logging.info("Sampling model will be run over %d cycles" % nlag) #LU w ramach modelu jest 3 cykle, czyli 3 przejscia. jednoczesnie w ramach kazdego cyklu idzie sie 3 tygodnie do przodu
ObservationOperator.get_initial_data()
for lag in range(nlag): #LU no to teraz robimy te przejsica
logging.info(header + ".....Ensemble Kalman Filter at lag %d" % (lag + 1))
# If we are going to sample the lag = 0 time step, place the needed files to run the transport model in the right folder first
if lag == 0: #LU w pierwszym tygodniu danego cyklu bierzemy initial data dla tm5, tzn jakies pewnie ustawienia.
ObservationOperator.get_initial_data()
############# Perform the actual sampling loop #####################
sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag)
......@@ -125,21 +152,6 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
logging.debug("StateVector now carries %d samples" % StateVector.nobs)
# Add the observation operator restart+output files to the general Output+RestartFileList, only after 'advance'
# Same logic for observation files
if lag == 0:
DaCycle.RestartFileList.extend(ObservationOperator.RestartFileList)
DaCycle.OutputFileList.extend(ObservationOperator.OutputFileList)
logging.debug("Appended ObsOperator restart and output file lists to DaCycle for collection ")
DaCycle.OutputFileList.append(DaCycle['ObsOperator.inputfile'])
logging.debug("Appended Observation filename to DaCycle for collection ")
# Optionally, post-processing of the model output can be added that deals for instance with
# sub-sampling of time series, vertical averaging, etc.
def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, isadvance=False):
""" Perform all actions needed to sample one cycle """
......@@ -192,13 +204,12 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, isadvan
# We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates
# one file per member, some logic needs to be included to merge all files!!!
filename = os.path.join(ObservationOperator.outputdir, 'flask_output.%s.nc' % DaCycle['time.sample.stamp'])
Samples.add_simulations(filename)
# Now write a small file that holds for each observation a number of values that we need in postprocessing
filename = Samples.write_sample_info()
#filename = Samples.write_sample_info()
# Now add the observations that need to be assimilated to the StateVector.
# Note that obs will only be added to the statevector if either this is the first step (restart=False), or lag==nlag
......@@ -213,9 +224,6 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, isadvan
else:
StateVector.ObsToAssimmilate += (None,)
#LU tego nie ma w oryginalnym kodzie
if isadvance:
Samples.write_obs_to_file("newstyle")
def invert(DaCycle, StateVector, Optimizer):
......@@ -256,13 +264,16 @@ def advance(DaCycle, Samples, StateVector, ObservationOperator):
# Then, restore model state from the start of the filter
logging.info(header + "starting advance" + footer)
logging.info("Sampling model will be run over 1 cycle")
ObservationOperator.get_initial_data()
sample_step(DaCycle, Samples, StateVector, ObservationOperator, 0, True) #LU w srodku zmienia zawartosc obiektu samples
#LU ale z drugiej strony dodaje tez pozniej samples, to tak jakby chcial na nowo dodac ...
#LU a to dlatego ze samples jest inicjalizowane na nowo.
#LU skoro w srodku sample step jest inicjalizowanie samples przez przypisanie datalist=[], to czy ten obiekt samples ma jkaies podobiekty ktore kaza mu byc przekazywanym?
# Write info from optimized flux cycle to a file
Samples.write_obs_to_file()
#LU tego nie ma w oryginalnym kodzie
Samples.write_obs_to_file("newstyle")
def save_and_submit(DaCycle, StateVector):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment