From 5e30bb31067baf950f60604cedec3c2deccab5f9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 17 Jun 2016 18:37:21 -0400 Subject: [PATCH] Use a plain old select loop Use `select.select` to track subproc completion since `select.epoll` isn't available on OSX. Resolves #16 --- pysipp/launch.py | 44 +++++++++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/pysipp/launch.py b/pysipp/launch.py index a004d87..f8f852d 100644 --- a/pysipp/launch.py +++ b/pysipp/launch.py @@ -32,12 +32,12 @@ def __init__( self, subprocmod=subprocess, osmod=os, - poller=select.epoll, + select=select.select, ): # these could optionally be rpyc proxy objs self.spm = subprocmod self.osm = osmod - self.poller = poller() + self.select = select # collector thread placeholder self._waiter = None # store proc results @@ -67,11 +67,9 @@ def __call__(self, cmds, block=True, rate=300, **kwargs): stderr=sp.PIPE ) fd = proc.stderr.fileno() - log.debug("registering fd '{}' for pid '{}'".format( - fd, proc.pid)) + log.debug("registering fd '{}' for pid '{}'".format(fd, proc.pid)) fds2procs[fd] = self._procs[cmd] = proc - # register for stderr hangup events - self.poller.register(proc.stderr.fileno(), select.EPOLLHUP) + # limit launch rate time.sleep(1. / rate) @@ -87,15 +85,39 @@ def _wait(self, fds2procs): signalled = None left = len(fds2procs) collected = 0 + p2fd = {p: p.stderr for p in fds2procs.values()} + stderrs = {p: [] for p in fds2procs.values()} + + # wait on stderr hangup events while collected < left: - pairs = self.poller.poll() # wait on hangup events - log.debug("received hangup for pairs '{}'".format(pairs)) - for fd, status in pairs: + try: + fds, _, _ = self.select(list(p2fd.values()), [], []) + except ValueError: + # all fds are now closed + hungup = p2fd.keys() + else: + hungup = [] + for proc in (fds2procs[fd.fileno()] for fd in fds): + data = proc.stderr.read() + if data != '': + stderrs[proc].append(data) # append stderr + continue + + p2fd.pop(proc) + hungup.append(proc) + + if not hungup: + continue + + for proc in hungup: + log.debug("received hangup for pid '{}'".format(proc.pid)) collected += 1 - proc = fds2procs[fd] # attach streams so they can be read more then once log.debug("collecting streams for {}".format(proc)) - proc.streams = Streams(*proc.communicate()) # timeout=2)) + proc.streams = Streams( + stdout=proc.communicate()[0], + stderr=''.join(stderrs[proc]), + ) # timeout=2)) if proc.returncode != 0 and not signalled: # stop all other agents if there is a failure signalled = self.stop()