diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 110b7a4..eacbeb0 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -13,6 +13,7 @@ on: env: REGISTRY: quay.io IMAGE_NAME: ${{ github.repository }} + IMAGE_NAME_ENVOY_CONFIGURER: ${{ github.repository }}/envoy-sidecar/configurer jobs: docker: @@ -44,3 +45,16 @@ jobs: push: true tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} + - name: Extract envoy/configurer metadata (tags, labels) for Docker + id: meta-envoy-configurer + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME_ENVOY_CONFIGURER }} + - name: Build and push Docker image for envoy configurer + if: github.actor != 'dependabot[bot]' + uses: docker/build-push-action@v5 + with: + context: ./envoy-sidecar/configurer + push: true + tags: ${{ steps.meta-envoy-configurer.outputs.tags }} + labels: ${{ steps.meta-envoy-configurer.outputs.labels }} diff --git a/controller/controller_test.go b/controller/controller_test.go index 71a3cfe..316ab56 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -39,7 +39,7 @@ func TestReconcileServices_LabelledService(t *testing.T) { "./test-resources/labelled_service.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -60,7 +60,7 @@ func TestReconcileServices_LabelledService(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes // Verify the default round robin policy is set on the clusters for _, cl := range snap.GetResources(resource.ClusterType) { cluster, err := xds.UnmarshalResourceToCluster(cl) @@ -82,7 +82,7 @@ func TestReconcileServices_LabelledServiceLbPolicy(t *testing.T) { "./test-resources/labelled_service_ring_hash_balancer.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -103,7 +103,7 @@ func TestReconcileServices_LabelledServiceLbPolicy(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes // Verify the correct lb policy (ring hash) is set on the clusters for _, cl := range snap.GetResources(resource.ClusterType) { cluster, err := xds.UnmarshalResourceToCluster(cl) @@ -119,7 +119,7 @@ func TestReconcileServices_LabelledServiceInvalidLbPolicy(t *testing.T) { "./test-resources/labelled_service_invalid_balancer.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -140,7 +140,7 @@ func TestReconcileServices_LabelledServiceInvalidLbPolicy(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes // Verify the default round robin policy is set on the clusters for _, cl := range snap.GetResources(resource.ClusterType) { cluster, err := xds.UnmarshalResourceToCluster(cl) @@ -156,7 +156,7 @@ func TestReconcileServices_XdsService(t *testing.T) { "./test-resources/xds_service.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -177,7 +177,7 @@ func TestReconcileServices_XdsService(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes // Verify the default round robin policy is set on the clusters for _, cl := range snap.GetResources(resource.ClusterType) { cluster, err := xds.UnmarshalResourceToCluster(cl) @@ -199,7 +199,7 @@ func TestReconcileServices_XdsServiceNotExistent(t *testing.T) { "./test-resources/xds_service_not_existent.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -233,7 +233,7 @@ func TestReconcileServices_XdsServiceDelete(t *testing.T) { "./test-resources/xds_service.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -254,7 +254,7 @@ func TestReconcileServices_XdsServiceDelete(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes // Verify we will have one Endpoint resource in the snapshot snap, err = snapshotter.EndpointsSnapshot(testNodeID) if err != nil { @@ -284,7 +284,7 @@ func TestReconcileLocalEndpointSlice_SnapOnUpdate(t *testing.T) { "./test-resources/xds_service.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -304,7 +304,7 @@ func TestReconcileLocalEndpointSlice_SnapOnUpdate(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes snap, err = snapshotter.EndpointsSnapshot(testNodeID) if err != nil { t.Fatal(err) @@ -318,7 +318,7 @@ func TestReconcileLocalEndpointSlice_NotFound(t *testing.T) { "./test-resources/endpointslice.yaml", ) client.EndpointSliceApiError(kubeerror.NewNotFound(schema.GroupResource{Resource: "endpointslice"}, "foo")) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -345,7 +345,7 @@ func TestReconcileLocalEndpointSlice_NonXdsService(t *testing.T) { client := kube.NewClientMock( "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -373,7 +373,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpoints(t *testing.T) { remoteClient := kube.NewClientMock( "./test-resources/endpointslice-remote.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( localClient, []kube.Client{remoteClient}, @@ -394,7 +394,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpoints(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes // Verify we will have 1 Endpoint resource in the snapshot containing // addresses from both local(2) and remote(2). 4 lbEndpoint addresses in // total. Also verify that all priorities are set to 0. @@ -436,7 +436,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpoints_NoRemoteEndpoints(t *te remoteClient := kube.NewClientMock( "./test-resources/endpointslice-remote.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( localClient, []kube.Client{remoteClient}, @@ -457,7 +457,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpoints_NoRemoteEndpoints(t *te } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes // Verify we will have 1 Endpoint resource in the snapshot containing // only local client addresses. snap, err = snapshotter.EndpointsSnapshot(testNodeID) @@ -487,7 +487,7 @@ func TestReconcileServices_XdsServiceWithOnlyRemoteEndpoints(t *testing.T) { remoteClient := kube.NewClientMock( "./test-resources/endpointslice-remote.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( localClient, []kube.Client{remoteClient}, @@ -508,7 +508,7 @@ func TestReconcileServices_XdsServiceWithOnlyRemoteEndpoints(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes // Verify we will have 1 Endpoint resource in the snapshot containing // only remote addresses (2). snap, err = snapshotter.EndpointsSnapshot(testNodeID) @@ -539,7 +539,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpointsAndLocalPriority(t *test remoteClient := kube.NewClientMock( "./test-resources/endpointslice-remote.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( localClient, []kube.Client{remoteClient}, @@ -560,7 +560,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpointsAndLocalPriority(t *test } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes // Verify we will have 1 Endpoint resource in the snapshot containing // addresses for local endpoints with priority 0 and for remote ones // with priority 1. @@ -601,7 +601,7 @@ func TestReconcileServices_XdsServiceWithOnlyRemoteEndpointsAndLocalPriority(t * remoteClient := kube.NewClientMock( "./test-resources/endpointslice-remote.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( localClient, []kube.Client{remoteClient}, @@ -622,7 +622,7 @@ func TestReconcileServices_XdsServiceWithOnlyRemoteEndpointsAndLocalPriority(t * } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes // Verify we will have 1 Endpoint resource in the snapshot containing // addresses for remote endpoints with priority 0, regardless of // PriorityStrategy set to local-first. @@ -656,7 +656,7 @@ func TestReconcileLocalEndpointSlices_XdsServiceWithEmptyLocalEndpoints(t *testi remoteClient := kube.NewClientMock( "./test-resources/endpointslice-remote.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( localClient, []kube.Client{remoteClient}, @@ -700,7 +700,7 @@ func TestReconcileServices_XdsServiceWithRingHash(t *testing.T) { "./test-resources/xds_service_ring_hash_balancing.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, diff --git a/envoy-sidecar/README.md b/envoy-sidecar/README.md new file mode 100644 index 0000000..931d725 --- /dev/null +++ b/envoy-sidecar/README.md @@ -0,0 +1,7 @@ +# Tooling to deploy envoy as a sidecar and use it with the xDS server + +This is in a very experimental stage and definitely not production ready. + +It includes a small app to be used as an initContainer to produce envoy config +based on a map of xDS services and local ports and a kustomize base to deploy a +Kyverno mutating rule for the initContainer and the envoy sidecar. diff --git a/envoy-sidecar/configurer/.gitignore b/envoy-sidecar/configurer/.gitignore new file mode 100644 index 0000000..0beebb8 --- /dev/null +++ b/envoy-sidecar/configurer/.gitignore @@ -0,0 +1 @@ +configurer diff --git a/envoy-sidecar/configurer/Dockerfile b/envoy-sidecar/configurer/Dockerfile new file mode 100644 index 0000000..5ecf0ec --- /dev/null +++ b/envoy-sidecar/configurer/Dockerfile @@ -0,0 +1,13 @@ +FROM golang:1.21-alpine AS build +WORKDIR /go/src/github.com/utilitywarehouse/semaphore-xds/envoy-sidecar/configurer +COPY . /go/src/github.com/utilitywarehouse/semaphore-xds/envoy-sidecar/configurer +ENV CGO_ENABLED=0 +RUN \ + apk --no-cache add git upx \ + && go build -ldflags='-s -w' -o /semaphore-xds-envoy-configurer . \ + && upx /semaphore-xds-envoy-configurer + +FROM alpine:3.18 +COPY --from=build /semaphore-xds-envoy-configurer /semaphore-xds-envoy-configurer +COPY --from=build /go/src/github.com/utilitywarehouse/semaphore-xds/envoy-sidecar/configurer/templates /templates +ENTRYPOINT [ "/semaphore-xds-envoy-configurer" ] diff --git a/envoy-sidecar/configurer/README.md b/envoy-sidecar/configurer/README.md new file mode 100644 index 0000000..ab26174 --- /dev/null +++ b/envoy-sidecar/configurer/README.md @@ -0,0 +1,12 @@ +# semaphore-xds-envoy-configurer + +Expects an environment variable named `ENVOY_SIDECAR_TARGETS` in the form of a +comma separated list of xDS listeners. For example: + +`ENVOY_SIDECAR_TARGETS=","` +`XDS_SERVER_ADDRESS`="semaphore-xds.sys-semaphore.svc.cluster.local" +`XDS_SERVER_PORT`="18000" + +It generates envoy config to point listeners on the specified local ports to +the respective xDS dynamic resources in order for envoy to be able to proxy +traffic to the configured services. diff --git a/envoy-sidecar/configurer/configure.go b/envoy-sidecar/configurer/configure.go new file mode 100644 index 0000000..8521a21 --- /dev/null +++ b/envoy-sidecar/configurer/configure.go @@ -0,0 +1,190 @@ +package main + +import ( + "bytes" + "fmt" + "path" + "strings" + "text/template" +) + +const ( + EnvoyMainListenPort = 18001 +) + +type MainListener struct { + Port int + Domains []RouteDomain +} + +type RouteDomain struct { + Cluster string + Domain string + Name string + Port int +} +type TargetListener struct { + Name string + Port int + RouteConfigName string +} + +type StaticCluster struct { + Name string + Port int +} + +type DynamicCluster struct { + Name string +} + +type Clusters struct { + Dynamic []DynamicCluster + Static []StaticCluster +} + +type XdsCluster struct { + XdsServerAddress string + XdsServerPort string +} + +type EnvoyConfig struct { + NodeID string + ClusterID string + MainListener string + Listeners string + Clusters string + XdsCluster string +} + +func makeEnvoyConfig(nodeID, envoySidecarTargets, XdsServerAddress, XdsServerPort string) (string, error) { + main, listeners, clusters := extractConfigFromTargets(envoySidecarTargets) + // Generate main listener config + mainListenerTmplPath := "./templates/main-listener.tmpl" + mainListenerTmplBase := path.Base(mainListenerTmplPath) + tmpl, err := template.New(mainListenerTmplBase).ParseFiles(mainListenerTmplPath) + if err != nil { + return "", err + } + var renderedMainListener bytes.Buffer + err = tmpl.Execute(&renderedMainListener, main) + if err != nil { + return "", err + } + // Generate Listeners Config + listenersTmplPath := "./templates/target-listeners.tmpl" + listenersTmplBase := path.Base(listenersTmplPath) + tmpl, err = template.New(listenersTmplBase).ParseFiles(listenersTmplPath) + if err != nil { + return "", err + } + var renderedListeners bytes.Buffer + err = tmpl.Execute(&renderedListeners, listeners) + if err != nil { + return "", err + } + // Generate Clusters Config + clustersTmplPath := "./templates/clusters.tmpl" + clustersTmplBase := path.Base(clustersTmplPath) + tmpl, err = template.New(clustersTmplBase).ParseFiles(clustersTmplPath) + if err != nil { + return "", err + } + var renderedClusters bytes.Buffer + err = tmpl.Execute(&renderedClusters, clusters) + if err != nil { + return "", err + } + // Generate XdsCluster Config + xdsCluster := XdsCluster{ + XdsServerAddress: XdsServerAddress, + XdsServerPort: XdsServerPort, + } + XdsClusterTmplPath := "./templates/xds-cluster.tmpl" + XdsClusterTmplBase := path.Base(XdsClusterTmplPath) + tmpl, err = template.New(XdsClusterTmplBase).ParseFiles(XdsClusterTmplPath) + if err != nil { + return "", err + } + var renderedXdsCluster bytes.Buffer + err = tmpl.Execute(&renderedXdsCluster, xdsCluster) + if err != nil { + return "", err + } + + // Generate the Envoy config + envoyConfig := EnvoyConfig{ + NodeID: nodeID, + ClusterID: nodeID, // needed by envoy, add node id here as a dummy value here + MainListener: renderedMainListener.String(), + Listeners: renderedListeners.String(), + Clusters: renderedClusters.String(), + XdsCluster: renderedXdsCluster.String(), + } + envoyConfigTmplPath := "./templates/envoy-config.tmpl" + envoyConfigTmplBase := path.Base(envoyConfigTmplPath) + tmpl, err = template.New(envoyConfigTmplBase).ParseFiles(envoyConfigTmplPath) + if err != nil { + return "", err + } + var renderedEnvoyConfig bytes.Buffer + err = tmpl.Execute(&renderedEnvoyConfig, envoyConfig) + if err != nil { + return "", err + } + return renderedEnvoyConfig.String(), nil +} + +// List expected upstream listeners in the form: +// ,, +// From this we should extract the xds addresses as listerner and route config +// names. +// XdsAddress is expected in the name that semaphore-xds would configure +// listeners: service.namespace:port and should be copied as is to the listener +// and routeConfig names. Clusters names should be derived from the above and +// follow the form: service.namespace.port to comply with the xds naming +// limitations and how semaphore-xds configures cluster names. +func extractConfigFromTargets(envoySidecarTargets string) (MainListener, []TargetListener, Clusters) { + port := EnvoyMainListenPort + main := MainListener{ + Port: port, + } + domains := []RouteDomain{} + listeners := []TargetListener{} + staticClusters := []StaticCluster{} + for _, target := range strings.Split(envoySidecarTargets, ",") { + port++ + lName := fmt.Sprintf("listener_%d", port) + listeners = append(listeners, TargetListener{ + Name: lName, + Port: port, + RouteConfigName: target, + }) + rName := fmt.Sprintf("route_%d", port) + cName := fmt.Sprintf("localhost_%d", port) + domains = append(domains, RouteDomain{ + Cluster: cName, + Domain: target, + Name: rName, + Port: port, + }) + staticClusters = append(staticClusters, StaticCluster{ + Name: cName, + Port: port, + }) + } + main.Domains = domains + + dynamicClusters := []DynamicCluster{} + for _, l := range listeners { + clusterName := strings.Join(strings.Split(l.RouteConfigName, ":"), ".") + dynamicClusters = append(dynamicClusters, DynamicCluster{ + Name: clusterName, + }) + } + clusters := Clusters{ + Dynamic: dynamicClusters, + Static: staticClusters, + } + return main, listeners, clusters +} diff --git a/envoy-sidecar/configurer/go.mod b/envoy-sidecar/configurer/go.mod new file mode 100644 index 0000000..070db01 --- /dev/null +++ b/envoy-sidecar/configurer/go.mod @@ -0,0 +1,3 @@ +module github.com/utilitywarehouse/semaphore-xds/envoy-sidecar/configurer + +go 1.21.3 diff --git a/envoy-sidecar/configurer/main.go b/envoy-sidecar/configurer/main.go new file mode 100644 index 0000000..194cc1d --- /dev/null +++ b/envoy-sidecar/configurer/main.go @@ -0,0 +1,41 @@ +package main + +import ( + "flag" + "fmt" + "os" +) + +var ( + flagEnvoyNodeId = flag.String("envoy-node-id", getEnv("ENVOY_NODE_ID", ""), "Node id to configure for envoy sidecar") + flagEnvoySidecarTargets = flag.String("envoy-sidecar-targets", getEnv("ENVOY_SIDECAR_TARGETS", ""), "Comma separated list of listeners to get xDS config for: ,") + flagXdsServerAddress = flag.String("xds-server-address", getEnv("XDS_SERVER_ADDRESS", ""), "The address of the xds server for envoy sidecar to fetch config") + flagXdsServerPort = flag.String("xds-server-port", getEnv("XDS_SERVER_PORT", ""), "The port of the xds server for envoy sidecar to fetch config") +) + +func usage() { + flag.Usage() + os.Exit(1) +} + +func getEnv(key, defaultValue string) string { + value := os.Getenv(key) + if len(value) == 0 { + return defaultValue + } + return value +} + +func main() { + flag.Parse() + + if *flagEnvoyNodeId == "" || *flagEnvoySidecarTargets == "" || *flagXdsServerAddress == "" || *flagXdsServerPort == "" { + usage() + } + c, err := makeEnvoyConfig(*flagEnvoyNodeId, *flagEnvoySidecarTargets, *flagXdsServerAddress, *flagXdsServerPort) + if err != nil { + fmt.Print(err) + os.Exit(1) + } + fmt.Print(c) +} diff --git a/envoy-sidecar/configurer/templates/clusters.tmpl b/envoy-sidecar/configurer/templates/clusters.tmpl new file mode 100644 index 0000000..4172e60 --- /dev/null +++ b/envoy-sidecar/configurer/templates/clusters.tmpl @@ -0,0 +1,29 @@ +{{- range .Dynamic }} + - name: {{ .Name }} + http2_protocol_options: {} + type: EDS + eds_cluster_config: + eds_config: + resource_api_version: V3 + api_config_source: + api_type: GRPC + transport_api_version: V3 + grpc_services: + - envoy_grpc: + cluster_name: xds_cluster +{{- end }} +{{- range .Static }} + - name: {{ .Name }} + connect_timeout: 5s + type: STATIC + load_assignment: + cluster_name: {{ .Name }} + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: {{ .Port }} + http2_protocol_options: {} +{{- end }} diff --git a/envoy-sidecar/configurer/templates/envoy-config.tmpl b/envoy-sidecar/configurer/templates/envoy-config.tmpl new file mode 100644 index 0000000..bea29c2 --- /dev/null +++ b/envoy-sidecar/configurer/templates/envoy-config.tmpl @@ -0,0 +1,13 @@ +admin: + address: + socket_address: { address: 127.0.0.1, port_value: 9901 } +node: + id: {{ .NodeID }} + cluster: {{ .ClusterID }} +static_resources: + listeners: +{{ .MainListener }} +{{ .Listeners }} + clusters: +{{- .Clusters }} +{{- .XdsCluster }} diff --git a/envoy-sidecar/configurer/templates/main-listener.tmpl b/envoy-sidecar/configurer/templates/main-listener.tmpl new file mode 100644 index 0000000..84bd490 --- /dev/null +++ b/envoy-sidecar/configurer/templates/main-listener.tmpl @@ -0,0 +1,25 @@ + - name: listener_main + address: + socket_address: { address: 127.0.0.1, port_value: {{ .Port }} } + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http + route_config: + virtual_hosts: + {{- range .Domains }} + - name: {{ .Name }} + domains: ["{{ .Domain }}"] + routes: + - match: + prefix: "" + route: + cluster: {{ .Cluster }} + {{- end }} + http_filters: + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + dynamic_stats: true diff --git a/envoy-sidecar/configurer/templates/target-listeners.tmpl b/envoy-sidecar/configurer/templates/target-listeners.tmpl new file mode 100644 index 0000000..c187ff0 --- /dev/null +++ b/envoy-sidecar/configurer/templates/target-listeners.tmpl @@ -0,0 +1,27 @@ +{{- range . }} + - name: {{ .Name }} + address: + socket_address: { address: 127.0.0.1, port_value: {{ .Port }} } + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http + codec_type: AUTO + rds: + route_config_name: {{ .RouteConfigName }} + config_source: + resource_api_version: V3 + api_config_source: + api_type: GRPC + transport_api_version: V3 + grpc_services: + - envoy_grpc: + cluster_name: xds_cluster + http_filters: + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + dynamic_stats: true + {{- end }} diff --git a/envoy-sidecar/configurer/templates/xds-cluster.tmpl b/envoy-sidecar/configurer/templates/xds-cluster.tmpl new file mode 100644 index 0000000..20f0f08 --- /dev/null +++ b/envoy-sidecar/configurer/templates/xds-cluster.tmpl @@ -0,0 +1,25 @@ + - name: xds_cluster + connect_timeout: 0.25s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: + connection_keepalive: + interval: 30s + timeout: 5s + upstream_connection_options: + # configure a TCP keep-alive to detect and reconnect to the admin + # server in the event of a TCP socket half open connection + tcp_keepalive: {} + load_assignment: + cluster_name: xds_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: {{ .XdsServerAddress }} + port_value: {{ .XdsServerPort }} diff --git a/envoy-sidecar/deploy/kustomize/kyverno/mutate/kustomization.yaml b/envoy-sidecar/deploy/kustomize/kyverno/mutate/kustomization.yaml new file mode 100644 index 0000000..ec3e774 --- /dev/null +++ b/envoy-sidecar/deploy/kustomize/kyverno/mutate/kustomization.yaml @@ -0,0 +1,4 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +resources: +- mutate-semaphore-xds-envoy-sidecar.yaml diff --git a/envoy-sidecar/deploy/kustomize/kyverno/mutate/mutate-semaphore-xds-envoy-sidecar.yaml b/envoy-sidecar/deploy/kustomize/kyverno/mutate/mutate-semaphore-xds-envoy-sidecar.yaml new file mode 100644 index 0000000..fadaa73 --- /dev/null +++ b/envoy-sidecar/deploy/kustomize/kyverno/mutate/mutate-semaphore-xds-envoy-sidecar.yaml @@ -0,0 +1,88 @@ +apiVersion: kyverno.io/v1 +kind: ClusterPolicy +metadata: + name: mutate-semaphore-xds-envoy-sidecar + annotations: + policies.kyverno.io/title: Mutate Sempaphore-xDS Envoy Sidecar + policies.kyverno.io/category: xDS + policies.kyverno.io/subject: Pod + policies.kyverno.io/description: >- + This policy ensures that pods labelled with + xds.semaphore.uw.systems/envoy-sidecar: "true" are injected with the init + container and the sidecar needed to use envoy to proxy grpc connections + to targets specified by xds.semaphore.uw.systems/envoy-sidecar-targets + annotation. +spec: + background: false + mutateExistingOnPolicyUpdate: false + rules: + - name: xds-clients-inject-envoy-sidecar + match: + resources: + kinds: + - Pod + operations: + - CREATE + selector: + matchLabels: + xds.semaphore.uw.systems/envoy-sidecar: "true" + mutate: + patchStrategicMerge: + spec: + initContainers: + - name: envoy-configurer + image: quay.io/utilitywarehouse/semaphore-xds/envoy-sidecar/configurer:envoy-sidecar-localhost + imagePullPolicy: Always + env: + - name: ENVOY_NODE_ID + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: ENVOY_SIDECAR_TARGETS + valueFrom: + fieldRef: + fieldPath: metadata.annotations['xds.semaphore.uw.systems/envoy-sidecar-targets'] + - name: XDS_SERVER_ADDRESS + value: semaphore-xds.sys-semaphore.svc.cluster.local + - name: XDS_SERVER_PORT + value: "18000" + command: + - /bin/sh + - -c + - | + /semaphore-xds-envoy-configurer > /etc/envoy/config.yaml + cat /etc/envoy/config.yaml + volumeMounts: + - name: envoy-config + mountPath: /etc/envoy + containers: + - name: envoy-sidecar + image: envoyproxy/envoy:v1.29.1 + imagePullPolicy: Always + args: + - -c /etc/envoy/config.yaml + volumeMounts: + - name: envoy-config + mountPath: /etc/envoy + - (name): "*" + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: GRPC_XDS_BOOTSTRAP_CONFIG + value: >- + { + "xds_servers": [{ + "server_uri": "semaphore-xds-localhost.sys-semaphore.svc.cluster.local:18000", + "channel_creds": [{"type": "insecure"}], + "server_features": ["xds_v3"]} + ], + "node":{ + "id":"{{request.object.metadata.namespace}}/\$(POD_NAME)", + "locality":{} + } + } + volumes: + - name: envoy-config + emptyDir: {} diff --git a/example/client/go.mod b/example/client/go.mod index b8f6c7a..81c3bb1 100644 --- a/example/client/go.mod +++ b/example/client/go.mod @@ -1,29 +1,29 @@ module github.com/utilitywarehouse/semaphore-xds/example/client -go 1.21 +go 1.22.7 + +toolchain go1.23.3 require ( - github.com/golang/protobuf v1.5.3 - google.golang.org/grpc v1.58.3 + github.com/golang/protobuf v1.5.4 + google.golang.org/grpc v1.68.0 ) require ( - cloud.google.com/go/compute v1.23.1 // indirect - cloud.google.com/go/compute/metadata v0.2.3 // indirect + cel.dev/expr v0.19.0 // indirect + cloud.google.com/go/compute/metadata v0.5.2 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe // indirect - github.com/cncf/xds/go v0.0.0-20231016030527-8bd2eac9fb4a // indirect - github.com/envoyproxy/go-control-plane v0.11.1 // indirect - github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect - golang.org/x/net v0.23.0 // indirect - golang.org/x/oauth2 v0.13.0 // indirect - golang.org/x/sync v0.4.0 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/text v0.14.0 // indirect - google.golang.org/appengine v1.6.8 // indirect - google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect - google.golang.org/protobuf v1.33.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 // indirect + github.com/envoyproxy/go-control-plane v0.13.1 // indirect + github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect + github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect + golang.org/x/net v0.31.0 // indirect + golang.org/x/oauth2 v0.24.0 // indirect + golang.org/x/sync v0.9.0 // indirect + golang.org/x/sys v0.27.0 // indirect + golang.org/x/text v0.20.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect + google.golang.org/protobuf v1.35.2 // indirect ) diff --git a/example/client/go.sum b/example/client/go.sum index 1e7ef27..ff3b222 100644 --- a/example/client/go.sum +++ b/example/client/go.sum @@ -1,112 +1,40 @@ -cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -cloud.google.com/go/compute v1.23.1 h1:V97tBoDaZHb6leicZ1G6DLK2BAaZLJ/7+9BB/En3hR0= -cloud.google.com/go/compute v1.23.1/go.mod h1:CqB3xpmPKKt3OJpW2ndFIXnA9A4xAy/F3Xp1ixncW78= -cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= -cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= -github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +cel.dev/expr v0.19.0 h1:lXuo+nDhpyJSpWxpPVi5cPUwzKb+dsdOiw6IreM5yt0= +cel.dev/expr v0.19.0/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw= +cloud.google.com/go/compute/metadata v0.5.2 h1:UxK4uu/Tn+I3p2dYWTfiX4wva7aYlKixAHn3fyqngqo= +cloud.google.com/go/compute/metadata v0.5.2/go.mod h1:C66sj2AluDcIqakBq/M8lw8/ybHgOZqin2obFxa/E5k= github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe h1:QQ3GSy+MqSHxm/d8nCtnAiZdYFd45cYZPs8vOOIYKfk= -github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= -github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20231016030527-8bd2eac9fb4a h1:SZL0tarhuhoN0kvo5pfO4i6vxYghwzXUo9w0WHIjI4k= -github.com/cncf/xds/go v0.0.0-20231016030527-8bd2eac9fb4a/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= -github.com/envoyproxy/go-control-plane v0.11.1 h1:wSUXTlLfiAQRWs2F+p+EKOY9rUyis1MyGqJ2DIk5HpM= -github.com/envoyproxy/go-control-plane v0.11.1/go.mod h1:uhMcXKCQMEJHiAb0w+YGefQLaTEw+YhGluxZkrTmD0g= -github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= -github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= -github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= -golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= -golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= -golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= -golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= -golang.org/x/oauth2 v0.13.0 h1:jDDenyj+WgFtmV3zYVoi8aE2BwtXFLWOA67ZfNWftiY= -golang.org/x/oauth2 v0.13.0/go.mod h1:/JMhi4ZRXAf4HG9LiNmxvk+45+96RUlVThiH8FzNBn0= -golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= -golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= -golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= -golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= -google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= -google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= -google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= -google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b h1:+YaDE2r2OG8t/z5qmsh7Y+XXwCbvadxxZ0YY6mTdrVA= -google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:CgAqfJo+Xmu0GwA0411Ht3OU3OntXwsGmrmjI8ioGXI= -google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b h1:CIC2YMXmIhYw6evmhPxBKJ4fmLbOFtXQN/GV3XOZR8k= -google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:IBQ646DjkDkvUIsVq/cc03FUFQ9wbZu7yE396YcL870= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc= -google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= -google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= -google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= -google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ= -google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= -honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 h1:QVw89YDxXxEe+l8gU8ETbOasdwEV+avkR75ZzsVV9WI= +github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= +github.com/envoyproxy/go-control-plane v0.13.1 h1:vPfJZCkob6yTMEgS+0TwfTUfbHjfy/6vOJ8hUWX/uXE= +github.com/envoyproxy/go-control-plane v0.13.1/go.mod h1:X45hY0mufo6Fd0KW3rqsGvQMw58jvjymeCzBU3mWyHw= +github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM= +github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo= +github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= +golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo= +golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM= +golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE= +golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= +golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a h1:OAiGFfOiA0v9MRYsSidp3ubZaBnteRUyn3xB2ZQ5G/E= +google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a/go.mod h1:jehYqy3+AhJU9ve55aNOaSml7wUXjF9x6z2LcCfpAhY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a h1:hgh8P4EuoxpsuKMXX/To36nOFD7vixReXgn8lPGnt+o= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= +google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0= +google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= diff --git a/example/client/main.go b/example/client/main.go index bd63a77..633a06c 100644 --- a/example/client/main.go +++ b/example/client/main.go @@ -3,8 +3,8 @@ package main import ( "context" "flag" - "fmt" "log" + "strings" "time" echo "github.com/utilitywarehouse/semaphore-xds/example/client/echo" @@ -14,17 +14,25 @@ import ( ) var ( - flagGrpcServerAddress = flag.String("grpc-server-address", "", "Echo server address") + flagGrpcServerAddresses = flag.String("grpc-server-addresses", "", "Comma separated echo server address") ) func main() { flag.Parse() - if *flagGrpcServerAddress == "" { - log.Fatal("Must provide a grpc server address") + if *flagGrpcServerAddresses == "" { + log.Fatal("Must provide at least one grpc server address") } - log.Println("Looking up service %s", *flagGrpcServerAddress) + servers := strings.Split(*flagGrpcServerAddresses, ",") - address := fmt.Sprintf(*flagGrpcServerAddress) + for _, server := range servers { + go callEchoServer(server) + } + for { // so bad.. + } +} + +func callEchoServer(address string) { + log.Println("Looking up service %s", address) conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { log.Fatalf("Could not connect %v", err) diff --git a/main.go b/main.go index bda1258..d9b6119 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ var ( flagNamespace = flag.String("namespace", getEnv("SXDS_NAMESPACE", ""), "The namespace in which to watch for kubernetes resources") flagLabelSelector = flag.String("label-selector", getEnv("SXDS_LABEL_SELECTOR", "xds.semaphore.uw.systems/enabled=true"), "Label selector for watched kubernetes resources") flagLbPolicyLabel = flag.String("lb-policy-selector", getEnv("SXDS_LB_POLICY_SELECTOR", "xds.semaphore.uw.systems/lb-policy"), "Label to allow user to configure the lb policy for a Service clusters") + flagLocalhostEndpoints = flag.Bool("localhost-endpoints", false, "If enabled the server will create configuration with dummy endpoints on 127.0.0.1:18001 for all requested listeners and clusters") flagServerListenPort = flag.Uint("server-listen-port", 18000, "xDS server listen port") flagMaxRequestsPerSecond = flag.Float64("max-requests-per-second", 500.0, "maximum allowed requests to the server per second") flagMaxPeerRequestsPerSecond = flag.Float64("max-peer-requests-per-second", 50.0, "maximum allowed requests from a peer per second") @@ -53,24 +54,27 @@ func main() { controller.LbPolicyLabel = *flagLbPolicyLabel localClient, remoteClients := createClientsFromConfig(*flagClustersConfigPath) - snapshotter := xds.NewSnapshotter(*flagAuthorityName, *flagServerListenPort, *flagMaxRequestsPerSecond, *flagMaxPeerRequestsPerSecond) + snapshotter := xds.NewSnapshotter(*flagAuthorityName, *flagServerListenPort, *flagMaxRequestsPerSecond, *flagMaxPeerRequestsPerSecond, *flagLocalhostEndpoints) xds.InitSnapMetricsCollector(snapshotter) - go serveMetrics(fmt.Sprintf(":%s", *flagMetricsListenPort)) + if !!*flagLocalhostEndpoints { + go serveMetrics(fmt.Sprintf(":%s", *flagMetricsListenPort)) + } go servePprof(*flagPprofListenAddress) - controller := controller.NewController( - localClient, - remoteClients, - *flagNamespace, - *flagLabelSelector, - snapshotter, - 0, - ) - controller.Run() + if !*flagLocalhostEndpoints { + controller := controller.NewController( + localClient, + remoteClients, + *flagNamespace, + *flagLabelSelector, + snapshotter, + 0, + ) + controller.Run() + defer controller.Stop() + } snapshotter.ListenAndServe() - - controller.Stop() } func serveMetrics(address string) { diff --git a/xds/endpoints.go b/xds/endpoints.go index b6b4953..2eac12f 100644 --- a/xds/endpoints.go +++ b/xds/endpoints.go @@ -230,3 +230,9 @@ func endpointSlicesToClusterLoadAssignments(endpointStore XdsEndpointStore, auth } return eds, nil } + +func localhostClusterLoadAssignment(clusterName string) *endpointv3.ClusterLoadAssignment { + endpoints := []*endpointv3.LbEndpoint{lbEndpoint("127.0.0.1", int32(18001), true)} + localityEndpoint := []*endpointv3.LocalityLbEndpoints{localityEndpoints(endpoints, "localhost", "localhost", uint32(0))} + return clusterLoadAssignment(clusterName, localityEndpoint) +} diff --git a/xds/service.go b/xds/service.go index 9f549d4..60bb0c9 100644 --- a/xds/service.go +++ b/xds/service.go @@ -93,6 +93,10 @@ func makeRouteConfig(name, namespace, authority string, port int32, retry *route virtualHostName = makeXdstpVirtualHostName(name, namespace, authority, port) domains = append(domains, virtualHostName) } + return routeConfig(routeName, clusterName, virtualHostName, domains, retry, hashPolicies) +} + +func routeConfig(routeName, clusterName, virtualHostName string, domains []string, retry *routev3.RetryPolicy, hashPolicies []*routev3.RouteAction_HashPolicy) *routev3.RouteConfiguration { return &routev3.RouteConfiguration{ Name: routeName, VirtualHosts: []*routev3.VirtualHost{ @@ -120,6 +124,47 @@ func makeRouteConfig(name, namespace, authority string, port int32, retry *route } } +// makeAllKubeServicesRouteConfig will return all the available routes in a +// Kubernetes cluster. It is meant to be combined with envoy clients that will +// also configure on-demand CDS and EDS for lazy resources discovery. +func makeAllKubeServicesRouteConfig(serviceStore XdsServiceStore) *routev3.RouteConfiguration { + vh := []*routev3.VirtualHost{} + for _, s := range serviceStore.All() { + for _, port := range s.Service.Spec.Ports { + clusterName := makeClusterName(s.Service.Name, s.Service.Namespace, port.Port) + virtualHostName := makeVirtualHostName(s.Service.Name, s.Service.Namespace, port.Port) + domains := []string{makeGlobalServiceDomain(s.Service.Name, s.Service.Namespace, port.Port)} + vh = append(vh, &routev3.VirtualHost{ + Name: virtualHostName, + Domains: domains, + Routes: []*routev3.Route{{ + Match: &routev3.RouteMatch{ + PathSpecifier: &routev3.RouteMatch_Prefix{ + Prefix: "", + }, + }, + Action: &routev3.Route_Route{ + Route: &routev3.RouteAction{ + ClusterSpecifier: &routev3.RouteAction_Cluster{ + Cluster: clusterName, + }, + HashPolicy: s.RingHashPolicies, + }, + }, + }}, + RetryPolicy: s.Retry, + }) + } + } + if len(vh) > 0 { + return &routev3.RouteConfiguration{ + Name: "all_kube_routes", + VirtualHosts: vh, + } + } + return nil +} + func makeManager(routeConfig *routev3.RouteConfiguration) (*anypb.Any, error) { router, _ := anypb.New(&routerv3.Router{}) return anypb.New(&managerv3.HttpConnectionManager{ @@ -142,6 +187,10 @@ func makeListener(name, namespace, authority string, port int32, manager *anypb. if authority != "" { listenerName = makeXdstpListenerName(name, namespace, authority, port) } + return listener(listenerName, manager) +} + +func listener(listenerName string, manager *anypb.Any) *listenerv3.Listener { return &listenerv3.Listener{ Name: listenerName, ApiListener: &listenerv3.ApiListener{ @@ -155,19 +204,8 @@ func makeCluster(name, namespace, authority string, port int32, policy clusterv3 if authority != "" { clusterName = makeXdstpClusterName(name, namespace, authority, port) } - cluster := &clusterv3.Cluster{ - Name: clusterName, - ClusterDiscoveryType: &clusterv3.Cluster_Type{Type: clusterv3.Cluster_EDS}, - LbPolicy: policy, - EdsClusterConfig: &clusterv3.Cluster_EdsClusterConfig{ - EdsConfig: &corev3.ConfigSource{ - ConfigSourceSpecifier: &corev3.ConfigSource_Ads{ - Ads: &corev3.AggregatedConfigSource{}, - }, - }, - }, - } + cluster := cluster(clusterName, policy) if authority != "" { // This will be the name of the subsequently requested ClusterLoadAssignment. We need to set this // to the cluster name to hit resources in the cache and reply to EDS requests @@ -184,6 +222,48 @@ func makeCluster(name, namespace, authority string, port int32, policy clusterv3 return cluster } +func cluster(clusterName string, policy clusterv3.Cluster_LbPolicy) *clusterv3.Cluster { + return &clusterv3.Cluster{ + Name: clusterName, + ClusterDiscoveryType: &clusterv3.Cluster_Type{Type: clusterv3.Cluster_EDS}, + LbPolicy: policy, + EdsClusterConfig: &clusterv3.Cluster_EdsClusterConfig{ + EdsConfig: &corev3.ConfigSource{ + ConfigSourceSpecifier: &corev3.ConfigSource_Ads{ + Ads: &corev3.AggregatedConfigSource{}, + }, + }, + }, + } +} + +// patchClusterDeltaEDS patches a cluster's EDS config to configure the clients +// to use Delta streams. It is meant to be used only for envoy clients where we +// configure an xds server as "xds_cluster" via injected config. +func patchClusterDeltaEDS(cluster *clusterv3.Cluster) *clusterv3.Cluster { + cluster.Http2ProtocolOptions = &corev3.Http2ProtocolOptions{} + cluster.EdsClusterConfig = &clusterv3.Cluster_EdsClusterConfig{ + EdsConfig: &corev3.ConfigSource{ + ConfigSourceSpecifier: &corev3.ConfigSource_ApiConfigSource{ + ApiConfigSource: &corev3.ApiConfigSource{ + ApiType: corev3.ApiConfigSource_DELTA_GRPC, + TransportApiVersion: corev3.ApiVersion_V3, + GrpcServices: []*corev3.GrpcService{ + { + TargetSpecifier: &corev3.GrpcService_EnvoyGrpc_{ + EnvoyGrpc: &corev3.GrpcService_EnvoyGrpc{ + ClusterName: "xds_cluster", + }, + }, + }, + }, + }, + }, + }, + } + return cluster +} + // servicesToResources will return a set of listener, routeConfiguration and // cluster for each service port func servicesToResources(serviceStore XdsServiceStore, authority string) ([]types.Resource, []types.Resource, []types.Resource, error) { @@ -205,5 +285,8 @@ func servicesToResources(serviceStore XdsServiceStore, authority string) ([]type cls = append(cls, cluster) } } + if allKubeRoutes := makeAllKubeServicesRouteConfig(serviceStore); allKubeRoutes != nil { + rds = append(rds, allKubeRoutes) + } return cls, rds, lsnr, nil } diff --git a/xds/snapshotter.go b/xds/snapshotter.go index 14f6a17..015b426 100644 --- a/xds/snapshotter.go +++ b/xds/snapshotter.go @@ -9,7 +9,9 @@ import ( "sync/atomic" "time" + clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3" @@ -49,16 +51,19 @@ type Stream struct { type Snapshotter struct { authority string // Authority name of the server for federated requests servePort uint - servicesCache cache.SnapshotCache // default service snapshot cache for empty node ID (all watched resources snapshot) - serviceSnapVersion int32 // Service snap version for empty node ID snapshot - endpointsCache cache.SnapshotCache // default endpoints snapshot cache for empty node ID (all watched resources snapshot) - endpointsSnapVersion int32 // Endpoints snap version for empty node ID snapshot + servicesCache cache.SnapshotCache + serviceSnapVersion int32 // Service snap version for empty node ID snapshot + endpointsCache cache.SnapshotCache + endpointsSnapVersion int32 // Endpoints snap version for empty node ID snapshot + deltaCDSCache *cache.LinearCache + deltaEDSCache *cache.LinearCache muxCache cache.MuxCache requestRateLimit *rate.Limiter // maximum number of requests allowed to server streamRequestPerSecond float64 // maximum number of requests per stream per second streams sync.Map // map of open streams nodes sync.Map // maps all clients node ids to requested resources that will be snapshotted and served snapNodesMu sync.Mutex // Simple lock to avoid deleting a node while snapshotting + localhostEndpoints bool } // Node keeps the info for a node. Each node can have multiple open streams, @@ -126,20 +131,57 @@ func mapTypeURL(typeURL string) string { } } +func mapDeltaTypeURL(typeURL string) string { + switch typeURL { + case resource.ClusterType: + return "deltaClusters" + case resource.EndpointType: + return "deltaEndpoints" + default: + return "" + } +} + +func (s *Snapshotter) getCacheForType(typeURL string) cache.SnapshotCache { + switch typeURL { + case resource.ListenerType, resource.RouteType, resource.ClusterType: + return s.servicesCache + case resource.EndpointType: + return s.endpointsCache + default: + return nil + } +} + +func (s *Snapshotter) getDeltaCacheForType(typeURL string) *cache.LinearCache { + switch typeURL { + case resource.EndpointType: + return s.deltaEDSCache + case resource.ClusterType: + return s.deltaCDSCache + default: + return nil + } +} + // NewSnapshotter needs a grpc server port and the allowed requests limits per server and stream per second -func NewSnapshotter(authority string, port uint, requestLimit, streamRequestLimit float64) *Snapshotter { +func NewSnapshotter(authority string, port uint, requestLimit, streamRequestLimit float64, localhostEndpoints bool) *Snapshotter { servicesCache := cache.NewSnapshotCache(false, cache.IDHash{}, log.EnvoyLogger) - endpointsCache := cache.NewSnapshotCache(false, cache.IDHash{}, log.EnvoyLogger) // This could be a linear cache? https://pkg.go.dev/github.com/envoyproxy/go-control-plane/pkg/cache/v3#LinearCache + endpointsCache := cache.NewSnapshotCache(false, cache.IDHash{}, log.EnvoyLogger) + deltaCDSCache := cache.NewLinearCache(resource.ClusterType, cache.WithLogger(log.EnvoyLogger)) + deltaEDSCache := cache.NewLinearCache(resource.EndpointType, cache.WithLogger(log.EnvoyLogger)) muxCache := cache.MuxCache{ Classify: func(r *cache.Request) string { return mapTypeURL(r.TypeUrl) }, ClassifyDelta: func(r *cache.DeltaRequest) string { - return mapTypeURL(r.TypeUrl) + return mapDeltaTypeURL(r.TypeUrl) }, Caches: map[string]cache.Cache{ - "services": servicesCache, - "endpoints": endpointsCache, + "services": servicesCache, + "endpoints": endpointsCache, + "deltaClusters": deltaCDSCache, + "deltaEndpoints": deltaEDSCache, }, } return &Snapshotter{ @@ -147,9 +189,12 @@ func NewSnapshotter(authority string, port uint, requestLimit, streamRequestLimi servePort: port, servicesCache: servicesCache, endpointsCache: endpointsCache, + deltaCDSCache: deltaCDSCache, + deltaEDSCache: deltaEDSCache, muxCache: muxCache, requestRateLimit: rate.NewLimiter(rate.Limit(requestLimit), 1), streamRequestPerSecond: streamRequestLimit, + localhostEndpoints: localhostEndpoints, } } @@ -207,12 +252,15 @@ func (s *Snapshotter) SnapServices(serviceStore XdsServiceStore) error { if err != nil { return fmt.Errorf("Failed to set services snapshot %v", err) } + // Sync linear caches + s.setDeltaCache(resource.ClusterType) + s.nodes.Range(func(nID, n interface{}) bool { nodeID := nID.(string) node := n.(*Node) for sID, res := range node.resources { for typeURL, resources := range res.servicesNames { - if err := s.updateNodeStreamResources(nodeID, typeURL, sID, resources); err != nil { + if err := s.updateNodeStreamServiceResources(nodeID, typeURL, sID, resources); err != nil { log.Logger.Error("Failed to update service resources before snapping", "type", typeURL, "node", nodeID, "resources", resources, "stream_id", sID, @@ -259,6 +307,9 @@ func (s *Snapshotter) SnapEndpoints(endpointStore XdsEndpointStore) error { if err != nil { return fmt.Errorf("Failed to set endpoints snapshot %v", err) } + // Sync linear caches + s.setDeltaCache(resource.EndpointType) + s.nodes.Range(func(nID, n interface{}) bool { nodeID := nID.(string) node := n.(*Node) @@ -281,6 +332,32 @@ func (s *Snapshotter) SnapEndpoints(endpointStore XdsEndpointStore) error { return nil } +// setDeltaCache gets resources of a type from the full Sotw snapshot and sets +// them in the respective linear cache. This way we can make sure that linear +// caches are synced with the existing resources, but we will be triggering +// downstream notifications more often +func (s *Snapshotter) setDeltaCache(typeUrl string) error { + // Load all the known resources from the full State of the world Snapshot + allResourcesSnap, err := s.getCacheForType(typeUrl).GetSnapshot(EmptyNodeID) + if err != nil { + return fmt.Errorf("Cannot sync linear cache for type: %s, failed to get sotw snap: %v", typeUrl, err) + } + allResourcesOfType := allResourcesSnap.GetResources(typeUrl) + setResources := make(map[string]types.Resource) + for name, res := range allResourcesOfType { + if typeUrl == resource.ClusterType { + cluster, err := UnmarshalResourceToCluster(res) + if err != nil { + return fmt.Errorf("Cannot unmarshal resource to cluster for patching: %v", err) + } + res = patchClusterDeltaEDS(cluster) + } + setResources[name] = res + } + s.getDeltaCacheForType(typeUrl).SetResources(setResources) + return nil +} + func (s *Snapshotter) OnStreamOpen(ctx context.Context, id int64, typ string) error { var peerAddr string if peerInfo, ok := peer.FromContext(ctx); ok { @@ -357,21 +434,37 @@ func (s *Snapshotter) OnFetchRequest(ctx context.Context, req *discovery.Discove } func (s *Snapshotter) OnDeltaStreamClosed(id int64, node *core.Node) { - log.Logger.Info("OnDeltaStreamClosed") + log.Logger.Info("OnDeltaStreamClosed", "id", id, "node", node) } func (s *Snapshotter) OnDeltaStreamOpen(ctx context.Context, id int64, typ string) error { - log.Logger.Info("OnDeltaStreamOpen") + var peerAddr string + if peerInfo, ok := peer.FromContext(ctx); ok { + peerAddr = peerInfo.Addr.String() + } + log.Logger.Info("OnDeltaStreamOpen", "peer address", peerAddr, "id", id, "type", typ) return nil } -func (s *Snapshotter) OnStreamDeltaRequest(i int64, request *discovery.DeltaDiscoveryRequest) error { - log.Logger.Info("OnStreamDeltaRequest") +func (s *Snapshotter) OnStreamDeltaRequest(id int64, r *discovery.DeltaDiscoveryRequest) error { + log.Logger.Info("OnStreamDeltaRequest", + "id", id, + "received", r.GetTypeUrl(), + "node", r.GetNode().GetId(), + "locality", r.GetNode().GetLocality(), + "subscribes", strings.Join(r.GetResourceNamesSubscribe(), ", "), + "unsubscribes", strings.Join(r.GetResourceNamesUnsubscribe(), ", "), + "response_nonce", r.GetResponseNonce(), + ) return nil } -func (s *Snapshotter) OnStreamDeltaResponse(i int64, request *discovery.DeltaDiscoveryRequest, response *discovery.DeltaDiscoveryResponse) { - log.Logger.Info("OnStreamDeltaResponse") +func (s *Snapshotter) OnStreamDeltaResponse(id int64, req *discovery.DeltaDiscoveryRequest, resp *discovery.DeltaDiscoveryResponse) { + log.Logger.Info("OnStreamDeltaResponse", + "id", id, + "type", resp.GetTypeUrl(), + "resources", len(resp.GetResources()), + ) } func (s *Snapshotter) OnFetchResponse(req *discovery.DiscoveryRequest, resp *discovery.DiscoveryResponse) { @@ -493,9 +586,9 @@ func (s *Snapshotter) getResourcesFromCache(typeURL string, resources []string) return res, nil } -// updateNodeStreamResources updates the list of service resources requested in a node's stream context +// updateNodeStreamServiceResources updates the list of service resources requested in a node's stream context // by copying the most up to date version of them from the full snapshot (default snapshot) -func (s *Snapshotter) updateNodeStreamResources(nodeID, typeURL string, streamID int64, resources []string) error { +func (s *Snapshotter) updateNodeStreamServiceResources(nodeID, typeURL string, streamID int64, resources []string) error { n, ok := s.nodes.Load(nodeID) if !ok { return fmt.Errorf("Cannot update service snapshot resources, node: %s not found", nodeID) @@ -506,9 +599,19 @@ func (s *Snapshotter) updateNodeStreamResources(nodeID, typeURL string, streamID return fmt.Errorf("Cannot find service resources to update for node: %s in stream: %d context", nodeID, streamID) } - newSnapResources, err := s.getResourcesFromCache(typeURL, resources) - if err != nil { - return fmt.Errorf("Cannot get resources from cache: %s", err) + var newSnapResources []types.Resource + var err error + if s.localhostEndpoints { + newSnapResources, err = s.makeDummyResources(typeURL, resources) + if err != nil { + return fmt.Errorf("Cannot make dummy resources for localhost mode: %s", err) + } + log.Logger.Debug("Created dummy resources", "type", typeURL, "resources", newSnapResources, "count", len(newSnapResources)) + } else { + newSnapResources, err = s.getResourcesFromCache(typeURL, resources) + if err != nil { + return fmt.Errorf("Cannot get resources from cache: %s", err) + } } nodeResources[streamID].services[typeURL] = newSnapResources @@ -519,13 +622,13 @@ func (s *Snapshotter) updateNodeStreamResources(nodeID, typeURL string, streamID serviceSnapVersion: node.serviceSnapVersion, endpointsSnapVersion: node.endpointsSnapVersion, } + log.Logger.Debug("Updating node resources", "node", nodeID, "type", typeURL, "resources", updatedNode.resources[streamID].services[typeURL]) s.nodes.Store(nodeID, updatedNode) return nil } // updateNodeStreamEndpointsResources updates the list of endpoint resources requested in a node's stream context // by copying the most up to date version of them from the full snapshot (default snapshot) - func (s *Snapshotter) updateNodeStreamEndpointsResources(nodeID, typeURL string, streamID int64, resources []string) error { n, ok := s.nodes.Load(nodeID) if !ok { @@ -537,9 +640,19 @@ func (s *Snapshotter) updateNodeStreamEndpointsResources(nodeID, typeURL string, return fmt.Errorf("Cannot find endpoint resources to update for node: %s in stream: %d context", nodeID, streamID) } - newSnapResources, err := s.getResourcesFromCache(typeURL, resources) - if err != nil { - return fmt.Errorf("Cannot get resources from cache: %s", err) + var newSnapResources []types.Resource + var err error + if s.localhostEndpoints { + newSnapResources, err = s.makeDummyResources(typeURL, resources) + if err != nil { + return fmt.Errorf("Cannot make dummy resources for localhost mode: %s", err) + } + log.Logger.Debug("Created dummy resources", "type", typeURL, "resources", newSnapResources, "count", len(newSnapResources)) + } else { + newSnapResources, err = s.getResourcesFromCache(typeURL, resources) + if err != nil { + return fmt.Errorf("Cannot get resources from cache: %s", err) + } } nodeResources[streamID].endpoints[typeURL] = newSnapResources @@ -592,7 +705,7 @@ func (s *Snapshotter) nodeEndpointsSnapshot(nodeID string) error { // trigger a new snapshot func (s *Snapshotter) updateStreamNodeResources(nodeID, typeURL string, streamID int64, resources []string) error { if mapTypeURL(typeURL) == "services" { - if err := s.updateNodeStreamResources(nodeID, typeURL, streamID, resources); err != nil { + if err := s.updateNodeStreamServiceResources(nodeID, typeURL, streamID, resources); err != nil { return err } return s.nodeServiceSnapshot(nodeID) @@ -616,7 +729,7 @@ func (s *Snapshotter) needToUpdateSnapshot(nodeID, typeURL string, streamID int6 node := n.(*Node) sNodeResources, ok := node.resources[streamID] if !ok { - log.Logger.Warn("Cannot check if snapshot needs updating, strema not found", "id", streamID) + log.Logger.Warn("Cannot check if snapshot needs updating, stream not found", "id", streamID) return false } if mapTypeURL(typeURL) == "services" { @@ -628,6 +741,41 @@ func (s *Snapshotter) needToUpdateSnapshot(nodeID, typeURL string, streamID int6 return false } +// makeDummyResources blindly creates default configuration resources to snapshot based on the requested names. +// For endpoints it will create a single localhost one +func (s *Snapshotter) makeDummyResources(typeURL string, resources []string) ([]types.Resource, error) { + res := []types.Resource{} + if typeURL == resource.ListenerType { + for _, r := range resources { + clusterName := strings.Replace(r, ":", ".", 1) // Replace expected format of name.namespace:port -> name.namespace.port to match cluster names due to xds naming + routeConfig := routeConfig(r, clusterName, r, []string{r}, nil, []*routev3.RouteAction_HashPolicy{}) + manager, err := makeManager(routeConfig) + if err != nil { + return res, fmt.Errorf("Cannot generate listener manager: %v", err) + } + l := listener(r, manager) + res = append(res, l) + } + } + if typeURL == resource.RouteType { + for _, r := range resources { + clusterName := strings.Replace(r, ":", ".", 1) // Replace expected format of name.namespace:port -> name.namespace.port to match cluster names due to xds naming + res = append(res, routeConfig(r, clusterName, r, []string{r}, nil, []*routev3.RouteAction_HashPolicy{})) + } + } + if typeURL == resource.ClusterType { + for _, r := range resources { + res = append(res, cluster(r, clusterv3.Cluster_ROUND_ROBIN)) + } + } + if typeURL == resource.EndpointType { + for _, r := range resources { + res = append(res, localhostClusterLoadAssignment(r)) + } + } + return res, nil +} + // ListenAndServeFromCache will start an xDS server at the given port and serve // snapshots from the given cache func (s *Snapshotter) ListenAndServe() { diff --git a/xds/snapshotter_metrics_test.go b/xds/snapshotter_metrics_test.go index 7262104..da29594 100644 --- a/xds/snapshotter_metrics_test.go +++ b/xds/snapshotter_metrics_test.go @@ -120,7 +120,9 @@ func TestSnapMetricsCollector(t *testing.T) { fmt.Sprintf(`semaphore_xds_snapshot_listener{name="%s",node_id="%s",route_config="%s",type="%s"} 1`, expectHttpListenerName, "test-node", expectHttpRouteName, resource.ListenerType), fmt.Sprintf(`semaphore_xds_snapshot_listener{name="%s",node_id="%s",route_config="%s",type="%s"} 1`, expectHttpsListenerName, "test-node", expectHttpsRouteName, resource.ListenerType), fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpClusterName, expectHttpDomains, expectHttpRouteName, EmptyNodeID, resource.RouteType, expectHttpVhost), + fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpClusterName, expectHttpDomains, "all_kube_routes", EmptyNodeID, resource.RouteType, expectHttpVhost), fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpsClusterName, expectHttpsDomains, expectHttpsRouteName, EmptyNodeID, resource.RouteType, expectHttpsVhost), + fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpsClusterName, expectHttpsDomains, "all_kube_routes", EmptyNodeID, resource.RouteType, expectHttpsVhost), fmt.Sprintf(`semaphore_xds_snapshot_cluster{discovery_type="eds",lb_policy="round_robin",name="%s",node_id="%s",type="%s"} 1`, expectHttpClusterName, EmptyNodeID, resource.ClusterType), fmt.Sprintf(`semaphore_xds_snapshot_cluster{discovery_type="eds",lb_policy="round_robin",name="%s",node_id="%s",type="%s"} 1`, expectHttpsClusterName, EmptyNodeID, resource.ClusterType), fmt.Sprintf(`semaphore_xds_snapshot_endpoint{cluster_name="%s",health_status="healthy",lb_address="%s",locality_subzone="foo-xzf",locality_zone="test",node_id="%s",priority="0",type="%s"} 1`, expectHttpClusterName, "10.2.1.1:80", EmptyNodeID, resource.EndpointType), @@ -131,7 +133,7 @@ func TestSnapMetricsCollector(t *testing.T) { log.InitLogger("test-semaphore-xds-metrics", "debug") for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) snapshotter.SnapServices(tt.serviceStore) snapshotter.SnapEndpoints(tt.endpointStore) snapshotter.addOrUpdateNode(tt.nodeID, tt.nodeAddress, tt.streamID) diff --git a/xds/snapshotter_test.go b/xds/snapshotter_test.go index bde2bcf..e8e5768 100644 --- a/xds/snapshotter_test.go +++ b/xds/snapshotter_test.go @@ -20,7 +20,7 @@ func init() { } func TestSnapServices_EmptyServiceList(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) serviceStore := NewXdsServiceStore() snapshotter.SnapServices(serviceStore) snap, err := snapshotter.servicesCache.GetSnapshot(testNodeID) @@ -34,7 +34,7 @@ func TestSnapServices_EmptyServiceList(t *testing.T) { } func TestSnapServices_SingleService(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) serviceStore := NewXdsServiceStore() svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -63,11 +63,11 @@ func TestSnapServices_SingleService(t *testing.T) { // cluster assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes } func TestSnapServices_NoServicePorts(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) serviceStore := NewXdsServiceStore() svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -91,7 +91,7 @@ func TestSnapServices_NoServicePorts(t *testing.T) { } func TestSnapServices_MultipleServicePorts(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) serviceStore := NewXdsServiceStore() svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -124,11 +124,11 @@ func TestSnapServices_MultipleServicePorts(t *testing.T) { // per port assert.Equal(t, 2, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 2, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes } func TestSnapEndpoints_EmptyEndpointStore(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) endpointStore := NewXdsEnpointStore() snapshotter.SnapEndpoints(endpointStore) snap, err := snapshotter.endpointsCache.GetSnapshot(testNodeID) @@ -140,7 +140,7 @@ func TestSnapEndpoints_EmptyEndpointStore(t *testing.T) { } func TestSnapEndpoints_MissingServiceOwnershipLabel(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) log.InitLogger("test-semaphore-xds", "debug") endpointSlice := &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ @@ -159,7 +159,7 @@ func TestSnapEndpoints_MissingServiceOwnershipLabel(t *testing.T) { } func TestSnapEndpoints_UpdateAddress(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) // Create test EndpointSlice httpPortName := "http" httpPortValue := int32(80) @@ -242,7 +242,7 @@ func TestSnapEndpoints_UpdateAddress(t *testing.T) { } func TestSnapServices_NodeSnapshotResources(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) serviceStore := NewXdsServiceStore() // fooA.bar:80 svcA := &v1.Service{ @@ -332,7 +332,7 @@ func TestSnapServices_NodeSnapshotResources(t *testing.T) { } assert.Equal(t, 2, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 2, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes // Client requesting more resources again should bring them back into the node snapshot assert.Equal(t, true, snapshotter.needToUpdateSnapshot(nodeID, resource.ListenerType, streamID, []string{"fooA.bar:80", "fooB.bar:80"})) if err := snapshotter.updateStreamNodeResources(nodeID, resource.ListenerType, streamID, []string{"fooA.bar:80", "fooB.bar:80"}); err != nil { @@ -366,7 +366,7 @@ func TestSnapServices_NodeSnapshotResources(t *testing.T) { } func TestSnapServices_MultipleStreams(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) serviceStore := NewXdsServiceStore() // fooA.bar:80 svcA := &v1.Service{ @@ -459,7 +459,7 @@ func TestSnapServices_MultipleStreams(t *testing.T) { } func TestSnapServices_SingleServiceWithAuthoritySet(t *testing.T) { - snapshotter := NewSnapshotter("test-authority", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("test-authority", uint(0), float64(0), float64(0), false) serviceStore := NewXdsServiceStore() svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -504,8 +504,12 @@ func TestSnapServices_SingleServiceWithAuthoritySet(t *testing.T) { } assert.Contains(t, expectedClusters, cluster.Name) } - expectedRoutes := []string{makeRouteConfigName("foo", "bar", int32(80)), makeXdstpRouteConfigName("foo", "bar", "test-authority", int32(80))} - assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) + expectedRoutes := []string{ + makeRouteConfigName("foo", "bar", int32(80)), + makeXdstpRouteConfigName("foo", "bar", "test-authority", int32(80)), + "all_kube_routes", + } + assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes for _, res := range snap.GetResources(resource.RouteType) { route, err := UnmarshalResourceToRouteConfiguration(res) if err != nil { diff --git a/xds/utils.go b/xds/utils.go index 37d4dd7..c3391af 100644 --- a/xds/utils.go +++ b/xds/utils.go @@ -286,3 +286,13 @@ func resourcesMatch(a, b []string) bool { } return true } + +func subscriptionKeys(s map[string]struct{}) []string { + keys := make([]string, len(s)) + i := 0 + for k := range s { + keys[i] = k + i++ + } + return keys +}