Skip to content

Commit

Permalink
feat: websocket support
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Gleich <[email protected]>
  • Loading branch information
gleich committed Dec 22, 2024
1 parent 85cabc1 commit 72a744b
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 13 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/caarlos0/env/v11 v11.2.2
github.com/gleich/lumber/v3 v3.0.2
github.com/go-chi/chi/v5 v5.1.0
github.com/gorilla/websocket v1.5.3
github.com/joho/godotenv v1.5.1
github.com/minio/minio-go/v7 v7.0.81
github.com/prometheus/client_golang v1.20.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
Expand Down
38 changes: 25 additions & 13 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@ import (
"github.com/gleich/lcp-v2/internal/metrics"
"github.com/gleich/lcp-v2/internal/secrets"
"github.com/gleich/lumber/v3"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type Cache[T any] struct {
name string
mutex sync.RWMutex
data T
updated time.Time
updateCounter prometheus.Counter
requestCounter prometheus.Counter
filePath string
name string
dataMutex sync.RWMutex
data T
updated time.Time
updateCounter prometheus.Counter
requestCounter prometheus.Counter
filePath string
wsConnPool map[*websocket.Conn]bool
wsConnPoolMutex sync.Mutex
wsUpgrader websocket.Upgrader
}

func NewCache[T any](name string, data T) *Cache[T] {
Expand All @@ -39,7 +43,9 @@ func NewCache[T any](name string, data T) *Cache[T] {
Name: fmt.Sprintf("cache_%s_requests", name),
Help: fmt.Sprintf(`The total number of times the cache "%s" has been requested`, name),
}),
filePath: filepath.Join(secrets.SECRETS.CacheFolder, fmt.Sprintf("%s.json", name)),
filePath: filepath.Join(secrets.SECRETS.CacheFolder, fmt.Sprintf("%s.json", name)),
wsConnPool: make(map[*websocket.Conn]bool),
wsUpgrader: websocket.Upgrader{},
}
cache.loadFromFile()
cache.Update(data)
Expand All @@ -58,10 +64,10 @@ func (c *Cache[T]) ServeHTTP() http.HandlerFunc {
w.WriteHeader(http.StatusUnauthorized)
return
}
c.mutex.RLock()
c.dataMutex.RLock()
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(cacheData[T]{Data: c.data, Updated: c.updated})
c.mutex.RUnlock()
c.dataMutex.RUnlock()
c.requestCounter.Inc()
if err != nil {
lumber.Error(err, "failed to write data")
Expand All @@ -74,7 +80,7 @@ func (c *Cache[T]) ServeHTTP() http.HandlerFunc {
// Update the given cache
func (c *Cache[T]) Update(data T) {
var updated bool
c.mutex.Lock()
c.dataMutex.Lock()
old, err := json.Marshal(c.data)
if err != nil {
lumber.Error(err, "failed to json marshal old data")
Expand All @@ -90,12 +96,18 @@ func (c *Cache[T]) Update(data T) {
c.updated = time.Now()
updated = true
}
c.mutex.Unlock()
c.dataMutex.Unlock()

if updated {
c.updateCounter.Inc()
metrics.CacheUpdates.Inc()
c.persistToFile()
lumber.Done(strings.ToUpper(c.name), "cache updated")
connectionsUpdated := c.broadcastUpdate()
lumber.Done(
strings.ToUpper(c.name),
"cache updated;",
"broadcasted to", connectionsUpdated, "websocket connections",
)
}
}

Expand Down
62 changes: 62 additions & 0 deletions internal/cache/websockets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package cache

import (
"net/http"

"github.com/gleich/lumber/v3"
"github.com/gorilla/websocket"
)

// Handle websocket connections
func (c *Cache[T]) ServeWS() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
conn, err := c.wsUpgrader.Upgrade(w, r, nil)
if err != nil {
lumber.Error(err, "failed to upgrade connection to websocket")
return
}
c.wsConnPoolMutex.Lock()
c.wsConnPool[conn] = true
c.wsConnPoolMutex.Unlock()

// sending initial data
c.dataMutex.RLock()
err = conn.WriteJSON(c.data)
c.dataMutex.RUnlock()
if err != nil {
lumber.Error(err, "failed to write initial cache data for", c.name)
c.removeConnection(conn)
return
}

// spawning goroutine to handle connection
go func() {
defer c.removeConnection(conn)
}()
}
}

func (c *Cache[T]) broadcastUpdate() int {
c.dataMutex.RLock()
d := c.data
c.dataMutex.RUnlock()

updatedConnections := 0
for conn := range c.wsConnPool {
err := conn.WriteJSON(d)
if err != nil {
lumber.Error(err, "failed to broadcast update to client")
c.removeConnection(conn)
} else {
updatedConnections++
}
}
return updatedConnections
}

func (c *Cache[T]) removeConnection(conn *websocket.Conn) {
c.wsConnPoolMutex.Lock()
delete(c.wsConnPool, conn)
c.wsConnPoolMutex.Unlock()
conn.Close()
}

0 comments on commit 72a744b

Please sign in to comment.