Commit 298dfacf authored by karolina's avatar karolina
Browse files

reverted to r1186 (the last one from 2012), with changes in observation files...

reverted to r1186 (the last one from 2012), with changes in observation files from r1192-1194 preserved
parent 8e0ac6f9
......@@ -45,7 +45,7 @@ class Observation(object):
"""
def __init__(self, DaCycle=None):#LU ten dacycle nigdy sie nie przydaje. jesli jest to generyczny typ, to czy w ogole do czegokolwiek jest potrzebny ten init? w praktyce?
def __init__(self, DaCycle=None):
"""
create an object with an identifier, version, and an empty ObservationList
"""
......
......@@ -143,7 +143,7 @@ class Optimizer(object):
StateVector.isOptimized = True
logging.debug('Returning optimized data to the StateVector, setting "StateVector.isOptimized = True" ')
def write_diagnostics(self, diagdir, timestart, StateVector, type='prior'):
def write_diagnostics(self, DaCycle, StateVector, type='prior'):
"""
Open a NetCDF file and write diagnostic output from optimization process:
......@@ -160,9 +160,9 @@ class Optimizer(object):
import da.tools.io4 as io
#import da.tools.io as io
filename = os.path.join(diagdir, 'optimizer.%s.nc' % timestart.strftime('%Y%m%d'))
#DaCycle.OutputFileList.append(filename)
outdir = DaCycle['dir.diagnostics']
filename = os.path.join(outdir, 'optimizer.%s.nc' % DaCycle['time.start'].strftime('%Y%m%d'))
DaCycle.OutputFileList.append(filename)
# Open or create file
......@@ -294,7 +294,7 @@ class Optimizer(object):
f.close()
logging.debug('Diagnostics file closed')
return filename
def serial_minimum_least_squares(self):
""" Make minimum least squares solution by looping over obs"""
......
......@@ -114,16 +114,20 @@ class StateVector(object):
"""
def __init__(self):
def __init__(self, DaCycle=None):
self.Identifier = identifier
self.Version = version
# The following code allows the object to be initialized with a DaCycle object already present. Otherwise, it can
# be added at a later moment.
if DaCycle != None:
self.DaCycle = DaCycle
else:
self.DaCycle = {}
logging.info('Statevector object initialized: %s' % self.Identifier)
def Initialize(self, newrc):
def Initialize(self):
"""
Initialize the object by specifying the dimensions.
There are two major requirements for each statvector that you want to build:
......@@ -134,9 +138,9 @@ class StateVector(object):
An example is given below.
"""
self.nlag = int(newrc['time.nlag'])
self.nmembers = int(newrc['da.optimizer.nmembers'])
self.nparams = int(newrc['nparameters'])
self.nlag = int(self.DaCycle['time.nlag'])
self.nmembers = int(self.DaCycle['da.optimizer.nmembers'])
self.nparams = int(self.DaCycle.DaSystem['nparameters'])
self.nobs = 0
self.isOptimized = False
......@@ -154,14 +158,14 @@ class StateVector(object):
# This specifies the file to read with the gridded mask at 1x1 degrees. Each gridbox holds a number that specifies the parametermember
# that maps onto it. From this map, a dictionary is created that allows a reverse look-up so that we can map parameters to a grid.
mapfile = os.path.join(newrc['regionsfile'])
mapfile = os.path.join(self.DaCycle.DaSystem['regionsfile'])
ncf = io.CT_Read(mapfile, 'read')
self.gridmap = ncf.GetVariable('regions')
self.tcmap = ncf.GetVariable('transcom_regions')
ncf.close()
logging.debug("A TransCom map on 1x1 degree was read from file %s" % mapfile)
logging.debug("A parameter map on 1x1 degree was read from file %s" % mapfile)
logging.debug("A TransCom map on 1x1 degree was read from file %s" % self.DaCycle.DaSystem['regionsfile'])
logging.debug("A parameter map on 1x1 degree was read from file %s" % self.DaCycle.DaSystem['regionsfile'])
# Create a dictionary for state <-> gridded map conversions
......@@ -278,7 +282,7 @@ class StateVector(object):
logging.debug('%d new ensemble members were added to the state vector # %d' % (self.nmembers, lag))
def propagate(self, timestart, cyclen):
def propagate(self):
"""
:rtype: None
......@@ -297,8 +301,8 @@ class StateVector(object):
self.EnsembleMembers.append([])
# And now create a new time step of mean + members for n=nlag
date = timestart + timedelta(days=(self.nlag - 0.5) * int(cyclen))
cov = self.get_covariance(date) #LU tutaj powinny byc parametry do get_covariance
date = self.DaCycle['time.start'] + timedelta(days=(self.nlag - 0.5) * int(self.DaCycle['time.cycle']))
cov = self.get_covariance(date)
self.make_new_ensemble(self.nlag, cov)
logging.info('The state vector has been propagated by one cycle')
......@@ -322,7 +326,7 @@ class StateVector(object):
#import da.tools.io4 as io
#import da.tools.io as io
if not self.isOptimized: #LU tutaj bedzie zmiana gdy sv bedzie mial date.
if not self.isOptimized:
f = io.CT_CDF(filename, method='create')
logging.debug('Creating new StateVector output file (%s)' % filename)
qual = 'prior'
......@@ -397,7 +401,7 @@ class StateVector(object):
logging.info('Successfully read the State Vector from file (%s) ' % filename)
def write_members_to_file(self, lag, outdir):
def write_members_to_file(self, lag):
"""
:param: lag: Which lag step of the filter to write, must lie in range [1,...,nlag]
:rtype: None
......@@ -419,7 +423,7 @@ class StateVector(object):
#import da.tools.io as io
#import da.tools.io4 as io
outdir = self.DaCycle['dir.input']
members = self.EnsembleMembers[lag - 1]
for mem in members:
......
......@@ -25,7 +25,7 @@ version = '0.0'
class CtStateVector(StateVector):
""" This is a StateVector object for CarbonTracker. It has a private method to make new ensemble members """
def get_covariance(self, date, params):
def get_covariance(self, date):
""" Make a new ensemble from specified matrices, the attribute lag refers to the position in the state vector.
Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below.
The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag]
......@@ -39,8 +39,8 @@ class CtStateVector(StateVector):
# Get the needed matrices from the specified covariance files
file_ocn_cov = params['ocn.covariance']
file_bio_cov = params['bio.covariance'] #LU logika tego to powrot do dacycle zeby potem z systemu(CT) pobrac parametr
file_ocn_cov = self.DaCycle.DaSystem['ocn.covariance']
file_bio_cov = self.DaCycle.DaSystem['bio.covariance'] #LU logika tego to powrot do dacycle zeby potem z systemu(CT) pobrac parametr
# replace YYYY.MM in the ocean covariance file string
......
......@@ -62,9 +62,6 @@ from da.tools.pipeline import header,footer
msg = header+"Entering Pipeline "+footer ; logging.info(msg)
import da.tools.toolkit
da.tools.toolkit.read_RC(args['rc'])
EnsembleSmootherPipeline(DaCycle,PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer)
......
......@@ -58,7 +58,7 @@ class TM5ObservationOperator(ObservationOperator):
"""
def __init__(self, RcFileName):
def __init__(self, RcFileName, DaCycle=None):
""" The instance of an TMObservationOperator is application dependent """
self.Identifier = identifier # the identifier gives the model name
self.Version = version # the model version used
......@@ -73,6 +73,10 @@ class TM5ObservationOperator(ObservationOperator):
# The following code allows the object to be initialized with a DaCycle object already present. Otherwise, it can
# be added at a later moment.
if DaCycle != None:
self.DaCycle = DaCycle
else:
self.DaCycle = {}
logging.info('Observation Operator initialized: %s (%s)' % (self.Identifier, self.Version))
......@@ -153,8 +157,7 @@ class TM5ObservationOperator(ObservationOperator):
else:
logging.info('Compilation successful, continuing')
def prepare_run(self, stepstart, stepend): #LU time.sample start i stop, dir.input, obsop inputfile, restart i window, + platform
#LU mogloby byc ze start i end jkao parametr, directories i inne ze slownika przekazanego jkao parametr
def prepare_run(self):
"""
Prepare a forward model TM5 run, this consists of:
......@@ -181,7 +184,7 @@ class TM5ObservationOperator(ObservationOperator):
'output.flask.infile': self.DaCycle['ObsOperator.inputfile'] ,
'output.flask': 'True'
}
#LU istartkey jest zawsze restart z wyjatkiem pirewszego w pierwszym
if self.DaCycle['time.restart']: # If this is a restart from a previous cycle, the TM5 model should do a restart
NewItems[self.istartkey] = self.restartvalue
logging.debug('Resetting TM5 to perform restart')
......@@ -194,9 +197,6 @@ class TM5ObservationOperator(ObservationOperator):
#LU ale: nowy RC zapisywany jst przy koncu cyklu. czyli time. sample.window bedzie zawsze 0.
if self.DaCycle['time.sample.window'] != 0: # If this is a restart from a previous time step within the filter lag, the TM5 model should do a restart
#LU zamiast tego zrobic to jako parametr obsoperatora - current lag, albo w ogole tm restart true false - nie mnozmy bytow ponad potrzebe
NewItems[self.istartkey] = self.restartvalue
logging.debug('Resetting TM5 to perform restart')
......@@ -221,7 +221,7 @@ class TM5ObservationOperator(ObservationOperator):
self.RcFileType = 'pre-pycasso'
logging.debug('TM5 rc-file loaded successfully')
def ValidateRc(self): #LU nic
def ValidateRc(self):
"""
Validate the contents of the tm_settings dictionary and add extra values. The required items for the TM5 rc-file
are specified in the tm5_tools module, as dictionary variable "needed_rc_items".
......@@ -284,7 +284,7 @@ class TM5ObservationOperator(ObservationOperator):
logging.debug('rc-file has been validated succesfully')
def ModifyRC(self, NewValues): #LU nic
def ModifyRC(self, NewValues):
"""
Modify parts of the tm5 settings, for instance to give control of file locations to the DA shell
instead of to the tm5.rc script.
......@@ -315,7 +315,7 @@ class TM5ObservationOperator(ObservationOperator):
logging.debug('Added new tm5 rc-item %s ' % k)
def WriteRc(self, tm5rcfilename): #LU nic
def WriteRc(self, tm5rcfilename):
"""
Write the rc-file settings to a tm5.rc file in the rundir
"""
......@@ -323,7 +323,7 @@ class TM5ObservationOperator(ObservationOperator):
rc.write(tm5rcfilename, self.tm_settings)
logging.debug("Modified rc file for TM5 written (%s)" % tm5rcfilename)
def validate_input(self): #LU tylko inputfile, nmembers
def validate_input(self):
"""
Make sure that parameter files are written to the TM5 inputdir, and that observation lists are present
"""
......@@ -366,8 +366,9 @@ class TM5ObservationOperator(ObservationOperator):
logging.error("Please compile the model with the specified rc-file and the regular TM5 scripts first")
raise IOError
#LU zmienic definicje w dokumentacji bo mowi o save.hdf podczas gdy juz takiego nie ma.
#LU kopiujemy tm5-owe restarty , chociaz w sumie juz je skopiowalismy wczesniej
def get_initial_data(self, sourcedir): #LU tylko dir.restrt.curret jkao miejsce z ktorego zbiore pliki typu "tm5_restart"
def get_initial_data(self):
""" This method places all initial data needed by an ObservationOperator in the proper folder for the model.
For TM5, this means copying the save_*.hdf* files to the dir.save directory from which TM5 will read initial
concentrations for all tracers.
......@@ -382,7 +383,7 @@ class TM5ObservationOperator(ObservationOperator):
# First get the restart data for TM5 from the current restart dir of the filter
# sourcedir = self.DaCycle['dir.restart.current']
sourcedir = self.DaCycle['dir.restart.current']
targetdir = self.tm_settings[self.savedirkey]
self.outputdir = self.tm_settings[self.outputdirkey] #IV test
......@@ -404,7 +405,7 @@ class TM5ObservationOperator(ObservationOperator):
def run(self): #LU nic
def run(self):
"""
Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is
spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and
......@@ -447,7 +448,7 @@ class TM5ObservationOperator(ObservationOperator):
return code
def TM5_under_mpirun(self): #LU nmembers, dir.jobs, dier.exec, plus platform
def TM5_under_mpirun(self):
""" Method handles the case where a shell runs an MPI process that forks into N TM5 model instances """
import datetime
......@@ -503,7 +504,7 @@ class TM5ObservationOperator(ObservationOperator):
return code
#LU nie iwem czy to dobrze zadziala z tym kodem bo moze jednak sa lepsze sposoby sprawdzenia statusu...
def TM5_With_N_tracers(self): #LU nic
def TM5_With_N_tracers(self):
""" Method handles the case where one TM5 model instance with N tracers does the sampling of all ensemble members"""
import datetime
import subprocess
......@@ -544,7 +545,7 @@ class TM5ObservationOperator(ObservationOperator):
return code
def save_data(self): #LU tylko stamp - do wybrania odpowiendich w outpucie tm5 to collect
def save_data(self):
""" Copy the TM5 recovery data from the outputdir to the TM5 savedir, also add the restart files to a list of names
that is used by the DaCycle object to collect restart data for the filter.
......
......@@ -19,7 +19,7 @@ import datetime
header = '\n\n *************************************** '
footer = ' *************************************** \n '
def EnsembleSmootherPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperator, Optimizer, newrc):
def EnsembleSmootherPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperator, Optimizer):
""" The main point of entry for the pipeline """
sys.path.append(os.getcwd())
......@@ -27,11 +27,11 @@ def EnsembleSmootherPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector,
start_job(DaCycle, DaSystem, PlatForm, StateVector, Samples, ObsOperator)
prepare_state(DaCycle, StateVector)
sample_state(DaCycle, Samples, StateVector, ObsOperator, newrc)
sample_state(DaCycle, Samples, StateVector, ObsOperator)
invert(DaCycle, StateVector, Optimizer)
advance(DaCycle, Samples, StateVector, ObsOperator, newrc)
advance(DaCycle, Samples, StateVector, ObsOperator)
save_and_submit(DaCycle, StateVector)
logging.info("Cycle finished...exiting pipeline")
......@@ -41,22 +41,17 @@ def EnsembleSmootherPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector,
def start_job(DaCycle, DaSystem, DaPlatForm, StateVector, Samples, ObsOperator):
""" Set up the job specific directory structure and create an expanded rc-file """
DaSystem.Validate()
DaCycle.DaSystem = DaSystem
DaCycle.DaPlatForm = DaPlatForm
DaCycle.Initialize()
#StateVector.DaCycle = DaCycle # also embed object in StateVector so it can access cycle information for I/O etc
#Samples.DaCycle = DaCycle # also embed object in Samples object so it can access cycle information for I/O etc
ObsOperator.DaCycle = DaCycle # also embed object in ObsOperator object so it can access cycle information for I/O etc
ObsOperator.Initialize() # Setup Observation Operator
DaSystem.Validate() #LU tylko sprawdza needed rc items in the file
DaCycle.DaSystem = DaSystem #LU przypisuje dacyclowi liste parametrow
DaCycle.DaPlatForm = DaPlatForm #LU przypisuje cyklowi platforme (tez liste parametrow)
DaCycle.Initialize() #LU nastepnie cykl zostaje inicjalizowany...bardzo logiczne
StateVector.DaCycle = DaCycle # also embed object in StateVector so it can access cycle information for I/O etc #LU cykl zostaje przypisany state vectorowi
Samples.DaCycle = DaCycle # also embed object in Samples object so it can access cycle information for I/O etc #LU cykl zostaje przypisany probkom
ObsOperator.DaCycle = DaCycle # also embed object in ObsOperator object so it can access cycle information for I/O etc #LU cykl zostaje przypisany obsoperatorowi
ObsOperator.Initialize() # Setup Observation Operator #LU a pote mobsoperator jest inicjalizowany
def prepare_state(DaCycle, StateVector, newrc):
#LU numer cyklu (zamiast time.restart)
#LU rozne parametry do initialize - trzeba przekazac caly slownik
def prepare_state(DaCycle, StateVector):
""" Set up the input data for the forward model: obs and parameters/fluxes"""
# We now have an empty StateVector object that we need to populate with data. If this is a continuation from a previous cycle, we can read
......@@ -66,36 +61,40 @@ def prepare_state(DaCycle, StateVector, newrc):
logging.info(header + "starting prepare_state" + footer)
StateVector.Initialize(newrc) #LU w prepare state inicjalizujemy wektor stanu ktoremu dopiero co przypisalismy wartosci.
StateVector.Initialize() #LU w prepare state inicjalizujemy wektor stanu ktoremu dopiero co przypisalismy wartosci.
#LU to jest zalezne od cyklu, i cykl pojedynczy moze miec time.restart lub moze nie miec.
if not DaCycle['time.restart']:
if not DaCycle['time.restart']: #LU jesli jest to pierwszy cykl
# Fill each week from n=1 to n=nlag with a new ensemble
for n in range(StateVector.nlag):
date = DaCycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(DaCycle['time.cycle']))
#LU ta data jest tutaj potrzebna tylko do znalezienia odpowiedniego pliku z kowariancja.
svparams = {k: newrc[k] for k in ('ocn.covariance', 'bio.covariance')}
cov = StateVector.get_covariance(date, svparams) #LU to mozna zrobic w srodku, bo przeciez nie potrzebna tu ta kowariancja a z drugiej strony zawsze potrzebuje kowariancji zeby stworzyc nowa ensemble
nlag = StateVector.nlag #LU dla kazdego od zera do dlugosc(cykl) wyznaczamy date oraz znajdujemy kowariancje i robimy nowa ensemble.
for n in range(0, nlag):
date = DaCycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(DaCycle['time.cycle'])) #LU ta data jest tutaj potrzebna tylko do znalezienia odpowiedniego pliku z kowariancja.
cov = StateVector.get_covariance(date)
StateVector.make_new_ensemble(n + 1, cov)
else:
# Read the StateVector data from file
StateVector.ReadFromFile(os.path.join(DaCycle['dir.restart.current'], 'savestate.nc')) # by default will read "opt"(imized) variables, and then propagate
filename = os.path.join(DaCycle['dir.restart.current'], 'savestate.nc') #LU teraz czytamy savestate.nc
StateVector.ReadFromFile(filename) # by default will read "opt"(imized) variables, and then propagate
# Now propagate the ensemble by one cycle to prepare for the current cycle
StateVector.propagate(DaCycle['time.start'], DaCycle['time.cycle'])
StateVector.propagate()
# Finally, also write the StateVector to a file so that we can always access the a-priori information
StateVector.WriteToFile(os.path.join(DaCycle['dir.output'], 'savestate.nc')) # write prior info because StateVector.Isoptimized == False for now
filename = os.path.join(DaCycle['dir.output'], 'savestate.nc')
StateVector.WriteToFile(filename) # write prior info because StateVector.Isoptimized == False for now
def sample_state(DaCycle, Samples, StateVector, ObservationOperator, newrc):
def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
""" Sample the filter state for the inversion """
......@@ -115,12 +114,12 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator, newrc):
# If we are going to sample the lag = 0 time step, place the needed files to run the transport model in the right folder first
if lag == 0: #LU w pierwszym tygodniu danego cyklu bierzemy initial data dla tm5, tzn jakies pewnie ustawienia. dokladnie: restart files z poprzednich
ObservationOperator.get_initial_data(DaCycle['dir.restart.current'])
if lag == 0: #LU w pierwszym tygodniu danego cyklu bierzemy initial data dla tm5, tzn jakies pewnie ustawienia.
ObservationOperator.get_initial_data()
############# Perform the actual sampling loop #####################
sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc)
sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag)
############# Finished the actual sampling loop #####################
......@@ -129,7 +128,7 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator, newrc):
# Add the observation operator restart+output files to the general Output+RestartFileList, only after 'advance'
# Same logic for observation files
if lag == 0: #LU to jest tylko uzyte gdy mamy advance step.
if lag == 0:
DaCycle.RestartFileList.extend(ObservationOperator.RestartFileList)
DaCycle.OutputFileList.extend(ObservationOperator.OutputFileList)
logging.debug("Appended ObsOperator restart and output file lists to DaCycle for collection ")
......@@ -142,33 +141,18 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator, newrc):
# sub-sampling of time series, vertical averaging, etc.
def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc, isadvance=False):
def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, isadvance=False):
""" Perform all actions needed to sample one cycle """
import copy
import da.tools.toolkit as toolkit
# First set up the information for time start and time end of this sample
#LU on powinien byc parametrem wywolania.
DaCycle.set_sample_times(lag) #LU przesuniecie odpowiednie dat, choc nie jestem na 100% pewna czy to jest tak jak trzeba.
#LU pytanie jak bardzo zachowywane sa te time.sample.start. bo tak, na przyklad w da_runtime.rc one chyba nie sa potrzebne i w ogole to zapisywanie jego nie jest potrzebne bo dzieje sie tylko
#LU w przypadku gdy mamy nowy step, czyli potrzebujemy wtedy tylko odpwiedniego rc do tm5. a generalnie zapisuje sie tylko dlatego ze jest jednym z parametrow da cycle...czy my go gdziekolwiek indziej uzywamy?
#LU set_sample_times to ustawia i wstawia do da cycle. ILE RAZY POZNIEJ GO POTRZEBUJEMY?
#LU (1) initialize observations (2) tm5 prepare run (3) set sample times (czyli nie), (4) ponizej.
#LU czyli innymi slowy najpierw ustaiwamy to w dacycle -> chociaz nie jest to zaden parametr da cyclu! a potem uzywamy do inicjalizacji samples [PYTANIE: gdize jeszcze w samples uzywamy czegos z dacycle??] i run
#LU podobnie to time.sample.stamp jak psu na bude.
DaCycle.set_sample_times(lag)
startdate = DaCycle['time.sample.start']
enddate = DaCycle['time.sample.end']
DaCycle['time.sample.window'] = lag
DaCycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H"))
stepstart = toolkit.calculate_start_time(newrc["time.start"], newrc["time.cycle"], 0, lag)
stepend = toolkit.calculate_step_end_time(stepstart, newrc["time.cycle"])
logging.info("#LU start of the sample: " + str(stepstart))
logging.info("#LU end of the sample:" + str(stepend))
logging.info("New simulation interval set : ")
logging.info(" start date : %s " % startdate.strftime('%F %H:%M'))
logging.info(" end date : %s " % enddate.strftime('%F %H:%M'))
......@@ -178,25 +162,15 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc,
# Implement something that writes the ensemble member parameter info to file, or manipulates them further into the
# type of info needed in your transport model
StateVector.write_members_to_file(lag + 1) #LU parameters.nc
#LU nie potrzebuje nic z dat
#LU za to potrzebuje dir.input
StateVector.write_members_to_file(lag + 1, DaCycle['dir.input']) #LU parameters.nc, parametry sa za kazdym razem zapisywane, dla kazdego jednego odpalenia tm5 sa one inne, i z cyklu na cykl sa one inne bo SV byl optymalizowany.
#LU czyli ogolnie jest ich tak strasznie duzo, ze nie ma sensu ich zachowywac z kroku na krok, bo i tak sa w srodku state vectora. tutaj pelnia one jedynie role wejscia dla state vectora
#LU Samples.Initialize(newrc, startdate, enddate)
obsparams = {k: newrc[k] for k in ('obs.input.fname', 'obs.input.dir', 'obspack.input.dir', 'obspack.input.id')}
Samples.Initialize(startdate, enddate, obsparams) #LU to daje przede wszystkim zresetowanie data list. czyli to znaczy ze data list jest za kazdym razem nowa przy odpalaniu nowego stepu. czyli jeszcze innymi slowy, nowy obiekt observations, no moze ma te same params...
#LU nie potrzebuje nic z dat
Samples.add_observations() #LU wydaje mi sie ze tutaj za kazdym stepem wczytywany jest cale observations --> nie, jest subselect po self.startdate i self.enddate (ustawione wczesniej w initialize)
Samples.Initialize() #LU to daje przede wszystkim zresetowanie data list. czyli to znaczy ze data list jest za kazdym razem nowa przy odpalaniu nowego cyklu
Samples.add_observations()
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
Samples.add_model_data_mismatch(DaCycle.DaSystem['obs.sites.rc'])
filename = Samples.write_sample_info(DaCycle['dir.input'], DaCycle['time.sample.stamp']) #LU observations.nc
Samples.add_model_data_mismatch()
filename = Samples.write_sample_info() #LU observations.nc
# Write filename to DaCycle, and to output collection list
......@@ -204,7 +178,7 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc,
# Run the observation operator
run_forecast_model(ObservationOperator, stepstart, stepend)
run_forecast_model(DaCycle, ObservationOperator)
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
......@@ -224,17 +198,15 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc,
# Now write a small file that holds for each observation a number of values that we need in postprocessing
#filename = Samples.write_sample_info() #LU NIEPOTRZEBNE
filename = Samples.write_sample_info()
# Now add the observations that need to be assimilated to the StateVector.
# Note that obs will only be added to the statevector if either this is the first step (restart=False), or lag==nlag
# This is to make sure that the first step of the system uses all observations available, while the subsequent
# steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only
# (and at least) once # in the assimilation
#LU z samples korzysta sie tylko gdy 1 lub 1 .
#LU teoretycznie one powinny wszystkie byc do dyspozycji z uruchomienia na uruchomienie. z cyklu na cykl [ALE MOZE BYC RECOVER] --> OK ALE NIE ZAWSZE POTRZEBUJEMY PRODUKOWAC OBSERWACJE. FLASK OUTPUT= FALSE????
if lag == int(DaCycle['time.nlag']) - 1 or DaCycle['time.restart'] == False: #LU dla ostatniego cyklu bierzemy te samples albo dla wszystkich w przypadku gdy pierwszy
if lag == int(DaCycle['time.nlag']) - 1 or DaCycle['time.restart'] == False:
StateVector.ObsToAssimmilate += (copy.deepcopy(Samples),)
StateVector.nobs += Samples.getlength()
logging.debug("Added samples from the observation operator to the assimilated obs list in the StateVector")
......@@ -243,12 +215,12 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc,
StateVector.ObsToAssimmilate += (None,)
#LU tego nie ma w oryginalnym kodzie
if isadvance:
Samples.write_obs_to_file(DaCycle['dir.output'], DaCycle['time.sample.stamp'], "newstyle")
Samples.write_obs_to_file("newstyle")
def invert(DaCycle, StateVector, Optimizer):
""" Perform the inverse calculation """
logging.info(header + "starting invert" + footer)
logging.info(msg = header + "starting invert" + footer)
dims = (int(DaCycle['time.nlag']),
int(DaCycle['da.optimizer.nmembers']),
int(DaCycle.DaSystem['nparameters']),
......@@ -256,8 +228,7 @@ def invert(DaCycle, StateVector, Optimizer):
Optimizer.Initialize(dims)
Optimizer.state_to_matrix(StateVector)
filename = Optimizer.write_diagnostics(DaCycle['dir.diagnostics'], DaCycle['time.start'], StateVector, type='prior')
DaCycle.OutputFileList.append(filename)
Optimizer.write_diagnostics(DaCycle, StateVector, type='prior')
Optimizer.set_localization('None')
if not DaCycle.DaSystem.has_key('opt.algorithm'):
......@@ -273,11 +244,11 @@ def invert(DaCycle, StateVector, Optimizer):
Optimizer.matrix_to_state(StateVector)
Optimizer.write_diagnostics(DaCycle['dir.diagnostics'], DaCycle['time.start'], StateVector, type='optimized')
Optimizer.write_diagnostics(DaCycle, StateVector, type='optimized')
StateVector.isOptimized = True
def advance(DaCycle, Samples, StateVector, ObservationOperator, newrc):
def advance(DaCycle, Samples, StateVector, ObservationOperator):
""" Advance the filter state to the next step """
# This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)
......@@ -285,13 +256,13 @@ def advance(DaCycle, Samples, StateVector, ObservationOperator, newrc):
# Then, restore model state from the start of the filter
logging.info(header + "starting advance" + footer)
logging.info("Sampling model will be run over 1 cycle")
sample_step(DaCycle, Samples, StateVector, ObservationOperator, 0, newrc, True) #LU w srodku zmienia zawartosc obiektu samples
sample_step(DaCycle, Samples, StateVector, ObservationOperator, 0, True) #LU w srodku zmienia zawartosc obiektu samples
#LU ale z drugiej strony dodaje tez pozniej samples, to tak jakby chcial na nowo dodac ...
#LU a to dlatego ze samples jest inicjalizowane na nowo.
#LU skoro w srodku sample step jest inicjalizowanie samples przez przypisanie datalist=[], to czy ten obiekt samples ma jkaies podobiekty ktore kaza mu byc przekazywanym?
# Write info from optimized flux cycle to a file
Samples.write_obs_to_file(DaCycle['dir.output'], DaCycle['time.sample.stamp'])
Samples.write_obs_to_file()
def save_and_submit(DaCycle, StateVector):
......@@ -304,7 +275,7 @@ def save_and_submit(DaCycle, StateVector):
DaCycle.RestartFileList.append(filename) # write optimized info because StateVector.Isoptimized == False for now
DaCycle.Finalize()
def run_forecast_model(ObsOperator, stepstart, stepend):
def run_forecast_model(DaCycle, ObsOperator):
"""Prepare and execute a forecast step using an external Fortran model. Note that the flavor of model
used is determined in the very first line where the import statement of module "model" depends on a
setting in your da.rc file. After that choice, the module written specifically for a particular
......@@ -320,9 +291,9 @@ def run_forecast_model(ObsOperator, stepstart, stepend):
submitting a string of jobs to the queue, or simply spawning a subprocess, or ...
"""
ObsOperator.prepare_run(stepstart, stepend) #LU tutaj dodac daty startu i stopu #LU ewnetualnie wyliczac to stepend
ObsOperator.prepare_run()
ObsOperator.validate_input()
#ObsOperator.run()
#ObsOperator.save_data()
ObsOperator.run()
ObsOperator.save_data()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment