Commit 8cd65f81 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

changed information object to dictionaries

parent ce2244e4
......@@ -16,7 +16,7 @@ import datetime
################### Begin Class DaSystem ###################
class DaSystem(object):
class DaSystem(dict):
""" Information on the data assimilation system used. This is normally an rc-file with settings.
"""
......@@ -45,7 +45,8 @@ class DaSystem(object):
"""
import da.tools.rc as rc
self.da_settings = rc.read(RcFileName)
for k,v in rc.read(RcFileName).iteritems():
self[k] = v
self.RcFileName = RcFileName
self.DaRcLoaded = True
......@@ -53,6 +54,10 @@ class DaSystem(object):
return True
def Initialize(self ):
"""
Initialize the object
"""
def Validate(self ):
"""
......@@ -61,13 +66,13 @@ class DaSystem(object):
needed_rc_items={}
for k,v in self.da_settings.iteritems():
if v == 'True' : self.da_settings[k] = True
if v == 'False': self.da_settings[k] = False
for k,v in self.iteritems():
if v == 'True' : self[k] = True
if v == 'False': self[k] = False
for key in needed_rc_items:
if not self.da_settings.has_key(key):
if not self.has_key(key):
status,msg = ( False,'Missing a required value in rc-file : %s' % key)
logging.error(msg)
raise IOError,msg
......
......@@ -33,7 +33,7 @@ import da.tools.rc as rc
class CycleControl(object):
class CycleControl(dict):
"""
This object controls a data assimilation cycle. It is created using information from a da rc-file with settings, and
also using the command line options passed to the job control script that runs in the foreground, or through the queue.
......@@ -52,11 +52,11 @@ class CycleControl(object):
# Add some useful variables to the rc-file dictionary
self.da_settings['jobrcfilename'] = self.RcFileName
self.da_settings['dir.da_submit'] = os.getcwd()
self.da_settings['da.crash.recover'] = '-r' in opts
self.da_settings['verbose'] = '-v' in opts
self.DaSystem = None # to be filled later
self['jobrcfilename'] = self.RcFileName
self['dir.da_submit'] = os.getcwd()
self['da.crash.recover'] = '-r' in opts
self['verbose'] = '-v' in opts
self.DaSystem = None # to be filled later
def __str__(self):
......@@ -66,9 +66,9 @@ class CycleControl(object):
msg = "===============================================================" ; print msg
msg = "DA Cycle rc-file is %s" % self.RcFileName ; print msg
msg = "DA Cycle run directory is %s" % self.da_settings['dir.da_run'] ; print msg
msg = "DA Cycle inverse system is %s" % self.da_settings['da.system'] ; print msg
msg = "DA Cycle forecast model is %s" % self.da_settings['forecast.model'] ; print msg
msg = "DA Cycle run directory is %s" % self['dir.da_run'] ; print msg
msg = "DA Cycle inverse system is %s" % self['da.system'] ; print msg
msg = "DA Cycle forecast model is %s" % self['forecast.model'] ; print msg
msg = "===============================================================" ; print msg
return ""
......@@ -79,7 +79,9 @@ class CycleControl(object):
This method loads a DA Cycle rc-file with settings for this simulation
"""
self.da_settings = rc.read(RcFileName)
rcdata = rc.read(RcFileName)
for k,v in rcdata.iteritems():
self[k] = v
self.RcFileName = RcFileName
self.DaRcLoaded = True
......@@ -94,20 +96,20 @@ class CycleControl(object):
"""
from da.tools.general import ToDatetime
for k,v in self.da_settings.iteritems():
if v == 'True' : self.da_settings[k] = True
if v == 'False': self.da_settings[k] = False
if 'date' in k : self.da_settings[k] = ToDatetime(v)
for k,v in self.iteritems():
if v == 'True' : self[k] = True
if v == 'False': self[k] = False
if 'date' in k : self[k] = ToDatetime(v)
if 'time.start' in k :
self.da_settings[k] = ToDatetime(v)
self[k] = ToDatetime(v)
if 'time.end' in k :
self.da_settings[k] = ToDatetime(v)
self[k] = ToDatetime(v)
if 'time.finish' in k :
self.da_settings[k] = ToDatetime(v)
self[k] = ToDatetime(v)
for key in needed_da_items:
if not self.da_settings.has_key(key):
if not self.has_key(key):
status,msg = ( False,'Missing a required value in rc-file : %s' % key)
logging.error(msg)
raise IOError,msg
......@@ -122,15 +124,15 @@ class CycleControl(object):
"""
from da.tools.general import AdvanceTime
startdate = self.da_settings['time.start']
finaldate = self.da_settings['time.finish']
startdate = self['time.start']
finaldate = self['time.finish']
if finaldate <= startdate:
msg = 'The start date (%s) is not greater than the end date (%s), please revise'%(startdate.strftime('%Y%m%d'),finaldate.strftime('%Y%m%d'))
logging.error(msg)
raise ValueError
#
cyclelength = self.da_settings['time.cycle'] # get time step
cyclelength = self['time.cycle'] # get time step
# Determine end date
......@@ -143,17 +145,17 @@ class CycleControl(object):
if enddate > finaldate: # do not run beyon finaldate
enddate = finaldate
self.da_settings['time.start'] = startdate
self.da_settings['time.end'] = enddate
self.da_settings['time.finish'] = finaldate
self.da_settings['cyclelength'] = cyclelength
self['time.start'] = startdate
self['time.end'] = enddate
self['time.finish'] = finaldate
self['cyclelength'] = cyclelength
msg = "===============================================================" ; logging.info(msg)
msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg)
msg = "DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg)
msg = "DA Cycle final date is %s" % finaldate.strftime('%Y-%m-%d %H:%M') ; logging.info(msg)
msg = "DA Cycle cycle length is %s" % cyclelength ; logging.info(msg)
msg = "DA Cycle restart is %s" % str(self.da_settings['time.restart']) ; logging.info(msg)
msg = "DA Cycle restart is %s" % str(self['time.restart']) ; logging.info(msg)
msg = "===============================================================" ; logging.info(msg)
return None
......@@ -163,7 +165,7 @@ class CycleControl(object):
import cPickle
import numpy as np
filename = os.path.join(self.da_settings['dir.save'],'randomseed.pickle')
filename = os.path.join(self['dir.save'],'randomseed.pickle')
if action == 'write':
f = open(filename,'wb')
......@@ -196,9 +198,9 @@ class CycleControl(object):
#
# case 1: A recover from a previous crash, this is signaled by flag "-r"
#
if self.da_settings['da.crash.recover']:
if self['da.crash.recover']:
msg = "Recovering simulation from data in: %s" % self.da_settings['dir.da_run'] ; logging.info(msg)
msg = "Recovering simulation from data in: %s" % self['dir.da_run'] ; logging.info(msg)
dummy = self.SetupFileStructure()
......@@ -208,7 +210,7 @@ class CycleControl(object):
#
# case 2: A continuation, this is signaled by rc-item time.restart = True
#
elif self.da_settings['time.restart']:
elif self['time.restart']:
msg = "Restarting filter from previous step" ; logging.info(msg)
......@@ -218,18 +220,18 @@ class CycleControl(object):
#
# case 3: A fresh start, this is signaled by rc-item time.restart = False
#
elif not self.da_settings['time.restart']:
elif not self['time.restart']:
msg = "First time step in filter sequence" ; logging.info(msg)
dummy = self.SetupFileStructure()
# expand jobrcfilename to include exec dir from now on.
self.da_settings['jobrcfilename'] = os.path.join(self.da_settings['dir.exec'],self.da_settings['jobrcfilename'])
self['jobrcfilename'] = os.path.join(self['dir.exec'],self['jobrcfilename'])
self.ParseTimes()
self.WriteRC(self.da_settings['jobrcfilename'])
self.WriteRC(self['jobrcfilename'])
return None
......@@ -259,26 +261,26 @@ class CycleControl(object):
# Create the run directory for this DA job, including I/O structure
filtertime = self.da_settings['time.start'].strftime('%Y%m%d')
self.da_settings['dir.exec'] = os.path.join(self.da_settings['dir.da_run'],'exec')
self.da_settings['dir.input'] = os.path.join(self.da_settings['dir.da_run'],'input')
self.da_settings['dir.output'] = os.path.join(self.da_settings['dir.da_run'],'output',filtertime)
self.da_settings['dir.diagnostics'] = os.path.join(self.da_settings['dir.da_run'],'diagnostics')
self.da_settings['dir.analysis'] = os.path.join(self.da_settings['dir.da_run'],'analysis')
self.da_settings['dir.jobs'] = os.path.join(self.da_settings['dir.da_run'],'jobs')
self.da_settings['dir.save'] = os.path.join(self.da_settings['dir.da_run'],'save')
CreateDirs(self.da_settings['dir.da_run'])
CreateDirs(os.path.join(self.da_settings['dir.exec']))
CreateDirs(os.path.join(self.da_settings['dir.input']))
CreateDirs(os.path.join(self.da_settings['dir.output']))
CreateDirs(os.path.join(self.da_settings['dir.diagnostics']))
CreateDirs(os.path.join(self.da_settings['dir.analysis']))
CreateDirs(os.path.join(self.da_settings['dir.jobs']))
CreateDirs(os.path.join(self.da_settings['dir.save']))
CreateDirs(os.path.join(self.da_settings['dir.save'],'one-ago'))
CreateDirs(os.path.join(self.da_settings['dir.save'],'two-ago'))
filtertime = self['time.start'].strftime('%Y%m%d')
self['dir.exec'] = os.path.join(self['dir.da_run'],'exec')
self['dir.input'] = os.path.join(self['dir.da_run'],'input')
self['dir.output'] = os.path.join(self['dir.da_run'],'output',filtertime)
self['dir.diagnostics'] = os.path.join(self['dir.da_run'],'diagnostics')
self['dir.analysis'] = os.path.join(self['dir.da_run'],'analysis')
self['dir.jobs'] = os.path.join(self['dir.da_run'],'jobs')
self['dir.save'] = os.path.join(self['dir.da_run'],'save')
CreateDirs(self['dir.da_run'])
CreateDirs(os.path.join(self['dir.exec']))
CreateDirs(os.path.join(self['dir.input']))
CreateDirs(os.path.join(self['dir.output']))
CreateDirs(os.path.join(self['dir.diagnostics']))
CreateDirs(os.path.join(self['dir.analysis']))
CreateDirs(os.path.join(self['dir.jobs']))
CreateDirs(os.path.join(self['dir.save']))
CreateDirs(os.path.join(self['dir.save'],'one-ago'))
CreateDirs(os.path.join(self['dir.save'],'two-ago'))
msg = 'Succesfully created the file structure for the assimilation job' ; logging.info(msg)
......@@ -298,16 +300,16 @@ class CycleControl(object):
# Replace rc-items with those from the crashed run last rc-file
file_rc_rec = os.path.join(self.da_settings['dir.save'],'da_runtime.rc')
file_rc_rec = os.path.join(self['dir.save'],'da_runtime.rc')
rc_rec = rc.read(file_rc_rec)
for k,v in rc_rec.iteritems():
self.da_settings[k] = v
self[k] = v
self.ValidateRC()
msg = "Replaced rc-items.... " ; logging.debug(msg)
msg = "Next cycle start date is %s" % self.da_settings['time.start'] ; logging.debug(msg)
msg = "Next cycle start date is %s" % self['time.start'] ; logging.debug(msg)
return None
......@@ -336,14 +338,14 @@ class CycleControl(object):
"""
filtertime = self.da_settings['time.start'].strftime('%Y%m%d')
filtertime = self['time.start'].strftime('%Y%m%d')
file1 = os.path.join(self.da_settings['dir.exec'],'da_runtime.rc')
file2 = os.path.join(self.da_settings['dir.output'],'savestate.nc')
file1 = os.path.join(self['dir.exec'],'da_runtime.rc')
file2 = os.path.join(self['dir.output'],'savestate.nc')
FilesToSave = [file1,file2]
targetdir = os.path.join(self.da_settings['dir.save'])
targetdir = os.path.join(self['dir.save'])
msg = "Collecting the required restart/save data" ; logging.info(msg)
msg = " to directory: %s " % targetdir ; logging.debug(msg)
......@@ -381,9 +383,9 @@ class CycleControl(object):
raise ValueError,'Invalid option specified for save_option (%s)' % save_option
savedir = os.path.join(self.da_settings['dir.save'])
oneagodir = os.path.join(self.da_settings['dir.save'],'one-ago')
tempdir = os.path.join(self.da_settings['dir.save'],'tmp')
savedir = os.path.join(self['dir.save'])
oneagodir = os.path.join(self['dir.save'],'one-ago')
tempdir = os.path.join(self['dir.save'],'tmp')
if save_option == 'partial': # save background data to tmp dir
......@@ -463,18 +465,18 @@ class CycleControl(object):
# These first two lines advance the filter time for the next cycle
self.da_settings['time.start'] = self.da_settings['time.end']
self.da_settings['time.end'] = AdvanceTime(self.da_settings['time.end'],self.da_settings['cyclelength'])
self['time.start'] = self['time.end']
self['time.end'] = AdvanceTime(self['time.end'],self['cyclelength'])
# The rest is info needed for a system restart
self.da_settings['time.restart'] = True
#self.da_settings['time.start'] = self.da_settings['time.start'].strftime('%Y-%m-%d %H:%M:%S')
#self.da_settings['time.finish'] = self.da_settings['time.finish'].strftime('%Y-%m-%d %H:%M:%S')
self['time.restart'] = True
#self['time.start'] = self['time.start'].strftime('%Y-%m-%d %H:%M:%S')
#self['time.finish'] = self['time.finish'].strftime('%Y-%m-%d %H:%M:%S')
fname = os.path.join(self.da_settings['dir.exec'],'da_runtime.rc')
self.da_settings['da.restart.fname'] = fname
dummy = rc.write(fname,self.da_settings)
fname = os.path.join(self['dir.exec'],'da_runtime.rc')
self['da.restart.fname'] = fname
dummy = rc.write(fname,self)
msg = 'Wrote new da_runtime.rc (%s)'%fname ; logging.debug(msg)
......@@ -482,7 +484,7 @@ class CycleControl(object):
def WriteRC(self,fname):
""" Write RC file after each process to reflect updated info """
dummy = rc.write(fname,self.da_settings)
dummy = rc.write(fname,self)
msg = 'Wrote expanded rc-file (%s)'%(fname) ; logging.debug(msg)
return None
......@@ -497,14 +499,14 @@ class CycleControl(object):
DaPlatForm = self.DaPlatForm
if self.da_settings['time.start'] < self.da_settings['time.finish']:
if self['time.start'] < self['time.finish']:
jobparams = {'jobname':'das','jobaccount':'co2','jobnodes':'nserial 1','jobtime':'00:30:00','jobshell':'/bin/sh'}
template = DaPlatForm.GetJobTemplate(jobparams)
template += 'cd %s\n'%os.getcwd()
template += './das.py rc=%s'% self.da_settings['da.restart.fname']
template += './das.py rc=%s'% self['da.restart.fname']
cd = self.da_settings['time.end'].strftime('%Y%m%d')
cd = self['time.end'].strftime('%Y%m%d')
jobfile = DaPlatForm.WriteJob(self,template,'cycle.%s.das'%cd)
jobid = DaPlatForm.SubmitJob(jobfile)
else:
......@@ -524,7 +526,7 @@ class CycleControl(object):
jobparams = {'jobname':'das.%s'%stepname}
template = DaPlatForm.GetJobTemplate(jobparams)
template += 'cd %s\n'%os.getcwd()
template += './das.py rc=%s process=%s %s' % (self.da_settings['jobrcfilename'],stepname,join(self.opts,''),)
template += './das.py rc=%s process=%s %s' % (self['jobrcfilename'],stepname,join(self.opts,''),)
jobfile = DaPlatForm.WriteJob(self,template,stepname)
jobid = DaPlatForm.SubmitJob(jobfile)
......@@ -613,11 +615,11 @@ def CleanUpCycle(DaCycle):
"""
# move log file to rundir/jobs
jobdir = os.path.join(DaCycle.da_settings['dir.da_run'],"jobs")
jobdir = os.path.join(DaCycle['dir.da_run'],"jobs")
logfile = 'das.o%s'%DaCycle.DaPlatForm.GetMyID()
if os.path.exists(logfile):
joblogfile = os.path.join(jobdir,'cycle.%s.log'%DaCycle.da_settings['time.start'].strftime('%Y%m%d'))
joblogfile = os.path.join(jobdir,'cycle.%s.log'%DaCycle['time.start'].strftime('%Y%m%d'))
dummy = shutil.move(logfile,joblogfile)
msg = "....Moved %s to %s"%(logfile,joblogfile) ; logging.debug(msg)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment