Commit c8153d1b authored by karolina's avatar karolina
Browse files

minor changes before implementing things discussed in mail #3 and #4;

a lot of my helper comments, but they will disappear soon.
parent c0c34642
......@@ -208,7 +208,7 @@ class Optimizer(object):
savedict['units'] = "mol mol-1"
savedict['dims'] = dimobs
savedict['values'] = self.Hx.tolist()
savedict['comment'] = '%s mean mixing ratios based on %s state vector' % (type, type,)
savedict['comment'] = '%s mean mixing ratios based on %s state vector' % (type, type)
f.AddData(savedict)
savedict = io.std_savedict.copy()
......@@ -217,7 +217,7 @@ class Optimizer(object):
savedict['units'] = "mol mol-1"
savedict['dims'] = dimobs + dimmembers
savedict['values'] = self.HX_prime.tolist()
savedict['comment'] = '%s mixing ratio deviations based on %s state vector' % (type, type,)
savedict['comment'] = '%s mixing ratio deviations based on %s state vector' % (type, type)
f.AddData(savedict)
# Continue with prior only data
......
......@@ -180,7 +180,7 @@ class StateVector(object):
# Create a matrix for state <-> TransCom conversions
self.tcmatrix = np.zeros((self.nparams, 23,), 'float')
self.tcmatrix = np.zeros((self.nparams, 23), 'float')
for r in range(1, self.nparams + 1):
sel = (self.gridmap.flat == r).nonzero()
......
......@@ -11,6 +11,7 @@ File created on 28 Jul 2010.
import os
import sys
import logging
#from da.baseclasses.statevector import filename
sys.path.append(os.getcwd())
sys.path.append('../../')
......@@ -26,7 +27,6 @@ class CtObservations(Observation):
""" an object that holds data + methods and attributes needed to manipulate mixing ratio values """
def Initialize(self):
self.startdate = self.DaCycle['time.sample.start']
self.enddate = self.DaCycle['time.sample.end']
......@@ -80,7 +80,7 @@ class CtObservations(Observation):
lons = ncf.GetVariable('lon').take(subselect, axis=0)
alts = ncf.GetVariable('alt').take(subselect, axis=0)
obs = ncf.GetVariable('obs').take(subselect, axis=0) * 1.e-6
msg = "Converting observed values from ppm to mol/mol!!!!"; logging.info(msg)
logging.info("Converting observed values from ppm to mol/mol!!!!")
species = ncf.GetVariable('species').take(subselect, axis=0)
species = [s.tostring().lower() for s in species]
species = map(strip, species)
......@@ -314,14 +314,14 @@ class CtObservations(Observation):
self.SiteInfo = SiteInfo
def write_obs_to_file(self):
def write_obs_to_file(self,filenam="oldstyle"):
"""
Write selected information contained in the Observation object to a file.
"""
import da.tools.io4 as io
outfile = os.path.join(self.DaCycle['dir.output'], 'sampleinfo_%s.nc' % self.DaCycle['time.sample.stamp'])
outfile = os.path.join(self.DaCycle['dir.output'], 'sampleinfo_%s__%s.nc' % (self.DaCycle['time.sample.stamp'], filenam))
f = io.CT_CDF(outfile, method='create')
logging.debug('Creating new Sample output file for postprocessing (%s)' % outfile)
......@@ -346,7 +346,7 @@ class CtObservations(Observation):
savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED."
f.AddData(savedict)
data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate') ]
data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate')]
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
......
......@@ -48,7 +48,6 @@ class CtOptimizer(Optimizer):
if not self.localization:
return
if self.localizetype == 'CT2007':
tvalue = 1.97591
if np.sqrt(self.R[n, n]) >= 1.5:
for r in range(self.nlag * self.nparams):
......
......@@ -40,7 +40,7 @@ class CtStateVector(StateVector):
# Get the needed matrices from the specified covariance files
file_ocn_cov = self.DaCycle.DaSystem['ocn.covariance']
file_bio_cov = self.DaCycle.DaSystem['bio.covariance']
file_bio_cov = self.DaCycle.DaSystem['bio.covariance'] #LU logika tego to powrot do dacycle zeby potem z systemu(CT) pobrac parametr
# replace YYYY.MM in the ocean covariance file string
......
......@@ -50,7 +50,7 @@ class TM5ObservationOperator(ObservationOperator):
[]> tm=TM5('/Users/peters/Modeling/TM5/tutorial.rc')
[]> tm.WriteRc()
[]> tm.WriteRunRc()
[]> tm.Run()
[]> tm.run()
To use this class inside a data assimilation cycle, a stand-alone method "Initialize()" is included which modifies the TM5
settings according to an external dictionary of values to overwrite, and then runs the TM5 model.
......@@ -157,7 +157,7 @@ class TM5ObservationOperator(ObservationOperator):
else:
logging.info('Compilation successful, continuing')
def PrepareRun(self):
def prepare_run(self):
"""
Prepare a forward model TM5 run, this consists of:
......@@ -173,15 +173,15 @@ class TM5ObservationOperator(ObservationOperator):
# Write a modified TM5 model rc-file in which run/break times are defined by our da system
NewItems = {
'submit.options' : DaPlatForm.ReturnBlockingFlag() ,
self.timestartkey : self.DaCycle['time.sample.start'] ,
self.timefinalkey : self.DaCycle['time.sample.end'] ,
'jobstep.timerange.start' : self.DaCycle['time.sample.start'] ,
'jobstep.timerange.end' : self.DaCycle['time.sample.end'] ,
'jobstep.length' : 'inf' ,
'ct.params.input.dir' : self.DaCycle['dir.input'] ,
'ct.params.input.file' : os.path.join(self.DaCycle['dir.input'], 'parameters') ,
'output.flask.infile' : self.DaCycle['ObsOperator.inputfile']
'submit.options': DaPlatForm.ReturnBlockingFlag(),
self.timestartkey: self.DaCycle['time.sample.start'],
self.timefinalkey: self.DaCycle['time.sample.end'],
'jobstep.timerange.start': self.DaCycle['time.sample.start'],
'jobstep.timerange.end': self.DaCycle['time.sample.end'],
'jobstep.length': 'inf',
'ct.params.input.dir': self.DaCycle['dir.input'],
'ct.params.input.file': os.path.join(self.DaCycle['dir.input'], 'parameters'),
'output.flask.infile': self.DaCycle['ObsOperator.inputfile']
}
if self.DaCycle['time.restart']: # If this is a restart from a previous cycle, the TM5 model should do a restart
......@@ -190,6 +190,10 @@ class TM5ObservationOperator(ObservationOperator):
else:
NewItems[self.istartkey] = self.coldstartvalue # if not, start TM5 'cold'
logging.debug('Resetting TM5 to perform cold start')
#LU to ponizej: po prostu jesli to jest inny lag niz pierwszy
#LU czyli: restart wtedy kiedy albo time.restart = true czyli w kazdym nastepnym cyklu, albo kiedy jest to nie pierwszy step w sample step. tylko wtedy to skad robimy restart sie rozni...czy nie , bo robimy po advance?
#LU z drugiej strony to time.sample.window to nic innego jak lag.
#LU ale: nowy RC zapisywany jst przy koncu cyklu. czyli time. sample.window bedzie zawsze 0.
if self.DaCycle['time.sample.window'] != 0: # If this is a restart from a previous time step within the filter lag, the TM5 model should do a restart
NewItems[self.istartkey] = self.restartvalue
......@@ -200,7 +204,7 @@ class TM5ObservationOperator(ObservationOperator):
self.ModifyRC(NewItems)
self.WriteRc(self.RcFileName)
def LoadRc(self, RcFileName):
def LoadRc(self, RcFileName):#LU tutaj bylo wczesniej rcfIletype. to znaczy ze pewnie zawsze bylo pycasso. bo wpp byloby blad.
"""
This method loads a TM5 rc-file with settings for this simulation
"""
......@@ -209,11 +213,11 @@ class TM5ObservationOperator(ObservationOperator):
self.rcfile = rc.RcFile(RcFileName)
self.tm_settings = self.rcfile.values
self.RcFileName = RcFileName
self.Tm5RcLoaded = True
self.Tm5RcLoaded = True #LU to wyglada na zupelnie niepotrzebne
if 'my.source.dirs' in self.tm_settings.keys():
self.RcFileType = 'pycasso'
else:
self.RcfIleType = 'pre-pycasso'
self.RcFileType = 'pre-pycasso'
logging.debug('TM5 rc-file loaded successfully')
def ValidateRc(self):
......@@ -247,12 +251,12 @@ class TM5ObservationOperator(ObservationOperator):
self.coldstartvalue = 9
needed_rc_items = [
self.projectkey ,
self.rundirkey ,
self.outputdirkey ,
self.savedirkey ,
self.timestartkey ,
self.timefinalkey ,
self.projectkey,
self.rundirkey,
self.outputdirkey,
self.savedirkey,
self.timestartkey,
self.timefinalkey,
self.timelengthkey,
self.istartkey
]
......@@ -296,7 +300,7 @@ class TM5ObservationOperator(ObservationOperator):
self.tm_settings[k] = v
#replace all instances of old with new, but only if it concerns a name of a path!!!
if os.path.exists(str(v)):
for k_old, v_old in self.tm_settings.iteritems():
for k_old, v_old in self.tm_settings.iteritems(): #LU nie wydaje mi sie zeby to powyzej mialo sens, bo wowczas v jest tym samym co v_old
if not isinstance(v_old, str):
continue
if str(v_orig) in str(v_old):
......@@ -322,7 +326,8 @@ class TM5ObservationOperator(ObservationOperator):
"""
Make sure that parameter files are written to the TM5 inputdir, and that observation lists are present
"""
#LU tutaj generalnie sprawdzamy czy input sie zgadza, czyli czy istnieja wszystkie pliki parametrow oraz plik obserwacji wskazywany przez dacycle[obsopertor.inputfile]
#LU a potem jeszcze sprawdzmay czy jest odpowiedni executable, z tym ze nie bardzo wiem skad sie go bierze.
datadir = self.tm_settings['ct.params.input.dir']
if not os.path.exists(datadir):
msg = "The specified input directory for the TM5 model to read from does not exist (%s), exiting..." % datadir
......@@ -360,7 +365,8 @@ class TM5ObservationOperator(ObservationOperator):
logging.error("Please compile the model with the specified rc-file and the regular TM5 scripts first")
raise IOError
#LU zmienic definicje w dokumentacji bo mowi o save.hdf podczas gdy juz takiego nie ma.
#LU kopiujemy tm5-owe restarty , chociaz w sumie juz je skopiowalismy wczesniej
def get_initial_data(self):
""" This method places all initial data needed by an ObservationOperator in the proper folder for the model.
For TM5, this means copying the save_*.hdf* files to the dir.save directory from which TM5 will read initial
......@@ -398,14 +404,14 @@ class TM5ObservationOperator(ObservationOperator):
def Run(self):
def run(self):
"""
Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is
spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and
only if successfull on all processors will execution of the shell continue.
"""
#LU mam wrazneie ze ten status sie i tak d oniczego nie przydaje.
cwd = os.getcwd()
# From here on, several options should be implemented.
......@@ -496,7 +502,7 @@ class TM5ObservationOperator(ObservationOperator):
return code
#LU nie iwem czy to dobrze zadziala z tym kodem bo moze jednak sa lepsze sposoby sprawdzenia statusu...
def TM5_With_N_tracers(self):
""" Method handles the case where one TM5 model instance with N tracers does the sampling of all ensemble members"""
import datetime
......@@ -640,7 +646,7 @@ if __name__ == "__main__":
tm = TM5ObservationOperator()
tm.Setup()
tm.Initialize()
tm.Run()
tm.run()
tm.save_data()
......
......@@ -106,15 +106,6 @@ class CycleControl(dict):
self.RestartFileList = [] # List of files needed for restart, to be extended later
self.OutputFileList = [] # List of files needed for output, to be extended later
def __str__(self):
msg = "==============================================================="
msg += "DA Cycle rc-file is %s" % self.RcFileName
msg += "DA Cycle run directory is %s" % self['dir.da_run']
msg += "DA Cycle inverse system is %s" % self['da.system']
msg += "DA Cycle obs operator is %s" % self['da.obsoperator']
msg += "==============================================================="
return msg
def LoadRc(self, RcFileName):
"""
......@@ -143,7 +134,8 @@ class CycleControl(dict):
self[k] = True
if v in ['False', 'false', 'f', 'F', 'n', 'no']:
self[k] = False
if 'date' in k : self[k] = ToDatetime(v)
if 'date' in k :
self[k] = ToDatetime(v)
if 'time.start' in k :
self[k] = ToDatetime(v)
if 'time.end' in k :
......@@ -194,7 +186,7 @@ class CycleControl(dict):
logging.info("===============================================================")
logging.info("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M'))
logging.info("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M'))
logging.info("DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M'))
logging.info("DA Cycle final date is %s" % finaldate.strftime('%Y-%m-%d %H:%M'))
logging.info("DA Cycle cycle length is %s" % cyclelength)
logging.info("DA Cycle restart is %s" % str(self['time.restart']))
......@@ -404,7 +396,7 @@ class CycleControl(dict):
CreateDirs(os.path.join(self['dir.restart.oneago']))
logging.info('Succesfully created the file structure for the assimilation job')
#LU tutaj chyba brakuje move restart data
def recover_run(self):
"""
Prepare a recovery from a crashed run. This consists of:
......@@ -432,7 +424,7 @@ class CycleControl(dict):
logging.debug("Next cycle start date is %s" % self['time.start'])
# Copy randomseed.pickle file to exec dir
source = os.path.join(self['dir.restart.current'], 'randomseed.pickle')
source = os.path.join(self['dir.restart.current'], 'randomseed.pickle') #LU wydaje mi sie ze tutaj nie trzeba podawac nazwy pliku w folderze docelowym, jesli sie obczai ze to folder to sie kopiuje.
dest = os.path.join(self['dir.exec'], 'randomseed.pickle')
shutil.copy(source, dest)
......@@ -564,7 +556,7 @@ class CycleControl(dict):
if io_option == 'store':
CreateDirs(os.path.join(targetdir), forceclean=True)
logging.debug("Performing a %s of data" % (io_option))
logging.debug("Performing a %s of data" % io_option)
logging.debug(" from directory: %s " % sourcedir)
logging.debug(" to directory: %s " % targetdir)
......@@ -610,7 +602,7 @@ class CycleControl(dict):
# The rest is info needed for a system restart, so it modifies the current DaCycle object (self)
self['da.restart.fname'] = fname # needed for next job template
self.RestartFileList.extend([fname]) # current restart list holds next rc file name
self.RestartFileList.append(fname) # current restart list holds next rc file name
logging.debug('Added da_runtime.rc to the RestartFileList for later collection')
......@@ -635,25 +627,23 @@ class CycleControl(dict):
"""
from string import join
DaPlatForm = self.DaPlatForm
if self['time.end'] < self['time.finish']:
# file ID and names
jobid = self['time.end'].strftime('%Y%m%d')
targetdir = os.path.join(self['dir.exec'])
jobfile = os.path.join(targetdir, 'jb.%s.jb' % jobid)
logfile = jobfile.replace('.jb', '.log')
logfile = os.path.join(targetdir, 'jb.%s.log' % jobid)
#LU tutaj sa parametry ktore ida na gore do pliku job. nie zawsze koniecznie potrzebne.
# Template and commands for job
jobparams = {'jobname':"j.%s" % jobid, 'jobtime':'24:00:00', 'logfile':logfile, 'errfile':logfile}
template = DaPlatForm.get_job_template(jobparams)
jobparams = {'jobname':"j.%s" % jobid, 'jobtime':'06:00:00', 'logfile': logfile, 'errfile': logfile}
template = self.DaPlatForm.get_job_template(jobparams)
execcommand = os.path.join(self['dir.da_submit'], sys.argv[0])
template += 'python %s rc=%s %s' % (execcommand, self['da.restart.fname'], join(self.opts, ''),)
template += 'python %s rc=%s %s' % (execcommand, self['da.restart.fname'], join(self.opts, ''))
# write and submit
DaPlatForm.write_job(jobfile, template, jobid)
jobid = DaPlatForm.submit_job(jobfile, joblog=logfile)
self.DaPlatForm.write_job(jobfile, template, jobid)
jobid = self.DaPlatForm.submit_job(jobfile, joblog=logfile)
else:
logging.info('Final date reached, no new cycle started')
......@@ -706,7 +696,7 @@ def ParseOptions():
logging.root.setLevel(logging.DEBUG)
if opts:
optslist = [item[0] for item in opts]
optslist = [item[0] for item in opts] #LU ze co same minusy zwroci?
else:
optslist = []
......
......@@ -41,14 +41,14 @@ def EnsembleSmootherPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector,
def start_job(DaCycle, DaSystem, DaPlatForm, StateVector, Samples, ObsOperator):
""" Set up the job specific directory structure and create an expanded rc-file """
DaSystem.Validate()
DaCycle.DaSystem = DaSystem
DaCycle.DaPlatForm = DaPlatForm
DaCycle.Initialize()
StateVector.DaCycle = DaCycle # also embed object in StateVector so it can access cycle information for I/O etc
Samples.DaCycle = DaCycle # also embed object in Samples object so it can access cycle information for I/O etc
ObsOperator.DaCycle = DaCycle # also embed object in ObsOperator object so it can access cycle information for I/O etc
ObsOperator.Initialize() # Setup Observation Operator
DaSystem.Validate() #LU tylko sprawdza needed rc items in the file
DaCycle.DaSystem = DaSystem #LU przypisuje dacyclowi liste parametrow
DaCycle.DaPlatForm = DaPlatForm #LU przypisuje cyklowi platforme (tez liste parametrow)
DaCycle.Initialize() #LU nastepnie cykl zostaje inicjalizowany...bardzo logiczne
StateVector.DaCycle = DaCycle # also embed object in StateVector so it can access cycle information for I/O etc #LU cykl zostaje przypisany state vectorowi
Samples.DaCycle = DaCycle # also embed object in Samples object so it can access cycle information for I/O etc #LU cykl zostaje przypisany probkom
ObsOperator.DaCycle = DaCycle # also embed object in ObsOperator object so it can access cycle information for I/O etc #LU cykl zostaje przypisany obsoperatorowi
ObsOperator.Initialize() # Setup Observation Operator #LU a pote mobsoperator jest inicjalizowany
def prepare_state(DaCycle, StateVector):
......@@ -61,15 +61,16 @@ def prepare_state(DaCycle, StateVector):
logging.info(header + "starting prepare_state" + footer)
StateVector.Initialize()
StateVector.Initialize() #LU w prepare state inicjalizujemy wektor stanu ktoremu dopiero co przypisalismy wartosci.
#LU to jest zalezne od cyklu, i cykl pojedynczy moze miec time.restart lub moze nie miec.
if not DaCycle['time.restart']:
if not DaCycle['time.restart']: #LU jesli jest to pierwszy cykl
# Fill each week from n=1 to n=nlag with a new ensemble
nlag = StateVector.nlag
nlag = StateVector.nlag #LU dla kazdego od zera do dlugosc(cykl) wyznaczamy date oraz znajdujemy kowariancje i robimy nowa ensemble.
for n in range(0, nlag):
date = DaCycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(DaCycle['time.cycle']))
date = DaCycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(DaCycle['time.cycle'])) #LU ta data jest tutaj potrzebna tylko do znalezienia odpowiedniego pliku z kowariancja.
cov = StateVector.get_covariance(date)
StateVector.make_new_ensemble(n + 1, cov)
......@@ -77,9 +78,8 @@ def prepare_state(DaCycle, StateVector):
# Read the StateVector data from file
savedir = DaCycle['dir.restart.current']
filtertime = DaCycle['time.start'].strftime('%Y%m%d')
filename = os.path.join(savedir, 'savestate.nc')
filename = os.path.join(DaCycle['dir.restart.current'], 'savestate.nc') #LU teraz czytamy savestate.nc
StateVector.ReadFromFile(filename) # by default will read "opt"(imized) variables, and then propagate
......@@ -89,8 +89,8 @@ def prepare_state(DaCycle, StateVector):
# Finally, also write the StateVector to a file so that we can always access the a-priori information
savedir = DaCycle['dir.output']
filename = os.path.join(savedir, 'savestate.nc')
filename = os.path.join(DaCycle['dir.output'], 'savestate.nc')
StateVector.WriteToFile(filename) # write prior info because StateVector.Isoptimized == False for now
......@@ -107,19 +107,19 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
#msg = "All restart data have been copied to the save/tmp directory for future use" ; logging.debug(msg)
logging.info(header + "starting sample_state" + footer)
nlag = int(DaCycle['time.nlag'])
logging.info("Sampling model will be run over %d cycles" % nlag)
logging.info("Sampling model will be run over %d cycles" % nlag) #LU w ramach modelu jest 3 cykle, czyli 3 przejscia. jednoczesnie w ramach kazdego cyklu idzie sie 3 tygodnie do przodu
for lag in range(nlag):
logging.info(header + ".....Ensemble Kalman Filter at lag %d" % (int(lag) + 1,))
for lag in range(nlag): #LU no to teraz robimy te przejsica
logging.info(header + ".....Ensemble Kalman Filter at lag %d" % (lag + 1))
# If we are going to sample the lag = 0 time step, place the needed files to run the transport model in the right folder first
if lag == 0:
if lag == 0: #LU w pierwszym tygodniu danego cyklu bierzemy initial data dla tm5, tzn jakies pewnie ustawienia.
ObservationOperator.get_initial_data()
############# Perform the actual sampling loop #####################
sample_one_cycle(DaCycle, Samples, StateVector, ObservationOperator, lag)
sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag)
############# Finished the actual sampling loop #####################
......@@ -133,7 +133,7 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
DaCycle.OutputFileList.extend(ObservationOperator.OutputFileList)
logging.debug("Appended ObsOperator restart and output file lists to DaCycle for collection ")
DaCycle.OutputFileList.extend([DaCycle['ObsOperator.inputfile']])
DaCycle.OutputFileList.append(DaCycle['ObsOperator.inputfile'])
logging.debug("Appended Observation filename to DaCycle for collection ")
......@@ -141,7 +141,7 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
# sub-sampling of time series, vertical averaging, etc.
def sample_one_cycle(DaCycle, Samples, StateVector, ObservationOperator, lag):
def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, isadvance=False):
""" Perform all actions needed to sample one cycle """
import copy
......@@ -151,7 +151,7 @@ def sample_one_cycle(DaCycle, Samples, StateVector, ObservationOperator, lag):
startdate = DaCycle['time.sample.start']
enddate = DaCycle['time.sample.end']
DaCycle['time.sample.window'] = lag
DaCycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H"),)
DaCycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H"))
logging.info("New simulation interval set : ")
logging.info(" start date : %s " % startdate.strftime('%F %H:%M'))
......@@ -162,15 +162,15 @@ def sample_one_cycle(DaCycle, Samples, StateVector, ObservationOperator, lag):
# Implement something that writes the ensemble member parameter info to file, or manipulates them further into the
# type of info needed in your transport model
StateVector.write_members_to_file(lag + 1)
StateVector.write_members_to_file(lag + 1) #LU parameters.nc
Samples.Initialize()
Samples.Initialize() #LU to daje przede wszystkim zresetowanie data list. czyli to znaczy ze data list jest za kazdym razem nowa przy odpalaniu nowego cyklu
Samples.add_observations()
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
Samples.add_model_data_mismatch()
filename = Samples.write_sample_info()
Samples.add_model_data_mismatch()
filename = Samples.write_sample_info() #LU observations.nc
# Write filename to DaCycle, and to output collection list
......@@ -206,13 +206,16 @@ def sample_one_cycle(DaCycle, Samples, StateVector, ObservationOperator, lag):
# steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only
# (and at least) once # in the assimilation
if lag == int(DaCycle['time.nlag']) - 1 or DaCycle['time.restart'] == False :
if lag == int(DaCycle['time.nlag']) - 1 or DaCycle['time.restart'] == False:
StateVector.ObsToAssimmilate += (copy.deepcopy(Samples),)
StateVector.nobs += Samples.getlength()
logging.debug("Added samples from the observation operator to the assimilated obs list in the StateVector")
else:
StateVector.ObsToAssimmilate += (None,)
#LU tego nie ma w oryginalnym kodzie
if isadvance:
Samples.write_obs_to_file("newstyle")
def invert(DaCycle, StateVector, Optimizer):
......@@ -221,7 +224,7 @@ def invert(DaCycle, StateVector, Optimizer):
dims = (int(DaCycle['time.nlag']),
int(DaCycle['da.optimizer.nmembers']),
int(DaCycle.DaSystem['nparameters']),
StateVector.nobs,)
StateVector.nobs)
Optimizer.Initialize(dims)
Optimizer.state_to_matrix(StateVector)
......@@ -253,7 +256,10 @@ def advance(DaCycle, Samples, StateVector, ObservationOperator):
# Then, restore model state from the start of the filter
logging.info(header + "starting advance" + footer)
logging.info("Sampling model will be run over 1 cycle")
sample_one_cycle(DaCycle, Samples, StateVector, ObservationOperator, 0)
sample_step(DaCycle, Samples, StateVector, ObservationOperator, 0, True) #LU w srodku zmienia zawartosc obiektu samples
#LU ale z drugiej strony dodaje tez pozniej samples, to tak jakby chcial na nowo dodac ...
#LU a to dlatego ze samples jest inicjalizowane na nowo.
#LU skoro w srodku sample step jest inicjalizowanie samples przez przypisanie datalist=[], to czy ten obiekt samples ma jkaies podobiekty ktore kaza mu byc przekazywanym?
# Write info from optimized flux cycle to a file
Samples.write_obs_to_file()
......@@ -263,7 +269,7 @@ def save_and_submit(DaCycle, StateVector):
""" Save the model state and submit the next job """
logging.info(header + "starting save_and_submit" + footer)
savedir = DaCycle['dir.output']
filename = os.path.join(savedir, 'savestate.nc')
filename = os.path.join(savedir, 'savestate.nc') #LU to jest state vector po zoptymalizowaniu.
StateVector.WriteToFile(filename)
DaCycle.RestartFileList.append(filename) # write optimized info because StateVector.Isoptimized == False for now
......@@ -281,13 +287,13 @@ def run_forecast_model(DaCycle, ObsOperator):
parameter file for the model (tm5.rc), or execute some scripts needed before the
actual execution starts (get_meteo). After this step, a call to the model
executable should start a succesfull simulation
Run (method) : Start the executable. How this is done depends on your model and might involve
run (method) : Start the executable. How this is done depends on your model and might involve
submitting a string of jobs to the queue, or simply spawning a subprocess, or ...
"""
ObsOperator.PrepareRun()
ObsOperator.prepare_run()
ObsOperator.validate_input()
ObsOperator.Run()
ObsOperator.run()
ObsOperator.save_data()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment