Commit 1cb3194d authored by karolina's avatar karolina
Browse files

write_members_to_file: is called with _lag_ instead of lag +1, as inside there...

write_members_to_file: is called with _lag_ instead of lag +1, as inside there was a "lag-1" call... that one removed from inside as well

write_sample_info changed to take the whole path as a parameter: so now we explicitly see the path in pipeline, plus we don't need to return it back.
same for write_obs_to_file

managed imports in obs files

removed a need of using samples.DaCycle=DaCycle (now it is only passed in the "initialize" function)
parent e3f2a894
......@@ -35,7 +35,7 @@ The full baseclass description:
"""
import logging
import da.tools.rc as rc
################### Begin Class DaSystem ###################
class DaSystem(dict):
......@@ -58,7 +58,6 @@ class DaSystem(dict):
"""
This method loads a DA System Info rc-file with settings for this simulation
"""
import da.tools.rc as rc
for k, v in rc.read(RcFileName).iteritems():
self[k] = v
......
......@@ -45,7 +45,7 @@ class Observation(object):
"""
def __init__(self, DaCycle=None):
def __init__(self, DaCycle=None):#LU to sie nigdy nie wywola, prawda?
"""
create an object with an identifier, version, and an empty ObservationList
"""
......@@ -66,7 +66,7 @@ class Observation(object):
def getlength(self):
return len(self.datalist)
def initialize(self):
def initialize(self, cycleparams):
""" Perform all steps needed to start working with observational data, this can include moving data, concatenating files,
selecting datasets, etc.
"""
......
......@@ -422,7 +422,7 @@ class StateVector(object):
#import da.tools.io as io
#import da.tools.io4 as io
members = self.EnsembleMembers[lag - 1]
members = self.EnsembleMembers[lag]
for mem in members:
filename = os.path.join(outdir, 'parameters.%03d.nc' % mem.membernumber)
......
......@@ -12,6 +12,9 @@ import os
import sys
import logging
#from da.baseclasses.statevector import filename
import datetime as dtm
from string import strip
from numpy import array, logical_and
sys.path.append(os.getcwd())
sys.path.append('../../')
......@@ -20,21 +23,23 @@ identifier = 'CarbonTracker CO2 mixing ratios'
version = '0.0'
from da.baseclasses.obs import Observation
import da.tools.io4 as io
import da.tools.rc as rc
################### Begin Class CtObservations ###################
class CtObservations(Observation):
""" an object that holds data + methods and attributes needed to manipulate mixing ratio values """
def initialize(self):
self.startdate = self.DaCycle['time.sample.start']
self.enddate = self.DaCycle['time.sample.end']
def initialize(self, DaCycle):
self.startdate = DaCycle['time.sample.start']
self.enddate = DaCycle['time.sample.end']
sfname = self.DaCycle.DaSystem['obs.input.fname']
sfname = DaCycle.DaSystem['obs.input.fname']
if sfname.endswith('.nc'):
filename = os.path.join(self.DaCycle.DaSystem['obs.input.dir'], sfname)
filename = os.path.join(DaCycle.DaSystem['obs.input.dir'], sfname)
else:
filename = os.path.join(self.DaCycle.DaSystem['obs.input.dir'], sfname + '.' + self.startdate.strftime('%Y%m%d') + '.nc')
filename = os.path.join(DaCycle.DaSystem['obs.input.dir'], sfname + '.' + self.startdate.strftime('%Y%m%d') + '.nc')
if not os.path.exists(filename):
msg = 'Could not find the required observation input file (%s) ' % filename
......@@ -56,11 +61,6 @@ class CtObservations(Observation):
For now, we will stick with option (1)
"""
import da.tools.io4 as io
import datetime as dtm
from string import strip
from numpy import array, logical_and
ncf = io.CT_Read(self.ObsFilename, 'read')
idates = ncf.get_variable('date_components')
dates = array([dtm.datetime(*d) for d in idates])
......@@ -100,7 +100,6 @@ class CtObservations(Observation):
def add_simulations(self, filename, silent=True):
""" Adds model simulated values to the mixing ratio objects """
import da.tools.io4 as io
if not os.path.exists(filename):
msg = "Sample output filename for observations could not be found : %s" % filename
......@@ -136,15 +135,11 @@ class CtObservations(Observation):
logging.debug("Added %d simulated values to the Data list" % (len(ids) - len(missing_samples)))
def write_sample_info(self):
def write_sample_info(self, obsinputfile):
"""
Write the information needed by the observation operator to a file. Return the filename that was written for later use
"""
import da.tools.io4 as io
obsinputfile = os.path.join(self.DaCycle['dir.input'], 'observations_%s.nc' % self.DaCycle['time.sample.stamp'])
f = io.CT_CDF(obsinputfile, method='create')
logging.debug('Creating new observations file for ObservationOperator (%s)' % obsinputfile)
......@@ -154,7 +149,7 @@ class CtObservations(Observation):
if len(self.datalist) == 0:
f.close()
return obsinputfile
#return obsinputfile
data = self.getvalues('id')
......@@ -238,7 +233,7 @@ class CtObservations(Observation):
logging.debug("Successfully wrote data to obs file")
logging.info("Sample input file for obs operator now in place [%s]" % obsinputfile)
return obsinputfile
def add_model_data_mismatch(self, filename):
......@@ -251,7 +246,7 @@ class CtObservations(Observation):
(4) Take care of double sites, etc
"""
import da.tools.rc as rc
......@@ -314,15 +309,11 @@ class CtObservations(Observation):
self.SiteInfo = SiteInfo
def write_obs_to_file(self, outdir, timestamp):
def write_obs_to_file(self, outfile):
"""
Write selected information contained in the Observation object to a file.
"""
import da.tools.io4 as io
outfile = os.path.join(self.DaCycle['dir.output'], 'sampleinfo_%s.nc' % (self.DaCycle['time.sample.stamp']))
outfile = os.path.join(outdir, 'sampleinfo_%s.nc' % timestamp)
f = io.CT_CDF(outfile, method='create')
logging.debug('Creating new Sample output file for postprocessing (%s)' % outfile)
......@@ -333,7 +324,7 @@ class CtObservations(Observation):
if len(self.datalist) == 0:
f.close()
return outfile
#return outfile
data = self.getvalues('id')
......@@ -421,7 +412,7 @@ class CtObservations(Observation):
logging.debug("Successfully wrote data to Sample output file (%s)" % outfile)
return outfile
#return outfile
################### End Class CtObservations ###################
......
......@@ -19,8 +19,9 @@ sys.path.append(os.getcwd())
sys.path.append('../../')
import da.tools.io4 as io
import da.tools.rc as rc
from da.baseclasses.obs import Observation
from da.tools.general import name_convert
identifier = 'CarbonTracker CO2 mixing ratios'
version = '0.0'
......@@ -32,13 +33,13 @@ version = '0.0'
class ObsPackObservations(Observation):
""" an object that holds data + methods and attributes needed to manipulate mixing ratio values """
def initialize(self, start, end, params):
def initialize(self, DaCycle):
self.startdate = start
self.enddate = end
self.startdate = DaCycle['time.sample.start']
self.enddate = DaCycle['time.sample.end']
op_id = params['obspack.input.id']
op_dir = params['obspack.input.dir']
op_id = DaCycle.DaSystem['obspack.input.id']
op_dir = DaCycle.DaSystem['obspack.input.dir']
if not os.path.exists(op_dir):
msg = 'Could not find the required ObsPack distribution (%s) ' % op_dir
......@@ -149,14 +150,12 @@ class ObsPackObservations(Observation):
logging.debug("Added %d simulated values to the Data list" % (len(ids) - len(missing_samples)))
def write_sample_info(self, dirinput, timestamp):
def write_sample_info(self, obsinputfile):
"""
Write the information needed by the observation operator to a file. Return the filename that was written for later use
"""
obsinputfile = os.path.join(dirinput, 'observations_%s.nc' % timestamp)
f = io.CT_CDF(obsinputfile, method='create')
logging.debug('Creating new observations file for ObservationOperator (%s)' % obsinputfile)
......@@ -167,7 +166,7 @@ class ObsPackObservations(Observation):
if len(self.datalist) == 0:
f.close()
return obsinputfile
#return obsinputfile
data = self.getvalues('id')
......@@ -251,8 +250,6 @@ class ObsPackObservations(Observation):
logging.debug("Successfully wrote data to obs file")
logging.info("Sample input file for obs operator now in place [%s]" % obsinputfile)
return obsinputfile
def add_model_data_mismatch(self, filename):
"""
......@@ -264,9 +261,6 @@ class ObsPackObservations(Observation):
(4) Take care of double sites, etc
"""
import da.tools.rc as rc
from da.tools.general import name_convert
if not os.path.exists(filename):
msg = 'Could not find the required sites.rc input file (%s) ' % filename
......@@ -340,14 +334,12 @@ class ObsPackObservations(Observation):
self.SiteInfo = SiteInfo
def write_obs_to_file(self, outdir, timestamp):
def write_obs_to_file(self, outfile):
"""
Write selected information contained in the Observation object to a file.
"""
outfile = os.path.join(outdir, 'sampleinfo_%s.nc' % timestamp)
f = io.CT_CDF(outfile, method='create')
logging.debug('Creating new Sample output file for postprocessing (%s)' % outfile)
......@@ -357,7 +349,7 @@ class ObsPackObservations(Observation):
if len(self.datalist) == 0:
f.close()
return outfile
#return outfile
data = self.getvalues('id')
......@@ -433,7 +425,7 @@ class ObsPackObservations(Observation):
f.close()
logging.debug("Successfully wrote data to Sample output file (%s)" % outfile)
return outfile
#return outfile
......
......@@ -11,6 +11,10 @@ File created on 28 Jul 2010.
import os
import sys
import logging
import datetime as dtm
from string import strip
from numpy import array, logical_and
sys.path.append(os.getcwd())
sys.path.append('../../')
......@@ -18,19 +22,20 @@ identifier = 'CarbonTracker CO2 mixing ratios'
version = '0.0'
from da.baseclasses.obs import Observation
import da.tools.io4 as io
import da.tools.rc as rc
################### Begin Class ObsPackObservations ###################
class ObsPackObservations(Observation):
""" an object that holds data + methods and attributes needed to manipulate mixing ratio values """
def initialize(self):
def initialize(self, DaCycle):
self.startdate = self.DaCycle['time.sample.start']
self.enddate = self.DaCycle['time.sample.end']
self.startdate = DaCycle['time.sample.start']
self.enddate = DaCycle['time.sample.end']
op_id = self.DaCycle.DaSystem['obspack.input.id']
op_dir = self.DaCycle.DaSystem['obspack.input.dir']
op_id = DaCycle.DaSystem['obspack.input.id']
op_dir = DaCycle.DaSystem['obspack.input.dir']
if not os.path.exists(op_dir):
msg = 'Could not find the required ObsPack distribution (%s) ' % op_dir
......@@ -49,10 +54,6 @@ class ObsPackObservations(Observation):
We will loop over all site files in the ObsPackage, and subset each to our needs
"""
import da.tools.io4 as io
import datetime as dtm
from string import strip
from numpy import array, logical_and
# Step 1: Read list of available site files in package
......@@ -108,7 +109,6 @@ class ObsPackObservations(Observation):
def add_simulations(self, filename, silent=False):
""" Adds model simulated values to the mixing ratio objects """
import da.tools.io4 as io
if not os.path.exists(filename):
msg = "Sample output filename for observations could not be found : %s" % filename
......@@ -144,14 +144,11 @@ class ObsPackObservations(Observation):
logging.debug("Added %d simulated values to the Data list" % (len(ids) - len(missing_samples)))
def write_sample_info(self):
def write_sample_info(self, obsinputfile):
"""
Write the information needed by the observation operator to a file. Return the filename that was written for later use
"""
import da.tools.io4 as io
obsinputfile = os.path.join(self.DaCycle['dir.input'], 'observations_%s.nc' % self.DaCycle['time.sample.stamp'])
f = io.CT_CDF(obsinputfile, method='create')
logging.debug('Creating new observations file for ObservationOperator (%s)' % obsinputfile)
......@@ -163,7 +160,7 @@ class ObsPackObservations(Observation):
if len(self.datalist) == 0:
f.close()
return obsinputfile
#return obsinputfile
for key, value in self.SiteMove.iteritems():
msg = "Site is moved by %3.2f degrees latitude and %3.2f degrees longitude" % value
......@@ -251,7 +248,7 @@ class ObsPackObservations(Observation):
logging.debug("Successfully wrote data to obs file")
logging.info("Sample input file for obs operator now in place [%s]" % obsinputfile)
return obsinputfile
def add_model_data_mismatch(self, filename):
"""
......@@ -262,10 +259,7 @@ class ObsPackObservations(Observation):
(3) Compare site list against data
(4) Take care of double sites, etc
"""
import da.tools.rc as rc
"""
if not os.path.exists(filename):
msg = 'Could not find the required sites.rc input file (%s) ' % filename
......@@ -360,16 +354,11 @@ class ObsPackObservations(Observation):
logging.debug("Added Model Data Mismatch to all samples ")
def write_obs_to_file(self, outdir, timestamp):
def write_obs_to_file(self, outfile):
"""
Write selected information contained in the Observation object to a file.
"""
import da.tools.io4 as io
outfile = os.path.join(self.DaCycle['dir.output'], 'sampleinfo_%s.nc' % (self.DaCycle['time.sample.stamp']))
outfile = os.path.join(outdir, 'sampleinfo_%s.nc' % timestamp)
f = io.CT_CDF(outfile, method='create')
logging.debug('Creating new Sample output file for postprocessing (%s)' % outfile)
......@@ -381,7 +370,7 @@ class ObsPackObservations(Observation):
if len(self.datalist) == 0:
f.close()
return outfile
#return outfile
for key, value in self.SiteMove.iteritems():
msg = "Site is moved by %3.2f degrees latitude and %3.2f degrees longitude" % value
......@@ -462,7 +451,7 @@ class ObsPackObservations(Observation):
logging.debug("Successfully wrote data to Sample output file (%s)" % outfile)
return outfile
#return outfile
......
......@@ -9,10 +9,8 @@ File created on 26 Aug 2010.
"""
import os
import sys
import logging
import datetime
################### Begin Class CtDaSystem ###################
......@@ -51,13 +49,12 @@ class CtGriddedDaSystem(DaSystem):
if v == 'False': self[k] = False
for key in needed_rc_items:
if not self.has_key(key):
status, msg = (False, 'Missing a required value in rc-file : %s' % key)
msg = 'Missing a required value in rc-file : %s' % key
logging.error(msg)
raise IOError, msg
status, msg = (True, 'DA System Info settings have been validated succesfully') ; logging.debug(msg)
logging.debug('DA System Info settings have been validated succesfully')
################### End Class CtDaSystem ###################
......
......@@ -90,14 +90,13 @@ class TM5ObservationOperator(ObservationOperator):
Execute all steps needed to prepare the ObsOperator for use inside CTDAS, only done at the very first cycle normally
"""
if self.DaCycle['time.restart'] == False :
logging.info('First time step, setting up and compiling the TM5 model before proceeding!')
# Modify the rc-file to reflect directory structure defined by CTDAS
NewItems = {'my.basedir': self.DaCycle['dir.exec']}
newitems = {'my.basedir': self.DaCycle['dir.exec']}
self.modify_rc(NewItems)
self.modify_rc(newitems)
# Create the TM5 run directory to hold a copy of the modified rc-file
......
......@@ -456,7 +456,7 @@ class CycleControl(dict):
"""
targetdir = os.path.join(self['dir.output'])
create_dirs(os.path.join(targetdir))
create_dirs(targetdir)
logging.info("Collecting the required output data")
logging.debug(" to directory: %s " % targetdir)
......@@ -501,7 +501,7 @@ class CycleControl(dict):
logging.info("Purging the current restart directory before collecting new data")
create_dirs(os.path.join(targetdir), forceclean=True)
create_dirs(targetdir, forceclean=True)
logging.info("Collecting the required restart data")
logging.debug(" to directory: %s " % targetdir)
......
......@@ -83,12 +83,12 @@ def forward_pipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOpera
def start_job(DaCycle, DaSystem, DaPlatForm, StateVector, Samples, ObsOperator):
""" Set up the job specific directory structure and create an expanded rc-file """
DaSystem.validate() #LU tylko sprawdza needed rc items in the file
DaCycle.DaSystem = DaSystem #LU przypisuje dacyclowi liste parametrow
DaCycle.DaPlatForm = DaPlatForm #LU przypisuje cyklowi platforme (tez liste parametrow)
DaSystem.validate() #LU tylko sprawdza needed rc items in the file, typu obserwacje, kowariancja
DaCycle.DaSystem = DaSystem #LU laczy te listy parametrow ale na zasadzie hierarchii
DaCycle.DaPlatForm = DaPlatForm #LU przypisuje cyklowi platforme
DaCycle.initialize() #LU setup file structure etc
StateVector.DaCycle = DaCycle # also embed object in StateVector so it can access cycle information for I/O etc #LU cykl zostaje przypisany state vectorowi
Samples.DaCycle = DaCycle # also embed object in Samples object so it can access cycle information for I/O etc #LU cykl zostaje przypisany probkom
#Samples.DaCycle = DaCycle # also embed object in Samples object so it can access cycle information for I/O etc #LU cykl zostaje przypisany probkom
ObsOperator.DaCycle = DaCycle # also embed object in ObsOperator object so it can access cycle information for I/O etc #LU cykl zostaje przypisany obsoperatorowi
ObsOperator.initialize() # Setup Observation Operator #LU a pote mobsoperator jest inicjalizowany
......@@ -120,8 +120,8 @@ def prepare_state(DaCycle, StateVector):
# Read the StateVector data from file
filename = os.path.join(DaCycle['dir.restart.current'], 'savestate.nc')
StateVector.read_from_file(filename) # by default will read "opt"(imized) variables, and then propagate
saved_sv = os.path.join(DaCycle['dir.restart.current'], 'savestate.nc')
StateVector.read_from_file(saved_sv) # by default will read "opt"(imized) variables, and then propagate
# Now propagate the ensemble by one cycle to prepare for the current cycle
......@@ -129,8 +129,8 @@ def prepare_state(DaCycle, StateVector):
# Finally, also write the StateVector to a file so that we can always access the a-priori information
filename = os.path.join(DaCycle['dir.output'], 'savestate.nc')
StateVector.write_to_file(filename, False) # write prior info because StateVector.Isoptimized == False for now
saved_current_sv = os.path.join(DaCycle['dir.output'], 'savestate.nc')
StateVector.write_to_file(saved_current_sv, False) # write prior info because StateVector.Isoptimized == False for now
def sample_state(DaCycle, Samples, StateVector, ObservationOperator):
""" Sample the filter state for the inversion """
......@@ -182,19 +182,19 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag):
# Implement something that writes the ensemble member parameter info to file, or manipulates them further into the
# type of info needed in your transport model
StateVector.write_members_to_file(lag + 1, DaCycle['dir.input']) #LU parameters.nc
StateVector.write_members_to_file(lag, DaCycle['dir.input']) #LU parameters.nc to be an input for tm5
Samples.initialize() #LU to daje przede wszystkim zresetowanie data list. czyli to znaczy ze data list jest za kazdym razem nowa przy odpalaniu nowego cyklu
Samples.add_observations()
Samples.initialize(DaCycle) #LU to daje przede wszystkim zresetowanie data list. czyli to znaczy ze data list jest za kazdym razem nowa przy odpalaniu nowego cyklu
Samples.add_observations() #LU obserwacje mamy z pliku observations, tutaj dodajemy tylko ten ich fragment ktory zawiera dany przedzial czasowy (ale ladujemy wszystko)
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
Samples.add_model_data_mismatch(DaCycle.DaSystem['obs.sites.rc'])
filename = Samples.write_sample_info() #LU observations.nc
sampling_coords_file = os.path.join(DaCycle['dir.input'], 'observations_%s.nc' % DaCycle['time.sample.stamp'])
Samples.write_sample_info(sampling_coords_file) #LU observations.nc - for the tm5 to have sampling dates and times
# Write filename to DaCycle, and to output collection list
DaCycle['ObsOperator.inputfile'] = filename
DaCycle['ObsOperator.inputfile'] = sampling_coords_file
# Run the observation operator
......@@ -207,8 +207,8 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag):
# We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates
# one file per member, some logic needs to be included to merge all files!!!
filename = os.path.join(ObservationOperator.outputdir, 'flask_output.%s.nc' % DaCycle['time.sample.stamp'])
Samples.add_simulations(filename)
simulated_file = os.path.join(ObservationOperator.outputdir, 'flask_output.%s.nc' % DaCycle['time.sample.stamp'])
Samples.add_simulations(simulated_file)
# Now write a small file that holds for each observation a number of values that we need in postprocessing
......@@ -219,7 +219,9 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag):
# This is to make sure that the first step of the system uses all observations available, while the subsequent
# steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only
# (and at least) once # in the assimilation
#LU czyli ze zawsze zapisujemy obserwacje i robimy sampling i zawsze mamy potem flask_output. i naewt zawsze go potem wczytujemy i dodajemy do samples... hmm co?
#LU aha , samples maja zawsze zerowane data list i przypisany start i end. w samples.initialize() w niniejszej funkcji. i potem w zaleznosci od tego czy zaszly ponizsze warunki, dodajemy je do asymilacji badz nie
#LU jaki to ma sens: taki ze zawsze mamy flask_output.nc. tylko ze on jest chyba nadpisywany za kazdym cyklem? no coz.. chyba byla na ten temat dyskusja, i wyszlo ze for debugging. niech i tak zostanie.
if lag == int(DaCycle['time.nlag']) - 1 or DaCycle['time.restart'] == False:
StateVector.ObsToAssimmilate += (copy.deepcopy(Samples),)
StateVector.nobs += Samples.getlength()
......@@ -235,7 +237,7 @@ def invert(DaCycle, StateVector, Optimizer):
dims = (int(DaCycle['time.nlag']),
int(DaCycle['da.optimizer.nmembers']),
int(DaCycle.DaSystem['nparameters']),
StateVector.nobs)
StateVector.nobs)#LU to brane z informacji ile samples sie wzielo
if not DaCycle.DaSystem.has_key('opt.algorithm'):
logging.info("There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)")
......@@ -261,7 +263,7 @@ def invert(DaCycle, StateVector, Optimizer):
Optimizer.matrix_to_state(StateVector)
Optimizer.write_diagnostics(DaCycle, StateVector, type='optimized')
StateVector.isOptimized = True
def advance(DaCycle, Samples, StateVector, ObservationOperator):
""" Advance the filter state to the next step """
......@@ -286,15 +288,15 @@ def advance(DaCycle, Samples, StateVector, ObservationOperator):
DaCycle.OutputFileList.append(DaCycle['ObsOperator.inputfile'])
logging.debug("Appended Observation filename to DaCycle for collection ")
Samples.write_obs_to_file(DaCycle['dir.output'], DaCycle['time.sample.stamp'])
outfile = os.path.join(DaCycle['dir.output'], 'sampleinfo_%s.nc' % (DaCycle['time.sample.stamp']))
Samples.write_obs_to_file(outfile)
def save_and_submit(DaCycle, StateVector):
""" Save the model state and submit the next job """
logging.info(header + "starting save_and_submit" + footer)
savedir = DaCycle['dir.output']
filename = os.path.join(savedir, 'savestate.nc') #LU to jest state vector po zoptymalizowaniu.
filename = os.path.join(DaCycle['dir.output'], 'savestate.nc') #LU to jest state vector po zoptymalizowaniu.
StateVector.write_to_file(filename, True)
DaCycle.RestartFileList.append(filename) # write optimized info because StateVector.Isoptimized == False for now
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment