Skip to content
Snippets Groups Projects
Commit 7c216236 authored by Arne Babenhauserheide's avatar Arne Babenhauserheide
Browse files

platform: added hc3

parent 20d22bd7
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
# maunaloa.py
"""
Author : peters and babenhauserheide
Revision History:
File created on 06 Sep 2010.
Adapted for HC3 in 2012
"""
import sys
import os
import logging
import subprocess
import time
from da.baseclasses.platform import PlatForm, std_joboptions
class HC3PlatForm(PlatForm):
def __init__(self):
self.Identifier = 'KIT HC3' # the identifier gives the platform name
self.Version = '0.1' # the platform version used
def GetJobTemplate(self,joboptions={},block=True):
"""
Returns the job template for a given computing system, and fill it with options from the dictionary provided as argument.
The job template should return the preamble of a job that can be submitted to a queue on your platform,
examples of popular queuing systems are:
- SGE
- MOAB
- XGrid
-
A list of job options can be passed through a dictionary, which are then filled in on the proper line,
an example is for instance passing the dictionary {'account':'co2'} which will be placed
after the ``-A`` flag in a ``qsub`` environment.
An extra option ``block`` has been added that allows the job template to be configured to block the current
job until the submitted job in this template has been completed fully.
"""
template = """## \n"""+ \
"""## This is a set of dummy names, to be replaced by values from the dictionary \n"""+ \
"""## Please make your own platform specific template with your own keys and place it in a subfolder of the da package.\n """+ \
"""## \n"""+ \
""" \n"""+ \
"""#$ jobname \n"""+ \
"""#$ jobaccount \n"""+ \
"""#$ jobnodes \n"""+ \
"""#$ jobtime \n"""+ \
"""#$ jobshell \n"""+ \
"""\n"""+ \
"""source /home/ws/babenhau/.bashrc\n"""+ \
"""# make sure we have all meteo dirs\n"""+ \
"""mkdir /work/ws/babenhau/run-TM5/tm5-ctdas/output/ /work/ws/babenhau/run-TM5/tm5-ctdas/restart/\n"""+ \
"""# rsync -ruv ~/meteo-era-interim/hdf/ /scratch/babenhau/tmm-buf/ei\n"""+ \
"""\n"""
if 'depends' in joboptions:
template += """#$ -hold_jid depends \n"""
# First replace from passed dictionary
for k,v in joboptions.iteritems():
while k in template:
template = template.replace(k,v)
# Fill remaining values with std_options
for k,v in std_joboptions.iteritems():
while k in template:
template = template.replace(k,v)
return template
msg1 = 'Platform initialized: %s'%self.Identifier ; logging.info(msg1)
#msg2 = '%s version: %s'%(self.Identifier,self.Version) ; logging.info(msg2)
def WatchQueuedJob(self, jobid, joblog):
"""Watch a job in the hc3 queue till it is finished. Blocks from waiting over running till finished."""
from time import sleep
queue = subprocess.check_output(["job_queue"])
logidx = 0
sleep(1) # job_queue sometimes needs a second to show the job.
while str(jobid) in queue:
sleep(3)
# show output from the log
if joblog:
logfile = joblog
else:
logfile = "Job_hc3_" + str(jobid) + ".out"
try:
with open(logfile) as f:
f.seek(logidx)
log = f.read()
print log
logidx += len(log)
except: pass
queue = subprocess.check_output(["job_queue"])
def _jobInfotext(self, jobfile, joblog, jobid, killcmd):
"""
This method returns an infotext about the job.
"""
infotext = []
infotext.append( '\n' )
infotext.append( 'Summary:\n' )
infotext.append( '\n' )
infotext.append( 'job script : %s\n' % jobfile )
infotext.append( 'job log : %s\n' % joblog )
infotext.append( '\n')
infotext.append( 'To manage this process:\n' )
infotext.append( '\n' )
infotext.append( ' # kill process:\n' )
# infotext.append( ' kill %i\n' % jobid )
infotext.append( ' %s %i\n' % (killcmd, jobid) )
infotext.append( ' \n' )
infotext.append( '\n' )
return infotext
def _submitToShell(self, jobfile, joblog, block):
"""
:param jobfile: a string with the filename of a jobfile to run
:param joblog: a string with the filename of a logfile to write run output to
:param block: Boolean specifying whether to submit and continue (F), or submit and wait (T)
:rtype: integer
This method submits a jobfile to the queue, and returns the job ID # TODO: check if we really need the job id (it actually gives 0)
"""
from string import join
from time import sleep
cmd = ["sh", jobfile]
msg = "A new task will be started (%s)"%cmd ; logging.info(msg)
if block:
jobid = subprocess.call(cmd)
else:
jobid = subprocess.Popen(cmd).pid
# write to log:
infotext = self._jobInfotext(jobfile, joblog, jobid, killcmd="kill")
for line in infotext : logging.info( line.strip() )
return 0
def QueuePrefix(cputime, walltime, email=None, memorymb=2800, processnumber=1, threadnumber=8, partition=None):
"""
:param email: mailaddress to receive notifications
:param cputime: the cputime for the task in hours
:param walltime: the actual wall time: cputime + io
:param memorymb: the number of megabytes per thread
:param processcount: the number of processes to use
:param threadcount: the number of threads per process
:param partition: the partition to use as string: d (develop), f (fat: 12 nodes), m (medium: 32 nodes), t (tiny: 312 nodes).
Create a command list to submit a job.
"""
cmd = ["job_submit"]
a = cmd.append
def paradd(option, value):
a(option)
a(value)
if email: a("-NsbC:" + email)
paradd("-t",str(cputime))
paradd("-T",str(walltime))
paradd("-m", str(memorymb))
if partition == "d": paradd("-c", "d")
else: paradd("-c", "p")
if (partition is not None and
partition != "d"): paradd("-d", partition)
paradd("p", str(processnumber)+"/"+str(threadnumber))
return cmd
def _submitToQueue(self,jobfile,joblog,block):
"""
:param jobfile: a string with the filename of a jobfile to run
:param joblog: a string with the filename of a logfile to write run output to
:param block: Boolean specifying whether to submit and continue (F), or submit and wait (T)
:rtype: integer
TODO: Make the email address customizeable.
This method submits a jobfile to the queue, and returns the job ID # TODO: check if we really need the job id (it actually gives 0)
"""
from string import join
from time import sleep
if not block: # nonblocking needs longer time, because it is a controller which spawns blocking jobs!
submit_prefix = self.QueuePrefix(cputime=4320, walltime=4536,
email="arne.babenhauserheide@kit.edu",
memorymb=2800,
processnumber=3, threadnumber=8,
partition="t")
else:
submit_prefix = self.QueuePrefix(cputime=90, walltime=95,
email="arne.babenhauserheide@kit.edu",
memorymb=2800,
processnumber=3, threadnumber=8,
partition="t")
cmd = submit_prefix + [jobfile]
msg = "A new task will be started (%s)"%cmd ; logging.info(msg)
if not block:
# this is a servant job.
# wait > 30, so the cluster does not think that we are a rabid job chain on a rampage.
time.sleep(32) # see what happens, if you watch people too closely? :)
out = subprocess.check_output(cmd)
jobidx = out.index("Job ")
jobid = int(out[jobidx+4:out.index(" ", jobidx+5)])
if block:
self.WatchQueuedJob(jobid, joblog)
sleep(1)
# write to log:
infotext = self._jobInfotext(jobfile, joblog, jobid, killcmd="job_cancel")
for line in infotext : logging.info( line.strip() )
return 0
def SubmitJob(self,jobfile,joblog=None,block=False,shell=True):
"""
:param jobfile: a string with the filename of a jobfile to run
:param joblog: a string with the filename of a logfile to write run output to
:param block: Boolean specifying whether to submit and continue (F), or submit and wait (T)
:rtype: integer
TODO: Make the email address customizeable.
This method submits a jobfile to the queue, and returns the job ID # TODO: check if we really need the job id (it actually gives 0)
"""
if shell:
return self._submitToShell(jobfile, joblog, block)
else:
return self._submitToQueue(jobfile, joblog, block)
if __name__ == "__main__":
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment