Skip to content

Commit

Permalink
Use separate struct to keep track of Delta Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
ffilippopoulos committed Dec 19, 2024
1 parent ae68210 commit 239338f
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 30 deletions.
28 changes: 27 additions & 1 deletion xds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ func cluster(clusterName string, policy clusterv3.Cluster_LbPolicy) *clusterv3.C
Name: clusterName,
ClusterDiscoveryType: &clusterv3.Cluster_Type{Type: clusterv3.Cluster_EDS},
LbPolicy: policy,
Http2ProtocolOptions: &corev3.Http2ProtocolOptions{}, // Set so that Envoy will assume that the upstream supports HTTP/2
EdsClusterConfig: &clusterv3.Cluster_EdsClusterConfig{
EdsConfig: &corev3.ConfigSource{
ConfigSourceSpecifier: &corev3.ConfigSource_Ads{
Expand All @@ -238,6 +237,33 @@ func cluster(clusterName string, policy clusterv3.Cluster_LbPolicy) *clusterv3.C
}
}

// patchClusterDeltaEDS patches a cluster's EDS config to configure the clients
// to use Delta streams. It is meant to be used only for envoy clients where we
// configure an xds server as "xds_cluster" via injected config.
func patchClusterDeltaEDS(cluster *clusterv3.Cluster) *clusterv3.Cluster {
cluster.Http2ProtocolOptions = &corev3.Http2ProtocolOptions{}
cluster.EdsClusterConfig = &clusterv3.Cluster_EdsClusterConfig{
EdsConfig: &corev3.ConfigSource{
ConfigSourceSpecifier: &corev3.ConfigSource_ApiConfigSource{
ApiConfigSource: &corev3.ApiConfigSource{
ApiType: corev3.ApiConfigSource_DELTA_GRPC,
TransportApiVersion: corev3.ApiVersion_V3,
GrpcServices: []*corev3.GrpcService{
{
TargetSpecifier: &corev3.GrpcService_EnvoyGrpc_{
EnvoyGrpc: &corev3.GrpcService_EnvoyGrpc{
ClusterName: "xds_cluster",
},
},
},
},
},
},
},
}
return cluster
}

// servicesToResources will return a set of listener, routeConfiguration and
// cluster for each service port
func servicesToResources(serviceStore XdsServiceStore, authority string) ([]types.Resource, []types.Resource, []types.Resource, error) {
Expand Down
110 changes: 81 additions & 29 deletions xds/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,17 @@ const (
type Stream struct {
peerAddress string
requestRateLimit *rate.Limiter
isDelta bool
}

type Snapshotter struct {
authority string // Authority name of the server for federated requests
servePort uint
servicesCache cache.SnapshotCache // default service snapshot cache for empty node ID (all watched resources snapshot)
serviceSnapVersion int32 // Service snap version for empty node ID snapshot
endpointsCache cache.SnapshotCache // default endpoints snapshot cache for empty node ID (all watched resources snapshot)
endpointsSnapVersion int32 // Endpoints snap version for empty node ID snapshot
servicesCache cache.SnapshotCache
serviceSnapVersion int32 // Service snap version for empty node ID snapshot
endpointsCache cache.SnapshotCache
endpointsSnapVersion int32 // Endpoints snap version for empty node ID snapshot
deltaCDSCache *cache.LinearCache
deltaEDSCache *cache.LinearCache
muxCache cache.MuxCache
requestRateLimit *rate.Limiter // maximum number of requests allowed to server
streamRequestPerSecond float64 // maximum number of requests per stream per second
Expand Down Expand Up @@ -131,27 +132,66 @@ func mapTypeURL(typeURL string) string {
}
}

func mapDeltaTypeURL(typeURL string) string {
switch typeURL {
case resource.ClusterType:
return "deltaClusters"
case resource.EndpointType:
return "deltaEndpoints"
default:
return ""
}
}

func (s *Snapshotter) getCacheForType(typeURL string) cache.SnapshotCache {
switch typeURL {
case resource.ListenerType, resource.RouteType, resource.ClusterType:
return s.servicesCache
case resource.EndpointType:
return s.endpointsCache
default:
return nil
}
}

func (s *Snapshotter) getDeltaCacheForType(typeURL string) *cache.LinearCache {
switch typeURL {
case resource.EndpointType:
return s.deltaEDSCache
case resource.ClusterType:
return s.deltaCDSCache
default:
return nil
}
}

// NewSnapshotter needs a grpc server port and the allowed requests limits per server and stream per second
func NewSnapshotter(authority string, port uint, requestLimit, streamRequestLimit float64, localhostEndpoints bool) *Snapshotter {
servicesCache := cache.NewSnapshotCache(false, cache.IDHash{}, log.EnvoyLogger)
endpointsCache := cache.NewSnapshotCache(false, cache.IDHash{}, log.EnvoyLogger) // This could be a linear cache? https://pkg.go.dev/github.com/envoyproxy/go-control-plane/pkg/cache/v3#LinearCache
endpointsCache := cache.NewSnapshotCache(false, cache.IDHash{}, log.EnvoyLogger)
deltaCDSCache := cache.NewLinearCache(resource.ClusterType, cache.WithLogger(log.EnvoyLogger))
deltaEDSCache := cache.NewLinearCache(resource.EndpointType, cache.WithLogger(log.EnvoyLogger))
muxCache := cache.MuxCache{
Classify: func(r *cache.Request) string {
return mapTypeURL(r.TypeUrl)
},
ClassifyDelta: func(r *cache.DeltaRequest) string {
return mapTypeURL(r.TypeUrl)
return mapDeltaTypeURL(r.TypeUrl)
},
Caches: map[string]cache.Cache{
"services": servicesCache,
"endpoints": endpointsCache,
"services": servicesCache,
"endpoints": endpointsCache,
"deltaClusters": deltaCDSCache,
"deltaEndpoints": deltaEDSCache,
},
}
return &Snapshotter{
authority: authority,
servePort: port,
servicesCache: servicesCache,
endpointsCache: endpointsCache,
deltaCDSCache: deltaCDSCache,
deltaEDSCache: deltaEDSCache,
muxCache: muxCache,
requestRateLimit: rate.NewLimiter(rate.Limit(requestLimit), 1),
streamRequestPerSecond: streamRequestLimit,
Expand Down Expand Up @@ -213,6 +253,9 @@ func (s *Snapshotter) SnapServices(serviceStore XdsServiceStore) error {
if err != nil {
return fmt.Errorf("Failed to set services snapshot %v", err)
}
// Sync linear caches
s.setDeltaCache(resource.ClusterType)

s.nodes.Range(func(nID, n interface{}) bool {
nodeID := nID.(string)
node := n.(*Node)
Expand Down Expand Up @@ -265,6 +308,9 @@ func (s *Snapshotter) SnapEndpoints(endpointStore XdsEndpointStore) error {
if err != nil {
return fmt.Errorf("Failed to set endpoints snapshot %v", err)
}
// Sync linear caches
s.setDeltaCache(resource.EndpointType)

s.nodes.Range(func(nID, n interface{}) bool {
nodeID := nID.(string)
node := n.(*Node)
Expand All @@ -287,6 +333,32 @@ func (s *Snapshotter) SnapEndpoints(endpointStore XdsEndpointStore) error {
return nil
}

// setDeltaCache gets resources of a type from the full Sotw snapshot and sets
// them in the respective linear cache. This way we can make sure that linear
// caches are synced with the existing resources, but we will be triggering
// downstream notifications more often
func (s *Snapshotter) setDeltaCache(typeUrl string) error {
// Load all the known resources from the full State of the world Snapshot
allResourcesSnap, err := s.getCacheForType(typeUrl).GetSnapshot(EmptyNodeID)
if err != nil {
return fmt.Errorf("Cannot sync linear cache for type: %s, failed to get sotw snap: %v", typeUrl, err)
}
allResourcesOfType := allResourcesSnap.GetResources(typeUrl)
var setResources map[string]types.Resource
for name, res := range allResourcesOfType {
if typeUrl == resource.ClusterType {
cluster, err := UnmarshalResourceToCluster(res)
if err != nil {
return fmt.Errorf("Cannot unmarshal resource to cluster for patching: %v", err)
}
res = patchClusterDeltaEDS(cluster)
}
setResources[name] = res
}
s.getDeltaCacheForType(typeUrl).SetResources(setResources)
return nil
}

func (s *Snapshotter) OnStreamOpen(ctx context.Context, id int64, typ string) error {
var peerAddr string
if peerInfo, ok := peer.FromContext(ctx); ok {
Expand All @@ -296,7 +368,6 @@ func (s *Snapshotter) OnStreamOpen(ctx context.Context, id int64, typ string) er
s.streams.Store(id, &Stream{
peerAddress: peerAddr,
requestRateLimit: rate.NewLimiter(rate.Limit(s.streamRequestPerSecond), 1),
isDelta: false,
})
metricOnStreamOpenInc()
return nil
Expand Down Expand Up @@ -365,9 +436,6 @@ func (s *Snapshotter) OnFetchRequest(ctx context.Context, req *discovery.Discove

func (s *Snapshotter) OnDeltaStreamClosed(id int64, node *core.Node) {
log.Logger.Info("OnDeltaStreamClosed", "id", id, "node", node)
s.streams.Delete(id)
s.deleteNodeStream(node.GetId(), id)
metricOnStreamClosedInc()
}

func (s *Snapshotter) OnDeltaStreamOpen(ctx context.Context, id int64, typ string) error {
Expand All @@ -376,35 +444,19 @@ func (s *Snapshotter) OnDeltaStreamOpen(ctx context.Context, id int64, typ strin
peerAddr = peerInfo.Addr.String()
}
log.Logger.Info("OnDeltaStreamOpen", "peer address", peerAddr, "id", id, "type", typ)
s.streams.Store(id, &Stream{
peerAddress: peerAddr,
requestRateLimit: rate.NewLimiter(rate.Limit(s.streamRequestPerSecond), 1),
isDelta: true,
})
metricOnStreamOpenInc()
return nil
}

func (s *Snapshotter) OnStreamDeltaRequest(id int64, r *discovery.DeltaDiscoveryRequest) error {
st, _ := s.streams.Load(id)
stream := st.(*Stream)
log.Logger.Info("OnStreamDeltaRequest",
"id", id,
"peer", stream.peerAddress,
"received", r.GetTypeUrl(),
"node", r.GetNode().GetId(),
"locality", r.GetNode().GetLocality(),
"subscribes", strings.Join(r.GetResourceNamesSubscribe(), ", "),
"unsubscribes", strings.Join(r.GetResourceNamesUnsubscribe(), ", "),
"response_nonce", r.GetResponseNonce(),
)

s.addOrUpdateNode(r.GetNode().GetId(), stream.peerAddress, id)
if s.needToUpdateDeltaSubscriptions(r.GetNode().GetId(), r.GetTypeUrl(), id, r.GetResourceNamesSubscribe(), r.GetResourceNamesUnsubscribe()) {
if err := s.updateDeltaStreamNodeResources(r.GetNode().GetId(), r.GetTypeUrl(), id, r.GetResourceNamesSubscribe(), r.GetResourceNamesUnsubscribe()); err != nil {
return err
}
}
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions xds/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,13 @@ func resourcesMatch(a, b []string) bool {
}
return true
}

func subscriptionKeys(s map[string]struct{}) []string {
keys := make([]string, len(s))
i := 0
for k := range s {
keys[i] = k
i++
}
return keys
}

0 comments on commit 239338f

Please sign in to comment.