Skip to content

Commit

Permalink
Thread safety done right (#290)
Browse files Browse the repository at this point in the history
* Multithreading support for time() decorators

Fixes #287

Signed-off-by: Zaar Hai <[email protected]>

* Installing futures package for Python 2.x

Required for multithreading tests.

Signed-off-by: Zaar Hai <[email protected]>

* Py2.x unittest compatability

Signed-off-by: Zaar Hai <[email protected]>

* Fixing text environment for Py27 multithread

Signed-off-by: Zaar Hai <[email protected]>

* pypy needs futures for testing as well

It's a 2.7 version of Python lang.

Signed-off-by: Zaar Hai <[email protected]>

* Ensuring that different instances of timer do not interfere

Signed-off-by: Zaar Hai <[email protected]>

* Python2.6 compliance

Signed-off-by: Zaar Hai <[email protected]>

Python2.6 compliance take 2

Signed-off-by: Zaar Hai <[email protected]>

* Using new object instead of thread local storage

It makes time() decorated functions reentrant.
It's a bit slower than TLS version, but for only
0.12us (micro) per iteration - it worth the
reentrancy and it's also IMHO a more clear
and pythonic.

Signed-off-by: Zaar Hai <[email protected]>
  • Loading branch information
haizaar authored and brian-brazil committed Jul 24, 2018
1 parent f2436ca commit 889a6fb
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 46 deletions.
56 changes: 13 additions & 43 deletions prometheus_client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ def time(self):
Can be used as a function decorator or context manager.
'''
return _GaugeTimer(self)
return _Timer(self.set)

def set_function(self, f):
'''Call the provided function to return the Gauge value.
Expand Down Expand Up @@ -829,7 +829,7 @@ def time(self):
Can be used as a function decorator or context manager.
'''
return _SummaryTimer(self)
return _Timer(self.observe)

def _samples(self):
return (
Expand Down Expand Up @@ -919,7 +919,7 @@ def time(self):
Can be used as a function decorator or context manager.
'''
return _HistogramTimer(self)
return _Timer(self.observe)

def _samples(self):
samples = []
Expand All @@ -932,24 +932,6 @@ def _samples(self):
return tuple(samples)


class _HistogramTimer(object):
def __init__(self, histogram):
self._histogram = histogram

def __enter__(self):
self._start = default_timer()

def __exit__(self, typ, value, traceback):
# Time can go backwards.
self._histogram.observe(max(default_timer() - self._start, 0))

def __call__(self, f):
def wrapped(func, *args, **kwargs):
with self:
return func(*args, **kwargs)
return decorate(f, wrapped)


class _ExceptionCounter(object):
def __init__(self, counter, exception):
self._counter = counter
Expand Down Expand Up @@ -986,37 +968,25 @@ def wrapped(func, *args, **kwargs):
return decorate(f, wrapped)


class _SummaryTimer(object):
def __init__(self, summary):
self._summary = summary

def __enter__(self):
self._start = default_timer()
class _Timer(object):
def __init__(self, callback):
self._callback = callback

def __exit__(self, typ, value, traceback):
# Time can go backwards.
self._summary.observe(max(default_timer() - self._start, 0))

def __call__(self, f):
def wrapped(func, *args, **kwargs):
with self:
return func(*args, **kwargs)
return decorate(f, wrapped)


class _GaugeTimer(object):
def __init__(self, gauge):
self._gauge = gauge
def _new_timer(self):
return self.__class__(self._callback)

def __enter__(self):
self._start = default_timer()

def __exit__(self, typ, value, traceback):
# Time can go backwards.
self._gauge.set(max(default_timer() - self._start, 0))
duration = max(default_timer() - self._start, 0)
self._callback(duration)

def __call__(self, f):
def wrapped(func, *args, **kwargs):
with self:
# Obtaining new instance of timer every time
# ensures thread safety and reentrancy.
with self._new_timer():
return func(*args, **kwargs)
return decorate(f, wrapped)
98 changes: 97 additions & 1 deletion tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

import inspect
import time
import unittest
from concurrent.futures import ThreadPoolExecutor

try:
import unittest2 as unittest
except ImportError:
import unittest


from prometheus_client.core import (
CollectorRegistry,
Expand Down Expand Up @@ -124,6 +130,26 @@ def f():
f()
self.assertNotEqual(0, self.registry.get_sample_value('g'))

def test_function_decorator_multithread(self):
self.assertEqual(0, self.registry.get_sample_value('g'))
workers = 2
pool = ThreadPoolExecutor(max_workers=workers)

@self.gauge.time()
def f(duration):
time.sleep(duration)

expected_duration = 1
pool.submit(f, expected_duration)
time.sleep(0.7 * expected_duration)
pool.submit(f, expected_duration * 2)
time.sleep(expected_duration)

rounding_coefficient = 0.9
adjusted_expected_duration = expected_duration * rounding_coefficient
self.assertLess(adjusted_expected_duration, self.registry.get_sample_value('g'))
pool.shutdown(wait=True)

def test_time_block_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('g'))
with self.gauge.time():
Expand Down Expand Up @@ -155,6 +181,55 @@ def f():
f()
self.assertEqual(1, self.registry.get_sample_value('s_count'))

def test_function_decorator_multithread(self):
self.assertEqual(0, self.registry.get_sample_value('s_count'))
summary2 = Summary('s2', 'help', registry=self.registry)

workers = 3
duration = 0.1
pool = ThreadPoolExecutor(max_workers=workers)

@self.summary.time()
def f():
time.sleep(duration / 2)
# Testing that different instances of timer do not interfere
summary2.time()(lambda : time.sleep(duration / 2))()

jobs = workers * 3
for i in range(jobs):
pool.submit(f)
pool.shutdown(wait=True)

self.assertEqual(jobs, self.registry.get_sample_value('s_count'))

rounding_coefficient = 0.9
total_expected_duration = jobs * duration * rounding_coefficient
self.assertLess(total_expected_duration, self.registry.get_sample_value('s_sum'))
self.assertLess(total_expected_duration / 2 , self.registry.get_sample_value('s2_sum'))

def test_function_decorator_reentrancy(self):
self.assertEqual(0, self.registry.get_sample_value('s_count'))

iterations = 2
sleep = 0.1

@self.summary.time()
def f(i=1):
time.sleep(sleep)
if i == iterations:
return
f(i+1)

f()

self.assertEqual(iterations, self.registry.get_sample_value('s_count'))

# Arithmetic series with d == a_1
total_expected_duration = sleep * (iterations**2 + iterations) / 2
rounding_coefficient = 0.9
total_expected_duration *= rounding_coefficient
self.assertLess(total_expected_duration, self.registry.get_sample_value('s_sum'))

def test_block_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('s_count'))
with self.summary.time():
Expand Down Expand Up @@ -234,6 +309,27 @@ def f():
self.assertEqual(1, self.registry.get_sample_value('h_count'))
self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))

