-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
main.go
191 lines (158 loc) · 5.21 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
// Package main shows how to send continuous event messages to the clients through SSE via a broker.
// Read details at: https://www.w3schools.com/htmL/html5_serversentevents.asp and
// https://robots.thoughtbot.com/writing-a-server-sent-events-server-in-go
package main
import (
"encoding/json"
"fmt"
"time"
"github.com/kataras/golog"
"github.com/kataras/iris/v12"
)
// A Broker holds open client connections,
// listens for incoming events on its Notifier channel
// and broadcast event data to all registered connections.
type Broker struct {
// Events are pushed to this channel by the main events-gathering routine.
Notifier chan []byte
// New client connections.
newClients chan chan []byte
// Closed client connections.
closingClients chan chan []byte
// Client connections registry.
clients map[chan []byte]bool
}
// NewBroker returns a new broker factory.
func NewBroker() *Broker {
b := &Broker{
Notifier: make(chan []byte, 1),
newClients: make(chan chan []byte),
closingClients: make(chan chan []byte),
clients: make(map[chan []byte]bool),
}
// Set it running - listening and broadcasting events.
go b.listen()
return b
}
// Listen on different channels and act accordingly.
func (b *Broker) listen() {
for {
select {
case s := <-b.newClients:
// A new client has connected.
// Register their message channel.
b.clients[s] = true
golog.Infof("Client added. %d registered clients", len(b.clients))
case s := <-b.closingClients:
// A client has dettached and we want to
// stop sending them messages.
delete(b.clients, s)
golog.Warnf("Removed client. %d registered clients", len(b.clients))
case event := <-b.Notifier:
// We got a new event from the outside!
// Send event to all connected clients.
for clientMessageChan := range b.clients {
clientMessageChan <- event
}
}
}
}
func (b *Broker) ServeHTTP(ctx iris.Context) {
// Make sure that the writer supports flushing.
flusher, ok := ctx.ResponseWriter().Flusher()
if !ok {
ctx.StatusCode(iris.StatusHTTPVersionNotSupported)
ctx.WriteString("Streaming unsupported!")
return
}
// Set the headers related to event streaming, you can omit the "application/json" if you send plain text.
// If you develop a go client, you must have: "Accept" : "application/json, text/event-stream" header as well.
ctx.ContentType("application/json, text/event-stream")
ctx.Header("Cache-Control", "no-cache")
ctx.Header("Connection", "keep-alive")
// We also add a Cross-origin Resource Sharing header so browsers on different domains can still connect.
ctx.Header("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry.
messageChan := make(chan []byte)
// Signal the broker that we have a new connection.
b.newClients <- messageChan
// Listen to connection close and when the entire request handler chain exits(this handler here) and un-register messageChan.
ctx.OnClose(func() {
// Remove this client from the map of connected clients
// when this handler exits.
b.closingClients <- messageChan
})
// Block waiting for messages broadcast on this connection's messageChan.
for {
// Write to the ResponseWriter.
// Server Sent Events compatible.
ctx.Writef("data: %s\n\n", <-messageChan)
// or json: data:{obj}.
// Flush the data immediately instead of buffering it for later.
flusher.Flush()
}
}
type event struct {
Timestamp int64 `json:"timestamp"`
Message string `json:"message"`
}
const script = `<script type="text/javascript">
if(typeof(EventSource) !== "undefined") {
console.log("server-sent events supported");
var client = new EventSource("http://localhost:8080/events");
var index = 1;
client.onmessage = function (evt) {
console.log(evt);
// it's not required that you send and receive JSON, you can just output the "evt.data" as well.
dataJSON = JSON.parse(evt.data)
var table = document.getElementById("messagesTable");
var row = table.insertRow(index);
var cellTimestamp = row.insertCell(0);
var cellMessage = row.insertCell(1);
cellTimestamp.innerHTML = dataJSON.timestamp;
cellMessage.innerHTML = dataJSON.message;
index++;
window.scrollTo(0,document.body.scrollHeight);
};
} else {
document.getElementById("header").innerHTML = "<h2>SSE not supported by this client-protocol</h2>";
}
</script>`
func main() {
broker := NewBroker()
go func() {
for {
time.Sleep(2 * time.Second)
now := time.Now()
evt := event{
Timestamp: now.Unix(),
Message: fmt.Sprintf("Hello at %s", now.Format(time.RFC1123)),
}
evtBytes, err := json.Marshal(evt)
if err != nil {
golog.Error(err)
continue
}
broker.Notifier <- evtBytes
}
}()
app := iris.New()
app.Get("/", func(ctx iris.Context) {
ctx.HTML(
`<html><head><title>SSE</title>` + script + `</head>
<body>
<h1 id="header">Waiting for messages...</h1>
<table id="messagesTable" border="1">
<tr>
<th>Timestamp (server)</th>
<th>Message</th>
</tr>
</table>
</body>
</html>`)
})
app.Get("/events", broker.ServeHTTP)
// http://localhost:8080
// http://localhost:8080/events
app.Run(iris.Addr(":8080"), iris.WithoutServerError(iris.ErrServerClosed))
}