From ab4590d0fddc7dd840cecaa746554efa9ed62b39 Mon Sep 17 00:00:00 2001 From: karolina <amvdw95@gmail.com> Date: Tue, 30 Oct 2012 15:32:11 +0000 Subject: [PATCH] function names changes, some minor changes --- da/baseclasses/observationoperator.py | 4 +- da/baseclasses/optimizer.py | 2 - da/baseclasses/platform.py | 12 ++-- da/ct/obs.py | 39 +++++++---- da/ct/obspack.py | 4 ++ da/ct/obspack_geocarbon.py | 4 ++ da/ctgridded/dasystem.py | 1 - da/ctgridded/statevector.py | 8 +-- da/platform/huygens.py | 6 +- da/platform/jet.py | 10 +-- da/platform/maunaloa.py | 2 +- da/tm5/observationoperator.py | 12 ++-- da/tools/initexit.py | 95 +++++++++++++-------------- da/tools/pipeline.py | 6 +- 14 files changed, 109 insertions(+), 96 deletions(-) diff --git a/da/baseclasses/observationoperator.py b/da/baseclasses/observationoperator.py index 2c3e6e17..80cc2c4f 100755 --- a/da/baseclasses/observationoperator.py +++ b/da/baseclasses/observationoperator.py @@ -58,11 +58,11 @@ class ObservationOperator(object): def Initialize(self): """ Perform all steps necessary to start the observation operator through a simple Run() call """ - def ValidateInput(self): + def validate_input(self): """ Make sure that data needed for the ObservationOperator (such as observation input lists, or parameter files) are present. """ - def SaveData(self): + def save_data(self): """ Write the data that is needed for a restart or recovery of the Observation Operator to the save directory """ diff --git a/da/baseclasses/optimizer.py b/da/baseclasses/optimizer.py index 71831e5b..e38cbcff 100755 --- a/da/baseclasses/optimizer.py +++ b/da/baseclasses/optimizer.py @@ -42,8 +42,6 @@ class Optimizer(object): self.nobs = dims[3] self.create_matrices() - return None - def create_matrices(self): """ Create Matrix space needed in optimization routine """ import numpy as np diff --git a/da/baseclasses/platform.py b/da/baseclasses/platform.py index ea75ec5d..c504d7e3 100755 --- a/da/baseclasses/platform.py +++ b/da/baseclasses/platform.py @@ -54,7 +54,7 @@ class PlatForm(object): def ReturnQueueType(self): return "foreground" - def GetJobTemplate(self, joboptions={}, block=False): + def get_job_template(self, joboptions={}, block=False): """ Returns the job template for a given computing system, and fill it with options from the dictionary provided as argument. The job template should return the preamble of a job that can be submitted to a queue on your platform, @@ -96,11 +96,11 @@ class PlatForm(object): template = template.replace(k, v) return template - def GetMyID(self): + def get_my_id(self): """ Return the process ID, or job ID of the current process or job""" return os.getpid() - def WriteJob(self, jobfile, template, jobid): + def write_job(self, jobfile, template, jobid): """ This method writes a jobfile to the exec dir and makes it executable (mod 477) """ @@ -113,7 +113,7 @@ class PlatForm(object): os.chmod(jobfile, 477) logging.debug("A job file was created (%s)" % jobfile) - def SubmitJob(self, jobfile, joblog=None, block=False): + def submit_job(self, jobfile, joblog=None, block=False): """ :param jobfile: a string with the filename of a jobfile to run :param joblog: a string with the filename of a logfile to write run output to @@ -137,10 +137,10 @@ class PlatForm(object): logging.info(' kill %i\n' % jobid) - def KillJob(self, jobid): + def kill_job(self, jobid): """ This method kills a running job """ - def StatJob(self, jobid): + def job_stat(self, jobid): """ This method gets the status of a running job """ output = subprocess.Popen(['qstat', jobid], stdout=subprocess.PIPE).communicate()[0] ; logging.info(output) return output diff --git a/da/ct/obs.py b/da/ct/obs.py index 2390a85e..2dd01bf7 100755 --- a/da/ct/obs.py +++ b/da/ct/obs.py @@ -70,7 +70,7 @@ class CtObservations(Observation): idates = ncf.GetVariable('date_components') dates = array([dtm.datetime(*d) for d in idates]) - subselect = logical_and(dates >= self.startdate , dates <= self.enddate).nonzero()[0] + subselect = logical_and(dates >= self.startdate, dates <= self.enddate).nonzero()[0] dates = dates.take(subselect, axis=0) @@ -484,13 +484,19 @@ class MixingRatioList(list): return result.squeeze() else: return result + def unflagged(self): - return MixingRatioSample([o for o in self if o.flag == 0]) + l = [o for o in self if o.flag == 0] + print l + return MixingRatioSample(l) + def flagged(self): return MixingRatioSample([o for o in self if o.flag != 0]) + def selectsite(self, site='mlo'): l = [o for o in self if o.code == site] return MixingRatioSample(l) + def selecthours(self, hours=range(1, 24)): l = [o for o in self if o.xdate.hour in hours] return MixingRatioSample(l) @@ -510,12 +516,23 @@ if __name__ == "__main__": obs = CtObservations() - DaCycle = JobStart(['-v'], {'rc':'da.rc'}) - DaCycle['time.sample.start'] = datetime(2000, 1, 1) - DaCycle['time.sample.end'] = datetime(2000, 1, 2) - - obs.Initialize() - obs.Validate() - obs.add_observations() - print(obs.Data.getvalues('obs')) - + #DaCycle = JobStart(['-v'], {'rc':'da.rc'}) + #DaCycle['time.sample.start'] = datetime(2000, 1, 1) + #DaCycle['time.sample.end'] = datetime(2000, 1, 2) + + + #obs.Initialize() + #obs.Validate() + #obs.add_observations() + #print(obs.Data.getvalues('obs')) + obs.Data = MixingRatioList([]) + obs.Data.append(MixingRatioSample("10",datetime(2000,1,1))) + obs.Data.append(MixingRatioSample("20",datetime(2000,1,2))) + obs.Data.append(MixingRatioSample("30",datetime(2000,1,3))) + obs.Data.append(MixingRatioSample("40",datetime(2000,1,4))) + for d in obs.Data: + print d + new = obs.Data.unflagged() + print new + #new = obs.Data.selectsite("10") + #print new \ No newline at end of file diff --git a/da/ct/obspack.py b/da/ct/obspack.py index 3af01656..1c0d535f 100755 --- a/da/ct/obspack.py +++ b/da/ct/obspack.py @@ -501,13 +501,17 @@ class MixingRatioList(list): return result.squeeze() else: return result + def unflagged(self): return MixingRatioSample([o for o in self if o.flag == 0]) + def flagged(self): return MixingRatioSample([o for o in self if o.flag != 0]) + def selectsite(self, site='mlo'): l = [o for o in self if o.code == site] return MixingRatioSample(l) + def selecthours(self, hours=range(1, 24)): l = [o for o in self if o.xdate.hour in hours] return MixingRatioSample(l) diff --git a/da/ct/obspack_geocarbon.py b/da/ct/obspack_geocarbon.py index e7e5d42d..35543bda 100755 --- a/da/ct/obspack_geocarbon.py +++ b/da/ct/obspack_geocarbon.py @@ -514,13 +514,17 @@ class MixingRatioList(list): return result.squeeze() else: return result + def unflagged(self): return MixingRatioSample([o for o in self if o.flag == 0]) + def flagged(self): return MixingRatioSample([o for o in self if o.flag != 0]) + def selectsite(self, site='mlo'): l = [o for o in self if o.code == site] return MixingRatioSample(l) + def selecthours(self, hours=range(1, 24)): l = [o for o in self if o.xdate.hour in hours] return MixingRatioSample(l) diff --git a/da/ctgridded/dasystem.py b/da/ctgridded/dasystem.py index ad75f4f4..e686fc27 100755 --- a/da/ctgridded/dasystem.py +++ b/da/ctgridded/dasystem.py @@ -29,7 +29,6 @@ class CtGriddedDaSystem(DaSystem): """ self.Identifier = 'CarbonTracker Gridded CO2' # the identifier gives the platform name - self.LoadRc(rcfilename) msg = 'Data Assimilation System initialized: %s'%self.Identifier ; logging.debug(msg) diff --git a/da/ctgridded/statevector.py b/da/ctgridded/statevector.py index a12ebed7..d9daaafa 100755 --- a/da/ctgridded/statevector.py +++ b/da/ctgridded/statevector.py @@ -16,8 +16,7 @@ sys.path.append('../../') import logging -import datetime -from da.baseclasses.statevector import EnsembleMember, StateVector +from da.baseclasses.statevector import StateVector import numpy as np identifier = 'CarbonTracker Gridded Statevector ' @@ -202,13 +201,10 @@ class CtGriddedStateVector(StateVector): if __name__ == "__main__": - import os - import sys + from da.tools.initexit import StartLogger , ParseOptions - from da.tools.pipeline import JobStart from da.tools.initexit import CycleControl from da.ctgridded.dasystem import CtGriddedDaSystem - import numpy as np sys.path.append(os.getcwd()) sys.path.append('../../') diff --git a/da/platform/huygens.py b/da/platform/huygens.py index 6a89f52e..72bbdc4a 100755 --- a/da/platform/huygens.py +++ b/da/platform/huygens.py @@ -52,7 +52,7 @@ class HuygensPlatForm(PlatForm): - def GetJobTemplate(self,joboptions={},block=False): + def get_job_template(self,joboptions={},block=False): """ Returns the job template for a given computing system, and fill it with options from the dictionary provided as argument. The job template should return the preamble of a job that can be submitted to a queue on your platform, @@ -136,7 +136,7 @@ class HuygensPlatForm(PlatForm): # return os.getpid() # -# def SubmitJob(self,jobfile,joblog=None,block=False): +# def submit_job(self,jobfile,joblog=None,block=False): # """ This method submits a jobfile to the queue, and returns the queue ID """ # # @@ -154,7 +154,7 @@ class HuygensPlatForm(PlatForm): - def SubmitJob(self,jobfile,joblog=None,block=False): + def submit_job(self,jobfile,joblog=None,block=False): """ This method submits a jobfile to the queue, and returns the queue ID """ diff --git a/da/platform/jet.py b/da/platform/jet.py index 53ae881e..2214e3a4 100755 --- a/da/platform/jet.py +++ b/da/platform/jet.py @@ -27,7 +27,7 @@ class JetPlatForm(PlatForm): msg2 = '%s version: %s'%(self.Identifier,self.Version) ; logging.debug(msg2) - def GetJobTemplate(self,joboptions={},block=False): + def get_job_template(self,joboptions={},block=False): """ Return the job template for a given computing system, and fill it with options from the dictionary provided as argument""" template = """#$ -N jobname \n"""+ \ @@ -59,13 +59,13 @@ class JetPlatForm(PlatForm): return template - def GetMyID(self): + def get_my_id(self): try: return os.environ['JOB_ID'] except: return os.getpid() - def SubmitJob(self,jobfile,joblog=None,block=False): + def submit_job(self,jobfile,joblog=None,block=False): """ This method submits a jobfile to the queue, and returns the queue ID """ @@ -77,14 +77,14 @@ class JetPlatForm(PlatForm): return retcode - def KillJob(self,jobid): + def kill_job(self,jobid): """ This method kills a running job """ output = subprocess.Popen(['qdel',jobid], stdout=subprocess.PIPE).communicate()[0] ; logging.info(output) return output - def StatJob(self,jobid): + def job_stat(self,jobid): """ This method gets the status of a running job """ import subprocess diff --git a/da/platform/maunaloa.py b/da/platform/maunaloa.py index 6f0ccaee..0973cf1c 100755 --- a/da/platform/maunaloa.py +++ b/da/platform/maunaloa.py @@ -30,7 +30,7 @@ class MaunaloaPlatForm(PlatForm): return "foreground" - def GetJobTemplate(self,joboptions={},block=False): + def get_job_template(self,joboptions={},block=False): """ Returns the job template for a given computing system, and fill it with options from the dictionary provided as argument. The job template should return the preamble of a job that can be submitted to a queue on your platform, diff --git a/da/tm5/observationoperator.py b/da/tm5/observationoperator.py index 88e424b6..ba69688b 100755 --- a/da/tm5/observationoperator.py +++ b/da/tm5/observationoperator.py @@ -357,7 +357,7 @@ class TM5ObservationOperator(ObservationOperator): logging.debug("Modified tm5_runtime.rc written (%s)" % tm5rcfilename) - def ValidateInput(self): + def validate_input(self): """ Make sure that parameter files are written to the TM5 inputdir, and that observation lists are present """ @@ -518,14 +518,14 @@ class TM5ObservationOperator(ObservationOperator): jobfile = os.path.join(targetdir, 'jb.%s.jb' % jobid) logfile = jobfile.replace('.jb', '.log') - template = DaPlatForm.GetJobTemplate(jobparams, block=True) + template = DaPlatForm.get_job_template(jobparams, block=True) template += 'cd %s\n' % targetdir template += 'mpirun -np %d %s ./tm5.x\n' % (int(nthreads), mpi_shell_filename,) - DaPlatForm.WriteJob(jobfile, template, jobid) + DaPlatForm.write_job(jobfile, template, jobid) logging.info('Submitting job at %s' % datetime.datetime.now()) - code = DaPlatForm.SubmitJob(jobfile, joblog=jobfile) + code = DaPlatForm.submit_job(jobfile, joblog=jobfile) logging.info('Resuming job at %s' % datetime.datetime.now()) if not os.path.exists(okfile): @@ -577,7 +577,7 @@ class TM5ObservationOperator(ObservationOperator): return code - def SaveData(self): + def save_data(self): """ Copy the TM5 recovery data from the outputdir to the TM5 savedir, also add the restart files to a list of names that is used by the DaCycle object to collect restart data for the filter. @@ -680,7 +680,7 @@ if __name__ == "__main__": tm.Setup() tm.Initialize() tm.Run() - tm.SaveData() + tm.save_data() diff --git a/da/tools/initexit.py b/da/tools/initexit.py index ff2b68f1..602d3560 100755 --- a/da/tools/initexit.py +++ b/da/tools/initexit.py @@ -40,8 +40,8 @@ like this::: The most important method of the CycleControl object are listed below: .. autoclass:: da.tools.initexit.CycleControl - :members: Initialize, Finalize, CollectRestartData, MoveRestartData, - SubmitNextCycle, CleanUpCycle, SetupFileStructure, RecoverRun, RandomSeed + :members: Initialize, Finalize, collect_restart_data, move_restart_data, + submit_next_cycle, setup_file_structure, recover_run, random_seed Two important attributes of the CycleControl object are: (1) DaSystem, an instance of a :ref:`dasystem` @@ -130,8 +130,6 @@ class CycleControl(dict): logging.info('DA Cycle rc-file (%s) loaded successfully' % self.RcFileName) - return True - def ValidateRC(self): """ @@ -203,7 +201,7 @@ class CycleControl(dict): logging.info("===============================================================") - def SetSampleTimes(self, lag): + def set_sample_times(self, lag): """ Set the times over which a sampling interval will loop, depending on the lag. Note that lag falls in the interval [0,nlag-1] @@ -217,10 +215,10 @@ class CycleControl(dict): # Now advance depending on lag for l in range(lag): - self.AdvanceSampleTimes() + self.advance_sample_times() - def AdvanceSampleTimes(self): + def advance_sample_times(self): """ Advance sampling start and end time by one cycle interval """ @@ -257,7 +255,7 @@ class CycleControl(dict): self['time.end'] = enddate - def RandomSeed(self, action='read'): + def random_seed(self, action='read'): """ Get the randomseed and save it, or read the random seed and set it. The seed is currently stored in a python :mod:`pickle` file, residing in the ``exec`` directory @@ -305,19 +303,19 @@ class CycleControl(dict): # the ``time.restart : True`` option in the da.rc file The latter is automatically set if the filter submits the next cycle at the end of the current one, - through method :meth:`~da.tools.initexit.CycleControl.SubmitNextCycle`. + through method :meth:`~da.tools.initexit.CycleControl.submit_next_cycle`. The specific call tree under each scenario is: 1. *Fresh Start* - * dummy = :meth:`~da.tools.initexit.CycleControl.SetupFileStructure()` <- Create directory tree + * :meth:`~da.tools.initexit.CycleControl.setup_file_structure()` <- Create directory tree 2. *Restart* - * dummy = :meth:`~da.tools.initexit.CycleControl.SetupFileStructure()` - * dummy = :meth:`~da.tools.initexit.CycleControl.RandomSeed` <- Read the random seed from file + * :meth:`~da.tools.initexit.CycleControl.setup_file_structure()` + * :meth:`~da.tools.initexit.CycleControl.random_seed` <- Read the random seed from file 3. *Recover* - * dummy = :meth:`~da.tools.initexit.CycleControl.SetupFileStructure()` - * dummy = :meth:`~da.tools.initexit.CycleControl.RecoverRun()` <- Recover files from restart/one-ago dir, reset ``time.start`` - * dummy = :meth:`~da.tools.initexit.CycleControl.RandomSeed` + * :meth:`~da.tools.initexit.CycleControl.setup_file_structure()` + * :meth:`~da.tools.initexit.CycleControl.recover_run()` <- Recover files from restart/one-ago dir, reset ``time.start`` + * :meth:`~da.tools.initexit.CycleControl.random_seed` And is always followed by a call to @@ -329,24 +327,24 @@ class CycleControl(dict): # if self['da.crash.recover']: logging.info("Recovering simulation from data in: %s" % self['dir.da_run']) - self.SetupFileStructure() - self.RecoverRun() - self.RandomSeed('read') + self.setup_file_structure() + self.recover_run() + self.random_seed('read') # # case 2: A continuation, this is signaled by rc-item time.restart = True # elif self['time.restart']: logging.info("Restarting filter from previous step") - self.SetupFileStructure() + self.setup_file_structure() strippedname = os.path.split(self['jobrcfilename'])[-1] self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname) - self.RandomSeed('read') + self.random_seed('read') # # case 3: A fresh start, this is signaled by rc-item time.restart = False # elif not self['time.restart']: logging.info("First time step in filter sequence") - self.SetupFileStructure() + self.setup_file_structure() # expand jobrcfilename to include exec dir from now on. # First strip current leading path from filename @@ -357,7 +355,7 @@ class CycleControl(dict): self.ParseTimes() self.WriteRC(self['jobrcfilename']) - def SetupFileStructure(self): + def setup_file_structure(self): """ Create file structure needed for data assimilation system. In principle this looks like: @@ -409,13 +407,13 @@ class CycleControl(dict): logging.info('Succesfully created the file structure for the assimilation job') - def RecoverRun(self): + def recover_run(self): """ Prepare a recovery from a crashed run. This consists of: - - copying all data from the restart/one-ago folder (:meth:`~da.tools.initexit.CycleControl.MoveRestartData`), + - copying all data from the restart/one-ago folder (:meth:`~da.tools.initexit.CycleControl.move_restart_data`), - replacing all ``rc-file`` items with those from the ``da_runtime.rc`` in the restart/current dir - - resetting the seed of the random number generator to the value it had before the crash (:meth:`~da.tools.initexit.CycleControl.RandomSeed`) + - resetting the seed of the random number generator to the value it had before the crash (:meth:`~da.tools.initexit.CycleControl.random_seed`) - replacing the output dir name, since it has the sample time in it... """ @@ -459,14 +457,14 @@ class CycleControl(dict): * Submit the next cycle """ - self.RandomSeed('write') - self.WriteNewRCfile() - self.MoveRestartData(io_option='store') # Move restart data from current to one-ago - self.CollectRestartData() # Collect restart data for next cycle into a clean restart/current folder - self.CollectOutput() # Collect restart data for next cycle into a clean restart/current folder - self.SubmitNextCycle() - - def CollectOutput(self): + self.random_seed('write') + self.write_new_rc_file() + self.move_restart_data(io_option='store') # Move restart data from current to one-ago + self.collect_restart_data() # Collect restart data for next cycle into a clean restart/current folder + self.collect_output() # Collect restart data for next cycle into a clean restart/current folder + self.submit_next_cycle() + + def collect_output(self): """ Collect files that are part of the requested output for this cycle. This function allows users to add files to a list, and then the system will copy these to the current cycle's output directory. The list of files included is read from the @@ -495,7 +493,7 @@ class CycleControl(dict): - def CollectRestartData(self): + def collect_restart_data(self): """ Collect files needed for the restart of this cycle in case of a crash, or for the continuation of the next cycle. All files needed are written to the restart/current directory. The list of files included is read from the attribute "RestartFileList" which is a simple list of files that can be appended by other objects/methods that @@ -539,7 +537,7 @@ class CycleControl(dict): shutil.copy(file, file.replace(os.path.split(file)[0], targetdir)) - def MoveRestartData(self, io_option='restore'): + def move_restart_data(self, io_option='restore'): """ Store or restore model state to/from a restart directory. @@ -587,7 +585,7 @@ class CycleControl(dict): shutil.copy(file, file.replace(sourcedir, targetdir)) # - def WriteNewRCfile(self): + def write_new_rc_file(self): """ Write the rc-file for the next DA cycle. .. note:: The start time for the next cycle is the end time of this one, while @@ -627,7 +625,7 @@ class CycleControl(dict): logging.debug('Wrote expanded rc-file (%s)' % (fname)) - def SubmitNextCycle(self): + def submit_next_cycle(self): """ Submit the next job of a DA cycle, this consists of * Changing to the working directory from which the job was started initially @@ -651,13 +649,13 @@ class CycleControl(dict): # Template and commands for job jobparams = {'jobname':"j.%s" % jobid, 'jobtime':'06:00:00', 'logfile':logfile, 'errfile':logfile} - template = DaPlatForm.GetJobTemplate(jobparams) + template = DaPlatForm.get_job_template(jobparams) execcommand = os.path.join(self['dir.da_submit'], sys.argv[0]) template += 'python %s rc=%s %s' % (execcommand, self['da.restart.fname'], join(self.opts, ''),) # write and submit - DaPlatForm.WriteJob(jobfile, template, jobid) - jobid = DaPlatForm.SubmitJob(jobfile, joblog=logfile) + DaPlatForm.write_job(jobfile, template, jobid) + jobid = DaPlatForm.submit_job(jobfile, joblog=logfile) else: logging.info('Final date reached, no new cycle started') @@ -665,10 +663,10 @@ class CycleControl(dict): def SubmitSubStep(self, stepname): """ Submit the next substep of a DA cycle, this consists of - * getting a job template as returned by :meth:`~da.tools.baseclasses.platform.GetJobTemplate` + * getting a job template as returned by :meth:`~da.tools.baseclasses.platform.get_job_template` * adding the lines needed to start a next run with a newly created rc-file - * Writing the jobfile as done by :meth:`~da.tools.baseclasses.platform.WriteJob` - * Submitting the jobfile as done by :meth:`~da.tools.baseclasses.platform.WriteJob` + * Writing the jobfile as done by :meth:`~da.tools.baseclasses.platform.write_job` + * Submitting the jobfile as done by :meth:`~da.tools.baseclasses.platform.write_job` """ from string import join @@ -676,16 +674,13 @@ class CycleControl(dict): DaPlatForm = self.DaPlatForm jobparams = {'jobname':'das.%s' % stepname} - template = DaPlatForm.GetJobTemplate(jobparams) + template = DaPlatForm.get_job_template(jobparams) template += 'cd %s\n' % os.getcwd() template += '%s rc=%s process=%s %s' % (sys.argv[0], self['jobrcfilename'], stepname, join(self.opts, ''),) - jobfile = DaPlatForm.WriteJob(self, template, stepname) - DaPlatForm.SubmitJob(jobfile) + jobfile = DaPlatForm.write_job(self, template, stepname) + DaPlatForm.submit_job(jobfile) + - def CleanUpCycle(self): - """ - Nothing to do for now anymore - """ diff --git a/da/tools/pipeline.py b/da/tools/pipeline.py index 97cf6d68..395acafd 100755 --- a/da/tools/pipeline.py +++ b/da/tools/pipeline.py @@ -158,7 +158,7 @@ def SampleOneCycle(DaCycle, Samples, StateVector, ObservationOperator, lag): import copy # First set up the information for time start and time end of this sample - DaCycle.SetSampleTimes(lag) + DaCycle.set_sample_times(lag) startdate = DaCycle['time.sample.start'] enddate = DaCycle['time.sample.end'] @@ -298,8 +298,8 @@ def RunForecastModel(DaCycle, ObsOperator): """ ObsOperator.PrepareRun() - ObsOperator.ValidateInput() + ObsOperator.validate_input() ObsOperator.Run() - ObsOperator.SaveData() + ObsOperator.save_data() -- GitLab