Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Broadcast Feature #5

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
abbdbc3
basic structure for RealtimeChannel
IcedTea2K May 13, 2024
ff241df
un-export event constants
IcedTea2K May 15, 2024
5d3880a
add eventType and filter criteria checking
IcedTea2K May 15, 2024
71fe108
allow optional criteria
IcedTea2K May 15, 2024
5ae49b0
set field and construct filters
IcedTea2K May 16, 2024
7450fb3
implement connection message constructing based on eventFilter type
IcedTea2K May 17, 2024
113628c
Merge branch 'socker_connections' into postgres_changes
IcedTea2K May 20, 2024
9dee7c7
implement addBinding with multiple queues, remove realtimeTopic type,…
IcedTea2K May 25, 2024
37982ce
send connection message and process response from server
IcedTea2K Jun 11, 2024
46a6323
route ID to the correct binding
IcedTea2K Jun 11, 2024
44ac1b6
correctly unmarshal postgres changes
IcedTea2K Jun 11, 2024
7a4aab3
able to subscribe to multiple events
IcedTea2K Jun 12, 2024
2b1d69a
match action/event type for postgres event at route
IcedTea2K Jun 13, 2024
9aa6d7b
add postgres event validation after initial connection message
IcedTea2K Jun 14, 2024
8715df0
implement RealtimeChannel.Unsubscribe (with potential race condition
IcedTea2K Jun 14, 2024
4080fb0
add rwMutex to prevent clearing route map during a callback
IcedTea2K Jun 14, 2024
417899c
cleanup messages and add docs
IcedTea2K Jun 14, 2024
6c31f0d
move binding definition into channel
IcedTea2K Jun 14, 2024
e9d3b15
add subscribing to broadcast events
IcedTea2K Jun 17, 2024
a2e11cf
add ref to msg for msg identifications
IcedTea2K Jun 17, 2024
a6b9f32
Merge branch 'postgres_changes' into broadcast
IcedTea2K Jun 18, 2024
b504f82
implement RealtimClient.Send()
IcedTea2K Jun 18, 2024
ad6596d
add context to Send()
IcedTea2K Jun 19, 2024
d9ff027
send POST request to the server if there's no websocket connection
IcedTea2K Jun 19, 2024
ffea8fa
quick code reformatting
IcedTea2K Jun 19, 2024
3937994
prevent overwrite other eventType bindings
IcedTea2K Jun 19, 2024
1f5b15a
add case insensitive checking
IcedTea2K Jun 19, 2024
19f0de1
resolve conflicts
IcedTea2K Jun 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 126 additions & 15 deletions realtime/events.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,131 @@
package realtime;
package realtime

// Channel Events
const JOIN_EVENT = "phx_join"
const REPLY_EVENT = "phx_reply"
import (
"fmt"
"reflect"
"strings"
)

// DB Subscription Events
const POSTGRES_CHANGE_EVENT = "postgres_changes"
// Events that are used to communicate with the server
const (
joinEvent string = "phx_join"
replyEvent string = "phx_reply"
leaveEvent string = "phx_leave"
closeEvent string = "phx_close"

// Broadcast Events
const BROADCAST_EVENT = "broadcast"
// DB Subscription Events
postgresChangesEvent string = "postgres_changes"

// Presence Events
const PRESENCE_STATE_EVENT = "presence_state"
const PRESENCE_DIFF_EVENT ="presence_diff"
// Broadcast Events
broadcastEvent string = "broadcast"

// Other Events
const SYS_EVENT = "system"
const HEARTBEAT_EVENT = "heartbeat"
const ACCESS_TOKEN_EVENT = "access_token"
// Presence Events
presenceStateEvent string = "presence_state"
presenceDiffEvent string = "presence_diff"

// Other Events
systemEvent string = "system"
heartbeatEvent string = "heartbeat"
accessTokennEvent string = "access_token"
)

// Event "type" that the user can specify for channel to listen to
const (
presenceEventType string = "presence"
broadcastEventType string = "broadcast"
postgresChangesEventType string = "postgres_changes"
)

// type eventFilter struct {}
type eventFilter interface {}

type postgresFilter struct {
Event string `supabase:"required" json:"event"`
Schema string `supabase:"required" json:"schema"`
Table string `supabase:"optional" json:"table,omitempty"`
Filter string `supabase:"optional" json:"filter,omitempty"`
}

type broadcastFilter struct {
Event string `supabase:"required"`
}

type presenceFilter struct {
Event string `supabase:"required"`
}

// Custom event that can be sent to the server
type CustomEvent struct {
Event string `json:"event"`
Payload any `json:"payload"`
Type string `json:"type"`
}

// Verify if the given event type is supported
func verifyEventType(eventType string) bool {
switch eventType {
case presenceEventType:
fallthrough
case broadcastEventType:
fallthrough
case postgresChangesEventType:
return true
}

return false
}

// Enforce client's filter object to follow a specific message
// structure of certain events. Check messages.go for more
// information on the struct of each event.
// Only the following events are currently supported:
// - postgres_changes, broadcast, presence
func createEventFilter(eventType string, filter map[string]string) (eventFilter, error) {
var filterType reflect.Type // Type for filter
var filterConValue reflect.Value // Concrete value
var filterPtrValue reflect.Value // Pointer value to the concrete value
var missingFields []string

switch eventType {
case postgresChangesEvent:
filterPtrValue = reflect.ValueOf(&postgresFilter{})
break
case broadcastEvent:
filterPtrValue = reflect.ValueOf(&broadcastFilter{})
break
case presenceEventType:
filterPtrValue = reflect.ValueOf(&presenceFilter{})
default:
return nil, fmt.Errorf("Unsupported event type: %s", eventType)
}

// Get the underlying filter type to identify missing fields
filterConValue = filterPtrValue.Elem()
filterType = filterConValue.Type()
missingFields = make([]string, 0, filterType.NumField())

for i := 0; i < filterType.NumField(); i++ {
currField := filterType.Field(i)
currFieldName := strings.ToLower(currField.Name)
isRequired := currField.Tag.Get("supabase") == "required"

val, ok := filter[currFieldName]
if !ok && isRequired {
missingFields = append(missingFields, currFieldName)
}

// Set field to empty string when value for currFieldName is missing
filterConValue.Field(i).SetString(val)
}

if len(missingFields) != 0 {
return nil, fmt.Errorf("Criteria for %s is missing: %+v", eventType, missingFields)
}

filterFinal, ok := filterConValue.Interface().(eventFilter)
if !ok {
return nil, fmt.Errorf("Unexpected Error: cannot create event filter")
}

return filterFinal, nil
}
174 changes: 152 additions & 22 deletions realtime/messages.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,160 @@
package realtime;
package realtime

type TemplateMsg struct {
Event string `json:"event"`
Topic string `json:"topic"`
Ref string `json:"ref"`
import (
"encoding/json"
"strconv"
"time"
)

// This is a general message strucutre. It follows the message protocol
// of the phoenix server:
/*
{
event: string,
topic: string,
payload: [key: string]: boolean | int | string | any,
ref: string
}
*/
type Msg struct {
Metadata
Payload any `json:"payload"`
}

// Generic message that contains raw payload. It can be used
// as a tagged union, where the event field can be used to
// determine the structure of the payload.
type RawMsg struct {
Metadata
Payload json.RawMessage `json:"payload"`
}

type ConnectionMsg struct {
TemplateMsg
// The other fields besides the payload that make up a message.
// It describes other information about a message such as type of event,
// the topic the message belongs to, and its reference.
type Metadata struct {
Event string `json:"event"`
Topic string `json:"topic"`
Ref string `json:"ref"`
}

// Payload for the conection message for when client first joins the channel.
// More info: https://supabase.com/docs/guides/realtime/protocol#connection
type ConnectionPayload struct {
Config struct {
Broadcast struct {
Self bool `json:"self"`
} `json:"broadcast,omitempty"`

Presence struct {
Key string `json:"key"`
} `json:"presence,omitempty"`

Payload struct {
Data struct {
Schema string `json:"schema"`
Table string `json:"table"`
CommitTime string `json:"commit_timestamp"`
EventType string `json:"eventType"`
New map[string]string `json:"new"`
Old map[string]string `json:"old"`
Errors string `json:"errors"`
} `json:"data"`
} `json:"payload"`
PostgresChanges []postgresFilter `json:"postgres_changes,omitempty"`
} `json:"config"`
}

type HearbeatMsg struct {
TemplateMsg
// Payload of the server's first response of three upon joining channel.
// It contains details about subscribed postgres events.
// More info: https://supabase.com/docs/guides/realtime/protocol#connection
type ReplyPayload struct {
Response struct {
PostgresChanges []struct{
ID int `json:"id"`
postgresFilter
} `json:"postgres_changes"`
} `json:"response"`
Status string `json:"status"`
}

// Payload of the server's second response of three upon joining channel.
// It contains details about the status of subscribing to PostgresSQL.
// More info: https://supabase.com/docs/guides/realtime/protocol#system-messages
type SystemPayload struct {
Channel string `json:"channel"`
Extension string `json:"extension"`
Message string `json:"message"`
Status string `json:"status"`
}

// Payload of the server's third response of three upon joining channel.
// It contains details about the Presence feature of Supabase.
// More info: https://supabase.com/docs/guides/realtime/protocol#state-update
type PresenceStatePayload map[string]struct{
Metas []struct{
Ref string `json:"phx_ref"`
Name string `json:"name"`
T float64 `json:"t"`
} `json:"metas,omitempty"`
}

// Payload of the server's response when there is a postgres_changes event.
// More info: https://supabase.com/docs/guides/realtime/protocol#system-messages
type PostgresCDCPayload struct {
Data struct {
Schema string `json:"schema"`
Table string `json:"table"`
CommitTime string `json:"commit_timestamp"`
Record map[string]any `json:"record"`
Columns []struct{
Name string `json:"name"`
Type string `json:"type"`
} `json:"columns"`
ActionType string `json:"type"`
Old map[string]any `json:"old_record"`
Errors string `json:"errors"`
} `json:"data"`
IDs []int `json:"ids"`
}

// Payload of the server's response when there is a broadcast event.
// More info: https://supabase.com/docs/guides/realtime/protocol#broadcast-message
type BroadcastPayload struct {
Event string `json:"event"`
Payload any `json:"payload"`
Type string `json:"type"`
}

// create a template message
func createMsgMetadata(event string, topic string) *Metadata {
return &Metadata{
Event: event,
Topic: topic,
Ref: "",
}
}

// create a connection message depending on event type
func createConnectionMessage(topic string, bindings []*binding) *Msg {
msg := &Msg{}

// Fill out the message template
msg.Metadata = *createMsgMetadata(joinEvent, topic)
msg.Metadata.Ref = strconv.FormatInt(time.Now().Unix(), 10)

// Fill out the payload
payload := &ConnectionPayload{}
for _, bind := range bindings {
filter := bind.filter
switch filter.(type) {
case postgresFilter:
if payload.Config.PostgresChanges == nil {
payload.Config.PostgresChanges = make([]postgresFilter, 0, 1)
}
payload.Config.PostgresChanges = append(payload.Config.PostgresChanges, filter.(postgresFilter))
break
case broadcastFilter:
payload.Config.Broadcast.Self = true
break
case presenceFilter:
payload.Config.Presence.Key = ""
break
default:
panic("TYPE ASSERTION FAILED: expecting one of postgresFilter, broadcastFilter, or presenceFilter")
}
}

msg.Payload = payload

Payload struct {
} `json:"payload"`
return msg
}
Loading