Skip to content

Commit

Permalink
Merge pull request #3 from IcedTea2K/socker_connections
Browse files Browse the repository at this point in the history
Establish & keep alive websocket connections with realtime server
  • Loading branch information
tranhoangvuit authored May 28, 2024
2 parents a5f6ddb + d9e57a6 commit 3ed63a4
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 1 deletion.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module github.com/supabase-community/realtime-go

go 1.17
go 1.22

require nhooyr.io/websocket v1.8.11
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0=
nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
20 changes: 20 additions & 0 deletions realtime/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package realtime;

// Channel Events
const JOIN_EVENT = "phx_join"
const REPLY_EVENT = "phx_reply"

// DB Subscription Events
const POSTGRES_CHANGE_EVENT = "postgres_changes"

// Broadcast Events
const BROADCAST_EVENT = "broadcast"

// Presence Events
const PRESENCE_STATE_EVENT = "presence_state"
const PRESENCE_DIFF_EVENT ="presence_diff"

// Other Events
const SYS_EVENT = "system"
const HEARTBEAT_EVENT = "heartbeat"
const ACCESS_TOKEN_EVENT = "access_token"
30 changes: 30 additions & 0 deletions realtime/messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package realtime;

type TemplateMsg struct {
Event string `json:"event"`
Topic string `json:"topic"`
Ref string `json:"ref"`
}

type ConnectionMsg struct {
TemplateMsg

Payload struct {
Data struct {
Schema string `json:"schema"`
Table string `json:"table"`
CommitTime string `json:"commit_timestamp"`
EventType string `json:"eventType"`
New map[string]string `json:"new"`
Old map[string]string `json:"old"`
Errors string `json:"errors"`
} `json:"data"`
} `json:"payload"`
}

type HearbeatMsg struct {
TemplateMsg

Payload struct {
} `json:"payload"`
}
213 changes: 213 additions & 0 deletions realtime/realtime_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package realtime

import (
"context"
"errors"
"fmt"
"io"
"log"
"sync"
"time"

"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)

type RealtimeClient struct {
Url string
ApiKey string

mu sync.Mutex
conn *websocket.Conn
closed chan struct{}
logger *log.Logger
dialTimeout time.Duration
reconnectInterval time.Duration
heartbeatDuration time.Duration
heartbeatInterval time.Duration
}

// Create a new RealtimeClient with user's speicfications
func CreateRealtimeClient(projectRef string, apiKey string) *RealtimeClient {
realtimeUrl := fmt.Sprintf(
"wss://%s.supabase.co/realtime/v1/websocket?apikey=%s&log_level=info&vsn=1.0.0",
projectRef,
apiKey,
)
newLogger := log.Default()

return &RealtimeClient{
Url: realtimeUrl,
ApiKey: apiKey,
logger: newLogger,
dialTimeout: 10 * time.Second,
heartbeatDuration: 5 * time.Second,
heartbeatInterval: 20 * time.Second,
reconnectInterval: 500 * time.Millisecond,
}
}

// Connect the client with the realtime server
func (client *RealtimeClient) Connect() error {
if client.isClientAlive() {
return nil
}

// Attempt to dial the server
err := client.dialServer()
if err != nil {
return fmt.Errorf("Cannot connect to the server: %w", err)
}

// client is only alive after the connection has been made
client.mu.Lock()
client.closed = make(chan struct{})
client.mu.Unlock()

go client.startHeartbeats()

return nil
}

// Disconnect the client from the realtime server
func (client *RealtimeClient) Disconnect() error {
client.mu.Lock()
defer client.mu.Unlock()

if !client.isClientAlive() {
return nil
}

err := client.conn.Close(websocket.StatusNormalClosure, "Closing the connection")
if err != nil {
if !client.isConnectionAlive(err) {
client.logger.Println("Connection has already been terminated")
close(client.closed)
} else {
return fmt.Errorf("Failed to close the connection: %w", err)
}
} else {
close(client.closed)
}

return nil
}

// Start sending heartbeats to the server to maintain connection
func (client *RealtimeClient) startHeartbeats() {
for client.isClientAlive() {
err := client.sendHeartbeat()

if err != nil {
if client.isConnectionAlive(err) {
client.logger.Println(err)
} else {
client.logger.Println("Error: lost connection with the server")
client.logger.Println("Attempting to to send hearbeat again")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// there should never be an error returned, since it'll keep trying
_ = client.reconnect(ctx)
}
}

// in case where the client needs to reconnect with the server,
// the interval between heartbeats be however long it takes to
// reconnect plus the number of heartbeatInterval has gone by
time.Sleep(client.heartbeatInterval)
}
}

// Send the heartbeat to the realtime server
func (client *RealtimeClient) sendHeartbeat() error {
msg := HearbeatMsg{
TemplateMsg: TemplateMsg{
Event: HEARTBEAT_EVENT,
Topic: "phoenix",
Ref: "",
},
Payload: struct{}{},
}

ctx, cancel := context.WithTimeout(context.Background(), client.heartbeatDuration)
defer cancel()

client.logger.Print("Sending heartbeat")

err := wsjson.Write(ctx, client.conn, msg)
if err != nil {
return fmt.Errorf("Failed to send hearbeat in %f seconds: %w", client.heartbeatDuration.Seconds(), err)
}

return nil
}

// Dial the server with a certain timeout in seconds
func (client *RealtimeClient) dialServer() error {
client.mu.Lock()
defer client.mu.Unlock()

if client.isClientAlive() {
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), client.dialTimeout)
defer cancel()

conn, _, err := websocket.Dial(ctx, client.Url, nil)
if err != nil {
return fmt.Errorf("Failed to dial the server: %w", err)
}

client.conn = conn

return nil
}

// Keep trying to reconnect every 0.5 seconds until ctx is done/invalidated
func (client *RealtimeClient) reconnect(ctx context.Context) error {
for client.isClientAlive() {
client.logger.Println("Attempt to reconnect to the server")

select {
case <-ctx.Done():
return fmt.Errorf("Failed to reconnect to the server within time limit")
default:
err := client.dialServer()
if err == nil {
return nil
}

client.logger.Printf("Failed to reconnect to the server: %s", err)
time.Sleep(client.reconnectInterval)
}
}

return nil
}

// Check if the realtime client has been killed
func (client *RealtimeClient) isClientAlive() bool {
if client.closed == nil {
return false
}

select {
case <-client.closed:
return false
default:
break
}

return true
}

// The underlying package of websocket returns an error if the connection is
// terminated on the server side. Therefore, the state of the connection can
// be achieved by investigating the error
// Constraints: err must be returned from interacting with the connection
func (client *RealtimeClient) isConnectionAlive(err error) bool {
return !errors.Is(err, io.EOF)
}

0 comments on commit 3ed63a4

Please sign in to comment.