Skip to content

Commit

Permalink
Try a kube_on_demand route config to trigger vhds
Browse files Browse the repository at this point in the history
  • Loading branch information
ffilippopoulos committed Dec 12, 2024
1 parent cb8ba75 commit cae2e01
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 25 deletions.
26 changes: 13 additions & 13 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestReconcileServices_LabelledService(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand
// Verify the default round robin policy is set on the clusters
for _, cl := range snap.GetResources(resource.ClusterType) {
cluster, err := xds.UnmarshalResourceToCluster(cl)
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestReconcileServices_LabelledServiceLbPolicy(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand
// Verify the correct lb policy (ring hash) is set on the clusters
for _, cl := range snap.GetResources(resource.ClusterType) {
cluster, err := xds.UnmarshalResourceToCluster(cl)
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestReconcileServices_LabelledServiceInvalidLbPolicy(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand
// Verify the default round robin policy is set on the clusters
for _, cl := range snap.GetResources(resource.ClusterType) {
cluster, err := xds.UnmarshalResourceToCluster(cl)
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestReconcileServices_XdsService(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand
// Verify the default round robin policy is set on the clusters
for _, cl := range snap.GetResources(resource.ClusterType) {
cluster, err := xds.UnmarshalResourceToCluster(cl)
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestReconcileServices_XdsServiceNotExistent(t *testing.T) {
}
assert.Equal(t, 0, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 0, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 0, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) // kube_on_demand
snap, err = snapshotter.EndpointsSnapshot(testNodeID)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestReconcileServices_XdsServiceDelete(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand
// Verify we will have one Endpoint resource in the snapshot
snap, err = snapshotter.EndpointsSnapshot(testNodeID)
if err != nil {
Expand All @@ -271,7 +271,7 @@ func TestReconcileServices_XdsServiceDelete(t *testing.T) {
}
assert.Equal(t, 0, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 0, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 0, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) // kube_on_demand
snap, err = snapshotter.EndpointsSnapshot(testNodeID)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestReconcileLocalEndpointSlice_SnapOnUpdate(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand
snap, err = snapshotter.EndpointsSnapshot(testNodeID)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpoints(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand
// Verify we will have 1 Endpoint resource in the snapshot containing
// addresses from both local(2) and remote(2). 4 lbEndpoint addresses in
// total. Also verify that all priorities are set to 0.
Expand Down Expand Up @@ -457,7 +457,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpoints_NoRemoteEndpoints(t *te
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand
// Verify we will have 1 Endpoint resource in the snapshot containing
// only local client addresses.
snap, err = snapshotter.EndpointsSnapshot(testNodeID)
Expand Down Expand Up @@ -508,7 +508,7 @@ func TestReconcileServices_XdsServiceWithOnlyRemoteEndpoints(t *testing.T) {
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand
// Verify we will have 1 Endpoint resource in the snapshot containing
// only remote addresses (2).
snap, err = snapshotter.EndpointsSnapshot(testNodeID)
Expand Down Expand Up @@ -560,7 +560,7 @@ func TestReconcileServices_XdsServiceWithRemoteEndpointsAndLocalPriority(t *test
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand
// Verify we will have 1 Endpoint resource in the snapshot containing
// addresses for local endpoints with priority 0 and for remote ones
// with priority 1.
Expand Down Expand Up @@ -622,7 +622,7 @@ func TestReconcileServices_XdsServiceWithOnlyRemoteEndpointsAndLocalPriority(t *
}
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand
// Verify we will have 1 Endpoint resource in the snapshot containing
// addresses for remote endpoints with priority 0, regardless of
// PriorityStrategy set to local-first.
Expand Down
35 changes: 32 additions & 3 deletions xds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,34 @@ func makeAllKubeServicesRouteConfig(serviceStore XdsServiceStore) *routev3.Route
return nil
}

// makeOnDemandRouteConfig returns a config that will trigger on-demand follow
// up queries for virtual hosts.
func makeOnDemandRouteConfig() *routev3.RouteConfiguration {
return &routev3.RouteConfiguration{
Name: "kube_on_demand",
Vhds: &routev3.Vhds{
ConfigSource: &corev3.ConfigSource{
ResourceApiVersion: corev3.ApiVersion_V3, // Explicitly set to V3
ConfigSourceSpecifier: &corev3.ConfigSource_ApiConfigSource{
ApiConfigSource: &corev3.ApiConfigSource{
ApiType: corev3.ApiConfigSource_DELTA_GRPC, // Use DELTA_GRPC as required
TransportApiVersion: corev3.ApiVersion_V3, // Transport version
GrpcServices: []*corev3.GrpcService{
{
TargetSpecifier: &corev3.GrpcService_EnvoyGrpc_{
EnvoyGrpc: &corev3.GrpcService_EnvoyGrpc{
ClusterName: "xds_cluster",
},
},
},
},
},
},
},
},
}
}

func makeManager(routeConfig *routev3.RouteConfiguration) (*anypb.Any, error) {
router, _ := anypb.New(&routerv3.Router{})
return anypb.New(&managerv3.HttpConnectionManager{
Expand Down Expand Up @@ -259,8 +287,9 @@ func servicesToResources(serviceStore XdsServiceStore, authority string) ([]type
cls = append(cls, cluster)
}
}
if allKubeRoutes := makeAllKubeServicesRouteConfig(serviceStore); allKubeRoutes != nil {
rds = append(rds, allKubeRoutes)
}
//if allKubeRoutes := makeAllKubeServicesRouteConfig(serviceStore); allKubeRoutes != nil {
// rds = append(rds, allKubeRoutes)
//}
rds = append(rds, makeOnDemandRouteConfig())
return cls, rds, lsnr, nil
}
1 change: 1 addition & 0 deletions xds/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,7 @@ func registerServices(grpcServer *grpc.Server, xdsServer xds.Server) {
endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, xdsServer)
clusterservice.RegisterClusterDiscoveryServiceServer(grpcServer, xdsServer)
routeservice.RegisterRouteDiscoveryServiceServer(grpcServer, xdsServer)
routeservice.RegisterVirtualHostDiscoveryServiceServer(grpcServer, xdsServer)
listenerservice.RegisterListenerDiscoveryServiceServer(grpcServer, xdsServer)
}

Expand Down
4 changes: 2 additions & 2 deletions xds/snapshotter_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ func TestSnapMetricsCollector(t *testing.T) {
fmt.Sprintf(`semaphore_xds_snapshot_listener{name="%s",node_id="%s",route_config="%s",type="%s"} 1`, expectHttpListenerName, "test-node", expectHttpRouteName, resource.ListenerType),
fmt.Sprintf(`semaphore_xds_snapshot_listener{name="%s",node_id="%s",route_config="%s",type="%s"} 1`, expectHttpsListenerName, "test-node", expectHttpsRouteName, resource.ListenerType),
fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpClusterName, expectHttpDomains, expectHttpRouteName, EmptyNodeID, resource.RouteType, expectHttpVhost),
fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpClusterName, expectHttpDomains, "all_kube_routes", EmptyNodeID, resource.RouteType, expectHttpVhost),
fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpClusterName, expectHttpDomains, "kube_on_demand", EmptyNodeID, resource.RouteType, expectHttpVhost),
fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpsClusterName, expectHttpsDomains, expectHttpsRouteName, EmptyNodeID, resource.RouteType, expectHttpsVhost),
fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpsClusterName, expectHttpsDomains, "all_kube_routes", EmptyNodeID, resource.RouteType, expectHttpsVhost),
fmt.Sprintf(`semaphore_xds_snapshot_route{cluster_name="%s",domains="%s",name="%s",node_id="%s",path_prefix="",type="%s",virtual_host="%s"} 1`, expectHttpsClusterName, expectHttpsDomains, "kube_on_demand", EmptyNodeID, resource.RouteType, expectHttpsVhost),
fmt.Sprintf(`semaphore_xds_snapshot_cluster{discovery_type="eds",lb_policy="round_robin",name="%s",node_id="%s",type="%s"} 1`, expectHttpClusterName, EmptyNodeID, resource.ClusterType),
fmt.Sprintf(`semaphore_xds_snapshot_cluster{discovery_type="eds",lb_policy="round_robin",name="%s",node_id="%s",type="%s"} 1`, expectHttpsClusterName, EmptyNodeID, resource.ClusterType),
fmt.Sprintf(`semaphore_xds_snapshot_endpoint{cluster_name="%s",health_status="healthy",lb_address="%s",locality_subzone="foo-xzf",locality_zone="test",node_id="%s",priority="0",type="%s"} 1`, expectHttpClusterName, "10.2.1.1:80", EmptyNodeID, resource.EndpointType),
Expand Down
14 changes: 7 additions & 7 deletions xds/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestSnapServices_EmptyServiceList(t *testing.T) {
// Verify that our snapshot will be empty
assert.Equal(t, 0, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 0, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 0, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) // kube_on_demand
}

func TestSnapServices_SingleService(t *testing.T) {
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestSnapServices_SingleService(t *testing.T) {
// cluster
assert.Equal(t, 1, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 1, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
assert.Equal(t, 2, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand
}

func TestSnapServices_NoServicePorts(t *testing.T) {
Expand All @@ -87,7 +87,7 @@ func TestSnapServices_NoServicePorts(t *testing.T) {
// Verify that our snapshot will be empty
assert.Equal(t, 0, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 0, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 0, len(snap.GetResources(resource.RouteType)))
assert.Equal(t, 1, len(snap.GetResources(resource.RouteType))) // kube_on_demand
}

func TestSnapServices_MultipleServicePorts(t *testing.T) {
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestSnapServices_MultipleServicePorts(t *testing.T) {
// per port
assert.Equal(t, 2, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 2, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand
}

func TestSnapEndpoints_EmptyEndpointStore(t *testing.T) {
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestSnapServices_NodeSnapshotResources(t *testing.T) {
}
assert.Equal(t, 2, len(snap.GetResources(resource.ListenerType)))
assert.Equal(t, 2, len(snap.GetResources(resource.ClusterType)))
assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand
// Client requesting more resources again should bring them back into the node snapshot
assert.Equal(t, true, snapshotter.needToUpdateSnapshot(nodeID, resource.ListenerType, streamID, []string{"fooA.bar:80", "fooB.bar:80"}))
if err := snapshotter.updateStreamNodeResources(nodeID, resource.ListenerType, streamID, []string{"fooA.bar:80", "fooB.bar:80"}); err != nil {
Expand Down Expand Up @@ -507,9 +507,9 @@ func TestSnapServices_SingleServiceWithAuthoritySet(t *testing.T) {
expectedRoutes := []string{
makeRouteConfigName("foo", "bar", int32(80)),
makeXdstpRouteConfigName("foo", "bar", "test-authority", int32(80)),
"all_kube_routes",
"kube_on_demand",
}
assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes all_kube_routes
assert.Equal(t, 3, len(snap.GetResources(resource.RouteType))) // Includes kube_on_demand
for _, res := range snap.GetResources(resource.RouteType) {
route, err := UnmarshalResourceToRouteConfiguration(res)
if err != nil {
Expand Down

0 comments on commit cae2e01

Please sign in to comment.