From 102617812cfc92150ba3373fc3c4c2cdc15c609c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 29 Apr 2020 20:15:12 -0600 Subject: [PATCH] AsyncTransport: eagerly process queue on stop() Sometimes, you don't want to just rely on the `atexit` handler, but manually shut down the `AsyncTransport` in your application and be sure all the logs are flushed from it. Calling `.flush(); .stop()` is not ideal, because `flush` could block for an unnecessarily long time. This PR: * sets the stop event in `_Worker.stop`, so the queue begins processing immediately * removes `_Worker._export_pending_data`, which became redundant, and just uses `stop` as the atexit handler * Exposes a public `stop` method on `AsyncTransport` * Driveby: clarify `grace_period` and `wait_period` units in docstring --- opencensus/common/transports/async_.py | 40 ++++++++++++++-------- tests/unit/common/transports/test_async.py | 22 ++++-------- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/opencensus/common/transports/async_.py b/opencensus/common/transports/async_.py index c5fe6a395..072775431 100644 --- a/opencensus/common/transports/async_.py +++ b/opencensus/common/transports/async_.py @@ -145,16 +145,17 @@ def start(self): # auto-collection. execution_context.set_is_exporter(True) self._thread.start() - atexit.register(self._export_pending_data) + atexit.register(self.stop) def stop(self): """Signals the background thread to stop. This does not terminate the background thread. It simply queues the - stop signal. If the main process exits before the background thread + stop signal, and tells the background thread to immediately consume any + remaining items. If the main process exits before the background thread processes the stop signal, it will be terminated without finishing - work. The ``grace_period`` parameter will give the background - thread some time to finish processing before this function returns. + work. The ``grace_period`` parameter will give the background thread + some time to finish processing before this function returns. :rtype: bool :returns: True if the thread terminated. False if the thread is still @@ -165,6 +166,8 @@ def stop(self): with self._lock: self._queue.put_nowait(_WORKER_TERMINATOR) + # Stop blocking between export batches + self._event.set() self._thread.join(timeout=self._grace_period) success = not self.is_alive @@ -172,14 +175,6 @@ def stop(self): return success - def _export_pending_data(self): - """Callback that attempts to send pending data before termination.""" - if not self.is_alive: - return - # Stop blocking between export batches - self._event.set() - self.stop() - def enqueue(self, data): """Queues data to be written by the background thread.""" self._queue.put_nowait(data) @@ -198,7 +193,7 @@ class AsyncTransport(base.Transport): :type grace_period: float :param grace_period: The amount of time to wait for pending data to - be submitted when the process is shutting down. + be submitted when the process is shutting down (sec). :type max_batch_size: int :param max_batch_size: The maximum number of items to send at a time @@ -206,7 +201,7 @@ class AsyncTransport(base.Transport): :type wait_period: int :param wait_period: The amount of time to wait before sending the next - batch of data. + batch of data (sec). """ def __init__(self, exporter, @@ -227,5 +222,20 @@ def export(self, data): self.worker.enqueue(data) def flush(self): - """Submit any pending traces/stats.""" + "Submit any pending traces/stats, blocking up to `wait_period` secs." self.worker.flush() + + def stop(self): + """ + Submit any pending traces/stats and shut down the transport. + + Blocks for up to `grace_period` secs. Unlike :meth:`~.flush`, any + pending traces are immediately processed, instead of waiting for the + wait period to end. Once called, any subsequent calls to + :meth:`~.export` will be silently dropped. + + :rtype: bool + :returns: True if the thread terminated. False if the thread is still + running. + """ + self.worker.stop() diff --git a/tests/unit/common/transports/test_async.py b/tests/unit/common/transports/test_async.py index ce8e3ab0b..7ecd85458 100644 --- a/tests/unit/common/transports/test_async.py +++ b/tests/unit/common/transports/test_async.py @@ -56,7 +56,7 @@ def test_start(self): self.assertEqual(worker._thread._target, worker._thread_main) self.assertEqual( worker._thread._name, async_._WORKER_THREAD_NAME) - mock_atexit.assert_called_once_with(worker._export_pending_data) + mock_atexit.assert_called_once_with(worker.stop) cur_thread = worker._thread self._start_worker(worker) @@ -70,6 +70,7 @@ def test_stop(self): worker.stop() + self.assertTrue(worker._event.is_set()) self.assertEqual(worker._queue.qsize(), 1) self.assertEqual( worker._queue.get(), async_._WORKER_TERMINATOR) @@ -79,35 +80,24 @@ def test_stop(self): # If thread not alive, do not stop twice. worker.stop() - def test__export_pending_data(self): - exporter = mock.Mock() - worker = async_._Worker(exporter) - - self._start_worker(worker) - worker._export_pending_data() - - self.assertFalse(worker.is_alive) - - worker._export_pending_data() - - def test__export_pending_data_non_empty_queue(self): + def test_stop_non_empty_queue(self): exporter = mock.Mock() worker = async_._Worker(exporter) self._start_worker(worker) worker.enqueue(mock.Mock()) - worker._export_pending_data() + worker.stop() self.assertFalse(worker.is_alive) - def test__export_pending_data_did_not_join(self): + def test_stop_did_not_join(self): exporter = mock.Mock() worker = async_._Worker(exporter) self._start_worker(worker) worker._thread._terminate_on_join = False worker.enqueue(mock.Mock()) - worker._export_pending_data() + worker.stop() self.assertFalse(worker.is_alive)