220 lines
8.6 KiB
Python
220 lines
8.6 KiB
Python
import fcntl, os, select, time
|
|
from subprocess import Popen, PIPE
|
|
|
|
# Run a series of subprocesses. Try to keep up to a certain number going in
|
|
# parallel at any given time. Enforce time limits.
|
|
#
|
|
# This is implemented using non-blocking I/O, and so is Unix-specific.
|
|
#
|
|
# We assume that, if a task closes its standard error, then it's safe to
|
|
# wait for it to terminate. So an ill-behaved task that closes its standard
|
|
# output and then hangs will hang us, as well. However, as it takes special
|
|
# effort to close one's standard output, this seems unlikely to be a
|
|
# problem in practice.
|
|
class TaskPool(object):
|
|
|
|
# A task we should run in a subprocess. Users should subclass this and
|
|
# fill in the methods as given.
|
|
class Task(object):
|
|
def __init__(self):
|
|
self.pipe = None
|
|
self.start_time = None
|
|
|
|
# Record that this task is running, with |pipe| as its Popen object,
|
|
# and should time out at |deadline|.
|
|
def start(self, pipe, deadline):
|
|
self.pipe = pipe
|
|
self.deadline = deadline
|
|
|
|
# Return a shell command (a string or sequence of arguments) to be
|
|
# passed to Popen to run the task. The command will be given
|
|
# /dev/null as its standard input, and pipes as its standard output
|
|
# and error.
|
|
def cmd(self):
|
|
raise NotImplementedError
|
|
|
|
# TaskPool calls this method to report that the process wrote
|
|
# |string| to its standard output.
|
|
def onStdout(self, string):
|
|
raise NotImplementedError
|
|
|
|
# TaskPool calls this method to report that the process wrote
|
|
# |string| to its standard error.
|
|
def onStderr(self, string):
|
|
raise NotImplementedError
|
|
|
|
# TaskPool calls this method to report that the process terminated,
|
|
# yielding |returncode|.
|
|
def onFinished(self, returncode):
|
|
raise NotImplementedError
|
|
|
|
# TaskPool calls this method to report that the process timed out and
|
|
# was killed.
|
|
def onTimeout(self):
|
|
raise NotImplementedError
|
|
|
|
# If a task output handler (onStdout, onStderr) throws this, we terminate
|
|
# the task.
|
|
class TerminateTask(Exception):
|
|
pass
|
|
|
|
def __init__(self, tasks, cwd='.', job_limit=4, timeout=150):
|
|
self.pending = iter(tasks)
|
|
self.cwd = cwd
|
|
self.job_limit = job_limit
|
|
self.timeout = timeout
|
|
self.next_pending = self.get_next_pending()
|
|
|
|
# Set self.next_pending to the next task that has not yet been executed.
|
|
def get_next_pending(self):
|
|
try:
|
|
return self.pending.next()
|
|
except StopIteration:
|
|
return None
|
|
|
|
def run_all(self):
|
|
# The currently running tasks: a set of Task instances.
|
|
running = set()
|
|
with open(os.devnull, 'r') as devnull:
|
|
while True:
|
|
while len(running) < self.job_limit and self.next_pending:
|
|
t = self.next_pending
|
|
p = Popen(t.cmd(), bufsize=16384,
|
|
stdin=devnull, stdout=PIPE, stderr=PIPE,
|
|
cwd=self.cwd)
|
|
|
|
# Put the stdout and stderr pipes in non-blocking mode. See
|
|
# the post-'select' code below for details.
|
|
flags = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
|
|
fcntl.fcntl(p.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
|
|
flags = fcntl.fcntl(p.stderr, fcntl.F_GETFL)
|
|
fcntl.fcntl(p.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK)
|
|
|
|
t.start(p, time.time() + self.timeout)
|
|
running.add(t)
|
|
self.next_pending = self.get_next_pending()
|
|
|
|
# If we have no tasks running, and the above wasn't able to
|
|
# start any new ones, then we must be done!
|
|
if not running:
|
|
break
|
|
|
|
# How many seconds do we have until the earliest deadline?
|
|
now = time.time()
|
|
secs_to_next_deadline = max(min([t.deadline for t in running]) - now, 0)
|
|
|
|
# Wait for output or a timeout.
|
|
stdouts_and_stderrs = ([t.pipe.stdout for t in running]
|
|
+ [t.pipe.stderr for t in running])
|
|
(readable,w,x) = select.select(stdouts_and_stderrs, [], [], secs_to_next_deadline)
|
|
finished = set()
|
|
terminate = set()
|
|
for t in running:
|
|
# Since we've placed the pipes in non-blocking mode, these
|
|
# 'read's will simply return as many bytes as are available,
|
|
# rather than blocking until they have accumulated the full
|
|
# amount requested (or reached EOF). The 'read's should
|
|
# never throw, since 'select' has told us there was
|
|
# something available.
|
|
if t.pipe.stdout in readable:
|
|
output = t.pipe.stdout.read(16384)
|
|
if output != "":
|
|
try:
|
|
t.onStdout(output)
|
|
except TerminateTask:
|
|
terminate.add(t)
|
|
if t.pipe.stderr in readable:
|
|
output = t.pipe.stderr.read(16384)
|
|
if output != "":
|
|
try:
|
|
t.onStderr(output)
|
|
except TerminateTask:
|
|
terminate.add(t)
|
|
else:
|
|
# We assume that, once a task has closed its stderr,
|
|
# it will soon terminate. If a task closes its
|
|
# stderr and then hangs, we'll hang too, here.
|
|
t.pipe.wait()
|
|
t.onFinished(t.pipe.returncode)
|
|
finished.add(t)
|
|
# Remove the finished tasks from the running set. (Do this here
|
|
# to avoid mutating the set while iterating over it.)
|
|
running -= finished
|
|
|
|
# Terminate any tasks whose handlers have asked us to do so.
|
|
for t in terminate:
|
|
t.pipe.terminate()
|
|
t.pipe.wait()
|
|
running.remove(t)
|
|
|
|
# Terminate any tasks which have missed their deadline.
|
|
finished = set()
|
|
for t in running:
|
|
if now >= t.deadline:
|
|
t.pipe.terminate()
|
|
t.pipe.wait()
|
|
t.onTimeout()
|
|
finished.add(t)
|
|
# Remove the finished tasks from the running set. (Do this here
|
|
# to avoid mutating the set while iterating over it.)
|
|
running -= finished
|
|
return None
|
|
|
|
def get_cpu_count():
|
|
"""
|
|
Guess at a reasonable parallelism count to set as the default for the
|
|
current machine and run.
|
|
"""
|
|
# Python 2.6+
|
|
try:
|
|
import multiprocessing
|
|
return multiprocessing.cpu_count()
|
|
except (ImportError,NotImplementedError):
|
|
pass
|
|
|
|
# POSIX
|
|
try:
|
|
res = int(os.sysconf('SC_NPROCESSORS_ONLN'))
|
|
if res > 0:
|
|
return res
|
|
except (AttributeError,ValueError):
|
|
pass
|
|
|
|
# Windows
|
|
try:
|
|
res = int(os.environ['NUMBER_OF_PROCESSORS'])
|
|
if res > 0:
|
|
return res
|
|
except (KeyError, ValueError):
|
|
pass
|
|
|
|
return 1
|
|
|
|
if __name__ == '__main__':
|
|
# Test TaskPool by using it to implement the unique 'sleep sort' algorithm.
|
|
def sleep_sort(ns, timeout):
|
|
sorted=[]
|
|
class SortableTask(TaskPool.Task):
|
|
def __init__(self, n):
|
|
super(SortableTask, self).__init__()
|
|
self.n = n
|
|
def start(self, pipe, deadline):
|
|
super(SortableTask, self).start(pipe, deadline)
|
|
def cmd(self):
|
|
return ['sh', '-c', 'echo out; sleep %d; echo err>&2' % (self.n,)]
|
|
def onStdout(self, text):
|
|
print '%d stdout: %r' % (self.n, text)
|
|
def onStderr(self, text):
|
|
print '%d stderr: %r' % (self.n, text)
|
|
def onFinished(self, returncode):
|
|
print '%d (rc=%d)' % (self.n, returncode)
|
|
sorted.append(self.n)
|
|
def onTimeout(self):
|
|
print '%d timed out' % (self.n,)
|
|
|
|
p = TaskPool([SortableTask(_) for _ in ns], job_limit=len(ns), timeout=timeout)
|
|
p.run_all()
|
|
return sorted
|
|
|
|
print repr(sleep_sort([1,1,2,3,5,8,13,21,34], 15))
|