Commit ab4590d0 authored by karolina's avatar karolina
Browse files

function names changes, some minor changes

parent b0b1a255
......@@ -58,11 +58,11 @@ class ObservationOperator(object):
def Initialize(self):
""" Perform all steps necessary to start the observation operator through a simple Run() call """
def ValidateInput(self):
def validate_input(self):
""" Make sure that data needed for the ObservationOperator (such as observation input lists, or parameter files)
are present.
"""
def SaveData(self):
def save_data(self):
""" Write the data that is needed for a restart or recovery of the Observation Operator to the save directory """
......
......@@ -42,8 +42,6 @@ class Optimizer(object):
self.nobs = dims[3]
self.create_matrices()
return None
def create_matrices(self):
""" Create Matrix space needed in optimization routine """
import numpy as np
......
......@@ -54,7 +54,7 @@ class PlatForm(object):
def ReturnQueueType(self):
return "foreground"
def GetJobTemplate(self, joboptions={}, block=False):
def get_job_template(self, joboptions={}, block=False):
"""
Returns the job template for a given computing system, and fill it with options from the dictionary provided as argument.
The job template should return the preamble of a job that can be submitted to a queue on your platform,
......@@ -96,11 +96,11 @@ class PlatForm(object):
template = template.replace(k, v)
return template
def GetMyID(self):
def get_my_id(self):
""" Return the process ID, or job ID of the current process or job"""
return os.getpid()
def WriteJob(self, jobfile, template, jobid):
def write_job(self, jobfile, template, jobid):
"""
This method writes a jobfile to the exec dir and makes it executable (mod 477)
"""
......@@ -113,7 +113,7 @@ class PlatForm(object):
os.chmod(jobfile, 477)
logging.debug("A job file was created (%s)" % jobfile)
def SubmitJob(self, jobfile, joblog=None, block=False):
def submit_job(self, jobfile, joblog=None, block=False):
"""
:param jobfile: a string with the filename of a jobfile to run
:param joblog: a string with the filename of a logfile to write run output to
......@@ -137,10 +137,10 @@ class PlatForm(object):
logging.info(' kill %i\n' % jobid)
def KillJob(self, jobid):
def kill_job(self, jobid):
""" This method kills a running job """
def StatJob(self, jobid):
def job_stat(self, jobid):
""" This method gets the status of a running job """
output = subprocess.Popen(['qstat', jobid], stdout=subprocess.PIPE).communicate()[0] ; logging.info(output)
return output
......
......@@ -70,7 +70,7 @@ class CtObservations(Observation):
idates = ncf.GetVariable('date_components')
dates = array([dtm.datetime(*d) for d in idates])
subselect = logical_and(dates >= self.startdate , dates <= self.enddate).nonzero()[0]
subselect = logical_and(dates >= self.startdate, dates <= self.enddate).nonzero()[0]
dates = dates.take(subselect, axis=0)
......@@ -484,13 +484,19 @@ class MixingRatioList(list):
return result.squeeze()
else:
return result
def unflagged(self):
return MixingRatioSample([o for o in self if o.flag == 0])
l = [o for o in self if o.flag == 0]
print l
return MixingRatioSample(l)
def flagged(self):
return MixingRatioSample([o for o in self if o.flag != 0])
def selectsite(self, site='mlo'):
l = [o for o in self if o.code == site]
return MixingRatioSample(l)
def selecthours(self, hours=range(1, 24)):
l = [o for o in self if o.xdate.hour in hours]
return MixingRatioSample(l)
......@@ -510,12 +516,23 @@ if __name__ == "__main__":
obs = CtObservations()
DaCycle = JobStart(['-v'], {'rc':'da.rc'})
DaCycle['time.sample.start'] = datetime(2000, 1, 1)
DaCycle['time.sample.end'] = datetime(2000, 1, 2)
obs.Initialize()
obs.Validate()
obs.add_observations()
print(obs.Data.getvalues('obs'))
#DaCycle = JobStart(['-v'], {'rc':'da.rc'})
#DaCycle['time.sample.start'] = datetime(2000, 1, 1)
#DaCycle['time.sample.end'] = datetime(2000, 1, 2)
#obs.Initialize()
#obs.Validate()
#obs.add_observations()
#print(obs.Data.getvalues('obs'))
obs.Data = MixingRatioList([])
obs.Data.append(MixingRatioSample("10",datetime(2000,1,1)))
obs.Data.append(MixingRatioSample("20",datetime(2000,1,2)))
obs.Data.append(MixingRatioSample("30",datetime(2000,1,3)))
obs.Data.append(MixingRatioSample("40",datetime(2000,1,4)))
for d in obs.Data:
print d
new = obs.Data.unflagged()
print new
#new = obs.Data.selectsite("10")
#print new
\ No newline at end of file
......@@ -501,13 +501,17 @@ class MixingRatioList(list):
return result.squeeze()
else:
return result
def unflagged(self):
return MixingRatioSample([o for o in self if o.flag == 0])
def flagged(self):
return MixingRatioSample([o for o in self if o.flag != 0])
def selectsite(self, site='mlo'):
l = [o for o in self if o.code == site]
return MixingRatioSample(l)
def selecthours(self, hours=range(1, 24)):
l = [o for o in self if o.xdate.hour in hours]
return MixingRatioSample(l)
......
......@@ -514,13 +514,17 @@ class MixingRatioList(list):
return result.squeeze()
else:
return result
def unflagged(self):
return MixingRatioSample([o for o in self if o.flag == 0])
def flagged(self):
return MixingRatioSample([o for o in self if o.flag != 0])
def selectsite(self, site='mlo'):
l = [o for o in self if o.code == site]
return MixingRatioSample(l)
def selecthours(self, hours=range(1, 24)):
l = [o for o in self if o.xdate.hour in hours]
return MixingRatioSample(l)
......
......@@ -29,7 +29,6 @@ class CtGriddedDaSystem(DaSystem):
"""
self.Identifier = 'CarbonTracker Gridded CO2' # the identifier gives the platform name
self.LoadRc(rcfilename)
msg = 'Data Assimilation System initialized: %s'%self.Identifier ; logging.debug(msg)
......
......@@ -16,8 +16,7 @@ sys.path.append('../../')
import logging
import datetime
from da.baseclasses.statevector import EnsembleMember, StateVector
from da.baseclasses.statevector import StateVector
import numpy as np
identifier = 'CarbonTracker Gridded Statevector '
......@@ -202,13 +201,10 @@ class CtGriddedStateVector(StateVector):
if __name__ == "__main__":
import os
import sys
from da.tools.initexit import StartLogger , ParseOptions
from da.tools.pipeline import JobStart
from da.tools.initexit import CycleControl
from da.ctgridded.dasystem import CtGriddedDaSystem
import numpy as np
sys.path.append(os.getcwd())
sys.path.append('../../')
......
......@@ -52,7 +52,7 @@ class HuygensPlatForm(PlatForm):
def GetJobTemplate(self,joboptions={},block=False):
def get_job_template(self,joboptions={},block=False):
"""
Returns the job template for a given computing system, and fill it with options from the dictionary provided as argument.
The job template should return the preamble of a job that can be submitted to a queue on your platform,
......@@ -136,7 +136,7 @@ class HuygensPlatForm(PlatForm):
# return os.getpid()
#
# def SubmitJob(self,jobfile,joblog=None,block=False):
# def submit_job(self,jobfile,joblog=None,block=False):
# """ This method submits a jobfile to the queue, and returns the queue ID """
#
#
......@@ -154,7 +154,7 @@ class HuygensPlatForm(PlatForm):
def SubmitJob(self,jobfile,joblog=None,block=False):
def submit_job(self,jobfile,joblog=None,block=False):
""" This method submits a jobfile to the queue, and returns the queue ID """
......
......@@ -27,7 +27,7 @@ class JetPlatForm(PlatForm):
msg2 = '%s version: %s'%(self.Identifier,self.Version) ; logging.debug(msg2)
def GetJobTemplate(self,joboptions={},block=False):
def get_job_template(self,joboptions={},block=False):
""" Return the job template for a given computing system, and fill it with options from the dictionary provided as argument"""
template = """#$ -N jobname \n"""+ \
......@@ -59,13 +59,13 @@ class JetPlatForm(PlatForm):
return template
def GetMyID(self):
def get_my_id(self):
try:
return os.environ['JOB_ID']
except:
return os.getpid()
def SubmitJob(self,jobfile,joblog=None,block=False):
def submit_job(self,jobfile,joblog=None,block=False):
""" This method submits a jobfile to the queue, and returns the queue ID """
......@@ -77,14 +77,14 @@ class JetPlatForm(PlatForm):
return retcode
def KillJob(self,jobid):
def kill_job(self,jobid):
""" This method kills a running job """
output = subprocess.Popen(['qdel',jobid], stdout=subprocess.PIPE).communicate()[0] ; logging.info(output)
return output
def StatJob(self,jobid):
def job_stat(self,jobid):
""" This method gets the status of a running job """
import subprocess
......
......@@ -30,7 +30,7 @@ class MaunaloaPlatForm(PlatForm):
return "foreground"
def GetJobTemplate(self,joboptions={},block=False):
def get_job_template(self,joboptions={},block=False):
"""
Returns the job template for a given computing system, and fill it with options from the dictionary provided as argument.
The job template should return the preamble of a job that can be submitted to a queue on your platform,
......
......@@ -357,7 +357,7 @@ class TM5ObservationOperator(ObservationOperator):
logging.debug("Modified tm5_runtime.rc written (%s)" % tm5rcfilename)
def ValidateInput(self):
def validate_input(self):
"""
Make sure that parameter files are written to the TM5 inputdir, and that observation lists are present
"""
......@@ -518,14 +518,14 @@ class TM5ObservationOperator(ObservationOperator):
jobfile = os.path.join(targetdir, 'jb.%s.jb' % jobid)
logfile = jobfile.replace('.jb', '.log')
template = DaPlatForm.GetJobTemplate(jobparams, block=True)
template = DaPlatForm.get_job_template(jobparams, block=True)
template += 'cd %s\n' % targetdir
template += 'mpirun -np %d %s ./tm5.x\n' % (int(nthreads), mpi_shell_filename,)
DaPlatForm.WriteJob(jobfile, template, jobid)
DaPlatForm.write_job(jobfile, template, jobid)
logging.info('Submitting job at %s' % datetime.datetime.now())
code = DaPlatForm.SubmitJob(jobfile, joblog=jobfile)
code = DaPlatForm.submit_job(jobfile, joblog=jobfile)
logging.info('Resuming job at %s' % datetime.datetime.now())
if not os.path.exists(okfile):
......@@ -577,7 +577,7 @@ class TM5ObservationOperator(ObservationOperator):
return code
def SaveData(self):
def save_data(self):
""" Copy the TM5 recovery data from the outputdir to the TM5 savedir, also add the restart files to a list of names
that is used by the DaCycle object to collect restart data for the filter.
......@@ -680,7 +680,7 @@ if __name__ == "__main__":
tm.Setup()
tm.Initialize()
tm.Run()
tm.SaveData()
tm.save_data()
......
......@@ -40,8 +40,8 @@ like this:::
The most important method of the CycleControl object are listed below:
.. autoclass:: da.tools.initexit.CycleControl
:members: Initialize, Finalize, CollectRestartData, MoveRestartData,
SubmitNextCycle, CleanUpCycle, SetupFileStructure, RecoverRun, RandomSeed
:members: Initialize, Finalize, collect_restart_data, move_restart_data,
submit_next_cycle, setup_file_structure, recover_run, random_seed
Two important attributes of the CycleControl object are:
(1) DaSystem, an instance of a :ref:`dasystem`
......@@ -130,8 +130,6 @@ class CycleControl(dict):
logging.info('DA Cycle rc-file (%s) loaded successfully' % self.RcFileName)
return True
def ValidateRC(self):
"""
......@@ -203,7 +201,7 @@ class CycleControl(dict):
logging.info("===============================================================")
def SetSampleTimes(self, lag):
def set_sample_times(self, lag):
"""
Set the times over which a sampling interval will loop, depending on
the lag. Note that lag falls in the interval [0,nlag-1]
......@@ -217,10 +215,10 @@ class CycleControl(dict):
# Now advance depending on lag
for l in range(lag):
self.AdvanceSampleTimes()
self.advance_sample_times()
def AdvanceSampleTimes(self):
def advance_sample_times(self):
"""
Advance sampling start and end time by one cycle interval
"""
......@@ -257,7 +255,7 @@ class CycleControl(dict):
self['time.end'] = enddate
def RandomSeed(self, action='read'):
def random_seed(self, action='read'):
"""
Get the randomseed and save it, or read the random seed and set it. The seed is currently stored
in a python :mod:`pickle` file, residing in the ``exec`` directory
......@@ -305,19 +303,19 @@ class CycleControl(dict):
# the ``time.restart : True`` option in the da.rc file
The latter is automatically set if the filter submits the next cycle at the end of the current one,
through method :meth:`~da.tools.initexit.CycleControl.SubmitNextCycle`.
through method :meth:`~da.tools.initexit.CycleControl.submit_next_cycle`.
The specific call tree under each scenario is:
1. *Fresh Start*
* dummy = :meth:`~da.tools.initexit.CycleControl.SetupFileStructure()` <- Create directory tree
* :meth:`~da.tools.initexit.CycleControl.setup_file_structure()` <- Create directory tree
2. *Restart*
* dummy = :meth:`~da.tools.initexit.CycleControl.SetupFileStructure()`
* dummy = :meth:`~da.tools.initexit.CycleControl.RandomSeed` <- Read the random seed from file
* :meth:`~da.tools.initexit.CycleControl.setup_file_structure()`
* :meth:`~da.tools.initexit.CycleControl.random_seed` <- Read the random seed from file
3. *Recover*
* dummy = :meth:`~da.tools.initexit.CycleControl.SetupFileStructure()`
* dummy = :meth:`~da.tools.initexit.CycleControl.RecoverRun()` <- Recover files from restart/one-ago dir, reset ``time.start``
* dummy = :meth:`~da.tools.initexit.CycleControl.RandomSeed`
* :meth:`~da.tools.initexit.CycleControl.setup_file_structure()`
* :meth:`~da.tools.initexit.CycleControl.recover_run()` <- Recover files from restart/one-ago dir, reset ``time.start``
* :meth:`~da.tools.initexit.CycleControl.random_seed`
And is always followed by a call to
......@@ -329,24 +327,24 @@ class CycleControl(dict):
#
if self['da.crash.recover']:
logging.info("Recovering simulation from data in: %s" % self['dir.da_run'])
self.SetupFileStructure()
self.RecoverRun()
self.RandomSeed('read')
self.setup_file_structure()
self.recover_run()
self.random_seed('read')
#
# case 2: A continuation, this is signaled by rc-item time.restart = True
#
elif self['time.restart']:
logging.info("Restarting filter from previous step")
self.SetupFileStructure()
self.setup_file_structure()
strippedname = os.path.split(self['jobrcfilename'])[-1]
self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname)
self.RandomSeed('read')
self.random_seed('read')
#
# case 3: A fresh start, this is signaled by rc-item time.restart = False
#
elif not self['time.restart']:
logging.info("First time step in filter sequence")
self.SetupFileStructure()
self.setup_file_structure()
# expand jobrcfilename to include exec dir from now on.
# First strip current leading path from filename
......@@ -357,7 +355,7 @@ class CycleControl(dict):
self.ParseTimes()
self.WriteRC(self['jobrcfilename'])
def SetupFileStructure(self):
def setup_file_structure(self):
"""
Create file structure needed for data assimilation system.
In principle this looks like:
......@@ -409,13 +407,13 @@ class CycleControl(dict):
logging.info('Succesfully created the file structure for the assimilation job')
def RecoverRun(self):
def recover_run(self):
"""
Prepare a recovery from a crashed run. This consists of:
- copying all data from the restart/one-ago folder (:meth:`~da.tools.initexit.CycleControl.MoveRestartData`),
- copying all data from the restart/one-ago folder (:meth:`~da.tools.initexit.CycleControl.move_restart_data`),
- replacing all ``rc-file`` items with those from the ``da_runtime.rc`` in the restart/current dir
- resetting the seed of the random number generator to the value it had before the crash (:meth:`~da.tools.initexit.CycleControl.RandomSeed`)
- resetting the seed of the random number generator to the value it had before the crash (:meth:`~da.tools.initexit.CycleControl.random_seed`)
- replacing the output dir name, since it has the sample time in it...
"""
......@@ -459,14 +457,14 @@ class CycleControl(dict):
* Submit the next cycle
"""
self.RandomSeed('write')
self.WriteNewRCfile()
self.MoveRestartData(io_option='store') # Move restart data from current to one-ago
self.CollectRestartData() # Collect restart data for next cycle into a clean restart/current folder
self.CollectOutput() # Collect restart data for next cycle into a clean restart/current folder
self.SubmitNextCycle()
def CollectOutput(self):
self.random_seed('write')
self.write_new_rc_file()
self.move_restart_data(io_option='store') # Move restart data from current to one-ago
self.collect_restart_data() # Collect restart data for next cycle into a clean restart/current folder
self.collect_output() # Collect restart data for next cycle into a clean restart/current folder
self.submit_next_cycle()
def collect_output(self):
""" Collect files that are part of the requested output for this cycle. This function allows users to add files
to a list, and then the system will copy these to the current cycle's output directory.
The list of files included is read from the
......@@ -495,7 +493,7 @@ class CycleControl(dict):
def CollectRestartData(self):
def collect_restart_data(self):
""" Collect files needed for the restart of this cycle in case of a crash, or for the continuation of the next cycle.
All files needed are written to the restart/current directory. The list of files included is read from the
attribute "RestartFileList" which is a simple list of files that can be appended by other objects/methods that
......@@ -539,7 +537,7 @@ class CycleControl(dict):
shutil.copy(file, file.replace(os.path.split(file)[0], targetdir))
def MoveRestartData(self, io_option='restore'):
def move_restart_data(self, io_option='restore'):
"""
Store or restore model state to/from a restart directory.
......@@ -587,7 +585,7 @@ class CycleControl(dict):
shutil.copy(file, file.replace(sourcedir, targetdir))
#
def WriteNewRCfile(self):
def write_new_rc_file(self):
""" Write the rc-file for the next DA cycle.
.. note:: The start time for the next cycle is the end time of this one, while
......@@ -627,7 +625,7 @@ class CycleControl(dict):
logging.debug('Wrote expanded rc-file (%s)' % (fname))
def SubmitNextCycle(self):
def submit_next_cycle(self):
"""
Submit the next job of a DA cycle, this consists of
* Changing to the working directory from which the job was started initially
......@@ -651,13 +649,13 @@ class CycleControl(dict):
# Template and commands for job
jobparams = {'jobname':"j.%s" % jobid, 'jobtime':'06:00:00', 'logfile':logfile, 'errfile':logfile}
template = DaPlatForm.GetJobTemplate(jobparams)
template = DaPlatForm.get_job_template(jobparams)
execcommand = os.path.join(self['dir.da_submit'], sys.argv[0])
template += 'python %s rc=%s %s' % (execcommand, self['da.restart.fname'], join(self.opts, ''),)
# write and submit
DaPlatForm.WriteJob(jobfile, template, jobid)
jobid = DaPlatForm.SubmitJob(jobfile, joblog=logfile)
DaPlatForm.write_job(jobfile, template, jobid)
jobid = DaPlatForm.submit_job(jobfile, joblog=logfile)
else:
logging.info('Final date reached, no new cycle started')
......@@ -665,10 +663,10 @@ class CycleControl(dict):
def SubmitSubStep(self, stepname):
"""
Submit the next substep of a DA cycle, this consists of
* getting a job template as returned by :meth:`~da.tools.baseclasses.platform.GetJobTemplate`
* getting a job template as returned by :meth:`~da.tools.baseclasses.platform.get_job_template`
* adding the lines needed to start a next run with a newly created rc-file
* Writing the jobfile as done by :meth:`~da.tools.baseclasses.platform.WriteJob`
* Submitting the jobfile as done by :meth:`~da.tools.baseclasses.platform.WriteJob`
* Writing the jobfile as done by :meth:`~da.tools.baseclasses.platform.write_job`
* Submitting the jobfile as done by :meth:`~da.tools.baseclasses.platform.write_job`
"""
from string import join
......@@ -676,16 +674,13 @@ class CycleControl(dict):
DaPlatForm = self.DaPlatForm
jobparams = {'jobname':'das.%s' % stepname}
template = DaPlatForm.GetJobTemplate(jobparams)
template = DaPlatForm.get_job_template(jobparams)
template += 'cd %s\n' % os.getcwd()
template += '%s rc=%s process=%s %s' % (sys.argv[0], self['jobrcfilename'], stepname, join(self.opts, ''),)
jobfile = DaPlatForm.WriteJob(self, template, stepname)
DaPlatForm.SubmitJob(jobfile)
jobfile = DaPlatForm.write_job(self, template, stepname)
DaPlatForm.submit_job(jobfile)
def CleanUpCycle(self):
"""
Nothing to do for now anymore
"""
......
......@@ -158,7 +158,7 @@ def SampleOneCycle(DaCycle, Samples, StateVector, ObservationOperator, lag):
import copy
# First set up the information for time start and time end of this sample
DaCycle.SetSampleTimes(lag)
DaCycle.set_sample_times(lag)
startdate = DaCycle['time.sample.start']
enddate = DaCycle['time.sample.end']
......@@ -298,8 +298,8 @@ def RunForecastModel(DaCycle, ObsOperator):
"""
ObsOperator.PrepareRun()
ObsOperator.ValidateInput()
ObsOperator.validate_input()
ObsOperator.Run()
ObsOperator.SaveData()
ObsOperator.save_data()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment