Commit 9bd66234 authored by karolina's avatar karolina
Browse files

Beginning of February: last commit before date management and cycle management changes

parent 4039490c
......@@ -45,7 +45,7 @@ class Observation(object):
"""
def __init__(self, DaCycle=None):
def __init__(self, DaCycle=None):#LU ten dacycle nigdy sie nie przydaje. jesli jest to generyczny typ, to czy w ogole do czegokolwiek jest potrzebny ten init? w praktyce?
"""
create an object with an identifier, version, and an empty ObservationList
"""
......
......@@ -326,7 +326,7 @@ class StateVector(object):
#import da.tools.io4 as io
#import da.tools.io as io
if not self.isOptimized:
if not self.isOptimized: #LU tutaj bedzie zmiana gdy sv bedzie mial date.
f = io.CT_CDF(filename, method='create')
logging.debug('Creating new StateVector output file (%s)' % filename)
qual = 'prior'
......
......@@ -62,6 +62,9 @@ from da.tools.pipeline import header,footer
msg = header+"Entering Pipeline "+footer ; logging.info(msg)
import da.tools.toolkit
da.tools.toolkit.read_RC(args['rc'])
EnsembleSmootherPipeline(DaCycle,PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer)
......
......@@ -16,6 +16,6 @@ da.system.rc : da/rc/carbontracker.rc
! Info on the forward model to be used
da.obsoperator : TM5
da.obsoperator.rc : ${HOME}/Modeling/TM5/tm5-ctdas.rc
da.obsoperator.rc : ../../../../Desktop/ctdirs/pycasso-tm5-ctdas.rc
da.obsoperator.threads : 2
da.optimizer.nmembers : 100
......@@ -157,7 +157,8 @@ class TM5ObservationOperator(ObservationOperator):
else:
logging.info('Compilation successful, continuing')
def prepare_run(self):
def prepare_run(self): #LU time.sample start i stop, dir.input, obsop inputfile, restart i window, + platform
#LU mogloby byc ze start i end jkao parametr, directories i inne ze slownika przekazanego jkao parametr
"""
Prepare a forward model TM5 run, this consists of:
......@@ -184,7 +185,7 @@ class TM5ObservationOperator(ObservationOperator):
'output.flask.infile': self.DaCycle['ObsOperator.inputfile'] ,
'output.flask': 'True'
}
#LU istartkey jest zawsze restart z wyjatkiem pirewszego w pierwszym
if self.DaCycle['time.restart']: # If this is a restart from a previous cycle, the TM5 model should do a restart
NewItems[self.istartkey] = self.restartvalue
logging.debug('Resetting TM5 to perform restart')
......@@ -197,6 +198,9 @@ class TM5ObservationOperator(ObservationOperator):
#LU ale: nowy RC zapisywany jst przy koncu cyklu. czyli time. sample.window bedzie zawsze 0.
if self.DaCycle['time.sample.window'] != 0: # If this is a restart from a previous time step within the filter lag, the TM5 model should do a restart
#LU zamiast tego zrobic to jako parametr obsoperatora - current lag, albo w ogole tm restart true false - nie mnozmy bytow ponad potrzebe
NewItems[self.istartkey] = self.restartvalue
logging.debug('Resetting TM5 to perform restart')
......@@ -221,7 +225,7 @@ class TM5ObservationOperator(ObservationOperator):
self.RcFileType = 'pre-pycasso'
logging.debug('TM5 rc-file loaded successfully')
def ValidateRc(self):
def ValidateRc(self): #LU nic
"""
Validate the contents of the tm_settings dictionary and add extra values. The required items for the TM5 rc-file
are specified in the tm5_tools module, as dictionary variable "needed_rc_items".
......@@ -284,7 +288,7 @@ class TM5ObservationOperator(ObservationOperator):
logging.debug('rc-file has been validated succesfully')
def ModifyRC(self, NewValues):
def ModifyRC(self, NewValues): #LU nic
"""
Modify parts of the tm5 settings, for instance to give control of file locations to the DA shell
instead of to the tm5.rc script.
......@@ -315,7 +319,7 @@ class TM5ObservationOperator(ObservationOperator):
logging.debug('Added new tm5 rc-item %s ' % k)
def WriteRc(self, tm5rcfilename):
def WriteRc(self, tm5rcfilename): #LU nic
"""
Write the rc-file settings to a tm5.rc file in the rundir
"""
......@@ -323,7 +327,7 @@ class TM5ObservationOperator(ObservationOperator):
rc.write(tm5rcfilename, self.tm_settings)
logging.debug("Modified rc file for TM5 written (%s)" % tm5rcfilename)
def validate_input(self):
def validate_input(self): #LU tylko inputfile, nmembers
"""
Make sure that parameter files are written to the TM5 inputdir, and that observation lists are present
"""
......@@ -368,7 +372,7 @@ class TM5ObservationOperator(ObservationOperator):
#LU zmienic definicje w dokumentacji bo mowi o save.hdf podczas gdy juz takiego nie ma.
#LU kopiujemy tm5-owe restarty , chociaz w sumie juz je skopiowalismy wczesniej
def get_initial_data(self):
def get_initial_data(self): #LU tylko dir.restrt.curret jkao miejsce z ktorego zbiore pliki typu "tm5_restart"
""" This method places all initial data needed by an ObservationOperator in the proper folder for the model.
For TM5, this means copying the save_*.hdf* files to the dir.save directory from which TM5 will read initial
concentrations for all tracers.
......@@ -405,7 +409,7 @@ class TM5ObservationOperator(ObservationOperator):
def run(self):
def run(self): #LU nic
"""
Start the TM5 executable. A new log file is started for the TM5 model IO, and then a subprocess is
spawned with the tm5_mpi_wrapper and the tm5.x executable. The exit code of the model is caught and
......@@ -448,7 +452,7 @@ class TM5ObservationOperator(ObservationOperator):
return code
def TM5_under_mpirun(self):
def TM5_under_mpirun(self): #LU nmembers, dir.jobs, dier.exec, plus platform
""" Method handles the case where a shell runs an MPI process that forks into N TM5 model instances """
import datetime
......@@ -504,7 +508,7 @@ class TM5ObservationOperator(ObservationOperator):
return code
#LU nie iwem czy to dobrze zadziala z tym kodem bo moze jednak sa lepsze sposoby sprawdzenia statusu...
def TM5_With_N_tracers(self):
def TM5_With_N_tracers(self): #LU nic
""" Method handles the case where one TM5 model instance with N tracers does the sampling of all ensemble members"""
import datetime
import subprocess
......@@ -545,7 +549,7 @@ class TM5ObservationOperator(ObservationOperator):
return code
def save_data(self):
def save_data(self): #LU tylko stamp - do wybrania odpowiendich w outpucie tm5 to collect
""" Copy the TM5 recovery data from the outputdir to the TM5 savedir, also add the restart files to a list of names
that is used by the DaCycle object to collect restart data for the filter.
......
......@@ -19,7 +19,7 @@ import datetime
header = '\n\n *************************************** '
footer = ' *************************************** \n '
def EnsembleSmootherPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperator, Optimizer):
def EnsembleSmootherPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperator, Optimizer, newrc):
""" The main point of entry for the pipeline """
sys.path.append(os.getcwd())
......@@ -27,11 +27,11 @@ def EnsembleSmootherPipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector,
start_job(DaCycle, DaSystem, PlatForm, StateVector, Samples, ObsOperator)
prepare_state(DaCycle, StateVector)
sample_state(DaCycle, Samples, StateVector, ObsOperator)
sample_state(DaCycle, Samples, StateVector, ObsOperator, newrc)
invert(DaCycle, StateVector, Optimizer)
advance(DaCycle, Samples, StateVector, ObsOperator)
advance(DaCycle, Samples, StateVector, ObsOperator, newrc)
save_and_submit(DaCycle, StateVector)
logging.info("Cycle finished...exiting pipeline")
......@@ -52,6 +52,11 @@ def start_job(DaCycle, DaSystem, DaPlatForm, StateVector, Samples, ObsOperator):
def prepare_state(DaCycle, StateVector):
#LU numer cyklu (zamiast time.restart)
#LU rozne parametry do initialize - trzeba przekazac caly slownik
""" Set up the input data for the forward model: obs and parameters/fluxes"""
# We now have an empty StateVector object that we need to populate with data. If this is a continuation from a previous cycle, we can read
......@@ -94,7 +99,7 @@ def prepare_state(DaCycle, StateVector):
StateVector.WriteToFile(filename) # write prior info because StateVector.Isoptimized == False for now
def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
def sample_state(DaCycle, Samples, StateVector, ObservationOperator, newrc):
""" Sample the filter state for the inversion """
......@@ -114,12 +119,12 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
# If we are going to sample the lag = 0 time step, place the needed files to run the transport model in the right folder first
if lag == 0: #LU w pierwszym tygodniu danego cyklu bierzemy initial data dla tm5, tzn jakies pewnie ustawienia.
if lag == 0: #LU w pierwszym tygodniu danego cyklu bierzemy initial data dla tm5, tzn jakies pewnie ustawienia. dokladnie: restart files z poprzednich
ObservationOperator.get_initial_data()
############# Perform the actual sampling loop #####################
sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag)
sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc)
############# Finished the actual sampling loop #####################
......@@ -128,7 +133,7 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
# Add the observation operator restart+output files to the general Output+RestartFileList, only after 'advance'
# Same logic for observation files
if lag == 0:
if lag == 0: #LU to jest tylko uzyte gdy mamy advance step.
DaCycle.RestartFileList.extend(ObservationOperator.RestartFileList)
DaCycle.OutputFileList.extend(ObservationOperator.OutputFileList)
logging.debug("Appended ObsOperator restart and output file lists to DaCycle for collection ")
......@@ -141,13 +146,21 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
# sub-sampling of time series, vertical averaging, etc.
def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, isadvance=False):
def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc, isadvance=False):
""" Perform all actions needed to sample one cycle """
import copy
# First set up the information for time start and time end of this sample
DaCycle.set_sample_times(lag)
#LU on powinien byc parametrem wywolania.
DaCycle.set_sample_times(lag) #LU przesuniecie odpowiednie dat, choc nie jestem na 100% pewna czy to jest tak jak trzeba.
#LU pytanie jak bardzo zachowywane sa te time.sample.start. bo tak, na przyklad w da_runtime.rc one chyba nie sa potrzebne i w ogole to zapisywanie jego nie jest potrzebne bo dzieje sie tylko
#LU w przypadku gdy mamy nowy step, czyli potrzebujemy wtedy tylko odpwiedniego rc do tm5. a generalnie zapisuje sie tylko dlatego ze jest jednym z parametrow da cycle...czy my go gdziekolwiek indziej uzywamy?
#LU set_sample_times to ustawia i wstawia do da cycle. ILE RAZY POZNIEJ GO POTRZEBUJEMY?
#LU (1) initialize observations (2) tm5 prepare run (3) set sample times (czyli nie), (4) ponizej.
#LU czyli innymi slowy najpierw ustaiwamy to w dacycle -> chociaz nie jest to zaden parametr da cyclu! a potem uzywamy do inicjalizacji samples [PYTANIE: gdize jeszcze w samples uzywamy czegos z dacycle??] i run
#LU podobnie to time.sample.stamp jak psu na bude.
startdate = DaCycle['time.sample.start']
enddate = DaCycle['time.sample.end']
DaCycle['time.sample.window'] = lag
......@@ -162,10 +175,14 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, isadvan
# Implement something that writes the ensemble member parameter info to file, or manipulates them further into the
# type of info needed in your transport model
StateVector.write_members_to_file(lag + 1) #LU parameters.nc
StateVector.write_members_to_file(lag + 1) #LU parameters.nc, parametry sa za kazdym razem zapisywane, dla kazdego jednego odpalenia tm5 sa one inne, i z cyklu na cykl sa one inne bo SV byl optymalizowany.
#LU czyli ogolnie jest ich tak strasznie duzo, ze nie ma sensu ich zachowywac z kroku na krok, bo i tak sa w srodku state vectora. tutaj pelnia one jedynie role wejscia dla state vectora
Samples.Initialize() #LU to daje przede wszystkim zresetowanie data list. czyli to znaczy ze data list jest za kazdym razem nowa przy odpalaniu nowego cyklu
Samples.add_observations()
Samples.Initialize() #LU to daje przede wszystkim zresetowanie data list. czyli to znaczy ze data list jest za kazdym razem nowa przy odpalaniu nowego stepu. czyli jeszcze innymi slowy, nowy obiekt observations, no moze ma te same params...
Samples.add_observations() #LU wydaje mi sie ze tutaj za kazdym stepem wczytywany jest cale observations --> nie, jest subselect po self.startdate i self.enddate (ustawione wczesniej w initialize)
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
......@@ -198,15 +215,17 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, isadvan
# Now write a small file that holds for each observation a number of values that we need in postprocessing
filename = Samples.write_sample_info()
filename = Samples.write_sample_info() #LU NIEPOTRZEBNE
# Now add the observations that need to be assimilated to the StateVector.
# Note that obs will only be added to the statevector if either this is the first step (restart=False), or lag==nlag
# This is to make sure that the first step of the system uses all observations available, while the subsequent
# steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only
# (and at least) once # in the assimilation
if lag == int(DaCycle['time.nlag']) - 1 or DaCycle['time.restart'] == False:
#LU z samples korzysta sie tylko gdy 1 lub 1 .
#LU teoretycznie one powinny wszystkie byc do dyspozycji z uruchomienia na uruchomienie. z cyklu na cykl [ALE MOZE BYC RECOVER] --> OK ALE NIE ZAWSZE POTRZEBUJEMY PRODUKOWAC OBSERWACJE. FLASK OUTPUT= FALSE????
if lag == int(DaCycle['time.nlag']) - 1 or DaCycle['time.restart'] == False: #LU dla ostatniego cyklu bierzemy te samples albo dla wszystkich w przypadku gdy pierwszy
StateVector.ObsToAssimmilate += (copy.deepcopy(Samples),)
StateVector.nobs += Samples.getlength()
logging.debug("Added samples from the observation operator to the assimilated obs list in the StateVector")
......@@ -220,7 +239,7 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, isadvan
def invert(DaCycle, StateVector, Optimizer):
""" Perform the inverse calculation """
logging.info(msg = header + "starting invert" + footer)
logging.info(header + "starting invert" + footer)
dims = (int(DaCycle['time.nlag']),
int(DaCycle['da.optimizer.nmembers']),
int(DaCycle.DaSystem['nparameters']),
......@@ -248,7 +267,7 @@ def invert(DaCycle, StateVector, Optimizer):
StateVector.isOptimized = True
def advance(DaCycle, Samples, StateVector, ObservationOperator):
def advance(DaCycle, Samples, StateVector, ObservationOperator, newrc):
""" Advance the filter state to the next step """
# This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)
......@@ -256,7 +275,7 @@ def advance(DaCycle, Samples, StateVector, ObservationOperator):
# Then, restore model state from the start of the filter
logging.info(header + "starting advance" + footer)
logging.info("Sampling model will be run over 1 cycle")
sample_step(DaCycle, Samples, StateVector, ObservationOperator, 0, True) #LU w srodku zmienia zawartosc obiektu samples
sample_step(DaCycle, Samples, StateVector, ObservationOperator, 0, newrc, True) #LU w srodku zmienia zawartosc obiektu samples
#LU ale z drugiej strony dodaje tez pozniej samples, to tak jakby chcial na nowo dodac ...
#LU a to dlatego ze samples jest inicjalizowane na nowo.
#LU skoro w srodku sample step jest inicjalizowanie samples przez przypisanie datalist=[], to czy ten obiekt samples ma jkaies podobiekty ktore kaza mu byc przekazywanym?
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment