Skip to content

Commit

Permalink
Add unsubscription handler to the Turbo Streams broker (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanjli authored Apr 22, 2022
1 parent d22687b commit 868e95f
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 24 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ You'll need to set some environment variables to tell Fluitans how to assign nam
- AUTHN_ADMIN_PW_HASH, which should be set to the password hash generated by running Fluitans with a password set as AUTHN_ADMIN_PW.
- TURBOSTREAMS_HASH_KEY, which should be set to an HMAC key generated by running Fluitans without the TURBOSTREAMS_HASH_KEY set.

For example, you could generate the password and session key using:
For example, you could generate the password and session key and Turbo Streams hash key using:
```
AUTHN_ADMIN_PW='mypassword' make run
```
Expand Down
2 changes: 2 additions & 0 deletions internal/app/fluitans/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ func (h *Handlers) Register(er godest.EchoRouter, tsr turbostreams.Router, em go
controllers.New(h.r, ztcc, ztc).Register(er, ss)
networks.New(h.r, h.globals.TSBroker.Hub(), dc, ztc, ztcc).Register(er, tsr, ss)
dns.New(h.r, dc, ztc, ztcc).Register(er, tsr, ss)

tsr.UNSUB("/*", turbostreams.EmptyHandler)
}
2 changes: 1 addition & 1 deletion internal/clients/ztcontrollers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (c *Client) FindControllerByAddress(ctx context.Context, address string) (*
func (c *Client) getAddressFromCache(controller Controller) (string, bool) {
address, cacheHit, err := c.Cache.GetAddressByServer(controller.Server)
if err != nil && err != context.Canceled && errors.Unwrap(err) != context.Canceled {
// Log the error but return as a cache miss so we can manually query the RRsets
// Log the error but return as a cache miss so we can manually query the controller
c.Logger.Error(errors.Wrapf(
err, "couldn't get the cache entry for the Zerotier address for %s", controller.Server,
))
Expand Down
31 changes: 18 additions & 13 deletions pkg/godest/turbostreams/actioncable-channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@ import (
const ChannelName = "Turbo::StreamsChannel"

type (
SubHandler func(ctx stdContext.Context, streamName string) error
MsgHandler func(
SubHandler func(ctx stdContext.Context, streamName string) error
UnsubHandler func(ctx stdContext.Context, streamName string)
MsgHandler func(
ctx stdContext.Context, streamName string, messages []Message,
) (result string, err error)
)

type Channel struct {
identifier string
streamName string
h *MessagesHub
handleSub SubHandler
handleMsg MsgHandler
identifier string
streamName string
h *MessagesHub
handleSub SubHandler
handleUnsub UnsubHandler
handleMsg MsgHandler
}

func parseStreamName(identifier string) (string, error) {
Expand All @@ -37,7 +39,8 @@ func parseStreamName(identifier string) (string, error) {
}

func NewChannel(
identifier string, h *MessagesHub, handleSub SubHandler, handleMsg MsgHandler,
identifier string, h *MessagesHub,
handleSub SubHandler, handleUnsub UnsubHandler, handleMsg MsgHandler,
checkers ...actioncable.IdentifierChecker,
) (*Channel, error) {
name, err := parseStreamName(identifier)
Expand All @@ -50,11 +53,12 @@ func NewChannel(
}
}
return &Channel{
identifier: identifier,
streamName: name,
h: h,
handleSub: handleSub,
handleMsg: handleMsg,
identifier: identifier,
streamName: name,
h: h,
handleSub: handleSub,
handleUnsub: handleUnsub,
handleMsg: handleMsg,
}, nil
}

Expand Down Expand Up @@ -92,6 +96,7 @@ func (c *Channel) Subscribe(
}
cancel()
unsub()
c.handleUnsub(ctx, c.streamName)
sub.Close()
}()
return cancel, nil
Expand Down
16 changes: 9 additions & 7 deletions pkg/godest/turbostreams/broker-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
// Methods

const (
MethodPub = "PUB"
MethodSub = "SUB"
MethodMsg = "MSG"
MethodPub = "PUB"
MethodSub = "SUB"
MethodUnsub = "UNSUB"
MethodMsg = "MSG"
)

// Handlers
Expand All @@ -28,13 +29,14 @@ func EmptyHandler(c Context) error {
}

type methodHandler struct {
pub HandlerFunc
sub HandlerFunc
msg HandlerFunc
pub HandlerFunc
sub HandlerFunc
unsub HandlerFunc
msg HandlerFunc
}

func (m *methodHandler) isHandler() bool {
return m.pub != nil || m.sub != nil || m.msg != nil
return m.pub != nil || m.sub != nil || m.unsub != nil || m.msg != nil
}

func handlerName(h HandlerFunc) string {
Expand Down
4 changes: 4 additions & 0 deletions pkg/godest/turbostreams/broker-router.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func (n *node) addHandler(method string, h HandlerFunc) {
n.methodHandler.pub = h
case MethodSub:
n.methodHandler.sub = h
case MethodUnsub:
n.methodHandler.unsub = h
case MethodMsg:
n.methodHandler.msg = h
}
Expand All @@ -107,6 +109,8 @@ func (n *node) findHandler(method string) HandlerFunc {
return n.methodHandler.pub
case MethodSub:
return n.methodHandler.sub
case MethodUnsub:
return n.methodHandler.unsub
case MethodMsg:
return n.methodHandler.msg
}
Expand Down
20 changes: 19 additions & 1 deletion pkg/godest/turbostreams/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Logger interface {
type Router interface {
PUB(topic string, h HandlerFunc, m ...MiddlewareFunc) *Route
SUB(topic string, h HandlerFunc, m ...MiddlewareFunc) *Route
UNSUB(topic string, h HandlerFunc, m ...MiddlewareFunc) *Route
MSG(topic string, h HandlerFunc, m ...MiddlewareFunc) *Route
}

Expand Down Expand Up @@ -91,6 +92,10 @@ func (b *Broker) SUB(topic string, h HandlerFunc, m ...MiddlewareFunc) *Route {
return b.Add(MethodSub, topic, h, m...)
}

func (b *Broker) UNSUB(topic string, h HandlerFunc, m ...MiddlewareFunc) *Route {
return b.Add(MethodUnsub, topic, h, m...)
}

func (b *Broker) MSG(topic string, h HandlerFunc, m ...MiddlewareFunc) *Route {
return b.Add(MethodMsg, topic, h, m...)
}
Expand All @@ -102,7 +107,8 @@ func (b *Broker) ChannelFactory(
) actioncable.ChannelFactory {
return func(identifier string) (actioncable.Channel, error) {
return NewChannel(
identifier, b.hub, b.subHandler(sessionID), b.msgHandler(sessionID), checkers...,
identifier, b.hub,
b.subHandler(sessionID), b.unsubHandler(sessionID), b.msgHandler(sessionID), checkers...,
)
}
}
Expand Down Expand Up @@ -130,6 +136,18 @@ func (b *Broker) subHandler(sessionID string) SubHandler {
}
}

func (b *Broker) unsubHandler(sessionID string) UnsubHandler {
return func(ctx stdContext.Context, topic string) {
c := b.newContext(ctx, topic)
c.sessionID = sessionID
b.router.Find(MethodUnsub, topic, c)
err := errors.Wrapf(c.handler(c), "turbo streams not unsubscribable on topic %s", topic)
if err != nil {
b.logger.Error(err)
}
}
}

func (b *Broker) msgHandler(sessionID string) MsgHandler {
return func(ctx stdContext.Context, topic string, messages []Message) (result string, err error) {
c := b.newContext(ctx, topic)
Expand Down
2 changes: 1 addition & 1 deletion web/templates/auth/login.page.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<p>Security Warning</p>
</div>
<div class="message-body">
This server was started with authentication disabled, so everyone has can sign in as
This server was started with authentication disabled, so everyone can sign in as
admin with a blank username and password!
</div>
</div>
Expand Down

0 comments on commit 868e95f

Please sign in to comment.