diff --git a/go.mod b/go.mod index 564fcfe..e17a339 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module github.com/supabase-community/realtime-go -go 1.17 +go 1.22 + +require nhooyr.io/websocket v1.8.11 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..429e98e --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= +nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= diff --git a/realtime/events.go b/realtime/events.go new file mode 100644 index 0000000..5ef5018 --- /dev/null +++ b/realtime/events.go @@ -0,0 +1,20 @@ +package realtime; + +// Channel Events +const JOIN_EVENT = "phx_join" +const REPLY_EVENT = "phx_reply" + +// DB Subscription Events +const POSTGRES_CHANGE_EVENT = "postgres_changes" + +// Broadcast Events +const BROADCAST_EVENT = "broadcast" + +// Presence Events +const PRESENCE_STATE_EVENT = "presence_state" +const PRESENCE_DIFF_EVENT ="presence_diff" + +// Other Events +const SYS_EVENT = "system" +const HEARTBEAT_EVENT = "heartbeat" +const ACCESS_TOKEN_EVENT = "access_token" diff --git a/realtime/messages.go b/realtime/messages.go new file mode 100644 index 0000000..21b9872 --- /dev/null +++ b/realtime/messages.go @@ -0,0 +1,30 @@ +package realtime; + +type TemplateMsg struct { + Event string `json:"event"` + Topic string `json:"topic"` + Ref string `json:"ref"` +} + +type ConnectionMsg struct { + TemplateMsg + + Payload struct { + Data struct { + Schema string `json:"schema"` + Table string `json:"table"` + CommitTime string `json:"commit_timestamp"` + EventType string `json:"eventType"` + New map[string]string `json:"new"` + Old map[string]string `json:"old"` + Errors string `json:"errors"` + } `json:"data"` + } `json:"payload"` +} + +type HearbeatMsg struct { + TemplateMsg + + Payload struct { + } `json:"payload"` +} diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go new file mode 100644 index 0000000..8a3603e --- /dev/null +++ b/realtime/realtime_client.go @@ -0,0 +1,213 @@ +package realtime + +import ( + "context" + "errors" + "fmt" + "io" + "log" + "sync" + "time" + + "nhooyr.io/websocket" + "nhooyr.io/websocket/wsjson" +) + +type RealtimeClient struct { + Url string + ApiKey string + + mu sync.Mutex + conn *websocket.Conn + closed chan struct{} + logger *log.Logger + dialTimeout time.Duration + reconnectInterval time.Duration + heartbeatDuration time.Duration + heartbeatInterval time.Duration +} + +// Create a new RealtimeClient with user's speicfications +func CreateRealtimeClient(projectRef string, apiKey string) *RealtimeClient { + realtimeUrl := fmt.Sprintf( + "wss://%s.supabase.co/realtime/v1/websocket?apikey=%s&log_level=info&vsn=1.0.0", + projectRef, + apiKey, + ) + newLogger := log.Default() + + return &RealtimeClient{ + Url: realtimeUrl, + ApiKey: apiKey, + logger: newLogger, + dialTimeout: 10 * time.Second, + heartbeatDuration: 5 * time.Second, + heartbeatInterval: 20 * time.Second, + reconnectInterval: 500 * time.Millisecond, + } +} + +// Connect the client with the realtime server +func (client *RealtimeClient) Connect() error { + if client.isClientAlive() { + return nil + } + + // Attempt to dial the server + err := client.dialServer() + if err != nil { + return fmt.Errorf("Cannot connect to the server: %w", err) + } + + // client is only alive after the connection has been made + client.mu.Lock() + client.closed = make(chan struct{}) + client.mu.Unlock() + + go client.startHeartbeats() + + return nil +} + +// Disconnect the client from the realtime server +func (client *RealtimeClient) Disconnect() error { + client.mu.Lock() + defer client.mu.Unlock() + + if !client.isClientAlive() { + return nil + } + + err := client.conn.Close(websocket.StatusNormalClosure, "Closing the connection") + if err != nil { + if !client.isConnectionAlive(err) { + client.logger.Println("Connection has already been terminated") + close(client.closed) + } else { + return fmt.Errorf("Failed to close the connection: %w", err) + } + } else { + close(client.closed) + } + + return nil +} + +// Start sending heartbeats to the server to maintain connection +func (client *RealtimeClient) startHeartbeats() { + for client.isClientAlive() { + err := client.sendHeartbeat() + + if err != nil { + if client.isConnectionAlive(err) { + client.logger.Println(err) + } else { + client.logger.Println("Error: lost connection with the server") + client.logger.Println("Attempting to to send hearbeat again") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // there should never be an error returned, since it'll keep trying + _ = client.reconnect(ctx) + } + } + + // in case where the client needs to reconnect with the server, + // the interval between heartbeats be however long it takes to + // reconnect plus the number of heartbeatInterval has gone by + time.Sleep(client.heartbeatInterval) + } +} + +// Send the heartbeat to the realtime server +func (client *RealtimeClient) sendHeartbeat() error { + msg := HearbeatMsg{ + TemplateMsg: TemplateMsg{ + Event: HEARTBEAT_EVENT, + Topic: "phoenix", + Ref: "", + }, + Payload: struct{}{}, + } + + ctx, cancel := context.WithTimeout(context.Background(), client.heartbeatDuration) + defer cancel() + + client.logger.Print("Sending heartbeat") + + err := wsjson.Write(ctx, client.conn, msg) + if err != nil { + return fmt.Errorf("Failed to send hearbeat in %f seconds: %w", client.heartbeatDuration.Seconds(), err) + } + + return nil +} + +// Dial the server with a certain timeout in seconds +func (client *RealtimeClient) dialServer() error { + client.mu.Lock() + defer client.mu.Unlock() + + if client.isClientAlive() { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), client.dialTimeout) + defer cancel() + + conn, _, err := websocket.Dial(ctx, client.Url, nil) + if err != nil { + return fmt.Errorf("Failed to dial the server: %w", err) + } + + client.conn = conn + + return nil +} + +// Keep trying to reconnect every 0.5 seconds until ctx is done/invalidated +func (client *RealtimeClient) reconnect(ctx context.Context) error { + for client.isClientAlive() { + client.logger.Println("Attempt to reconnect to the server") + + select { + case <-ctx.Done(): + return fmt.Errorf("Failed to reconnect to the server within time limit") + default: + err := client.dialServer() + if err == nil { + return nil + } + + client.logger.Printf("Failed to reconnect to the server: %s", err) + time.Sleep(client.reconnectInterval) + } + } + + return nil +} + +// Check if the realtime client has been killed +func (client *RealtimeClient) isClientAlive() bool { + if client.closed == nil { + return false + } + + select { + case <-client.closed: + return false + default: + break + } + + return true +} + +// The underlying package of websocket returns an error if the connection is +// terminated on the server side. Therefore, the state of the connection can +// be achieved by investigating the error +// Constraints: err must be returned from interacting with the connection +func (client *RealtimeClient) isConnectionAlive(err error) bool { + return !errors.Is(err, io.EOF) +}