-
-
Notifications
You must be signed in to change notification settings - Fork 46
/
flask_sse.py
178 lines (159 loc) · 6.07 KB
/
flask_sse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# coding=utf-8
from __future__ import unicode_literals
from collections import OrderedDict
from flask import Blueprint, request, current_app, json, stream_with_context
from redis import StrictRedis
from redis.exceptions import ConnectionError
import six
__version__ = '1.0.0'
@six.python_2_unicode_compatible
class Message(object):
"""
Data that is published as a server-sent event.
"""
def __init__(self, data, type=None, id=None, retry=None):
"""
Create a server-sent event.
:param data: The event data. If it is not a string, it will be
serialized to JSON using the Flask application's
:class:`~flask.json.JSONEncoder`.
:param type: An optional event type.
:param id: An optional event ID.
:param retry: An optional integer, to specify the reconnect time for
disconnected clients of this stream.
"""
self.data = data
self.type = type
self.id = id
self.retry = retry
def to_dict(self):
"""
Serialize this object to a minimal dictionary, for storing in Redis.
"""
# data is required, all others are optional
d = {"data": self.data}
if self.type:
d["type"] = self.type
if self.id:
d["id"] = self.id
if self.retry:
d["retry"] = self.retry
return d
def __str__(self):
"""
Serialize this object to a string, according to the `server-sent events
specification <https://www.w3.org/TR/eventsource/>`_.
"""
if isinstance(self.data, six.string_types):
data = self.data
else:
data = json.dumps(self.data)
lines = ["data:{value}".format(value=line) for line in data.splitlines()]
if self.type:
lines.insert(0, "event:{value}".format(value=self.type))
if self.id:
lines.append("id:{value}".format(value=self.id))
if self.retry:
lines.append("retry:{value}".format(value=self.retry))
return "\n".join(lines) + "\n\n"
def __repr__(self):
kwargs = OrderedDict()
if self.type:
kwargs["type"] = self.type
if self.id:
kwargs["id"] = self.id
if self.retry:
kwargs["retry"] = self.retry
kwargs_repr = "".join(
", {key}={value!r}".format(key=key, value=value)
for key, value in kwargs.items()
)
return "{classname}({data!r}{kwargs})".format(
classname=self.__class__.__name__,
data=self.data,
kwargs=kwargs_repr,
)
def __eq__(self, other):
return (
isinstance(other, self.__class__) and
self.data == other.data and
self.type == other.type and
self.id == other.id and
self.retry == other.retry
)
class ServerSentEventsBlueprint(Blueprint):
"""
A :class:`flask.Blueprint` subclass that knows how to publish, subscribe to,
and stream server-sent events.
"""
@property
def redis(self):
"""
A :class:`redis.StrictRedis` instance, configured to connect to the
current application's Redis server.
"""
redis_url = current_app.config.get("SSE_REDIS_URL")
if not redis_url:
redis_url = current_app.config.get("REDIS_URL")
if not redis_url:
raise KeyError("Must set a redis connection URL in app config.")
return StrictRedis.from_url(redis_url)
def publish(self, data, type=None, id=None, retry=None, channel='sse'):
"""
Publish data as a server-sent event.
:param data: The event data. If it is not a string, it will be
serialized to JSON using the Flask application's
:class:`~flask.json.JSONEncoder`.
:param type: An optional event type.
:param id: An optional event ID.
:param retry: An optional integer, to specify the reconnect time for
disconnected clients of this stream.
:param channel: If you want to direct different events to different
clients, you may specify a channel for this event to go to.
Only clients listening to the same channel will receive this event.
Defaults to "sse".
"""
message = Message(data, type=type, id=id, retry=retry)
msg_json = json.dumps(message.to_dict())
return self.redis.publish(channel=channel, message=msg_json)
def messages(self, channel='sse'):
"""
A generator of :class:`~flask_sse.Message` objects from the given channel.
"""
pubsub = self.redis.pubsub()
pubsub.subscribe(channel)
try:
for pubsub_message in pubsub.listen():
if pubsub_message['type'] == 'message':
msg_dict = json.loads(pubsub_message['data'])
yield Message(**msg_dict)
finally:
try:
pubsub.unsubscribe(channel)
except ConnectionError:
pass
def stream(self):
"""
A view function that streams server-sent events. Ignores any
:mailheader:`Last-Event-ID` headers in the HTTP request.
Use a "channel" query parameter to stream events from a different
channel than the default channel (which is "sse").
"""
channel = request.args.get('channel') or 'sse'
@stream_with_context
def generator():
for message in self.messages(channel=channel):
yield str(message)
return current_app.response_class(
generator(),
mimetype='text/event-stream',
)
sse = ServerSentEventsBlueprint('sse', __name__)
"""
An instance of :class:`~flask_sse.ServerSentEventsBlueprint`
that hooks up the :meth:`~flask_sse.ServerSentEventsBlueprint.stream`
method as a view function at the root of the blueprint. If you don't
want to customize this blueprint at all, you can simply import and
use this instance in your application.
"""
sse.add_url_rule(rule="", endpoint="stream", view_func=sse.stream)