diff --git a/README.md b/README.md index 30e7909..307157a 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ You'll need to set some environment variables to tell Fluitans how to assign nam - AUTHN_ADMIN_PW_HASH, which should be set to the password hash generated by running Fluitans with a password set as AUTHN_ADMIN_PW. - TURBOSTREAMS_HASH_KEY, which should be set to an HMAC key generated by running Fluitans without the TURBOSTREAMS_HASH_KEY set. -For example, you could generate the password and session key using: +For example, you could generate the password and session key and Turbo Streams hash key using: ``` AUTHN_ADMIN_PW='mypassword' make run ``` diff --git a/internal/app/fluitans/routes/routes.go b/internal/app/fluitans/routes/routes.go index a5f2aec..386d981 100644 --- a/internal/app/fluitans/routes/routes.go +++ b/internal/app/fluitans/routes/routes.go @@ -43,4 +43,6 @@ func (h *Handlers) Register(er godest.EchoRouter, tsr turbostreams.Router, em go controllers.New(h.r, ztcc, ztc).Register(er, ss) networks.New(h.r, h.globals.TSBroker.Hub(), dc, ztc, ztcc).Register(er, tsr, ss) dns.New(h.r, dc, ztc, ztcc).Register(er, tsr, ss) + + tsr.UNSUB("/*", turbostreams.EmptyHandler) } diff --git a/internal/clients/ztcontrollers/controllers.go b/internal/clients/ztcontrollers/controllers.go index e51a5fd..0a6a15f 100644 --- a/internal/clients/ztcontrollers/controllers.go +++ b/internal/clients/ztcontrollers/controllers.go @@ -154,7 +154,7 @@ func (c *Client) FindControllerByAddress(ctx context.Context, address string) (* func (c *Client) getAddressFromCache(controller Controller) (string, bool) { address, cacheHit, err := c.Cache.GetAddressByServer(controller.Server) if err != nil && err != context.Canceled && errors.Unwrap(err) != context.Canceled { - // Log the error but return as a cache miss so we can manually query the RRsets + // Log the error but return as a cache miss so we can manually query the controller c.Logger.Error(errors.Wrapf( err, "couldn't get the cache entry for the Zerotier address for %s", controller.Server, )) diff --git a/pkg/godest/turbostreams/actioncable-channel.go b/pkg/godest/turbostreams/actioncable-channel.go index d7f0fbd..0d75f61 100644 --- a/pkg/godest/turbostreams/actioncable-channel.go +++ b/pkg/godest/turbostreams/actioncable-channel.go @@ -12,18 +12,20 @@ import ( const ChannelName = "Turbo::StreamsChannel" type ( - SubHandler func(ctx stdContext.Context, streamName string) error - MsgHandler func( + SubHandler func(ctx stdContext.Context, streamName string) error + UnsubHandler func(ctx stdContext.Context, streamName string) + MsgHandler func( ctx stdContext.Context, streamName string, messages []Message, ) (result string, err error) ) type Channel struct { - identifier string - streamName string - h *MessagesHub - handleSub SubHandler - handleMsg MsgHandler + identifier string + streamName string + h *MessagesHub + handleSub SubHandler + handleUnsub UnsubHandler + handleMsg MsgHandler } func parseStreamName(identifier string) (string, error) { @@ -37,7 +39,8 @@ func parseStreamName(identifier string) (string, error) { } func NewChannel( - identifier string, h *MessagesHub, handleSub SubHandler, handleMsg MsgHandler, + identifier string, h *MessagesHub, + handleSub SubHandler, handleUnsub UnsubHandler, handleMsg MsgHandler, checkers ...actioncable.IdentifierChecker, ) (*Channel, error) { name, err := parseStreamName(identifier) @@ -50,11 +53,12 @@ func NewChannel( } } return &Channel{ - identifier: identifier, - streamName: name, - h: h, - handleSub: handleSub, - handleMsg: handleMsg, + identifier: identifier, + streamName: name, + h: h, + handleSub: handleSub, + handleUnsub: handleUnsub, + handleMsg: handleMsg, }, nil } @@ -92,6 +96,7 @@ func (c *Channel) Subscribe( } cancel() unsub() + c.handleUnsub(ctx, c.streamName) sub.Close() }() return cancel, nil diff --git a/pkg/godest/turbostreams/broker-handler.go b/pkg/godest/turbostreams/broker-handler.go index f87696e..0dd8a61 100644 --- a/pkg/godest/turbostreams/broker-handler.go +++ b/pkg/godest/turbostreams/broker-handler.go @@ -10,9 +10,10 @@ import ( // Methods const ( - MethodPub = "PUB" - MethodSub = "SUB" - MethodMsg = "MSG" + MethodPub = "PUB" + MethodSub = "SUB" + MethodUnsub = "UNSUB" + MethodMsg = "MSG" ) // Handlers @@ -28,13 +29,14 @@ func EmptyHandler(c Context) error { } type methodHandler struct { - pub HandlerFunc - sub HandlerFunc - msg HandlerFunc + pub HandlerFunc + sub HandlerFunc + unsub HandlerFunc + msg HandlerFunc } func (m *methodHandler) isHandler() bool { - return m.pub != nil || m.sub != nil || m.msg != nil + return m.pub != nil || m.sub != nil || m.unsub != nil || m.msg != nil } func handlerName(h HandlerFunc) string { diff --git a/pkg/godest/turbostreams/broker-router.go b/pkg/godest/turbostreams/broker-router.go index 655dce8..bd9034d 100644 --- a/pkg/godest/turbostreams/broker-router.go +++ b/pkg/godest/turbostreams/broker-router.go @@ -88,6 +88,8 @@ func (n *node) addHandler(method string, h HandlerFunc) { n.methodHandler.pub = h case MethodSub: n.methodHandler.sub = h + case MethodUnsub: + n.methodHandler.unsub = h case MethodMsg: n.methodHandler.msg = h } @@ -107,6 +109,8 @@ func (n *node) findHandler(method string) HandlerFunc { return n.methodHandler.pub case MethodSub: return n.methodHandler.sub + case MethodUnsub: + return n.methodHandler.unsub case MethodMsg: return n.methodHandler.msg } diff --git a/pkg/godest/turbostreams/broker.go b/pkg/godest/turbostreams/broker.go index c936aa8..68eabce 100644 --- a/pkg/godest/turbostreams/broker.go +++ b/pkg/godest/turbostreams/broker.go @@ -31,6 +31,7 @@ type Logger interface { type Router interface { PUB(topic string, h HandlerFunc, m ...MiddlewareFunc) *Route SUB(topic string, h HandlerFunc, m ...MiddlewareFunc) *Route + UNSUB(topic string, h HandlerFunc, m ...MiddlewareFunc) *Route MSG(topic string, h HandlerFunc, m ...MiddlewareFunc) *Route } @@ -91,6 +92,10 @@ func (b *Broker) SUB(topic string, h HandlerFunc, m ...MiddlewareFunc) *Route { return b.Add(MethodSub, topic, h, m...) } +func (b *Broker) UNSUB(topic string, h HandlerFunc, m ...MiddlewareFunc) *Route { + return b.Add(MethodUnsub, topic, h, m...) +} + func (b *Broker) MSG(topic string, h HandlerFunc, m ...MiddlewareFunc) *Route { return b.Add(MethodMsg, topic, h, m...) } @@ -102,7 +107,8 @@ func (b *Broker) ChannelFactory( ) actioncable.ChannelFactory { return func(identifier string) (actioncable.Channel, error) { return NewChannel( - identifier, b.hub, b.subHandler(sessionID), b.msgHandler(sessionID), checkers..., + identifier, b.hub, + b.subHandler(sessionID), b.unsubHandler(sessionID), b.msgHandler(sessionID), checkers..., ) } } @@ -130,6 +136,18 @@ func (b *Broker) subHandler(sessionID string) SubHandler { } } +func (b *Broker) unsubHandler(sessionID string) UnsubHandler { + return func(ctx stdContext.Context, topic string) { + c := b.newContext(ctx, topic) + c.sessionID = sessionID + b.router.Find(MethodUnsub, topic, c) + err := errors.Wrapf(c.handler(c), "turbo streams not unsubscribable on topic %s", topic) + if err != nil { + b.logger.Error(err) + } + } +} + func (b *Broker) msgHandler(sessionID string) MsgHandler { return func(ctx stdContext.Context, topic string, messages []Message) (result string, err error) { c := b.newContext(ctx, topic) diff --git a/web/templates/auth/login.page.tmpl b/web/templates/auth/login.page.tmpl index aceb9f1..e07c921 100644 --- a/web/templates/auth/login.page.tmpl +++ b/web/templates/auth/login.page.tmpl @@ -27,7 +27,7 @@
Security Warning