Commit 4d0a98b3 authored by karolina's avatar karolina
Browse files

all the DaCycle/DaSystem references removed from the classes of StateVector

called from pipeline as call parameters
*warning* take care about get_covariance, it is called from baseclass's "propagate", whereas this function exists only in the derived class.
parent 9bd66234
...@@ -114,20 +114,16 @@ class StateVector(object): ...@@ -114,20 +114,16 @@ class StateVector(object):
""" """
def __init__(self, DaCycle=None): def __init__(self):
self.Identifier = identifier self.Identifier = identifier
self.Version = version self.Version = version
# The following code allows the object to be initialized with a DaCycle object already present. Otherwise, it can # The following code allows the object to be initialized with a DaCycle object already present. Otherwise, it can
# be added at a later moment. # be added at a later moment.
if DaCycle != None:
self.DaCycle = DaCycle
else:
self.DaCycle = {}
logging.info('Statevector object initialized: %s' % self.Identifier) logging.info('Statevector object initialized: %s' % self.Identifier)
def Initialize(self): def Initialize(self, newrc):
""" """
Initialize the object by specifying the dimensions. Initialize the object by specifying the dimensions.
There are two major requirements for each statvector that you want to build: There are two major requirements for each statvector that you want to build:
...@@ -138,9 +134,9 @@ class StateVector(object): ...@@ -138,9 +134,9 @@ class StateVector(object):
An example is given below. An example is given below.
""" """
self.nlag = int(self.DaCycle['time.nlag']) self.nlag = int(newrc['time.nlag'])
self.nmembers = int(self.DaCycle['da.optimizer.nmembers']) self.nmembers = int(newrc['da.optimizer.nmembers'])
self.nparams = int(self.DaCycle.DaSystem['nparameters']) self.nparams = int(newrc['nparameters'])
self.nobs = 0 self.nobs = 0
self.isOptimized = False self.isOptimized = False
...@@ -158,14 +154,14 @@ class StateVector(object): ...@@ -158,14 +154,14 @@ class StateVector(object):
# This specifies the file to read with the gridded mask at 1x1 degrees. Each gridbox holds a number that specifies the parametermember # This specifies the file to read with the gridded mask at 1x1 degrees. Each gridbox holds a number that specifies the parametermember
# that maps onto it. From this map, a dictionary is created that allows a reverse look-up so that we can map parameters to a grid. # that maps onto it. From this map, a dictionary is created that allows a reverse look-up so that we can map parameters to a grid.
mapfile = os.path.join(self.DaCycle.DaSystem['regionsfile']) mapfile = os.path.join(newrc['regionsfile'])
ncf = io.CT_Read(mapfile, 'read') ncf = io.CT_Read(mapfile, 'read')
self.gridmap = ncf.GetVariable('regions') self.gridmap = ncf.GetVariable('regions')
self.tcmap = ncf.GetVariable('transcom_regions') self.tcmap = ncf.GetVariable('transcom_regions')
ncf.close() ncf.close()
logging.debug("A TransCom map on 1x1 degree was read from file %s" % self.DaCycle.DaSystem['regionsfile']) logging.debug("A TransCom map on 1x1 degree was read from file %s" % mapfile)
logging.debug("A parameter map on 1x1 degree was read from file %s" % self.DaCycle.DaSystem['regionsfile']) logging.debug("A parameter map on 1x1 degree was read from file %s" % mapfile)
# Create a dictionary for state <-> gridded map conversions # Create a dictionary for state <-> gridded map conversions
...@@ -282,7 +278,7 @@ class StateVector(object): ...@@ -282,7 +278,7 @@ class StateVector(object):
logging.debug('%d new ensemble members were added to the state vector # %d' % (self.nmembers, lag)) logging.debug('%d new ensemble members were added to the state vector # %d' % (self.nmembers, lag))
def propagate(self): def propagate(self, timestart, cyclen):
""" """
:rtype: None :rtype: None
...@@ -301,8 +297,8 @@ class StateVector(object): ...@@ -301,8 +297,8 @@ class StateVector(object):
self.EnsembleMembers.append([]) self.EnsembleMembers.append([])
# And now create a new time step of mean + members for n=nlag # And now create a new time step of mean + members for n=nlag
date = self.DaCycle['time.start'] + timedelta(days=(self.nlag - 0.5) * int(self.DaCycle['time.cycle'])) date = timestart + timedelta(days=(self.nlag - 0.5) * int(cyclen))
cov = self.get_covariance(date) cov = self.get_covariance(date) #LU tutaj powinny byc parametry do get_covariance
self.make_new_ensemble(self.nlag, cov) self.make_new_ensemble(self.nlag, cov)
logging.info('The state vector has been propagated by one cycle') logging.info('The state vector has been propagated by one cycle')
...@@ -401,7 +397,7 @@ class StateVector(object): ...@@ -401,7 +397,7 @@ class StateVector(object):
logging.info('Successfully read the State Vector from file (%s) ' % filename) logging.info('Successfully read the State Vector from file (%s) ' % filename)
def write_members_to_file(self, lag): def write_members_to_file(self, lag, outdir):
""" """
:param: lag: Which lag step of the filter to write, must lie in range [1,...,nlag] :param: lag: Which lag step of the filter to write, must lie in range [1,...,nlag]
:rtype: None :rtype: None
...@@ -423,7 +419,7 @@ class StateVector(object): ...@@ -423,7 +419,7 @@ class StateVector(object):
#import da.tools.io as io #import da.tools.io as io
#import da.tools.io4 as io #import da.tools.io4 as io
outdir = self.DaCycle['dir.input']
members = self.EnsembleMembers[lag - 1] members = self.EnsembleMembers[lag - 1]
for mem in members: for mem in members:
......
...@@ -25,7 +25,7 @@ version = '0.0' ...@@ -25,7 +25,7 @@ version = '0.0'
class CtStateVector(StateVector): class CtStateVector(StateVector):
""" This is a StateVector object for CarbonTracker. It has a private method to make new ensemble members """ """ This is a StateVector object for CarbonTracker. It has a private method to make new ensemble members """
def get_covariance(self, date): def get_covariance(self, date, params):
""" Make a new ensemble from specified matrices, the attribute lag refers to the position in the state vector. """ Make a new ensemble from specified matrices, the attribute lag refers to the position in the state vector.
Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below. Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below.
The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag] The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag]
...@@ -39,8 +39,8 @@ class CtStateVector(StateVector): ...@@ -39,8 +39,8 @@ class CtStateVector(StateVector):
# Get the needed matrices from the specified covariance files # Get the needed matrices from the specified covariance files
file_ocn_cov = self.DaCycle.DaSystem['ocn.covariance'] file_ocn_cov = params['ocn.covariance']
file_bio_cov = self.DaCycle.DaSystem['bio.covariance'] #LU logika tego to powrot do dacycle zeby potem z systemu(CT) pobrac parametr file_bio_cov = params['bio.covariance'] #LU logika tego to powrot do dacycle zeby potem z systemu(CT) pobrac parametr
# replace YYYY.MM in the ocean covariance file string # replace YYYY.MM in the ocean covariance file string
......
...@@ -157,7 +157,7 @@ class TM5ObservationOperator(ObservationOperator): ...@@ -157,7 +157,7 @@ class TM5ObservationOperator(ObservationOperator):
else: else:
logging.info('Compilation successful, continuing') logging.info('Compilation successful, continuing')
def prepare_run(self): #LU time.sample start i stop, dir.input, obsop inputfile, restart i window, + platform def prepare_run(self, stepstart, stepend): #LU time.sample start i stop, dir.input, obsop inputfile, restart i window, + platform
#LU mogloby byc ze start i end jkao parametr, directories i inne ze slownika przekazanego jkao parametr #LU mogloby byc ze start i end jkao parametr, directories i inne ze slownika przekazanego jkao parametr
""" """
Prepare a forward model TM5 run, this consists of: Prepare a forward model TM5 run, this consists of:
...@@ -370,7 +370,6 @@ class TM5ObservationOperator(ObservationOperator): ...@@ -370,7 +370,6 @@ class TM5ObservationOperator(ObservationOperator):
logging.error("Please compile the model with the specified rc-file and the regular TM5 scripts first") logging.error("Please compile the model with the specified rc-file and the regular TM5 scripts first")
raise IOError raise IOError
#LU zmienic definicje w dokumentacji bo mowi o save.hdf podczas gdy juz takiego nie ma.
#LU kopiujemy tm5-owe restarty , chociaz w sumie juz je skopiowalismy wczesniej #LU kopiujemy tm5-owe restarty , chociaz w sumie juz je skopiowalismy wczesniej
def get_initial_data(self): #LU tylko dir.restrt.curret jkao miejsce z ktorego zbiore pliki typu "tm5_restart" def get_initial_data(self): #LU tylko dir.restrt.curret jkao miejsce z ktorego zbiore pliki typu "tm5_restart"
""" This method places all initial data needed by an ObservationOperator in the proper folder for the model. """ This method places all initial data needed by an ObservationOperator in the proper folder for the model.
......
...@@ -45,13 +45,13 @@ def start_job(DaCycle, DaSystem, DaPlatForm, StateVector, Samples, ObsOperator): ...@@ -45,13 +45,13 @@ def start_job(DaCycle, DaSystem, DaPlatForm, StateVector, Samples, ObsOperator):
DaCycle.DaSystem = DaSystem #LU przypisuje dacyclowi liste parametrow DaCycle.DaSystem = DaSystem #LU przypisuje dacyclowi liste parametrow
DaCycle.DaPlatForm = DaPlatForm #LU przypisuje cyklowi platforme (tez liste parametrow) DaCycle.DaPlatForm = DaPlatForm #LU przypisuje cyklowi platforme (tez liste parametrow)
DaCycle.Initialize() #LU nastepnie cykl zostaje inicjalizowany...bardzo logiczne DaCycle.Initialize() #LU nastepnie cykl zostaje inicjalizowany...bardzo logiczne
StateVector.DaCycle = DaCycle # also embed object in StateVector so it can access cycle information for I/O etc #LU cykl zostaje przypisany state vectorowi #StateVector.DaCycle = DaCycle # also embed object in StateVector so it can access cycle information for I/O etc #LU cykl zostaje przypisany state vectorowi
Samples.DaCycle = DaCycle # also embed object in Samples object so it can access cycle information for I/O etc #LU cykl zostaje przypisany probkom Samples.DaCycle = DaCycle # also embed object in Samples object so it can access cycle information for I/O etc #LU cykl zostaje przypisany probkom
ObsOperator.DaCycle = DaCycle # also embed object in ObsOperator object so it can access cycle information for I/O etc #LU cykl zostaje przypisany obsoperatorowi ObsOperator.DaCycle = DaCycle # also embed object in ObsOperator object so it can access cycle information for I/O etc #LU cykl zostaje przypisany obsoperatorowi
ObsOperator.Initialize() # Setup Observation Operator #LU a pote mobsoperator jest inicjalizowany ObsOperator.Initialize() # Setup Observation Operator #LU a pote mobsoperator jest inicjalizowany
def prepare_state(DaCycle, StateVector): def prepare_state(DaCycle, StateVector, newrc):
#LU numer cyklu (zamiast time.restart) #LU numer cyklu (zamiast time.restart)
#LU rozne parametry do initialize - trzeba przekazac caly slownik #LU rozne parametry do initialize - trzeba przekazac caly slownik
...@@ -66,7 +66,7 @@ def prepare_state(DaCycle, StateVector): ...@@ -66,7 +66,7 @@ def prepare_state(DaCycle, StateVector):
logging.info(header + "starting prepare_state" + footer) logging.info(header + "starting prepare_state" + footer)
StateVector.Initialize() #LU w prepare state inicjalizujemy wektor stanu ktoremu dopiero co przypisalismy wartosci. StateVector.Initialize(newrc) #LU w prepare state inicjalizujemy wektor stanu ktoremu dopiero co przypisalismy wartosci.
#LU to jest zalezne od cyklu, i cykl pojedynczy moze miec time.restart lub moze nie miec. #LU to jest zalezne od cyklu, i cykl pojedynczy moze miec time.restart lub moze nie miec.
if not DaCycle['time.restart']: #LU jesli jest to pierwszy cykl if not DaCycle['time.restart']: #LU jesli jest to pierwszy cykl
...@@ -75,29 +75,26 @@ def prepare_state(DaCycle, StateVector): ...@@ -75,29 +75,26 @@ def prepare_state(DaCycle, StateVector):
nlag = StateVector.nlag #LU dla kazdego od zera do dlugosc(cykl) wyznaczamy date oraz znajdujemy kowariancje i robimy nowa ensemble. nlag = StateVector.nlag #LU dla kazdego od zera do dlugosc(cykl) wyznaczamy date oraz znajdujemy kowariancje i robimy nowa ensemble.
for n in range(0, nlag): for n in range(0, nlag):
date = DaCycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(DaCycle['time.cycle'])) #LU ta data jest tutaj potrzebna tylko do znalezienia odpowiedniego pliku z kowariancja. date = DaCycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(DaCycle['time.cycle']))
cov = StateVector.get_covariance(date) #LU ta data jest tutaj potrzebna tylko do znalezienia odpowiedniego pliku z kowariancja.
svparams = {k: newrc[k] for k in ('ocn.covariance', 'bio.covariance')}
cov = StateVector.get_covariance(date, svparams)
StateVector.make_new_ensemble(n + 1, cov) StateVector.make_new_ensemble(n + 1, cov)
else: else:
# Read the StateVector data from file # Read the StateVector data from file
StateVector.ReadFromFile(os.path.join(DaCycle['dir.restart.current'], 'savestate.nc')) # by default will read "opt"(imized) variables, and then propagate
filename = os.path.join(DaCycle['dir.restart.current'], 'savestate.nc') #LU teraz czytamy savestate.nc
StateVector.ReadFromFile(filename) # by default will read "opt"(imized) variables, and then propagate
# Now propagate the ensemble by one cycle to prepare for the current cycle # Now propagate the ensemble by one cycle to prepare for the current cycle
StateVector.propagate() StateVector.propagate(DaCycle['time.start'], DaCycle['time.cycle'])
# Finally, also write the StateVector to a file so that we can always access the a-priori information # Finally, also write the StateVector to a file so that we can always access the a-priori information
StateVector.WriteToFile(os.path.join(DaCycle['dir.output'], 'savestate.nc')) # write prior info because StateVector.Isoptimized == False for now
filename = os.path.join(DaCycle['dir.output'], 'savestate.nc')
StateVector.WriteToFile(filename) # write prior info because StateVector.Isoptimized == False for now
def sample_state(DaCycle, Samples, StateVector, ObservationOperator, newrc): def sample_state(DaCycle, Samples, StateVector, ObservationOperator, newrc):
""" Sample the filter state for the inversion """ """ Sample the filter state for the inversion """
...@@ -149,6 +146,7 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator, newrc): ...@@ -149,6 +146,7 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator, newrc):
def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc, isadvance=False): def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc, isadvance=False):
""" Perform all actions needed to sample one cycle """ """ Perform all actions needed to sample one cycle """
import copy import copy
import da.tools.toolkit as toolkit
# First set up the information for time start and time end of this sample # First set up the information for time start and time end of this sample
...@@ -166,6 +164,12 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc, ...@@ -166,6 +164,12 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc,
DaCycle['time.sample.window'] = lag DaCycle['time.sample.window'] = lag
DaCycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H")) DaCycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H"))
stepstart = toolkit.calculate_start_time(newrc["time.start"], newrc["time.cycle"], 0, lag)
stepend = toolkit.calculate_step_end_time(stepstart, newrc["time.cycle"])
logging.info("#LU start of the sample: " + str(stepstart))
logging.info("#LU end of the sample:" + str(stepend))
logging.info("New simulation interval set : ") logging.info("New simulation interval set : ")
logging.info(" start date : %s " % startdate.strftime('%F %H:%M')) logging.info(" start date : %s " % startdate.strftime('%F %H:%M'))
logging.info(" end date : %s " % enddate.strftime('%F %H:%M')) logging.info(" end date : %s " % enddate.strftime('%F %H:%M'))
...@@ -175,17 +179,22 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc, ...@@ -175,17 +179,22 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc,
# Implement something that writes the ensemble member parameter info to file, or manipulates them further into the # Implement something that writes the ensemble member parameter info to file, or manipulates them further into the
# type of info needed in your transport model # type of info needed in your transport model
StateVector.write_members_to_file(lag + 1) #LU parameters.nc, parametry sa za kazdym razem zapisywane, dla kazdego jednego odpalenia tm5 sa one inne, i z cyklu na cykl sa one inne bo SV byl optymalizowany.
#LU nie potrzebuje nic z dat
#LU za to potrzebuje dir.input
StateVector.write_members_to_file(lag + 1, DaCycle['dir.input']) #LU parameters.nc, parametry sa za kazdym razem zapisywane, dla kazdego jednego odpalenia tm5 sa one inne, i z cyklu na cykl sa one inne bo SV byl optymalizowany.
#LU czyli ogolnie jest ich tak strasznie duzo, ze nie ma sensu ich zachowywac z kroku na krok, bo i tak sa w srodku state vectora. tutaj pelnia one jedynie role wejscia dla state vectora #LU czyli ogolnie jest ich tak strasznie duzo, ze nie ma sensu ich zachowywac z kroku na krok, bo i tak sa w srodku state vectora. tutaj pelnia one jedynie role wejscia dla state vectora
#LU Samples.Initialize(newrc, startdate, enddate)
Samples.Initialize() #LU to daje przede wszystkim zresetowanie data list. czyli to znaczy ze data list jest za kazdym razem nowa przy odpalaniu nowego stepu. czyli jeszcze innymi slowy, nowy obiekt observations, no moze ma te same params... Samples.Initialize() #LU to daje przede wszystkim zresetowanie data list. czyli to znaczy ze data list jest za kazdym razem nowa przy odpalaniu nowego stepu. czyli jeszcze innymi slowy, nowy obiekt observations, no moze ma te same params...
#LU nie potrzebuje nic z dat
Samples.add_observations() #LU wydaje mi sie ze tutaj za kazdym stepem wczytywany jest cale observations --> nie, jest subselect po self.startdate i self.enddate (ustawione wczesniej w initialize) Samples.add_observations() #LU wydaje mi sie ze tutaj za kazdym stepem wczytywany jest cale observations --> nie, jest subselect po self.startdate i self.enddate (ustawione wczesniej w initialize)
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future?? # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
Samples.add_model_data_mismatch() Samples.add_model_data_mismatch()
filename = Samples.write_sample_info() #LU observations.nc filename = Samples.write_sample_info() #LU observations.nc
...@@ -195,7 +204,7 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc, ...@@ -195,7 +204,7 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, newrc,
# Run the observation operator # Run the observation operator
run_forecast_model(DaCycle, ObservationOperator) run_forecast_model(ObservationOperator, stepstart, stepend)
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future?? # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
...@@ -294,7 +303,7 @@ def save_and_submit(DaCycle, StateVector): ...@@ -294,7 +303,7 @@ def save_and_submit(DaCycle, StateVector):
DaCycle.RestartFileList.append(filename) # write optimized info because StateVector.Isoptimized == False for now DaCycle.RestartFileList.append(filename) # write optimized info because StateVector.Isoptimized == False for now
DaCycle.Finalize() DaCycle.Finalize()
def run_forecast_model(DaCycle, ObsOperator): def run_forecast_model(ObsOperator, stepstart, stepend):
"""Prepare and execute a forecast step using an external Fortran model. Note that the flavor of model """Prepare and execute a forecast step using an external Fortran model. Note that the flavor of model
used is determined in the very first line where the import statement of module "model" depends on a used is determined in the very first line where the import statement of module "model" depends on a
setting in your da.rc file. After that choice, the module written specifically for a particular setting in your da.rc file. After that choice, the module written specifically for a particular
...@@ -310,9 +319,9 @@ def run_forecast_model(DaCycle, ObsOperator): ...@@ -310,9 +319,9 @@ def run_forecast_model(DaCycle, ObsOperator):
submitting a string of jobs to the queue, or simply spawning a subprocess, or ... submitting a string of jobs to the queue, or simply spawning a subprocess, or ...
""" """
ObsOperator.prepare_run() ObsOperator.prepare_run(stepstart, stepend) #LU tutaj dodac daty startu i stopu #LU ewnetualnie wyliczac to stepend
ObsOperator.validate_input() ObsOperator.validate_input()
ObsOperator.run() #ObsOperator.run()
ObsOperator.save_data() #ObsOperator.save_data()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment