Skip to content

Commit

Permalink
Merge pull request #16 from moiseshiraldo/15-enable-callbacks-for-celery
Browse files Browse the repository at this point in the history
Enable callbacks for the celery dispatcher #15
  • Loading branch information
ask authored Aug 14, 2019
2 parents 3fec8a0 + 8747b74 commit f967745
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 11 deletions.
2 changes: 1 addition & 1 deletion t/unit/dispatch/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_send(self, patching):
res = Dispatcher().send(
event, payload, user, timeout=3.03, kw=9, context=context)
send_event.s.assert_called_once_with(
event, payload, user.pk, 3.03, context,
event, payload, user.pk, 3.03, context, kw=9
)
send_event.s().apply_async.assert_called_once_with()
assert res is send_event.s().apply_async()
Expand Down
16 changes: 12 additions & 4 deletions t/unit/test_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
from conftest import DEFAULT_RECIPIENT_VALIDATORS


class PickableMock(Mock):
def __reduce__(self):
return (Mock, ())


def mock_req(event, url, **kwargs):
kwargs.setdefault('on_success', Mock(name='on_success'))
kwargs.setdefault('on_timeout', Mock(name='on_timeout'))
kwargs.setdefault('on_error', Mock(name='on_error'))
kwargs.setdefault('on_success', PickableMock(name='on_success'))
kwargs.setdefault('on_timeout', PickableMock(name='on_timeout'))
kwargs.setdefault('on_error', PickableMock(name='on_error'))
subscriber = Mock(name='subscriber')
subscriber.url = url
subscriber.content_type = MIME_JSON
Expand Down Expand Up @@ -183,6 +188,9 @@ def test_as_dict(self):
'retry_max': self.req.retry_max,
'recipient_validators': DEFAULT_RECIPIENT_VALIDATORS,
'allow_keepalive': self.req.allow_keepalive,
'on_success': self.req.on_success,
'on_error': self.req.on_error,
'on_timeout': self.req.on_timeout,
}

def test_urlident(self):
Expand All @@ -206,7 +214,7 @@ def as_dict(self):
return {'value': 808}
self.req._dispatcher = ''
self.req.subscriber = Subscriber()
r2 = pickle.loads(pickle.dumps(self.req))
r2 = pickle.loads(pickle.dumps(self.req, -1))
assert r2.app is self.app

def test_repr(self):
Expand Down
9 changes: 6 additions & 3 deletions t/unit/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ def test_success(self, app_or_default):
id=self.req.id, timeout=self.req.timeout, retry=self.req.retry,
retry_max=self.req.retry_max, retry_delay=self.req.retry_delay,
recipient_validators=DEFAULT_RECIPIENT_VALIDATORS,
allow_keepalive=True,
allow_keepalive=True, on_error=None, on_success=None,
on_timeout=None
)
_Request().dispatch.assert_called_once_with(
session=self.session, propagate=_Request().retry)
Expand All @@ -116,7 +117,8 @@ def test_when_keepalive_disabled(self, app_or_default):
id=self.req.id, timeout=self.req.timeout, retry=self.req.retry,
retry_max=self.req.retry_max, retry_delay=self.req.retry_delay,
recipient_validators=DEFAULT_RECIPIENT_VALIDATORS,
allow_keepalive=False,
allow_keepalive=False, on_error=None, on_success=None,
on_timeout=None
)
_Request().dispatch.assert_called_once_with(
session=self.session, propagate=_Request().retry)
Expand All @@ -133,7 +135,8 @@ def test_success__with_user(self, app_or_default):
id=self.req2.id, timeout=self.req2.timeout, retry=self.req2.retry,
retry_max=self.req2.retry_max, retry_delay=self.req2.retry_delay,
recipient_validators=DEFAULT_RECIPIENT_VALIDATORS,
allow_keepalive=True,
allow_keepalive=True, on_error=None, on_success=None,
on_timeout=None
)
_Request().dispatch.assert_called_once_with(
session=self.session, propagate=_Request().retry)
Expand Down
2 changes: 1 addition & 1 deletion thorn/dispatch/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def send(self, event, payload, sender,
timeout=None, context=None, **kwargs):
return send_event.s(
event, payload,
sender.pk if sender else sender, timeout, context,
sender.pk if sender else sender, timeout, context, **kwargs
).apply_async()

def flush_buffer(self):
Expand Down
3 changes: 3 additions & 0 deletions thorn/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ def as_dict(self):
self._recipient_validators,
),
'allow_keepalive': self.allow_keepalive,
'on_success': self.on_success,
'on_error': self.on_error,
'on_timeout': self.on_timeout,
}

def annotate_headers(self, extra_headers):
Expand Down
4 changes: 2 additions & 2 deletions thorn/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def _worker_dispatcher():


@shared_task(ignore_result=True)
def send_event(event, payload, sender, timeout, context={}):
def send_event(event, payload, sender, timeout, context={}, **kwargs):
# type: (str, Dict, Any, float, Dict) -> None
"""Task called by process dispatching the event.
Expand All @@ -26,7 +26,7 @@ def send_event(event, payload, sender, timeout, context={}):
HTTP requests in batches (``dispatch_requests -> dispatch_request``).
"""
_worker_dispatcher().send(
event, payload, sender, timeout=timeout, context=context)
event, payload, sender, timeout=timeout, context=context, **kwargs)


@shared_task(ignore_result=True)
Expand Down

0 comments on commit f967745

Please sign in to comment.