Commit 3884afbb authored by karolina's avatar karolina
Browse files

adopted changes from email on 20.05

parent 1116cd36
......@@ -150,7 +150,7 @@ class Optimizer(object):
logging.debug('Returning optimized data to the StateVector, setting "StateVector.isOptimized = True" ')
def write_diagnostics(self, DaCycle, StateVector, type='prior'):
def write_diagnostics(self, filename, type='prior'):
"""
Open a NetCDF file and write diagnostic output from optimization process:
......@@ -165,10 +165,6 @@ class Optimizer(object):
The type designation refers to the writing of prior or posterior data and is used in naming the variables"
"""
outdir = DaCycle['dir.diagnostics']
filename = os.path.join(outdir, 'optimizer.%s.nc' % DaCycle['time.start'].strftime('%Y%m%d'))
DaCycle.OutputFileList.append(filename)
# Open or create file
if type == 'prior':
......
......@@ -254,16 +254,16 @@ class StateVector(object):
# If this is not the start of the filter, average previous two optimized steps into the mix
if lag == self.nlag and self.nlag >= 3:
newmean += self.EnsembleMembers[lag - 2][0].ParameterValues + \
self.EnsembleMembers[lag - 3][0].ParameterValues
if lag == self.nlag - 1 and self.nlag >= 3:
newmean += self.EnsembleMembers[lag - 1][0].ParameterValues + \
self.EnsembleMembers[lag - 2][0].ParameterValues
newmean = newmean / 3.0
# Create the first ensemble member with a deviation of 0.0 and add to list
newmember = EnsembleMember(0)
newmember.ParameterValues = newmean.flatten() # no deviations
self.EnsembleMembers[lag - 1].append(newmember)
self.EnsembleMembers[lag].append(newmember)
# Create members 1:nmembers and add to EnsembleMembers list
......@@ -272,9 +272,9 @@ class StateVector(object):
newmember = EnsembleMember(member)
newmember.ParameterValues = np.dot(C, rands) + newmean
self.EnsembleMembers[lag - 1].append(newmember)
self.EnsembleMembers[lag].append(newmember)
logging.debug('%d new ensemble members were added to the state vector # %d' % (self.nmembers, lag))
logging.debug('%d new ensemble members were added to the state vector # %d' % (self.nmembers, lag + 1))
def propagate(self, DaCycle):
......
......@@ -74,9 +74,6 @@ class CapeGrimPlatForm(PlatForm):
return template
msg1 = 'Platform initialized: %s' % self.Identifier ; logging.info(msg1)
#msg2 = '%s version: %s'%(self.Identifier,self.Version) ; logging.info(msg2)
if __name__ == "__main__":
pass
......@@ -9,8 +9,6 @@ File created on 06 Sep 2010.
"""
import logging
from da.baseclasses.platform import PlatForm, std_joboptions
class MaunaloaPlatForm(PlatForm):
......@@ -73,9 +71,5 @@ class MaunaloaPlatForm(PlatForm):
return template
msg1 = 'Platform initialized: %s' % self.Identifier ; logging.info(msg1)
#msg2 = '%s version: %s'%(self.Identifier,self.Version) ; logging.info(msg2)
if __name__ == "__main__":
pass
......@@ -68,7 +68,7 @@ class TM5ObservationOperator(ObservationOperator):
self.Identifier = identifier # the identifier gives the model name
self.Version = version # the model version used
self.RestartFileList = []
self.OutputFileList = ()
self.OutputFileList = []
self.RcFileType = 'None'
self.outputdir = None # Needed for opening the samples.nc files created
......@@ -331,8 +331,7 @@ class TM5ObservationOperator(ObservationOperator):
"""
Make sure that parameter files are written to the TM5 inputdir, and that observation lists are present
"""
#LU tutaj generalnie sprawdzamy czy input sie zgadza, czyli czy istnieja wszystkie pliki parametrow oraz plik obserwacji wskazywany przez dacycle[obsopertor.inputfile]
#LU a potem jeszcze sprawdzmay czy jest odpowiedni executable, z tym ze nie bardzo wiem skad sie go bierze.
datadir = self.tm_settings['ct.params.input.dir']
if not os.path.exists(datadir):
msg = "The specified input directory for the TM5 model to read from does not exist (%s), exiting..." % datadir
......@@ -407,7 +406,11 @@ class TM5ObservationOperator(ObservationOperator):
shutil.copy(f, f.replace(sourcedir, targetdir))
logging.debug("All restart data have been copied from the restart/current directory to the TM5 save dir")
def run_forecast_model(self):
self.prepare_run()
self.validate_input()
self.run()
self.save_data()
def run(self):
"""
......@@ -487,11 +490,11 @@ class TM5ObservationOperator(ObservationOperator):
jobid = 'tm5'
targetdir = os.path.join(self.DaCycle['dir.exec'])
jobfile = os.path.join(targetdir, 'jb.%s.jb' % jobid)
logfile = jobfile.replace('.jb', '.log')
#logfile = jobfile.replace('.jb', '.log')
template = DaPlatForm.get_job_template(jobparams, block=True)
template += 'cd %s\n' % targetdir
template += 'mpirun -np %d %s ./tm5.x\n' % (int(nthreads), mpi_shell_filename,)
template += 'mpirun -np %d %s ./tm5.x\n' % (int(nthreads), mpi_shell_filename)
DaPlatForm.write_job(jobfile, template, jobid)
......
......@@ -46,7 +46,7 @@ def forward_pipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOpera
# Create State Vector
StateVector.initialize(DaCycle) #LU w prepare state inicjalizujemy wektor stanu ktoremu dopiero co przypisalismy wartosci.
StateVector.initialize(DaCycle)
# Read from other simulation and write priors, then read posteriors and propagate
......@@ -54,7 +54,7 @@ def forward_pipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOpera
filename = os.path.join(DaCycle['dir.forward.savestate'], DaCycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') #LU teraz czytamy savestate.nc
StateVector.read_from_legacy_file(filename, 'prior')
else:
prepare_state(DaCycle, StateVector)
prepare_state(DaCycle, StateVector)#LU tutaj zamiast tego raczej to stworzenie nowej kowariancji i ensembli bo pozostale rzeczy sa na gorze i na doel.
......@@ -91,7 +91,7 @@ def start_job(DaCycle, DaSystem, DaPlatForm, StateVector, Samples, ObsOperator):
#Samples.DaCycle = DaCycle # also embed object in Samples object so it can access cycle information for I/O etc #LU cykl zostaje przypisany probkom
#ObsOperator.DaCycle = DaCycle # also embed object in ObsOperator object so it can access cycle information for I/O etc #LU cykl zostaje przypisany obsoperatorowi
ObsOperator.initialize(DaCycle) # Setup Observation Operator #LU a pote mobsoperator jest inicjalizowany
StateVector.initialize(DaCycle)
def prepare_state(DaCycle, StateVector):
""" Set up the input data for the forward model: obs and parameters/fluxes"""
......@@ -103,18 +103,14 @@ def prepare_state(DaCycle, StateVector):
logging.info(header + "starting prepare_state" + footer)
StateVector.initialize(DaCycle) #LU w prepare state inicjalizujemy wektor stanu ktoremu dopiero co przypisalismy wartosci.
#LU to jest zalezne od cyklu, i cykl pojedynczy moze miec time.restart lub moze nie miec.
if not DaCycle['time.restart']: #LU jesli jest to pierwszy cykl
if not DaCycle['time.restart']:
# Fill each week from n=1 to n=nlag with a new ensemble
nlag = StateVector.nlag #LU dla kazdego od zera do dlugosc(cykl) wyznaczamy date oraz znajdujemy kowariancje i robimy nowa ensemble.
for n in range(nlag):
date = DaCycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(DaCycle['time.cycle'])) #LU ta data jest tutaj potrzebna tylko do znalezienia odpowiedniego pliku z kowariancja.
for n in range(StateVector.nlag):
date = DaCycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(DaCycle['time.cycle']))
cov = StateVector.get_covariance(date, DaCycle)
StateVector.make_new_ensemble(n + 1, cov)
StateVector.make_new_ensemble(n, cov)
else:
......@@ -124,13 +120,12 @@ def prepare_state(DaCycle, StateVector):
StateVector.read_from_file(saved_sv) # by default will read "opt"(imized) variables, and then propagate
# Now propagate the ensemble by one cycle to prepare for the current cycle
StateVector.propagate(DaCycle)
# Finally, also write the StateVector to a file so that we can always access the a-priori information
saved_current_sv = os.path.join(DaCycle['dir.output'], 'savestate.nc')
StateVector.write_to_file(saved_current_sv, 'prior') # write prior info because StateVector.Isoptimized == False for now
current_sv = os.path.join(DaCycle['dir.output'], 'savestate.nc')
StateVector.write_to_file(current_sv, 'prior') # write prior info
def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
""" Sample the filter state for the inversion """
......@@ -149,15 +144,13 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
ObservationOperator.get_initial_data()
for lag in range(nlag): #LU no to teraz robimy te przejsica
for lag in range(nlag):
logging.info(header + ".....Ensemble Kalman Filter at lag %d" % (lag + 1))
############# Perform the actual sampling loop #####################
sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag)
############# Finished the actual sampling loop #####################
logging.debug("StateVector now carries %d samples" % StateVector.nobs)
......@@ -184,8 +177,8 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag):
StateVector.write_members_to_file(lag, DaCycle['dir.input']) #LU parameters.nc to be an input for tm5
Samples.initialize(DaCycle) #LU to daje przede wszystkim zresetowanie data list. czyli to znaczy ze data list jest za kazdym razem nowa przy odpalaniu nowego cyklu
Samples.add_observations() #LU obserwacje mamy z pliku observations, tutaj dodajemy tylko ten ich fragment ktory zawiera dany przedzial czasowy (ale ladujemy wszystko)
Samples.initialize(DaCycle)
Samples.add_observations()
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
......@@ -198,7 +191,7 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag):
# Run the observation operator
run_forecast_model(DaCycle, ObservationOperator)
ObservationOperator.run_forecast_model()
# Read forecast model samples that were written to NetCDF files by each member. Add them to the exisiting
......@@ -219,10 +212,9 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag):
# This is to make sure that the first step of the system uses all observations available, while the subsequent
# steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only
# (and at least) once # in the assimilation
#LU czyli ze zawsze zapisujemy obserwacje i robimy sampling i zawsze mamy potem flask_output. i naewt zawsze go potem wczytujemy i dodajemy do samples... hmm co?
#LU aha , samples maja zawsze zerowane data list i przypisany start i end. w samples.initialize() w niniejszej funkcji. i potem w zaleznosci od tego czy zaszly ponizsze warunki, dodajemy je do asymilacji badz nie
#LU jaki to ma sens: taki ze zawsze mamy flask_output.nc. tylko ze on jest chyba nadpisywany za kazdym cyklem? no coz.. chyba byla na ten temat dyskusja, i wyszlo ze for debugging. niech i tak zostanie.
if lag == int(DaCycle['time.nlag']) - 1 or DaCycle['time.restart'] == False:
if DaCycle['time.restart'] == False or lag == int(DaCycle['time.nlag']) - 1:
StateVector.ObsToAssimmilate += (copy.deepcopy(Samples),)
StateVector.nobs += Samples.getlength()
logging.debug("Added samples from the observation operator to the assimilated obs list in the StateVector")
......@@ -233,11 +225,11 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag):
def invert(DaCycle, StateVector, Optimizer):
""" Perform the inverse calculation """
logging.info(msg=header + "starting invert" + footer)
logging.info(header + "starting invert" + footer)
dims = (int(DaCycle['time.nlag']),
int(DaCycle['da.optimizer.nmembers']),
int(DaCycle.DaSystem['nparameters']),
StateVector.nobs)#LU to brane z informacji ile samples sie wzielo
StateVector.nobs)
if not DaCycle.DaSystem.has_key('opt.algorithm'):
logging.info("There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)")
......@@ -251,8 +243,11 @@ def invert(DaCycle, StateVector, Optimizer):
Optimizer.set_algorithm('Bulk')
Optimizer.initialize(dims)
Optimizer.state_to_matrix(StateVector)
Optimizer.write_diagnostics(DaCycle, StateVector, type='prior')
Optimizer.state_to_matrix(StateVector)
diagnostics_file = os.path.join(DaCycle['dir.diagnostics'], 'optimizer.%s.nc' % DaCycle['time.start'].strftime('%Y%m%d'))
Optimizer.write_diagnostics(diagnostics_file, 'prior')
Optimizer.set_localization('None')
if Optimizer.algorithm == 'Serial':
......@@ -261,8 +256,8 @@ def invert(DaCycle, StateVector, Optimizer):
Optimizer.bulk_minimum_least_squares()
Optimizer.matrix_to_state(StateVector)
Optimizer.write_diagnostics(DaCycle, StateVector, type='optimized')
Optimizer.write_diagnostics(diagnostics_file, 'optimized')
DaCycle.OutputFileList.append(diagnostics_file)
def advance(DaCycle, Samples, StateVector, ObservationOperator):
......@@ -276,10 +271,7 @@ def advance(DaCycle, Samples, StateVector, ObservationOperator):
ObservationOperator.get_initial_data()
sample_step(DaCycle, Samples, StateVector, ObservationOperator, 0) #LU w srodku zmienia zawartosc obiektu samples
#LU ale z drugiej strony dodaje tez pozniej samples, to tak jakby chcial na nowo dodac ...
#LU a to dlatego ze samples jest inicjalizowane na nowo.
#LU skoro w srodku sample step jest inicjalizowanie samples przez przypisanie datalist=[], to czy ten obiekt samples ma jkaies podobiekty ktore kaza mu byc przekazywanym?
sample_step(DaCycle, Samples, StateVector, ObservationOperator, 0)
DaCycle.RestartFileList.extend(ObservationOperator.RestartFileList)
DaCycle.OutputFileList.extend(ObservationOperator.OutputFileList)
......@@ -296,31 +288,12 @@ def save_and_submit(DaCycle, StateVector):
""" Save the model state and submit the next job """
logging.info(header + "starting save_and_submit" + footer)
filename = os.path.join(DaCycle['dir.output'], 'savestate.nc') #LU to jest state vector po zoptymalizowaniu.
filename = os.path.join(DaCycle['dir.output'], 'savestate.nc')
StateVector.write_to_file(filename, 'opt')
DaCycle.RestartFileList.append(filename) # write optimized info because StateVector.Isoptimized == False for now
DaCycle.RestartFileList.append(filename)
DaCycle.finalize()
def run_forecast_model(DaCycle, ObsOperator):
"""Prepare and execute a forecast step using an external Fortran model. Note that the flavor of model
used is determined in the very first line where the import statement of module "model" depends on a
setting in your da.rc file. After that choice, the module written specifically for a particular
application takes over. There are a few required variables and methods in such a module:
version [variable, string] : defines the version number of the module, e.g. "1.0"
identifier [variable, string] : identifies the model used as a string variable, e.g. "TM5"
PrepareExe (method) : A method that preps the model simulation. It can for instance create an input
parameter file for the model (tm5.rc), or execute some scripts needed before the
actual execution starts (get_meteo). After this step, a call to the model
executable should start a succesfull simulation
run (method) : Start the executable. How this is done depends on your model and might involve
submitting a string of jobs to the queue, or simply spawning a subprocess, or ...
"""
ObsOperator.prepare_run()
ObsOperator.validate_input()
ObsOperator.run()
ObsOperator.save_data()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment