From e0da2b8a06afc608fe39eec29e33c8e42f0a439f Mon Sep 17 00:00:00 2001 From: ffilippopoulos Date: Wed, 23 Oct 2024 14:49:13 +0100 Subject: [PATCH] Add flag to run a dummy xDS server and envoy-sidecar configurer This adds a flag to add a dummy server mode, where all requested resources will be returned to clients with endpoints that point to a local port where an envoy sidecar is supposed to be running. It also adds a simple envoy configurer to produce envoy config for the sidecar --- .github/workflows/build.yaml | 14 ++ controller/controller_test.go | 32 ++--- envoy-sidecar/README.md | 7 + envoy-sidecar/configurer/.gitignore | 1 + envoy-sidecar/configurer/Dockerfile | 13 ++ envoy-sidecar/configurer/README.md | 12 ++ envoy-sidecar/configurer/configure.go | 120 ++++++++++++++++++ envoy-sidecar/configurer/go.mod | 3 + envoy-sidecar/configurer/main.go | 41 ++++++ .../configurer/templates/clusters.tmpl | 14 ++ .../configurer/templates/envoy-config.tmpl | 12 ++ .../configurer/templates/listeners.tmpl | 27 ++++ .../configurer/templates/xds-cluster.tmpl | 25 ++++ ...utate-semaphore-xds-envoy-sidecar.yaml.swp | Bin 0 -> 12288 bytes .../kyverno/mutate/kustomization.yaml | 4 + .../mutate-semaphore-xds-envoy-sidecar.yaml | 68 ++++++++++ main.go | 26 ++-- xds/endpoints.go | 6 + xds/service.go | 35 +++-- xds/snapshotter.go | 75 ++++++++++- xds/snapshotter_metrics_test.go | 2 +- xds/snapshotter_test.go | 20 +-- 22 files changed, 499 insertions(+), 58 deletions(-) create mode 100644 envoy-sidecar/README.md create mode 100644 envoy-sidecar/configurer/.gitignore create mode 100644 envoy-sidecar/configurer/Dockerfile create mode 100644 envoy-sidecar/configurer/README.md create mode 100644 envoy-sidecar/configurer/configure.go create mode 100644 envoy-sidecar/configurer/go.mod create mode 100644 envoy-sidecar/configurer/main.go create mode 100644 envoy-sidecar/configurer/templates/clusters.tmpl create mode 100644 envoy-sidecar/configurer/templates/envoy-config.tmpl create mode 100644 envoy-sidecar/configurer/templates/listeners.tmpl create mode 100644 envoy-sidecar/configurer/templates/xds-cluster.tmpl create mode 100644 envoy-sidecar/deploy/kustomize/kyverno/mutate/.mutate-semaphore-xds-envoy-sidecar.yaml.swp create mode 100644 envoy-sidecar/deploy/kustomize/kyverno/mutate/kustomization.yaml create mode 100644 envoy-sidecar/deploy/kustomize/kyverno/mutate/mutate-semaphore-xds-envoy-sidecar.yaml diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 110b7a4..eacbeb0 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -13,6 +13,7 @@ on: env: REGISTRY: quay.io IMAGE_NAME: ${{ github.repository }} + IMAGE_NAME_ENVOY_CONFIGURER: ${{ github.repository }}/envoy-sidecar/configurer jobs: docker: @@ -44,3 +45,16 @@ jobs: push: true tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} + - name: Extract envoy/configurer metadata (tags, labels) for Docker + id: meta-envoy-configurer + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME_ENVOY_CONFIGURER }} + - name: Build and push Docker image for envoy configurer + if: github.actor != 'dependabot[bot]' + uses: docker/build-push-action@v5 + with: + context: ./envoy-sidecar/configurer + push: true + tags: ${{ steps.meta-envoy-configurer.outputs.tags }} + labels: ${{ steps.meta-envoy-configurer.outputs.labels }} diff --git a/controller/controller_test.go b/controller/controller_test.go index 71a3cfe..56fbfd0 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -39,7 +39,7 @@ func TestReconcileServices_LabelledService(t *testing.T) { "./test-resources/labelled_service.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -82,7 +82,7 @@ func TestReconcileServices_LabelledServiceLbPolicy(t *testing.T) { "./test-resources/labelled_service_ring_hash_balancer.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -119,7 +119,7 @@ func TestReconcileServices_LabelledServiceInvalidLbPolicy(t *testing.T) { "./test-resources/labelled_service_invalid_balancer.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -156,7 +156,7 @@ func TestReconcileServices_XdsService(t *testing.T) { "./test-resources/xds_service.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -199,7 +199,7 @@ func TestReconcileServices_XdsServiceNotExistent(t *testing.T) { "./test-resources/xds_service_not_existent.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -233,7 +233,7 @@ func TestReconcileServices_XdsServiceDelete(t *testing.T) { "./test-resources/xds_service.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -284,7 +284,7 @@ func TestReconcileLocalEndpointSlice_SnapOnUpdate(t *testing.T) { "./test-resources/xds_service.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -318,7 +318,7 @@ func TestReconcileLocalEndpointSlice_NotFound(t *testing.T) { "./test-resources/endpointslice.yaml", ) client.EndpointSliceApiError(kubeerror.NewNotFound(schema.GroupResource{Resource: "endpointslice"}, "foo")) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -345,7 +345,7 @@ func TestReconcileLocalEndpointSlice_NonXdsService(t *testing.T) { client := kube.NewClientMock( "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, @@ -373,7 +373,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpoints(t *testing.T) { remoteClient := kube.NewClientMock( "./test-resources/endpointslice-remote.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( localClient, []kube.Client{remoteClient}, @@ -436,7 +436,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpoints_NoRemoteEndpoints(t *te remoteClient := kube.NewClientMock( "./test-resources/endpointslice-remote.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( localClient, []kube.Client{remoteClient}, @@ -487,7 +487,7 @@ func TestReconcileServices_XdsServiceWithOnlyRemoteEndpoints(t *testing.T) { remoteClient := kube.NewClientMock( "./test-resources/endpointslice-remote.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( localClient, []kube.Client{remoteClient}, @@ -539,7 +539,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpointsAndLocalPriority(t *test remoteClient := kube.NewClientMock( "./test-resources/endpointslice-remote.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( localClient, []kube.Client{remoteClient}, @@ -601,7 +601,7 @@ func TestReconcileServices_XdsServiceWithOnlyRemoteEndpointsAndLocalPriority(t * remoteClient := kube.NewClientMock( "./test-resources/endpointslice-remote.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( localClient, []kube.Client{remoteClient}, @@ -656,7 +656,7 @@ func TestReconcileLocalEndpointSlices_XdsServiceWithEmptyLocalEndpoints(t *testi remoteClient := kube.NewClientMock( "./test-resources/endpointslice-remote.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( localClient, []kube.Client{remoteClient}, @@ -700,7 +700,7 @@ func TestReconcileServices_XdsServiceWithRingHash(t *testing.T) { "./test-resources/xds_service_ring_hash_balancing.yaml", "./test-resources/endpointslice.yaml", ) - snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0)) + snapshotter := xds.NewSnapshotter("", testSnapshotterListenPort, float64(0), float64(0), false) controller := NewController( client, []kube.Client{}, diff --git a/envoy-sidecar/README.md b/envoy-sidecar/README.md new file mode 100644 index 0000000..931d725 --- /dev/null +++ b/envoy-sidecar/README.md @@ -0,0 +1,7 @@ +# Tooling to deploy envoy as a sidecar and use it with the xDS server + +This is in a very experimental stage and definitely not production ready. + +It includes a small app to be used as an initContainer to produce envoy config +based on a map of xDS services and local ports and a kustomize base to deploy a +Kyverno mutating rule for the initContainer and the envoy sidecar. diff --git a/envoy-sidecar/configurer/.gitignore b/envoy-sidecar/configurer/.gitignore new file mode 100644 index 0000000..0beebb8 --- /dev/null +++ b/envoy-sidecar/configurer/.gitignore @@ -0,0 +1 @@ +configurer diff --git a/envoy-sidecar/configurer/Dockerfile b/envoy-sidecar/configurer/Dockerfile new file mode 100644 index 0000000..5ecf0ec --- /dev/null +++ b/envoy-sidecar/configurer/Dockerfile @@ -0,0 +1,13 @@ +FROM golang:1.21-alpine AS build +WORKDIR /go/src/github.com/utilitywarehouse/semaphore-xds/envoy-sidecar/configurer +COPY . /go/src/github.com/utilitywarehouse/semaphore-xds/envoy-sidecar/configurer +ENV CGO_ENABLED=0 +RUN \ + apk --no-cache add git upx \ + && go build -ldflags='-s -w' -o /semaphore-xds-envoy-configurer . \ + && upx /semaphore-xds-envoy-configurer + +FROM alpine:3.18 +COPY --from=build /semaphore-xds-envoy-configurer /semaphore-xds-envoy-configurer +COPY --from=build /go/src/github.com/utilitywarehouse/semaphore-xds/envoy-sidecar/configurer/templates /templates +ENTRYPOINT [ "/semaphore-xds-envoy-configurer" ] diff --git a/envoy-sidecar/configurer/README.md b/envoy-sidecar/configurer/README.md new file mode 100644 index 0000000..ab26174 --- /dev/null +++ b/envoy-sidecar/configurer/README.md @@ -0,0 +1,12 @@ +# semaphore-xds-envoy-configurer + +Expects an environment variable named `ENVOY_SIDECAR_TARGETS` in the form of a +comma separated list of xDS listeners. For example: + +`ENVOY_SIDECAR_TARGETS=","` +`XDS_SERVER_ADDRESS`="semaphore-xds.sys-semaphore.svc.cluster.local" +`XDS_SERVER_PORT`="18000" + +It generates envoy config to point listeners on the specified local ports to +the respective xDS dynamic resources in order for envoy to be able to proxy +traffic to the configured services. diff --git a/envoy-sidecar/configurer/configure.go b/envoy-sidecar/configurer/configure.go new file mode 100644 index 0000000..c7680e9 --- /dev/null +++ b/envoy-sidecar/configurer/configure.go @@ -0,0 +1,120 @@ +package main + +import ( + "bytes" + "path" + "strings" + "text/template" +) + +type Listener struct { + RouteConfigName string +} + +type Cluster struct { + Name string +} + +type XdsCluster struct { + XdsServerAddress string + XdsServerPort string +} + +type EnvoyConfig struct { + NodeID string + ClusterID string + Listeners string + Clusters string + XdsCluster string +} + +func makeEnvoyConfig(nodeID, envoySidecarTargets, XdsServerAddress, XdsServerPort string) (string, error) { + listeners, clusters := extractConfigFromTargets(envoySidecarTargets) + // Generate Listeners Config + listenersTmplPath := "./templates/listeners.tmpl" + listenersTmplBase := path.Base(listenersTmplPath) + tmpl, err := template.New(listenersTmplBase).ParseFiles(listenersTmplPath) + if err != nil { + return "", err + } + var renderedListeners bytes.Buffer + err = tmpl.Execute(&renderedListeners, listeners) + if err != nil { + return "", err + } + // Generate Clusters Config + clustersTmplPath := "./templates/clusters.tmpl" + clustersTmplBase := path.Base(clustersTmplPath) + tmpl, err = template.New(clustersTmplBase).ParseFiles(clustersTmplPath) + if err != nil { + return "", err + } + var renderedClusters bytes.Buffer + err = tmpl.Execute(&renderedClusters, clusters) + if err != nil { + return "", err + } + // Generate XdsCluster Config + xdsCluster := XdsCluster{ + XdsServerAddress: XdsServerAddress, + XdsServerPort: XdsServerPort, + } + XdsClusterTmplPath := "./templates/xds-cluster.tmpl" + XdsClusterTmplBase := path.Base(XdsClusterTmplPath) + tmpl, err = template.New(XdsClusterTmplBase).ParseFiles(XdsClusterTmplPath) + if err != nil { + return "", err + } + var renderedXdsCluster bytes.Buffer + err = tmpl.Execute(&renderedXdsCluster, xdsCluster) + if err != nil { + return "", err + } + + // Generate the Envoy config + envoyConfig := EnvoyConfig{ + NodeID: nodeID, + ClusterID: nodeID, // needed by envoy, add node id here as a dummy value here + Listeners: renderedListeners.String(), + Clusters: renderedClusters.String(), + XdsCluster: renderedXdsCluster.String(), + } + envoyConfigTmplPath := "./templates/envoy-config.tmpl" + envoyConfigTmplBase := path.Base(envoyConfigTmplPath) + tmpl, err = template.New(envoyConfigTmplBase).ParseFiles(envoyConfigTmplPath) + if err != nil { + return "", err + } + var renderedEnvoyConfig bytes.Buffer + err = tmpl.Execute(&renderedEnvoyConfig, envoyConfig) + if err != nil { + return "", err + } + return renderedEnvoyConfig.String(), nil +} + +// List expected upstream listeners in the form: +// ,, +// From this we should extract the xds addresses as listerner and route config +// names. +// XdsAddress is expected in the name that semaphore-xds would configure +// listeners: service.namespace:port and should be copied as is to the listener +// and routeConfig names. Clusters names should be derived from the above and +// follow the form: service.namespace.port to comply with the xds naming +// limitations and how semaphore-xds configures cluster names. +func extractConfigFromTargets(envoySidecarTargets string) ([]Listener, []Cluster) { + listeners := []Listener{} + for _, target := range strings.Split(envoySidecarTargets, ",") { + listeners = append(listeners, Listener{ + RouteConfigName: target, + }) + } + clusters := []Cluster{} + for _, l := range listeners { + clusterName := strings.Join(strings.Split(l.RouteConfigName, ":"), ".") + clusters = append(clusters, Cluster{ + Name: clusterName, + }) + } + return listeners, clusters +} diff --git a/envoy-sidecar/configurer/go.mod b/envoy-sidecar/configurer/go.mod new file mode 100644 index 0000000..070db01 --- /dev/null +++ b/envoy-sidecar/configurer/go.mod @@ -0,0 +1,3 @@ +module github.com/utilitywarehouse/semaphore-xds/envoy-sidecar/configurer + +go 1.21.3 diff --git a/envoy-sidecar/configurer/main.go b/envoy-sidecar/configurer/main.go new file mode 100644 index 0000000..194cc1d --- /dev/null +++ b/envoy-sidecar/configurer/main.go @@ -0,0 +1,41 @@ +package main + +import ( + "flag" + "fmt" + "os" +) + +var ( + flagEnvoyNodeId = flag.String("envoy-node-id", getEnv("ENVOY_NODE_ID", ""), "Node id to configure for envoy sidecar") + flagEnvoySidecarTargets = flag.String("envoy-sidecar-targets", getEnv("ENVOY_SIDECAR_TARGETS", ""), "Comma separated list of listeners to get xDS config for: ,") + flagXdsServerAddress = flag.String("xds-server-address", getEnv("XDS_SERVER_ADDRESS", ""), "The address of the xds server for envoy sidecar to fetch config") + flagXdsServerPort = flag.String("xds-server-port", getEnv("XDS_SERVER_PORT", ""), "The port of the xds server for envoy sidecar to fetch config") +) + +func usage() { + flag.Usage() + os.Exit(1) +} + +func getEnv(key, defaultValue string) string { + value := os.Getenv(key) + if len(value) == 0 { + return defaultValue + } + return value +} + +func main() { + flag.Parse() + + if *flagEnvoyNodeId == "" || *flagEnvoySidecarTargets == "" || *flagXdsServerAddress == "" || *flagXdsServerPort == "" { + usage() + } + c, err := makeEnvoyConfig(*flagEnvoyNodeId, *flagEnvoySidecarTargets, *flagXdsServerAddress, *flagXdsServerPort) + if err != nil { + fmt.Print(err) + os.Exit(1) + } + fmt.Print(c) +} diff --git a/envoy-sidecar/configurer/templates/clusters.tmpl b/envoy-sidecar/configurer/templates/clusters.tmpl new file mode 100644 index 0000000..435cb99 --- /dev/null +++ b/envoy-sidecar/configurer/templates/clusters.tmpl @@ -0,0 +1,14 @@ +{{- range . }} + - name: {{ .Name }} + http2_protocol_options: {} + type: EDS + eds_cluster_config: + eds_config: + resource_api_version: V3 + api_config_source: + api_type: GRPC + transport_api_version: V3 + grpc_services: + - envoy_grpc: + cluster_name: xds_cluster +{{- end }} diff --git a/envoy-sidecar/configurer/templates/envoy-config.tmpl b/envoy-sidecar/configurer/templates/envoy-config.tmpl new file mode 100644 index 0000000..9043a4d --- /dev/null +++ b/envoy-sidecar/configurer/templates/envoy-config.tmpl @@ -0,0 +1,12 @@ +admin: + address: + socket_address: { address: 127.0.0.1, port_value: 9901 } +node: + id: {{ .NodeID }} + cluster: {{ .ClusterID }} +static_resources: + listeners: +{{- .Listeners }} + clusters: +{{- .Clusters }} +{{- .XdsCluster }} diff --git a/envoy-sidecar/configurer/templates/listeners.tmpl b/envoy-sidecar/configurer/templates/listeners.tmpl new file mode 100644 index 0000000..3c8efcc --- /dev/null +++ b/envoy-sidecar/configurer/templates/listeners.tmpl @@ -0,0 +1,27 @@ + - name: listener_0 + address: + socket_address: { address: 127.0.0.1, port_value: 18001 } + filter_chains: + - filters: + {{- range . }} + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http + codec_type: AUTO + rds: + route_config_name: {{ .RouteConfigName }} + config_source: + resource_api_version: V3 + api_config_source: + api_type: GRPC + transport_api_version: V3 + grpc_services: + - envoy_grpc: + cluster_name: xds_cluster + http_filters: + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + dynamic_stats: true + {{- end }} diff --git a/envoy-sidecar/configurer/templates/xds-cluster.tmpl b/envoy-sidecar/configurer/templates/xds-cluster.tmpl new file mode 100644 index 0000000..20f0f08 --- /dev/null +++ b/envoy-sidecar/configurer/templates/xds-cluster.tmpl @@ -0,0 +1,25 @@ + - name: xds_cluster + connect_timeout: 0.25s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: + connection_keepalive: + interval: 30s + timeout: 5s + upstream_connection_options: + # configure a TCP keep-alive to detect and reconnect to the admin + # server in the event of a TCP socket half open connection + tcp_keepalive: {} + load_assignment: + cluster_name: xds_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: {{ .XdsServerAddress }} + port_value: {{ .XdsServerPort }} diff --git a/envoy-sidecar/deploy/kustomize/kyverno/mutate/.mutate-semaphore-xds-envoy-sidecar.yaml.swp b/envoy-sidecar/deploy/kustomize/kyverno/mutate/.mutate-semaphore-xds-envoy-sidecar.yaml.swp new file mode 100644 index 0000000000000000000000000000000000000000..cf83a1c341339e40acf45003db5385968c9ee26b GIT binary patch literal 12288 zcmeI2O^6&t6vr!pXc7|@y~JA?KLSd3??we3aADaQV=$XtW;TmKWUcA0ovGdKuC}Xs zcE``Fr|89-9z+!M;LVeIQ^-*iNx-}0?nw~zU;VMu^O4QwplGOuAI)~ZdhgZ0Usdh0 zuRFJXewjY!&J$es6Y}Htr+V_2t7PlN$H}G?1F6i5{jW_P$dv7GW8S_~IP`kbQ>pLu zgzjfu*O!r(X%PxN++iv2%S>@k@rWgTnQ~`0P#%v5a_A@#aG#}Kz>`o8z3ohC8HqQz zw>=#2G?reJX{LF3bH-daV=mkwi^5yyYb~q-ca{Pt$x~270}5c@-8e6msZX4erV!e{%CJj0jq#jz$#!BunJfO ztO8a6tAJI&Dqt0`3j7ZhU;!Z?9wFqryO2Eo|6l$6|J!|p`~Sa z@I0X4F!=d6A)kTwz!o?Rt{o%f1JDQOz#KRQj)32f67nUu0t8qDPl99M*SiV%8hipW za2Y%ePJln4$#r0~`PS%TFROr6z$#!B_-`mc$1WZvde{_cgTDDTslRb(%p%^PMGu%S z<4w`484P5YMO-y%kE+p;%woODbiYA8uKl7BZOZNz{*ALD#cpo`%S8!9qBGPf!4|YBNKun7H#-}hR(rkG-dJ5}uT9yZbP7jf?ToG(s!@ZZ zs|LR7hv3gj;Mj+pSJ#%9Q5Urr=>vRDQ%Y3z%jui(`onOCe+R${F-! zx_ddf*^Ws`^Jt=3)6u9{uGN`WXU55+wE5;u1$-fu(e%q|Hm!^6E0;Gq=bNpi#rDS9 zV*6Zc?RF+)KEXei*Tt1)YvX)#A^{xDICQDqT3l=G%|Rv{nVAx<8?DM<&thA|!QRW1E19K!bvoox&A>`4 zQ;Mmq35pN;-c&9N;(=0587xQoJ3Z+HdTZxP9`ci_mg^gm9g~}9Tm-*Yh6bD66 zJ*$g5T7qtwu$Hu3W7HT5EUAEYhxA~|R_n8LwzsiU>}pi^IW0GT8gm}NF`}V8yEsYB z5BW(#duifRjEymWBW>yJU@24|m!kQb78)YZ4w}RB1Wd`5{?__J(Ih|Qkby#NDwr^h zXMv)jDTc5dnvcj$1y^p_Aa0gTz?Gkh#H87v3(igLRMs_Lvq7uUnB~kjH7-*exx39` z-- + This policy ensures that pods labelled with + xds.semaphore.uw.systems/envoy-sidecar: "true" are injected with the init + container and the sidecar needed to use envoy to proxy grpc connections + to targets specified by xds.semaphore.uw.systems/envoy-sidecar-targets + annotation. +spec: + background: false + mutateExistingOnPolicyUpdate: false + rules: + - name: xds-clients-inject-envoy-sidecar + match: + resources: + kinds: + - Pod + operations: + - CREATE + selector: + matchLabels: + xds.semaphore.uw.systems/envoy-sidecar: "true" + mutate: + patchStrategicMerge: + spec: + initContainers: + - name: envoy-configurer + image: quay.io/utilitywarehouse/semaphore-xds/envoy-sidecar/configurer:envoy-sidecar + imagePullPolicy: Always + env: + - name: ENVOY_NODE_ID + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: ENVOY_SIDECAR_TARGETS + valueFrom: + fieldRef: + fieldPath: metadata.annotations['xds.semaphore.uw.systems/envoy-sidecar-targets'] + - name: XDS_SERVER_ADDRESS + value: semaphore-xds.sys-semaphore.svc.cluster.local + - name: XDS_SERVER_PORT + value: "18000" + command: + - /bin/sh + - -c + - | + /semaphore-xds-envoy-configurer > /etc/envoy/config.yaml + volumeMounts: + - name: envoy-config + mountPath: /etc/envoy + containers: + - name: envoy-sidecar + image: envoyproxy/envoy:v1.29.1 + imagePullPolicy: Always + args: + - -c /etc/envoy/config.yaml + volumeMounts: + - name: envoy-config + mountPath: /etc/envoy + volumes: + - name: envoy-config + emptyDir: {} diff --git a/main.go b/main.go index bda1258..10da893 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ var ( flagNamespace = flag.String("namespace", getEnv("SXDS_NAMESPACE", ""), "The namespace in which to watch for kubernetes resources") flagLabelSelector = flag.String("label-selector", getEnv("SXDS_LABEL_SELECTOR", "xds.semaphore.uw.systems/enabled=true"), "Label selector for watched kubernetes resources") flagLbPolicyLabel = flag.String("lb-policy-selector", getEnv("SXDS_LB_POLICY_SELECTOR", "xds.semaphore.uw.systems/lb-policy"), "Label to allow user to configure the lb policy for a Service clusters") + flagLocalhostEndpoints = flag.Bool("localhost-endpoints", false, "If enabled the server will create configuration with dummy endpoints on 127.0.0.1:18001 for all requested listeners and clusters") flagServerListenPort = flag.Uint("server-listen-port", 18000, "xDS server listen port") flagMaxRequestsPerSecond = flag.Float64("max-requests-per-second", 500.0, "maximum allowed requests to the server per second") flagMaxPeerRequestsPerSecond = flag.Float64("max-peer-requests-per-second", 50.0, "maximum allowed requests from a peer per second") @@ -53,24 +54,25 @@ func main() { controller.LbPolicyLabel = *flagLbPolicyLabel localClient, remoteClients := createClientsFromConfig(*flagClustersConfigPath) - snapshotter := xds.NewSnapshotter(*flagAuthorityName, *flagServerListenPort, *flagMaxRequestsPerSecond, *flagMaxPeerRequestsPerSecond) + snapshotter := xds.NewSnapshotter(*flagAuthorityName, *flagServerListenPort, *flagMaxRequestsPerSecond, *flagMaxPeerRequestsPerSecond, *flagLocalhostEndpoints) xds.InitSnapMetricsCollector(snapshotter) go serveMetrics(fmt.Sprintf(":%s", *flagMetricsListenPort)) go servePprof(*flagPprofListenAddress) - controller := controller.NewController( - localClient, - remoteClients, - *flagNamespace, - *flagLabelSelector, - snapshotter, - 0, - ) - controller.Run() + if !*flagLocalhostEndpoints { + controller := controller.NewController( + localClient, + remoteClients, + *flagNamespace, + *flagLabelSelector, + snapshotter, + 0, + ) + controller.Run() + defer controller.Stop() + } snapshotter.ListenAndServe() - - controller.Stop() } func serveMetrics(address string) { diff --git a/xds/endpoints.go b/xds/endpoints.go index b6b4953..2eac12f 100644 --- a/xds/endpoints.go +++ b/xds/endpoints.go @@ -230,3 +230,9 @@ func endpointSlicesToClusterLoadAssignments(endpointStore XdsEndpointStore, auth } return eds, nil } + +func localhostClusterLoadAssignment(clusterName string) *endpointv3.ClusterLoadAssignment { + endpoints := []*endpointv3.LbEndpoint{lbEndpoint("127.0.0.1", int32(18001), true)} + localityEndpoint := []*endpointv3.LocalityLbEndpoints{localityEndpoints(endpoints, "localhost", "localhost", uint32(0))} + return clusterLoadAssignment(clusterName, localityEndpoint) +} diff --git a/xds/service.go b/xds/service.go index 9f549d4..9cece5a 100644 --- a/xds/service.go +++ b/xds/service.go @@ -93,6 +93,9 @@ func makeRouteConfig(name, namespace, authority string, port int32, retry *route virtualHostName = makeXdstpVirtualHostName(name, namespace, authority, port) domains = append(domains, virtualHostName) } + return routeConfig(routeName, clusterName, virtualHostName, domains, retry, hashPolicies) +} +func routeConfig(routeName, clusterName, virtualHostName string, domains []string, retry *routev3.RetryPolicy, hashPolicies []*routev3.RouteAction_HashPolicy) *routev3.RouteConfiguration { return &routev3.RouteConfiguration{ Name: routeName, VirtualHosts: []*routev3.VirtualHost{ @@ -142,6 +145,10 @@ func makeListener(name, namespace, authority string, port int32, manager *anypb. if authority != "" { listenerName = makeXdstpListenerName(name, namespace, authority, port) } + return listener(listenerName, manager) +} + +func listener(listenerName string, manager *anypb.Any) *listenerv3.Listener { return &listenerv3.Listener{ Name: listenerName, ApiListener: &listenerv3.ApiListener{ @@ -155,19 +162,8 @@ func makeCluster(name, namespace, authority string, port int32, policy clusterv3 if authority != "" { clusterName = makeXdstpClusterName(name, namespace, authority, port) } - cluster := &clusterv3.Cluster{ - Name: clusterName, - ClusterDiscoveryType: &clusterv3.Cluster_Type{Type: clusterv3.Cluster_EDS}, - LbPolicy: policy, - EdsClusterConfig: &clusterv3.Cluster_EdsClusterConfig{ - EdsConfig: &corev3.ConfigSource{ - ConfigSourceSpecifier: &corev3.ConfigSource_Ads{ - Ads: &corev3.AggregatedConfigSource{}, - }, - }, - }, - } + cluster := cluster(clusterName, policy) if authority != "" { // This will be the name of the subsequently requested ClusterLoadAssignment. We need to set this // to the cluster name to hit resources in the cache and reply to EDS requests @@ -184,6 +180,21 @@ func makeCluster(name, namespace, authority string, port int32, policy clusterv3 return cluster } +func cluster(clusterName string, policy clusterv3.Cluster_LbPolicy) *clusterv3.Cluster { + return &clusterv3.Cluster{ + Name: clusterName, + ClusterDiscoveryType: &clusterv3.Cluster_Type{Type: clusterv3.Cluster_EDS}, + LbPolicy: policy, + EdsClusterConfig: &clusterv3.Cluster_EdsClusterConfig{ + EdsConfig: &corev3.ConfigSource{ + ConfigSourceSpecifier: &corev3.ConfigSource_Ads{ + Ads: &corev3.AggregatedConfigSource{}, + }, + }, + }, + } +} + // servicesToResources will return a set of listener, routeConfiguration and // cluster for each service port func servicesToResources(serviceStore XdsServiceStore, authority string) ([]types.Resource, []types.Resource, []types.Resource, error) { diff --git a/xds/snapshotter.go b/xds/snapshotter.go index 14f6a17..b2dcd61 100644 --- a/xds/snapshotter.go +++ b/xds/snapshotter.go @@ -9,7 +9,9 @@ import ( "sync/atomic" "time" + clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3" @@ -59,6 +61,7 @@ type Snapshotter struct { streams sync.Map // map of open streams nodes sync.Map // maps all clients node ids to requested resources that will be snapshotted and served snapNodesMu sync.Mutex // Simple lock to avoid deleting a node while snapshotting + localhostEndpoints bool } // Node keeps the info for a node. Each node can have multiple open streams, @@ -127,7 +130,7 @@ func mapTypeURL(typeURL string) string { } // NewSnapshotter needs a grpc server port and the allowed requests limits per server and stream per second -func NewSnapshotter(authority string, port uint, requestLimit, streamRequestLimit float64) *Snapshotter { +func NewSnapshotter(authority string, port uint, requestLimit, streamRequestLimit float64, localhostEndpoints bool) *Snapshotter { servicesCache := cache.NewSnapshotCache(false, cache.IDHash{}, log.EnvoyLogger) endpointsCache := cache.NewSnapshotCache(false, cache.IDHash{}, log.EnvoyLogger) // This could be a linear cache? https://pkg.go.dev/github.com/envoyproxy/go-control-plane/pkg/cache/v3#LinearCache muxCache := cache.MuxCache{ @@ -150,6 +153,7 @@ func NewSnapshotter(authority string, port uint, requestLimit, streamRequestLimi muxCache: muxCache, requestRateLimit: rate.NewLimiter(rate.Limit(requestLimit), 1), streamRequestPerSecond: streamRequestLimit, + localhostEndpoints: localhostEndpoints, } } @@ -506,9 +510,18 @@ func (s *Snapshotter) updateNodeStreamResources(nodeID, typeURL string, streamID return fmt.Errorf("Cannot find service resources to update for node: %s in stream: %d context", nodeID, streamID) } - newSnapResources, err := s.getResourcesFromCache(typeURL, resources) - if err != nil { - return fmt.Errorf("Cannot get resources from cache: %s", err) + var newSnapResources []types.Resource + var err error + if s.localhostEndpoints { + newSnapResources, err = s.makeDummyResources(typeURL, resources) + if err != nil { + return fmt.Errorf("Cannot make dummy resources for localhost mode: %s", err) + } + } else { + newSnapResources, err = s.getResourcesFromCache(typeURL, resources) + if err != nil { + return fmt.Errorf("Cannot get resources from cache: %s", err) + } } nodeResources[streamID].services[typeURL] = newSnapResources @@ -537,9 +550,18 @@ func (s *Snapshotter) updateNodeStreamEndpointsResources(nodeID, typeURL string, return fmt.Errorf("Cannot find endpoint resources to update for node: %s in stream: %d context", nodeID, streamID) } - newSnapResources, err := s.getResourcesFromCache(typeURL, resources) - if err != nil { - return fmt.Errorf("Cannot get resources from cache: %s", err) + var newSnapResources []types.Resource + var err error + if s.localhostEndpoints { + newSnapResources, err = s.makeDummyResources(typeURL, resources) + if err != nil { + return fmt.Errorf("Cannot make dummy resources for localhost mode: %s", err) + } + } else { + newSnapResources, err = s.getResourcesFromCache(typeURL, resources) + if err != nil { + return fmt.Errorf("Cannot get resources from cache: %s", err) + } } nodeResources[streamID].endpoints[typeURL] = newSnapResources @@ -609,6 +631,9 @@ func (s *Snapshotter) updateStreamNodeResources(nodeID, typeURL string, streamID // needToUpdateSnapshot checks id a node snapshot needs updating based on the requested resources // from the client inside a streams context func (s *Snapshotter) needToUpdateSnapshot(nodeID, typeURL string, streamID int64, resources []string) bool { + if s.localhostEndpoints { + return true + } // Always update snapshot with the requested resources if we are on dummy mode n, ok := s.nodes.Load(nodeID) if !ok { return false @@ -628,6 +653,42 @@ func (s *Snapshotter) needToUpdateSnapshot(nodeID, typeURL string, streamID int6 return false } +// makeDummyResources blindly creates default configuration resources to snapshot based on the requested names. +// For endpoints it will create a single localhost one +func (s *Snapshotter) makeDummyResources(typeURL string, resources []string) ([]types.Resource, error) { + res := []types.Resource{} + if typeURL == resource.ListenerType { + for _, r := range resources { + clusterName := strings.Replace(r, ":", ".", 1) // Replace expected format of name.namespace:port -> name.namespace.port to match cluster names due to xds naming + routeConfig := routeConfig(r, clusterName, r, []string{r}, nil, []*routev3.RouteAction_HashPolicy{}) + manager, err := makeManager(routeConfig) + if err != nil { + log.Logger.Error("Cannot create listener manager", "error", err) + continue + } + res = append(res, listener(r, manager)) + } + } + if typeURL == resource.RouteType { + for _, r := range resources { + clusterName := strings.Replace(r, ":", ".", 1) // Replace expected format of name.namespace:port -> name.namespace.port to match cluster names due to xds naming + res = append(res, routeConfig(r, clusterName, r, []string{r}, nil, []*routev3.RouteAction_HashPolicy{})) + } + } + if typeURL == resource.ClusterType { + for _, r := range resources { + res = append(res, cluster(r, clusterv3.Cluster_ROUND_ROBIN)) + } + } + if typeURL == resource.EndpointType { + for _, r := range resources { + res = append(res, localhostClusterLoadAssignment(r)) + } + } + + return []types.Resource{}, nil +} + // ListenAndServeFromCache will start an xDS server at the given port and serve // snapshots from the given cache func (s *Snapshotter) ListenAndServe() { diff --git a/xds/snapshotter_metrics_test.go b/xds/snapshotter_metrics_test.go index 7262104..43b4eec 100644 --- a/xds/snapshotter_metrics_test.go +++ b/xds/snapshotter_metrics_test.go @@ -131,7 +131,7 @@ func TestSnapMetricsCollector(t *testing.T) { log.InitLogger("test-semaphore-xds-metrics", "debug") for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) snapshotter.SnapServices(tt.serviceStore) snapshotter.SnapEndpoints(tt.endpointStore) snapshotter.addOrUpdateNode(tt.nodeID, tt.nodeAddress, tt.streamID) diff --git a/xds/snapshotter_test.go b/xds/snapshotter_test.go index bde2bcf..1dee356 100644 --- a/xds/snapshotter_test.go +++ b/xds/snapshotter_test.go @@ -20,7 +20,7 @@ func init() { } func TestSnapServices_EmptyServiceList(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) serviceStore := NewXdsServiceStore() snapshotter.SnapServices(serviceStore) snap, err := snapshotter.servicesCache.GetSnapshot(testNodeID) @@ -34,7 +34,7 @@ func TestSnapServices_EmptyServiceList(t *testing.T) { } func TestSnapServices_SingleService(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) serviceStore := NewXdsServiceStore() svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -67,7 +67,7 @@ func TestSnapServices_SingleService(t *testing.T) { } func TestSnapServices_NoServicePorts(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) serviceStore := NewXdsServiceStore() svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -91,7 +91,7 @@ func TestSnapServices_NoServicePorts(t *testing.T) { } func TestSnapServices_MultipleServicePorts(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) serviceStore := NewXdsServiceStore() svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -128,7 +128,7 @@ func TestSnapServices_MultipleServicePorts(t *testing.T) { } func TestSnapEndpoints_EmptyEndpointStore(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) endpointStore := NewXdsEnpointStore() snapshotter.SnapEndpoints(endpointStore) snap, err := snapshotter.endpointsCache.GetSnapshot(testNodeID) @@ -140,7 +140,7 @@ func TestSnapEndpoints_EmptyEndpointStore(t *testing.T) { } func TestSnapEndpoints_MissingServiceOwnershipLabel(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) log.InitLogger("test-semaphore-xds", "debug") endpointSlice := &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ @@ -159,7 +159,7 @@ func TestSnapEndpoints_MissingServiceOwnershipLabel(t *testing.T) { } func TestSnapEndpoints_UpdateAddress(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) // Create test EndpointSlice httpPortName := "http" httpPortValue := int32(80) @@ -242,7 +242,7 @@ func TestSnapEndpoints_UpdateAddress(t *testing.T) { } func TestSnapServices_NodeSnapshotResources(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) serviceStore := NewXdsServiceStore() // fooA.bar:80 svcA := &v1.Service{ @@ -366,7 +366,7 @@ func TestSnapServices_NodeSnapshotResources(t *testing.T) { } func TestSnapServices_MultipleStreams(t *testing.T) { - snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("", uint(0), float64(0), float64(0), false) serviceStore := NewXdsServiceStore() // fooA.bar:80 svcA := &v1.Service{ @@ -459,7 +459,7 @@ func TestSnapServices_MultipleStreams(t *testing.T) { } func TestSnapServices_SingleServiceWithAuthoritySet(t *testing.T) { - snapshotter := NewSnapshotter("test-authority", uint(0), float64(0), float64(0)) + snapshotter := NewSnapshotter("test-authority", uint(0), float64(0), float64(0), false) serviceStore := NewXdsServiceStore() svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{