def test_function_decorator_multithread(self):
self.assertEqual(0, self.registry.get_sample_value('h_count'))
workers = 3
duration = 0.1
pool = ThreadPoolExecutor(max_workers=workers)

@self.histogram.time()
def f():
time.sleep(duration)

jobs = workers * 3
for i in range(jobs):
pool.submit(f)
pool.shutdown(wait=True)

self.assertEqual(jobs, self.registry.get_sample_value('h_count'))

rounding_coefficient = 0.9
total_expected_duration = jobs * duration * rounding_coefficient
self.assertLess(total_expected_duration, self.registry.get_sample_value('h_sum'))

def test_block_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('h_count'))
self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))
Expand Down
17 changes: 15 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,22 @@ deps =

[testenv:py26]
; Last pytest and py version supported on py26 .
deps =
deps =
unittest2
py==1.4.31
pytest==2.9.2
coverage
futures

[testenv:py27]
deps =
{[base]deps}
futures

[testenv:pypy]
deps =
{[base]deps}
futures

[testenv]
deps =
Expand All @@ -24,7 +35,9 @@ commands = coverage run --parallel -m pytest {posargs}

; Ensure test suite passes if no optional dependencies are present.
[testenv:py27-nooptionals]
deps = {[base]deps}
deps =
{[base]deps}
futures
commands = coverage run --parallel -m pytest {posargs}

[testenv:py36-nooptionals]
Expand Down

0 comments on commit 889a6fb

Please sign in to comment.