Commit abd55518 authored by karolina's avatar karolina
Browse files

First portion of cleanup:

- reduced to one the two-command messages logging 
- removed unnecessary commas and brackets in tuples definition
- removed unused imports
- __str__ definition: changed print to return
- removed unused values attribution ("dummy = ...")
- formatted the code
- repaired bad indentation
- removed dummy returns ("return None")
- removed non-meaning body of baseclasses.optimizer.Localize()
parent 820e62ba
......@@ -34,10 +34,7 @@ The full baseclass description:
"""
import os
import sys
import logging
import datetime
################### Begin Class DaSystem ###################
......@@ -46,73 +43,61 @@ class DaSystem(dict):
Information on the data assimilation system used. This is normally an rc-file with settings.
"""
def __init__(self,rcfilename):
def __init__(self, rcfilename):
"""
Initialization occurs from passed rc-file name, items in the rc-file will be added
to the dictionary
"""
self.Identifier = 'CarbonTracker CO2' # the identifier gives the platform name
self.Identifier = 'CarbonTracker CO2' # the identifier gives the platform name
self.LoadRc(rcfilename)
msg = 'Data Assimilation System initialized: %s'%self.Identifier ; logging.debug(msg)
logging.debug("Data Assimilation System initialized: %s" % self.Identifier)
def __str__(self):
"""
String representation of a DaInfo object
"""
msg = "===============================================================" ; print msg
msg = "DA System Info rc-file is %s" % self.RcFileName ; print msg
msg = "===============================================================" ; print msg
return ""
msg = "===============================================================\n"
msg += "DA System Info rc-file is %s\n" % self.RcFileName
msg += "==============================================================="
return msg
def LoadRc(self,RcFileName):
def LoadRc(self, RcFileName):
"""
This method loads a DA System Info rc-file with settings for this simulation
"""
import da.tools.rc as rc
for k,v in rc.read(RcFileName).iteritems():
self[k] = v
self.RcFileName = RcFileName
self.DaRcLoaded = True
for k, v in rc.read(RcFileName).iteritems():
self[k] = v
self.RcFileName = RcFileName
self.DaRcLoaded = True
logging.debug("DA System Info rc-file (%s) loaded successfully" % self.RcFileName)
msg = 'DA System Info rc-file (%s) loaded successfully'%self.RcFileName ; logging.debug(msg)
return True
def Initialize(self ):
def Initialize(self):
"""
Initialize the object.
"""
def Validate(self ):
def Validate(self):
"""
Validate the contents of the rc-file given a dictionary of required keys
"""
needed_rc_items = {}
needed_rc_items={}
for k,v in self.iteritems():
for k, v in self.iteritems():
if v == 'True' : self[k] = True
if v == 'False': self[k] = False
for key in needed_rc_items:
if not self.has_key(key):
status,msg = ( False,'Missing a required value in rc-file : %s' % key)
msg = 'Missing a required value in rc-file : %s' % key
logging.error(msg)
raise IOError,msg
status,msg = ( True,'DA System Info settings have been validated succesfully' ) ; logging.debug(msg)
raise IOError, msg
logging.debug('DA System Info settings have been validated succesfully')
return None
################### End Class DaSystem ###################
......
......@@ -15,10 +15,8 @@ File created on 28 Jul 2010.
:members: __init__
"""
import os
import sys
import logging
import datetime
from numpy import array, ndarray
identifier = 'Observation baseclass'
......@@ -63,7 +61,7 @@ class Observation(object):
else:
self.DaCycle = {}
msg = 'Observation object initialized: %s'%self.Identifier ; logging.info(msg)
logging.info('Observation object initialized: %s'% self.Identifier)
def getid(self):
"""
......@@ -78,8 +76,7 @@ class Observation(object):
return identifier
def __str__(self):
""" Prints a list of Observation objects"""
return "This is a %s object, version %s"%(self.Identifier,self.Version)
return "This is a %s object, version %s" % (self.Identifier,self.Version)
def Initialize(self):
""" Perform all steps needed to start working with observational data, this can include moving data, concatenating files,
......
......@@ -10,13 +10,10 @@ File created on 30 Aug 2010.
"""
import os
import sys
import logging
import datetime
identifier = 'GeneralObservationOperator'
version = '0.0'
identifier = 'GeneralObservationOperator'
version = '0.0'
################### Begin Class ObservationOperator ###################
class ObservationOperator(object):
......@@ -31,17 +28,17 @@ class ObservationOperator(object):
"""
def __init__(self, RcFileName,DaCycle = None):
def __init__(self, RcFileName, DaCycle=None):
""" The instance of an ObservationOperator is application dependent """
self.Identifier = self.getid()
self.Version = self.getversion()
self.Version = self.getversion()
self.RestartFileList = []
self.outputdir = None # Needed for opening the samples.nc files created
dummy = self.LoadRc(RcFilename) # load the specified rc-file
dummy = self.ValidateRc() # validate the contents
self.LoadRc(RcFileName) # load the specified rc-file
self.ValidateRc() # validate the contents
msg = 'Observation Operator object initialized: %s'%self.Identifier ; logging.info(msg)
logging.info('Observation Operator object initialized: %s' % self.Identifier)
# The following code allows the object to be initialized with a DaCycle object already present. Otherwise, it can
# be added at a later moment.
......@@ -60,10 +57,8 @@ class ObservationOperator(object):
def GetInitialData(self):
""" This method places all initial data needed by an ObservationOperator in the proper folder for the model """
return None
def __str__(self):
return "This is a %s object, version %s"%(self.Identifier,self.Version)
return "This is a %s object, version %s" % (self.Identifier, self.Version)
def Initialize(self):
""" Perform all steps necessary to start the observation operator through a simple Run() call """
......
This diff is collapsed.
......@@ -21,12 +21,11 @@ Typically, every platform needs specific implementations of this object (through
"""
import sys
import os
import logging
import subprocess
std_joboptions={'jobname':'test','jobaccount':'co2','jobnodes':'nserial 1','jobshell':'/bin/sh','depends':'','jobtime':'00:30:00'}
std_joboptions = {'jobname':'test', 'jobaccount':'co2', 'jobnodes':'nserial 1', 'jobshell':'/bin/sh', 'depends':'', 'jobtime':'00:30:00'}
class PlatForm(object):
"""
......@@ -39,27 +38,23 @@ class PlatForm(object):
computer/user requires their own PlatForm object modifications, the init function is usually overwritten
in the specific implementation of this class
"""
self.Identifier = 'iPad' # the identifier gives the plaform name
self.Version = '1.0' # the platform version used
self.Identifier = 'iPad' # the identifier gives the plaform name
self.Version = '1.0' # the platform version used
msg1 = '%s object initialized'%self.Identifier ; logging.debug(msg1)
msg2 = '%s version: %s'%(self.Identifier,self.Version) ; logging.debug(msg2)
logging.debug('%s object initialized' % self.Identifier)
logging.debug('%s version: %s' % (self.Identifier, self.Version))
def __str__(self):
return self.Version
def ReturnBlockingFlag(self):
return ""
def ReturnQueueType(self):
return "foreground"
def GetJobTemplate(self,joboptions={},block=False):
def GetJobTemplate(self, joboptions={}, block=False):
"""
Returns the job template for a given computing system, and fill it with options from the dictionary provided as argument.
The job template should return the preamble of a job that can be submitted to a queue on your platform,
......@@ -77,56 +72,48 @@ class PlatForm(object):
job until the submitted job in this template has been completed fully.
"""
template = """## \n"""+ \
"""## This is a set of dummy names, to be replaced by values from the dictionary \n"""+ \
"""## Please make your own platform specific template with your own keys and place it in a subfolder of the da package.\n """+ \
"""## \n"""+ \
""" \n"""+ \
"""#$ jobname \n"""+ \
"""#$ jobaccount \n"""+ \
"""#$ jobnodes \n"""+ \
"""#$ jobtime \n"""+ \
template = """## \n""" + \
"""## This is a set of dummy names, to be replaced by values from the dictionary \n""" + \
"""## Please make your own platform specific template with your own keys and place it in a subfolder of the da package.\n """ + \
"""## \n""" + \
""" \n""" + \
"""#$ jobname \n""" + \
"""#$ jobaccount \n""" + \
"""#$ jobnodes \n""" + \
"""#$ jobtime \n""" + \
"""#$ jobshell \n """
if 'depends' in joboptions:
template += """#$ -hold_jid depends \n"""
# First replace from passed dictionary
for k,v in joboptions.iteritems():
for k, v in joboptions.iteritems():
while k in template:
template = template.replace(k,v)
template = template.replace(k, v)
# Fill remaining values with std_options
for k,v in std_joboptions.iteritems():
for k, v in std_joboptions.iteritems():
while k in template:
template = template.replace(k,v)
template = template.replace(k, v)
return template
def GetMyID(self):
""" Return the process ID, or job ID of the current process or job"""
return os.getpid()
def WriteJob(self,jobfile,template, jobid):
def WriteJob(self, jobfile, template, jobid):
"""
This method writes a jobfile to the exec dir and makes it executable (mod 477)
"""
#
# Done, write jobfile
#
f = open(jobfile,'w')
dummy = f.write(template)
dummy = f.close()
dummy = os.chmod(jobfile,477)
f = open(jobfile, 'w')
f.write(template)
f.close()
os.chmod(jobfile, 477)
logging.debug("A job file was created (%s)" % jobfile)
msg = "A job file was created (%s)"%jobfile ; logging.debug(msg)
return None
def SubmitJob(self,jobfile,joblog=None,block=False):
def SubmitJob(self, jobfile, joblog=None, block=False):
"""
:param jobfile: a string with the filename of a jobfile to run
:param joblog: a string with the filename of a logfile to write run output to
......@@ -135,45 +122,27 @@ class PlatForm(object):
This method submits a jobfile to the queue, and returns the job ID
"""
from string import join
cmd = ["sh",jobfile]
msg = "A new task will be started (%s)"%cmd ; logging.info(msg)
cmd = ["sh", jobfile]
logging.info("A new task will be started (%s)" % cmd)
if block:
jobid = subprocess.call(cmd)
else:
jobid = subprocess.Popen(cmd).pid
logging.info('Summary:')
logging.info('job script : %s' % jobfile)
logging.info('job log : %s' % joblog)
logging.info('To manage this process:')
logging.info(' # kill process:')
logging.info(' kill %i\n' % jobid)
# info ...
infotext = []
infotext.append( '\n' )
infotext.append( 'Summary:\n' )
infotext.append( '\n' )
infotext.append( 'job script : %s\n' % jobfile )
infotext.append( 'job log : %s\n' % joblog )
infotext.append( '\n')
infotext.append( 'To manage this process:\n' )
infotext.append( '\n' )
infotext.append( ' # kill process:\n' )
infotext.append( ' kill %i\n' % jobid )
infotext.append( ' \n' )
infotext.append( '\n' )
# write to log:
for line in infotext : logging.info( line.strip() )
return 0
def KillJob(self,jobid):
def KillJob(self, jobid):
""" This method kills a running job """
return None
def StatJob(self,jobid):
def StatJob(self, jobid):
""" This method gets the status of a running job """
import subprocess
output = subprocess.Popen(['qstat',jobid], stdout=subprocess.PIPE).communicate()[0] ; logging.info(output)
output = subprocess.Popen(['qstat', jobid], stdout=subprocess.PIPE).communicate()[0] ; logging.info(output)
return output
......
This diff is collapsed.
......@@ -9,10 +9,7 @@ File created on 26 Aug 2010.
"""
import os
import sys
import logging
import datetime
################### Begin Class CtDaSystem ###################
......@@ -21,7 +18,6 @@ from da.baseclasses.dasystem import DaSystem
class CtDaSystem(DaSystem):
""" Information on the data assimilation system used. This is normally an rc-file with settings.
"""
def Validate(self):
"""
Validate the contents of the rc-file given a dictionary of required keys
......@@ -38,19 +34,15 @@ class CtDaSystem(DaSystem):
'regtype']
for k,v in self.iteritems():
for k, v in self.iteritems():
if v == 'True' : self[k] = True
if v == 'False': self[k] = False
for key in needed_rc_items:
if not self.has_key(key):
status,msg = ( False,'Missing a required value in rc-file : %s' % key)
logging.warning(msg)
status,msg = ( True,'DA System Info settings have been validated succesfully' ) ; logging.debug(msg)
logging.warning('Missing a required value in rc-file : %s' % key)
logging.debug('DA System Info settings have been validated succesfully')
return None
################### End Class CtDaSystem ###################
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -11,9 +11,8 @@ File created on 28 Jul 2010.
import os
import sys
sys.path.append(os.getcwd())
import logging
import datetime
sys.path.append(os.getcwd())
from da.baseclasses.optimizer import Optimizer
......@@ -35,65 +34,58 @@ class CtOptimizer(Optimizer):
def getversion(self):
return version
def SetLocalization(self,type='None'):
def SetLocalization(self, loctype='None'):
""" determine which localization to use """
if type == 'CT2007':
if loctype == 'CT2007':
self.localization = True
self.localizetype = 'CT2007'
else:
self.localization = False
self.localizetype = 'None'
msg = "Current localization option is set to %s"%self.localizetype ; logging.info(msg)
logging.info("Current localization option is set to %s" % self.localizetype)
def Localize(self,n):
def Localize(self, n):
""" localize the Kalman Gain matrix """
import numpy as np
if not self.localization: return
if self.localizetype == 'CT2007':
tvalue=1.97591
if np.sqrt(self.R[n,n]) >= 1.5:
for r in range(self.nlag*self.nparams):
corr=np.corrcoef(self.HX_prime[n,:],self.X_prime[r,:].squeeze())[0,1]
prob=corr/np.sqrt((1.0-corr**2)/(self.nmembers-2))
tvalue = 1.97591
if np.sqrt(self.R[n, n]) >= 1.5:
for r in range(self.nlag * self.nparams):
corr = np.corrcoef(self.HX_prime[n, :], self.X_prime[r, :].squeeze())[0, 1]
prob = corr / np.sqrt((1.0 - corr ** 2) / (self.nmembers - 2))
if abs(prob) < tvalue:
self.KG[r,n]=0.0
self.KG[r, n] = 0.0
################### End Class CtOptimizer ###################
if __name__ == "__main__":
import os
import sys
from da.tools.initexit import StartLogger
from da.tools.pipeline import JobStart
sys.path.append(os.getcwd())
opts = ['-v']
args = {'rc':'da.rc','logfile':'da_initexit.log','jobrcfilename':'test.rc'}
args = {'rc':'da.rc', 'logfile':'da_initexit.log', 'jobrcfilename':'test.rc'}
StartLogger()
DaCycle = JobStart(opts,args)
DaCycle = JobStart(opts, args)
DaCycle.Initialize()
opt = CtOptimizer()
nobs = 100
dims = ( int(DaCycle['time.nlag']),
nobs = 100
dims = (int(DaCycle['time.nlag']),
int(DaCycle['da.optimizer.nmembers']),
int(DaCycle.DaSystem['nparameters']),
nobs, )
nobs,)
opt.Initialize(dims)
opt.SetLocalization(type='CT2007')
......@@ -14,12 +14,11 @@ import sys
sys.path.append(os.getcwd())
import logging
import datetime
from da.baseclasses.statevector import EnsembleMember, StateVector
from da.baseclasses.statevector import StateVector
import numpy as np
identifier = 'CarbonTracker Statevector '
version = '0.0'
version = '0.0'
################### Begin Class CtStateVector ###################
......@@ -32,7 +31,7 @@ class CtStateVector(StateVector):
def getversion(self):
return version
def GetCovariance(self,date):
def GetCovariance(self, date):
""" Make a new ensemble from specified matrices, the attribute lag refers to the position in the state vector.
Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below.
The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag]
......@@ -51,20 +50,18 @@ class CtStateVector(StateVector):
# replace YYYY.MM in the ocean covariance file string
file_ocn_cov = file_ocn_cov.replace('2000.01',date.strftime('%Y.%m'))
for file in [file_ocn_cov,file_bio_cov]:
file_ocn_cov = file_ocn_cov.replace('2000.01', date.strftime('%Y.%m'))
for file in [file_ocn_cov, file_bio_cov]:
if not os.path.exists(file):
msg = "Cannot find the specified file %s" % file ; logging.error(msg)
raise IOError,msg
msg = "Cannot find the specified file %s" % file
logging.error(msg)
raise IOError, msg
else:
logging.info("Using covariance file: %s" % file)
msg = "Using covariance file: %s" % file ; logging.info(msg)
f_ocn = io.CT_Read(file_ocn_cov,'read')
f_bio = io.CT_Read(file_bio_cov,'read')
f_ocn = io.CT_Read(file_ocn_cov, 'read')
f_bio = io.CT_Read(file_bio_cov, 'read')
cov_ocn = f_ocn.GetVariable('CORMAT')
if f_bio.variables.has_key('covariance'):
......@@ -72,21 +69,21 @@ class CtStateVector(StateVector):
else:
cov_bio = f_bio.GetVariable('qprior') # old CarbonTracker covariance files
dummy = f_ocn.close()
dummy = f_bio.close()
f_ocn.close()
f_bio.close()
dummy = logging.debug("Succesfully closed files after retrieving prior covariance matrices")
logging.debug("Succesfully closed files after retrieving prior covariance matrices")
# Once we have the matrices, we can start to make the full covariance matrix, and then decompose it
fullcov = np.zeros((self.nparams,self.nparams),float)
fullcov = np.zeros((self.nparams, self.nparams), float)
nocn = cov_ocn.shape[0]
nbio = cov_bio.shape[0]
nocn = cov_ocn.shape[0]
nbio = cov_bio.shape[0]
fullcov[0:nbio,0:nbio] = cov_bio
fullcov[nbio:nbio+nocn,nbio:nbio+nocn] = cov_ocn
fullcov[nocn+nbio,nocn+nbio] = 1.e-10
fullcov[0:nbio, 0:nbio] = cov_bio
fullcov[nbio:nbio + nocn, nbio:nbio + nocn] = cov_ocn
fullcov[nocn + nbio, nocn + nbio] = 1.e-10
try:
......@@ -94,7 +91,7 @@ class CtStateVector(StateVector):
plt.colorbar()
plt.savefig('fullcovariancematrix.png')
plt.close('all')
dummy = logging.debug("Covariance matrix visualized for inspection")
logging.debug("Covariance matrix visualized for inspection")
except:
pass
......@@ -105,21 +102,17 @@ class CtStateVector(StateVector):
if __name__ == "__main__":
import os
import sys
from da.tools.initexit import StartLogger
from da.tools.pipeline import JobStart
import numpy as np
sys.path.append(os.getcwd())