From c543b94af0fb7c6259095eabfad6fd6ba24ecd7a Mon Sep 17 00:00:00 2001 From: Minh Au Date: Fri, 10 May 2024 01:08:07 -0400 Subject: [PATCH 1/8] add Connection and Heartbeat message structs and a few common event types --- realtime/constants.go | 20 ++++++++++++++++++++ realtime/messages.go | 26 ++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 realtime/constants.go create mode 100644 realtime/messages.go diff --git a/realtime/constants.go b/realtime/constants.go new file mode 100644 index 0000000..5ef5018 --- /dev/null +++ b/realtime/constants.go @@ -0,0 +1,20 @@ +package realtime; + +// Channel Events +const JOIN_EVENT = "phx_join" +const REPLY_EVENT = "phx_reply" + +// DB Subscription Events +const POSTGRES_CHANGE_EVENT = "postgres_changes" + +// Broadcast Events +const BROADCAST_EVENT = "broadcast" + +// Presence Events +const PRESENCE_STATE_EVENT = "presence_state" +const PRESENCE_DIFF_EVENT ="presence_diff" + +// Other Events +const SYS_EVENT = "system" +const HEARTBEAT_EVENT = "heartbeat" +const ACCESS_TOKEN_EVENT = "access_token" diff --git a/realtime/messages.go b/realtime/messages.go new file mode 100644 index 0000000..897f2e8 --- /dev/null +++ b/realtime/messages.go @@ -0,0 +1,26 @@ +package realtime; + +type ConnectionMsg struct { + Event string `json:"event"` + Topic string `json:"topic"` + Payload struct { + Data struct { + Schema string `json:"schema"` + Table string `json:"table"` + CommitTime string `json:"commit_timestamp"` + EventType string `json:"eventType"` + New map[string]string `json:"new"` + Old map[string]string `json:"old"` + Errors string `json:"errors"` + } `json:"data"` + } `json:"payload"` + Ref string `json:"ref"` +} + +type HearbeatMsg struct { + Event string `json:"event"` + Topic string `json:"topic"` + Payload struct { + } `json:"payload"` + Ref string `json:"ref"` +} From f5e22a9d3034d3e248ff43f90d72fab8d80433e1 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Fri, 10 May 2024 18:50:33 -0400 Subject: [PATCH 2/8] connect to websocket with WIP heartbeat --- go.mod | 2 + go.sum | 2 + realtime/realtime_client.go | 101 ++++++++++++++++++++++++++++++++++++ 3 files changed, 105 insertions(+) create mode 100644 go.sum create mode 100644 realtime/realtime_client.go diff --git a/go.mod b/go.mod index 564fcfe..2e58db8 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module github.com/supabase-community/realtime-go go 1.17 + +require nhooyr.io/websocket v1.8.11 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..429e98e --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= +nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go new file mode 100644 index 0000000..2ba0377 --- /dev/null +++ b/realtime/realtime_client.go @@ -0,0 +1,101 @@ +package realtime + +import ( + "context" + "fmt" + "time" + + "nhooyr.io/websocket" +) + +type RealtimeClient struct { + Url string + ApiKey string + + conn *websocket.Conn + closed chan struct{} + dialTimeout time.Duration + heartbeatDuration time.Duration +} + +// Create a new RealtimeClient with user's speicfications +func CreateRealtimeClient(projectRef string, apiKey string) *RealtimeClient { + realtimeUrl := fmt.Sprintf( + "wss://%s.supabase.co/realtime/v1/websocket?apikey=%s&log_level=info&vsn=1.0.0", + projectRef, + apiKey, + ) + + return &RealtimeClient{ + Url: realtimeUrl, + ApiKey: apiKey, + dialTimeout: 10, + heartbeatDuration: 20, + } +} + +// Connect the client with the realtie server +func (client *RealtimeClient) Connect() error { + if client.closed != nil { + return nil + } + + client.closed = make(chan struct{}) + + // Attempt to dial the server + err := client.dialServer() + if err != nil { + return fmt.Errorf("Cannot connect to the server: %w", err) + } + + // Start sending heartbeat to keep the connection alive + go client.startHeartbeats() + + return nil +} + +// Disconnect the client from the realtime server +func (client *RealtimeClient) Disconnect() error { + err := client.conn.CloseNow() + + return err +} + +// Start sending heartbeats to the server to maintain connection +func (client *RealtimeClient) startHeartbeats() { + isBeating := true + for isBeating { + select { + case <-client.closed: + isBeating = false + break + + default: + client.sendHeartbeat() + time.Sleep(client.heartbeatDuration * time.Second) + } + } +} + +func (client *RealtimeClient) sendHeartbeat() { + // Send the heartbeat +} + +// Dial the server with a certain timeout in seconds +func (client *RealtimeClient) dialServer() error { + if client.conn != nil { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), client.dialTimeout * time.Second) + defer cancel() + + conn, _, err := websocket.Dial(ctx, client.Url, nil) + if err != nil { + return err + } + + client.conn = conn + + return nil +} From 1174882b0a10f398ac7e31d1b433f5582980462b Mon Sep 17 00:00:00 2001 From: Minh Au Date: Sat, 11 May 2024 19:33:53 -0400 Subject: [PATCH 3/8] send heartbeat every 20 seconds --- realtime/messages.go | 14 +++++++----- realtime/realtime_client.go | 43 +++++++++++++++++++++++++++---------- 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/realtime/messages.go b/realtime/messages.go index 897f2e8..21b9872 100644 --- a/realtime/messages.go +++ b/realtime/messages.go @@ -1,8 +1,14 @@ package realtime; -type ConnectionMsg struct { +type TemplateMsg struct { Event string `json:"event"` Topic string `json:"topic"` + Ref string `json:"ref"` +} + +type ConnectionMsg struct { + TemplateMsg + Payload struct { Data struct { Schema string `json:"schema"` @@ -14,13 +20,11 @@ type ConnectionMsg struct { Errors string `json:"errors"` } `json:"data"` } `json:"payload"` - Ref string `json:"ref"` } type HearbeatMsg struct { - Event string `json:"event"` - Topic string `json:"topic"` + TemplateMsg + Payload struct { } `json:"payload"` - Ref string `json:"ref"` } diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index 2ba0377..29c91d7 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -6,6 +6,7 @@ import ( "time" "nhooyr.io/websocket" + "nhooyr.io/websocket/wsjson" ) type RealtimeClient struct { @@ -39,7 +40,7 @@ func (client *RealtimeClient) Connect() error { if client.closed != nil { return nil } - + client.closed = make(chan struct{}) // Attempt to dial the server @@ -48,27 +49,29 @@ func (client *RealtimeClient) Connect() error { return fmt.Errorf("Cannot connect to the server: %w", err) } - // Start sending heartbeat to keep the connection alive - go client.startHeartbeats() - return nil } // Disconnect the client from the realtime server func (client *RealtimeClient) Disconnect() error { - err := client.conn.CloseNow() + if client.closed == nil { + return nil + } + + close(client.closed) + err := client.conn.Close(websocket.StatusNormalClosure, "Closing the connection") return err } // Start sending heartbeats to the server to maintain connection func (client *RealtimeClient) startHeartbeats() { - isBeating := true - for isBeating { +Loop: + for { select { case <-client.closed: - isBeating = false - break + client.closed = nil + break Loop default: client.sendHeartbeat() @@ -79,6 +82,24 @@ func (client *RealtimeClient) startHeartbeats() { func (client *RealtimeClient) sendHeartbeat() { // Send the heartbeat + msg := HearbeatMsg{ + TemplateMsg: TemplateMsg{ + Event: HEARTBEAT_EVENT, + Topic: "phoenix", + Ref: "", + }, + Payload: struct{}{}, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := wsjson.Write(ctx, client.conn, msg) + if err != nil { + fmt.Println(websocket.CloseStatus(err)) + } + + fmt.Println(msg) } // Dial the server with a certain timeout in seconds @@ -87,7 +108,7 @@ func (client *RealtimeClient) dialServer() error { return nil } - ctx, cancel := context.WithTimeout(context.Background(), client.dialTimeout * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), client.dialTimeout * time.Second) defer cancel() conn, _, err := websocket.Dial(ctx, client.Url, nil) @@ -95,7 +116,7 @@ func (client *RealtimeClient) dialServer() error { return err } - client.conn = conn + client.conn = conn return nil } From 0e9f86218bbff3b527b32fba510d2b09544b2523 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Sat, 11 May 2024 22:34:54 -0400 Subject: [PATCH 4/8] retry sending heartbeats on failure along with being more thread safe --- realtime/realtime_client.go | 122 ++++++++++++++++++++++++++++-------- 1 file changed, 97 insertions(+), 25 deletions(-) diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index 29c91d7..410d9ee 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -3,6 +3,8 @@ package realtime import ( "context" "fmt" + "log" + "sync" "time" "nhooyr.io/websocket" @@ -13,10 +15,15 @@ type RealtimeClient struct { Url string ApiKey string + mu sync.Mutex conn *websocket.Conn closed chan struct{} + logger *log.Logger dialTimeout time.Duration + reconnectInterval time.Duration + reconnectDuration time.Duration heartbeatDuration time.Duration + heartbeatInterval time.Duration } // Create a new RealtimeClient with user's speicfications @@ -26,22 +33,25 @@ func CreateRealtimeClient(projectRef string, apiKey string) *RealtimeClient { projectRef, apiKey, ) + newLogger := log.Default() return &RealtimeClient{ Url: realtimeUrl, ApiKey: apiKey, - dialTimeout: 10, - heartbeatDuration: 20, + logger: newLogger, + dialTimeout: 10 * time.Second, + heartbeatDuration: 5 * time.Second, + heartbeatInterval: 20 * time.Second, + reconnectDuration: 60 * time.Second, + reconnectInterval: 500 * time.Millisecond, } } -// Connect the client with the realtie server +// Connect the client with the realtime server func (client *RealtimeClient) Connect() error { - if client.closed != nil { + if client.isAlive() { return nil } - - client.closed = make(chan struct{}) // Attempt to dial the server err := client.dialServer() @@ -49,39 +59,58 @@ func (client *RealtimeClient) Connect() error { return fmt.Errorf("Cannot connect to the server: %w", err) } + // client is only alive after the connection has been made + client.mu.Lock() + client.closed = make(chan struct{}) + client.mu.Unlock() + + go client.startHeartbeats() + return nil } // Disconnect the client from the realtime server func (client *RealtimeClient) Disconnect() error { - if client.closed == nil { + client.mu.Lock() + defer client.mu.Unlock() + + if !client.isAlive() { return nil } - close(client.closed) err := client.conn.Close(websocket.StatusNormalClosure, "Closing the connection") + if err != nil { + client.logger.Println("Failed to close the connection") + client.logger.Printf("%v", err) + } else { + close(client.closed) + } - return err + return fmt.Errorf("Failed to close the connection") } // Start sending heartbeats to the server to maintain connection func (client *RealtimeClient) startHeartbeats() { -Loop: - for { - select { - case <-client.closed: - client.closed = nil - break Loop + for client.isAlive() { + err1 := client.sendHeartbeat() - default: - client.sendHeartbeat() - time.Sleep(client.heartbeatDuration * time.Second) + if err1 != nil { + client.logger.Println("Attempting to to send hearbeat again") + + ctx, cancel := context.WithTimeout(context.Background(), client.reconnectDuration) + defer cancel() + + err2 := client.reConnect(ctx) + if err2 != nil { + client.logger.Printf("Error: %v", err2) + } } + time.Sleep(client.heartbeatInterval) } } -func (client *RealtimeClient) sendHeartbeat() { - // Send the heartbeat +// Send the heartbeat to the realtime server +func (client *RealtimeClient) sendHeartbeat() error { msg := HearbeatMsg{ TemplateMsg: TemplateMsg{ Event: HEARTBEAT_EVENT, @@ -91,24 +120,32 @@ func (client *RealtimeClient) sendHeartbeat() { Payload: struct{}{}, } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), client.heartbeatDuration) defer cancel() + client.logger.Print(msg) + err := wsjson.Write(ctx, client.conn, msg) if err != nil { - fmt.Println(websocket.CloseStatus(err)) + client.logger.Printf("Error: %v", err) + client.logger.Printf("Failed to send heartbeat in %f seconds", client.heartbeatDuration.Seconds()) + + return err } - fmt.Println(msg) + return nil } // Dial the server with a certain timeout in seconds func (client *RealtimeClient) dialServer() error { - if client.conn != nil { + client.mu.Lock() + defer client.mu.Unlock() + + if client.isAlive() { return nil } - ctx, cancel := context.WithTimeout(context.Background(), client.dialTimeout * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), client.dialTimeout) defer cancel() conn, _, err := websocket.Dial(ctx, client.Url, nil) @@ -120,3 +157,38 @@ func (client *RealtimeClient) dialServer() error { return nil } + +// Keep trying to reconnect every 0.5 seconds until ctx is done/invalidated +func (client *RealtimeClient) reConnect(ctx context.Context) error { + for client.isAlive() { + select { + case <-ctx.Done(): + return fmt.Errorf("Failed to reconnect to the server") + default: + err := client.dialServer() + if err == nil { + return nil + } + + time.Sleep(client.reconnectInterval) + } + } + + return nil +} + +// Check if there's a connection with the realtime server +func (client *RealtimeClient) isAlive() bool { + if client.closed == nil { + return false + } + + select { + case <-client.closed: + return false + default: + break + } + + return true +} From 9064bc77a489cf6fb70823c2446769d68fbe016f Mon Sep 17 00:00:00 2001 From: Minh Au Date: Sun, 12 May 2024 00:38:00 -0400 Subject: [PATCH 5/8] improve error handler with proper logging vs error returns --- realtime/realtime_client.go | 75 ++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index 410d9ee..c06fcf0 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -2,7 +2,9 @@ package realtime import ( "context" + "errors" "fmt" + "io" "log" "sync" "time" @@ -49,7 +51,7 @@ func CreateRealtimeClient(projectRef string, apiKey string) *RealtimeClient { // Connect the client with the realtime server func (client *RealtimeClient) Connect() error { - if client.isAlive() { + if client.isClientAlive() { return nil } @@ -74,37 +76,48 @@ func (client *RealtimeClient) Disconnect() error { client.mu.Lock() defer client.mu.Unlock() - if !client.isAlive() { + if !client.isClientAlive() { return nil } err := client.conn.Close(websocket.StatusNormalClosure, "Closing the connection") if err != nil { - client.logger.Println("Failed to close the connection") - client.logger.Printf("%v", err) + if !client.isConnectionAlive(err) { + client.logger.Println("Connection has already been terminated") + close(client.closed) + } else { + return fmt.Errorf("Failed to close the connection: %w", err) + } } else { close(client.closed) } - - return fmt.Errorf("Failed to close the connection") + + return nil } // Start sending heartbeats to the server to maintain connection func (client *RealtimeClient) startHeartbeats() { - for client.isAlive() { - err1 := client.sendHeartbeat() + for client.isClientAlive() { + err := client.sendHeartbeat() - if err1 != nil { - client.logger.Println("Attempting to to send hearbeat again") + if err != nil { + if client.isConnectionAlive(err) { + client.logger.Println(err) + } else { + client.logger.Println("Error: lost connection with the server") + client.logger.Println("Attempting to to send hearbeat again") - ctx, cancel := context.WithTimeout(context.Background(), client.reconnectDuration) - defer cancel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - err2 := client.reConnect(ctx) - if err2 != nil { - client.logger.Printf("Error: %v", err2) + // there should never be an error returned, since it'll keep trying + _ = client.reconnect(ctx) } } + + // in case where the client needs to reconnect with the server, + // the interval between heartbeats be however long it takes to + // reconnect plus the number of heartbeatInterval has gone by time.Sleep(client.heartbeatInterval) } } @@ -123,14 +136,11 @@ func (client *RealtimeClient) sendHeartbeat() error { ctx, cancel := context.WithTimeout(context.Background(), client.heartbeatDuration) defer cancel() - client.logger.Print(msg) + client.logger.Print("Sending heartbeat") err := wsjson.Write(ctx, client.conn, msg) if err != nil { - client.logger.Printf("Error: %v", err) - client.logger.Printf("Failed to send heartbeat in %f seconds", client.heartbeatDuration.Seconds()) - - return err + return fmt.Errorf("Failed to send hearbeat in %f seconds: %w", client.heartbeatDuration.Seconds(), err) } return nil @@ -141,7 +151,7 @@ func (client *RealtimeClient) dialServer() error { client.mu.Lock() defer client.mu.Unlock() - if client.isAlive() { + if client.isClientAlive() { return nil } @@ -150,7 +160,7 @@ func (client *RealtimeClient) dialServer() error { conn, _, err := websocket.Dial(ctx, client.Url, nil) if err != nil { - return err + return fmt.Errorf("Failed to dial the server: %w", err) } client.conn = conn @@ -159,17 +169,20 @@ func (client *RealtimeClient) dialServer() error { } // Keep trying to reconnect every 0.5 seconds until ctx is done/invalidated -func (client *RealtimeClient) reConnect(ctx context.Context) error { - for client.isAlive() { +func (client *RealtimeClient) reconnect(ctx context.Context) error { + for client.isClientAlive() { + client.logger.Println("Attempt to reconnect to the server") + select { case <-ctx.Done(): - return fmt.Errorf("Failed to reconnect to the server") + return fmt.Errorf("Failed to reconnect to the server within time limit") default: err := client.dialServer() if err == nil { return nil } + client.logger.Printf("Failed to reconnect to the server: %s", err) time.Sleep(client.reconnectInterval) } } @@ -177,8 +190,8 @@ func (client *RealtimeClient) reConnect(ctx context.Context) error { return nil } -// Check if there's a connection with the realtime server -func (client *RealtimeClient) isAlive() bool { +// Check if the realtime client has been killed +func (client *RealtimeClient) isClientAlive() bool { if client.closed == nil { return false } @@ -192,3 +205,11 @@ func (client *RealtimeClient) isAlive() bool { return true } + +// The underlying package of websocket returns an error if the connection is +// terminated on the server side. Therefore, the state of the connection can +// be achieved by investigating the error +// Constraints: err must be returned from interacting with the connection +func (client *RealtimeClient) isConnectionAlive(err error) bool { + return !errors.Is(err, io.EOF) +} From 407ac7328c705dfd485aebfe45b500ea3c001579 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Sun, 12 May 2024 00:49:43 -0400 Subject: [PATCH 6/8] remove unused fields --- realtime/realtime_client.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index c06fcf0..8a3603e 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -23,7 +23,6 @@ type RealtimeClient struct { logger *log.Logger dialTimeout time.Duration reconnectInterval time.Duration - reconnectDuration time.Duration heartbeatDuration time.Duration heartbeatInterval time.Duration } @@ -44,7 +43,6 @@ func CreateRealtimeClient(projectRef string, apiKey string) *RealtimeClient { dialTimeout: 10 * time.Second, heartbeatDuration: 5 * time.Second, heartbeatInterval: 20 * time.Second, - reconnectDuration: 60 * time.Second, reconnectInterval: 500 * time.Millisecond, } } From ae1506ccbd3bb7b68500b27cc87c594f29f2a03c Mon Sep 17 00:00:00 2001 From: Minh Au Date: Sun, 12 May 2024 01:16:02 -0400 Subject: [PATCH 7/8] quick file renaming constants.go to events.go --- realtime/{constants.go => events.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename realtime/{constants.go => events.go} (100%) diff --git a/realtime/constants.go b/realtime/events.go similarity index 100% rename from realtime/constants.go rename to realtime/events.go From d9e57a6e6283c357fe663fb9942cdf50998ea294 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Mon, 20 May 2024 17:14:28 -0400 Subject: [PATCH 8/8] update go version --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 2e58db8..e17a339 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/supabase-community/realtime-go -go 1.17 +go 1.22 require nhooyr.io/websocket v1.8.11