diff --git a/examples/simple/cons.sh b/examples/simple/cons.sh index 15495f1d..e7af990a 100755 --- a/examples/simple/cons.sh +++ b/examples/simple/cons.sh @@ -7,4 +7,4 @@ PYTHONPATH=".:$PYTHONPATH" export PYTHONPATH WORKER_CLASS=${1:-thread} export WORKER_CLASS -python ../../huey/bin/huey_consumer.py main.huey --workers=4 -k $WORKER_CLASS -C -S +python ../../huey/bin/huey_consumer.py main.huey --workers=4 -k $WORKER_CLASS -S diff --git a/huey/consumer.py b/huey/consumer.py index 6f0e27c1..c731b3b8 100644 --- a/huey/consumer.py +++ b/huey/consumer.py @@ -25,6 +25,9 @@ from huey.utils import time_clock +class ConsumerStopped(Exception): pass + + class BaseProcess(object): process_name = 'BaseProcess' @@ -431,31 +434,14 @@ def run(self): Run the consumer. """ self.start() - timeout = self._stop_flag_timeout health_check_ts = time_clock() while True: try: - self.stop_flag.wait(timeout=timeout) - except KeyboardInterrupt: - self._logger.info('Received SIGINT') - self.stop(graceful=True) - except: - self._logger.exception('Error in consumer.') - self.stop() - else: - if self._received_signal: - self.stop(graceful=self._graceful) - - if self.stop_flag.is_set(): + health_check_ts = self.loop(health_check_ts) + except ConsumerStopped: break - if self._health_check: - now = time_clock() - if now >= health_check_ts + self._health_check_interval: - health_check_ts = now - self.check_worker_health() - self.huey.notify_interrupted_tasks() if self._restart: @@ -465,6 +451,31 @@ def run(self): else: self._logger.info('Consumer exiting.') + def loop(self, health_check_ts=None): + try: + self.stop_flag.wait(timeout=self._stop_flag_timeout) + except KeyboardInterrupt: + self._logger.info('Received SIGINT') + self.stop(graceful=True) + except: + self._logger.exception('Error in consumer.') + self.stop() + else: + if self._received_signal: + self.stop(graceful=self._graceful) + + if self.stop_flag.is_set(): + # Flag to caller that the main consumer loop should shut down. + raise ConsumerStopped + + if self._health_check and health_check_ts: + now = time_clock() + if now >= health_check_ts + self._health_check_interval: + health_check_ts = now + self.check_worker_health() + + return health_check_ts + def check_worker_health(self): """ Check the health of the worker processes. Workers that have died will