From cae2e01b0500c41c8cc053037261bf12c1174121 Mon Sep 17 00:00:00 2001 From: ffilippopoulos Date: Thu, 12 Dec 2024 20:21:59 +0000 Subject: [PATCH] Try a kube_on_demand route config to trigger vhds --- controller/controller_test.go | 26 ++++++++++++------------ xds/service.go | 35 ++++++++++++++++++++++++++++++--- xds/snapshotter.go | 1 + xds/snapshotter_metrics_test.go | 4 ++-- xds/snapshotter_test.go | 14 ++++++------- 5 files changed, 55 insertions(+), 25 deletions(-) diff --git a/controller/controller_test.go b/controller/controller_test.go index 316ab56..293d2c7 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -60,7 +60,7 @@ func TestReconcileServices_LabelledService(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand // Verify the default round robin policy is set on the clusters for _, cl := range snap.GetResources(resource.ClusterType) { cluster, err := xds.UnmarshalResourceToCluster(cl) @@ -103,7 +103,7 @@ func TestReconcileServices_LabelledServiceLbPolicy(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand // Verify the correct lb policy (ring hash) is set on the clusters for _, cl := range snap.GetResources(resource.ClusterType) { cluster, err := xds.UnmarshalResourceToCluster(cl) @@ -140,7 +140,7 @@ func TestReconcileServices_LabelledServiceInvalidLbPolicy(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand // Verify the default round robin policy is set on the clusters for _, cl := range snap.GetResources(resource.ClusterType) { cluster, err := xds.UnmarshalResourceToCluster(cl) @@ -177,7 +177,7 @@ func TestReconcileServices_XdsService(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand // Verify the default round robin policy is set on the clusters for _, cl := range snap.GetResources(resource.ClusterType) { cluster, err := xds.UnmarshalResourceToCluster(cl) @@ -220,7 +220,7 @@ func TestReconcileServices_XdsServiceNotExistent(t *testing.T) { } assert.Equal(t, 0, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 0, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 0, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) // kube_on_demand snap, err = snapshotter.EndpointsSnapshot(testNodeID) if err != nil { t.Fatal(err) @@ -254,7 +254,7 @@ func TestReconcileServices_XdsServiceDelete(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand // Verify we will have one Endpoint resource in the snapshot snap, err = snapshotter.EndpointsSnapshot(testNodeID) if err != nil { @@ -271,7 +271,7 @@ func TestReconcileServices_XdsServiceDelete(t *testing.T) { } assert.Equal(t, 0, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 0, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 0, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) // kube_on_demand snap, err = snapshotter.EndpointsSnapshot(testNodeID) if err != nil { t.Fatal(err) @@ -304,7 +304,7 @@ func TestReconcileLocalEndpointSlice_SnapOnUpdate(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand snap, err = snapshotter.EndpointsSnapshot(testNodeID) if err != nil { t.Fatal(err) @@ -394,7 +394,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpoints(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand // Verify we will have 1 Endpoint resource in the snapshot containing // addresses from both local(2) and remote(2). 4 lbEndpoint addresses in // total. Also verify that all priorities are set to 0. @@ -457,7 +457,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpoints_NoRemoteEndpoints(t *te } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand // Verify we will have 1 Endpoint resource in the snapshot containing // only local client addresses. snap, err = snapshotter.EndpointsSnapshot(testNodeID) @@ -508,7 +508,7 @@ func TestReconcileServices_XdsServiceWithOnlyRemoteEndpoints(t *testing.T) { } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand // Verify we will have 1 Endpoint resource in the snapshot containing // only remote addresses (2). snap, err = snapshotter.EndpointsSnapshot(testNodeID) @@ -560,7 +560,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpointsAndLocalPriority(t *test } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand // Verify we will have 1 Endpoint resource in the snapshot containing // addresses for local endpoints with priority 0 and for remote ones // with priority 1. @@ -622,7 +622,7 @@ func TestReconcileServices_XdsServiceWithOnlyRemoteEndpointsAndLocalPriority(t * } assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand // Verify we will have 1 Endpoint resource in the snapshot containing // addresses for remote endpoints with priority 0, regardless of // PriorityStrategy set to local-first. diff --git a/xds/service.go b/xds/service.go index 816e002..a1b0802 100644 --- a/xds/service.go +++ b/xds/service.go @@ -165,6 +165,34 @@ func makeAllKubeServicesRouteConfig(serviceStore XdsServiceStore) *routev3.Route return nil } +// makeOnDemandRouteConfig returns a config that will trigger on-demand follow +// up queries for virtual hosts. +func makeOnDemandRouteConfig() *routev3.RouteConfiguration { + return &routev3.RouteConfiguration{ + Name: "kube_on_demand", + Vhds: &routev3.Vhds{ + ConfigSource: &corev3.ConfigSource{ + ResourceApiVersion: corev3.ApiVersion_V3, // Explicitly set to V3 + ConfigSourceSpecifier: &corev3.ConfigSource_ApiConfigSource{ + ApiConfigSource: &corev3.ApiConfigSource{ + ApiType: corev3.ApiConfigSource_DELTA_GRPC, // Use DELTA_GRPC as required + TransportApiVersion: corev3.ApiVersion_V3, // Transport version + GrpcServices: []*corev3.GrpcService{ + { + TargetSpecifier: &corev3.GrpcService_EnvoyGrpc_{ + EnvoyGrpc: &corev3.GrpcService_EnvoyGrpc{ + ClusterName: "xds_cluster", + }, + }, + }, + }, + }, + }, + }, + }, + } +} + func makeManager(routeConfig *routev3.RouteConfiguration) (*anypb.Any, error) { router, _ := anypb.New(&routerv3.Router{}) return anypb.New(&managerv3.HttpConnectionManager{ @@ -259,8 +287,9 @@ func servicesToResources(serviceStore XdsServiceStore, authority string) ([]type cls = append(cls, cluster) } } - if allKubeRoutes := makeAllKubeServicesRouteConfig(serviceStore); allKubeRoutes != nil { - rds = append(rds, allKubeRoutes) - } + //if allKubeRoutes := makeAllKubeServicesRouteConfig(serviceStore); allKubeRoutes != nil { + // rds = append(rds, allKubeRoutes) + //} + rds = append(rds, makeOnDemandRouteConfig()) return cls, rds, lsnr, nil } diff --git a/xds/snapshotter.go b/xds/snapshotter.go index e9b1d2b..256cb35 100644 --- a/xds/snapshotter.go +++ b/xds/snapshotter.go @@ -873,6 +873,7 @@ func registerServices(grpcServer *grpc.Server, xdsServer xds.Server) { endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, xdsServer) clusterservice.RegisterClusterDiscoveryServiceServer(grpcServer, xdsServer) routeservice.RegisterRouteDiscoveryServiceServer(grpcServer, xdsServer) + routeservice.RegisterVirtualHostDiscoveryServiceServer(grpcServer, xdsServer) listenerservice.RegisterListenerDiscoveryServiceServer(grpcServer, xdsServer) } diff --git a/xds/snapshotter_metrics_test.go b/xds/snapshotter_metrics_test.go index da29594..7822f68 100644 --- a/xds/snapshotter_metrics_test.go +++ b/xds/snapshotter_metrics_test.go @@ -120,9 +120,9 @@ func TestSnapMetricsCollector(t *testing.T) { fmt.Sprintf(`semaphore_xds_snapshot_listener{name="%s",node_id="%s",route_config="%s",type="%s"} 1`, expectHttpListenerName, "test-node", expectHttpRouteName, resource.ListenerType), fmt.Sprintf(`semaphore_xds_snapshot_listener{name="%s",node_id="%s",route_config="%s",type="%s"} 1`, expectHttpsListenerName, "test-node", expectHttpsRouteName, resource.ListenerType), fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpClusterName, expectHttpDomains, expectHttpRouteName, EmptyNodeID, resource.RouteType, expectHttpVhost), - fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpClusterName, expectHttpDomains, "all_kube_routes", EmptyNodeID, resource.RouteType, expectHttpVhost), + fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpClusterName, expectHttpDomains, "kube_on_demand", EmptyNodeID, resource.RouteType, expectHttpVhost), fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpsClusterName, expectHttpsDomains, expectHttpsRouteName, EmptyNodeID, resource.RouteType, expectHttpsVhost), - fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpsClusterName, expectHttpsDomains, "all_kube_routes", EmptyNodeID, resource.RouteType, expectHttpsVhost), + fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpsClusterName, expectHttpsDomains, "kube_on_demand", EmptyNodeID, resource.RouteType, expectHttpsVhost), fmt.Sprintf(`semaphore_xds_snapshot_cluster{discovery_type="eds",lb_policy="round_robin",name="%s",node_id="%s",type="%s"} 1`, expectHttpClusterName, EmptyNodeID, resource.ClusterType), fmt.Sprintf(`semaphore_xds_snapshot_cluster{discovery_type="eds",lb_policy="round_robin",name="%s",node_id="%s",type="%s"} 1`, expectHttpsClusterName, EmptyNodeID, resource.ClusterType), fmt.Sprintf(`semaphore_xds_snapshot_endpoint{cluster_name="%s",health_status="healthy",lb_address="%s",locality_subzone="foo-xzf",locality_zone="test",node_id="%s",priority="0",type="%s"} 1`, expectHttpClusterName, "10.2.1.1:80", EmptyNodeID, resource.EndpointType), diff --git a/xds/snapshotter_test.go b/xds/snapshotter_test.go index e8e5768..4222c4a 100644 --- a/xds/snapshotter_test.go +++ b/xds/snapshotter_test.go @@ -30,7 +30,7 @@ func TestSnapServices_EmptyServiceList(t *testing.T) { // Verify that our snapshot will be empty assert.Equal(t, 0, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 0, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 0, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) // kube_on_demand } func TestSnapServices_SingleService(t *testing.T) { @@ -63,7 +63,7 @@ func TestSnapServices_SingleService(t *testing.T) { // cluster assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes + assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand } func TestSnapServices_NoServicePorts(t *testing.T) { @@ -87,7 +87,7 @@ func TestSnapServices_NoServicePorts(t *testing.T) { // Verify that our snapshot will be empty assert.Equal(t, 0, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 0, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 0, len(snap.GetResources(resource.RouteType))) + assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) // kube_on_demand } func TestSnapServices_MultipleServicePorts(t *testing.T) { @@ -124,7 +124,7 @@ func TestSnapServices_MultipleServicePorts(t *testing.T) { // per port assert.Equal(t, 2, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 2, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes + assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand } func TestSnapEndpoints_EmptyEndpointStore(t *testing.T) { @@ -332,7 +332,7 @@ func TestSnapServices_NodeSnapshotResources(t *testing.T) { } assert.Equal(t, 2, len(snap.GetResources(resource.ListenerType))) assert.Equal(t, 2, len(snap.GetResources(resource.ClusterType))) - assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes + assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand // Client requesting more resources again should bring them back into the node snapshot assert.Equal(t, true, snapshotter.needToUpdateSnapshot(nodeID, resource.ListenerType, streamID, []string{"fooA.bar:80", "fooB.bar:80"})) if err := snapshotter.updateStreamNodeResources(nodeID, resource.ListenerType, streamID, []string{"fooA.bar:80", "fooB.bar:80"}); err != nil { @@ -507,9 +507,9 @@ func TestSnapServices_SingleServiceWithAuthoritySet(t *testing.T) { expectedRoutes := []string{ makeRouteConfigName("foo", "bar", int32(80)), makeXdstpRouteConfigName("foo", "bar", "test-authority", int32(80)), - "all_kube_routes", + "kube_on_demand", } - assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes + assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand for _, res := range snap.GetResources(resource.RouteType) { route, err := UnmarshalResourceToRouteConfiguration(res) if err != nil {