diff --git a/prometheus_client/core.py b/prometheus_client/core.py index 0f1d0144..a1c72333 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -765,7 +765,7 @@ def time(self): Can be used as a function decorator or context manager. ''' - return _GaugeTimer(self) + return _Timer(self.set) def set_function(self, f): '''Call the provided function to return the Gauge value. @@ -829,7 +829,7 @@ def time(self): Can be used as a function decorator or context manager. ''' - return _SummaryTimer(self) + return _Timer(self.observe) def _samples(self): return ( @@ -919,7 +919,7 @@ def time(self): Can be used as a function decorator or context manager. ''' - return _HistogramTimer(self) + return _Timer(self.observe) def _samples(self): samples = [] @@ -932,24 +932,6 @@ def _samples(self): return tuple(samples) -class _HistogramTimer(object): - def __init__(self, histogram): - self._histogram = histogram - - def __enter__(self): - self._start = default_timer() - - def __exit__(self, typ, value, traceback): - # Time can go backwards. - self._histogram.observe(max(default_timer() - self._start, 0)) - - def __call__(self, f): - def wrapped(func, *args, **kwargs): - with self: - return func(*args, **kwargs) - return decorate(f, wrapped) - - class _ExceptionCounter(object): def __init__(self, counter, exception): self._counter = counter @@ -986,37 +968,25 @@ def wrapped(func, *args, **kwargs): return decorate(f, wrapped) -class _SummaryTimer(object): - def __init__(self, summary): - self._summary = summary - - def __enter__(self): - self._start = default_timer() +class _Timer(object): + def __init__(self, callback): + self._callback = callback - def __exit__(self, typ, value, traceback): - # Time can go backwards. - self._summary.observe(max(default_timer() - self._start, 0)) - - def __call__(self, f): - def wrapped(func, *args, **kwargs): - with self: - return func(*args, **kwargs) - return decorate(f, wrapped) - - -class _GaugeTimer(object): - def __init__(self, gauge): - self._gauge = gauge + def _new_timer(self): + return self.__class__(self._callback) def __enter__(self): self._start = default_timer() def __exit__(self, typ, value, traceback): # Time can go backwards. - self._gauge.set(max(default_timer() - self._start, 0)) + duration = max(default_timer() - self._start, 0) + self._callback(duration) def __call__(self, f): def wrapped(func, *args, **kwargs): - with self: + # Obtaining new instance of timer every time + # ensures thread safety and reentrancy. + with self._new_timer(): return func(*args, **kwargs) return decorate(f, wrapped) diff --git a/tests/test_core.py b/tests/test_core.py index c949b623..f85a3e1b 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -2,7 +2,13 @@ import inspect import time -import unittest +from concurrent.futures import ThreadPoolExecutor + +try: + import unittest2 as unittest +except ImportError: + import unittest + from prometheus_client.core import ( CollectorRegistry, @@ -124,6 +130,26 @@ def f(): f() self.assertNotEqual(0, self.registry.get_sample_value('g')) + def test_function_decorator_multithread(self): + self.assertEqual(0, self.registry.get_sample_value('g')) + workers = 2 + pool = ThreadPoolExecutor(max_workers=workers) + + @self.gauge.time() + def f(duration): + time.sleep(duration) + + expected_duration = 1 + pool.submit(f, expected_duration) + time.sleep(0.7 * expected_duration) + pool.submit(f, expected_duration * 2) + time.sleep(expected_duration) + + rounding_coefficient = 0.9 + adjusted_expected_duration = expected_duration * rounding_coefficient + self.assertLess(adjusted_expected_duration, self.registry.get_sample_value('g')) + pool.shutdown(wait=True) + def test_time_block_decorator(self): self.assertEqual(0, self.registry.get_sample_value('g')) with self.gauge.time(): @@ -155,6 +181,55 @@ def f(): f() self.assertEqual(1, self.registry.get_sample_value('s_count')) + def test_function_decorator_multithread(self): + self.assertEqual(0, self.registry.get_sample_value('s_count')) + summary2 = Summary('s2', 'help', registry=self.registry) + + workers = 3 + duration = 0.1 + pool = ThreadPoolExecutor(max_workers=workers) + + @self.summary.time() + def f(): + time.sleep(duration / 2) + # Testing that different instances of timer do not interfere + summary2.time()(lambda : time.sleep(duration / 2))() + + jobs = workers * 3 + for i in range(jobs): + pool.submit(f) + pool.shutdown(wait=True) + + self.assertEqual(jobs, self.registry.get_sample_value('s_count')) + + rounding_coefficient = 0.9 + total_expected_duration = jobs * duration * rounding_coefficient + self.assertLess(total_expected_duration, self.registry.get_sample_value('s_sum')) + self.assertLess(total_expected_duration / 2 , self.registry.get_sample_value('s2_sum')) + + def test_function_decorator_reentrancy(self): + self.assertEqual(0, self.registry.get_sample_value('s_count')) + + iterations = 2 + sleep = 0.1 + + @self.summary.time() + def f(i=1): + time.sleep(sleep) + if i == iterations: + return + f(i+1) + + f() + + self.assertEqual(iterations, self.registry.get_sample_value('s_count')) + + # Arithmetic series with d == a_1 + total_expected_duration = sleep * (iterations**2 + iterations) / 2 + rounding_coefficient = 0.9 + total_expected_duration *= rounding_coefficient + self.assertLess(total_expected_duration, self.registry.get_sample_value('s_sum')) + def test_block_decorator(self): self.assertEqual(0, self.registry.get_sample_value('s_count')) with self.summary.time(): @@ -234,6 +309,27 @@ def f(): self.assertEqual(1, self.registry.get_sample_value('h_count')) self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '+Inf'})) + def test_function_decorator_multithread(self): + self.assertEqual(0, self.registry.get_sample_value('h_count')) + workers = 3 + duration = 0.1 + pool = ThreadPoolExecutor(max_workers=workers) + + @self.histogram.time() + def f(): + time.sleep(duration) + + jobs = workers * 3 + for i in range(jobs): + pool.submit(f) + pool.shutdown(wait=True) + + self.assertEqual(jobs, self.registry.get_sample_value('h_count')) + + rounding_coefficient = 0.9 + total_expected_duration = jobs * duration * rounding_coefficient + self.assertLess(total_expected_duration, self.registry.get_sample_value('h_sum')) + def test_block_decorator(self): self.assertEqual(0, self.registry.get_sample_value('h_count')) self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '+Inf'})) diff --git a/tox.ini b/tox.ini index c9c1138f..2b0d0bbb 100644 --- a/tox.ini +++ b/tox.ini @@ -9,11 +9,22 @@ deps = [testenv:py26] ; Last pytest and py version supported on py26 . -deps = +deps = unittest2 py==1.4.31 pytest==2.9.2 coverage + futures + +[testenv:py27] +deps = + {[base]deps} + futures + +[testenv:pypy] +deps = + {[base]deps} + futures [testenv] deps = @@ -24,7 +35,9 @@ commands = coverage run --parallel -m pytest {posargs} ; Ensure test suite passes if no optional dependencies are present. [testenv:py27-nooptionals] -deps = {[base]deps} +deps = + {[base]deps} + futures commands = coverage run --parallel -m pytest {posargs} [testenv:py36-nooptionals]