From 210fb133ce707747b8a89ff02ec57f88cb7935bd Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 1 Nov 2023 11:37:12 -0400 Subject: [PATCH] Delete stale EndpointSlices on restart If a local LH EndpointSlices exists but the corresponding Service doesn't exist, delete the EPS. Also, a headless Service can have multiple EndpointSlices so reconcile the LH EndpointSlices on startup to handle a K8s EndpointSlice deleted while the controller wasn't running. Fixes https://github.com/submariner-io/lighthouse/issues/1416 Signed-off-by: Tom Pantelis --- pkg/agent/controller/agent.go | 9 +- pkg/agent/controller/endpoint_slice.go | 29 +++++- pkg/agent/controller/reconciliation_test.go | 89 ++++++++++++++++++- .../controller/service_endpoint_slices.go | 24 +++++ pkg/agent/controller/service_import.go | 18 ++-- pkg/agent/controller/types.go | 29 +++--- 6 files changed, 173 insertions(+), 25 deletions(-) diff --git a/pkg/agent/controller/agent.go b/pkg/agent/controller/agent.go index bc3a625ed..39f08d21b 100644 --- a/pkg/agent/controller/agent.go +++ b/pkg/agent/controller/agent.go @@ -37,6 +37,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + k8slabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" validations "k8s.io/apimachinery/pkg/util/validation" @@ -113,14 +114,18 @@ func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, syncerMetricN localSyncer: agentController.serviceExportSyncer, } - agentController.endpointSliceController, err = newEndpointSliceController(spec, syncerConf, agentController.serviceExportClient) + agentController.endpointSliceController, err = newEndpointSliceController(spec, syncerConf, agentController.serviceExportClient, + agentController.serviceSyncer) if err != nil { return nil, err } agentController.serviceImportController, err = newServiceImportController(spec, syncerMetricNames, syncerConf, agentController.endpointSliceController.syncer.GetBrokerClient(), - agentController.endpointSliceController.syncer.GetBrokerNamespace(), agentController.serviceExportClient) + agentController.endpointSliceController.syncer.GetBrokerNamespace(), agentController.serviceExportClient, + func(selector k8slabels.Selector) []runtime.Object { + return agentController.endpointSliceController.syncer.ListLocalResourcesBySelector(&discovery.EndpointSlice{}, selector) + }) if err != nil { return nil, err } diff --git a/pkg/agent/controller/endpoint_slice.go b/pkg/agent/controller/endpoint_slice.go index 0499eaeee..71857c8f5 100644 --- a/pkg/agent/controller/endpoint_slice.go +++ b/pkg/agent/controller/endpoint_slice.go @@ -34,6 +34,7 @@ import ( "github.com/submariner-io/lighthouse/pkg/constants" corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8slabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -43,11 +44,12 @@ import ( //nolint:gocritic // (hugeParam) This function modifies syncerConf so we don't want to pass by pointer. func newEndpointSliceController(spec *AgentSpecification, syncerConfig broker.SyncerConfig, - serviceExportClient *ServiceExportClient, + serviceExportClient *ServiceExportClient, serviceSyncer syncer.Interface, ) (*EndpointSliceController, error) { c := &EndpointSliceController{ clusterID: spec.ClusterID, serviceExportClient: serviceExportClient, + serviceSyncer: serviceSyncer, conflictCheckWorkQueue: workqueue.New("ConflictChecker"), } @@ -116,8 +118,31 @@ func (c *EndpointSliceController) onLocalEndpointSlice(obj runtime.Object, _ int return nil, false } + serviceName := endpointSlice.Labels[mcsv1a1.LabelServiceName] + logger.V(log.DEBUG).Infof("Local EndpointSlice \"%s/%s\" for service %q %sd", - endpointSlice.Namespace, endpointSlice.Name, endpointSlice.Labels[mcsv1a1.LabelServiceName], op) + endpointSlice.Namespace, endpointSlice.Name, serviceName, op) + + // Check if the associated Service exists and, if not, delete the EndpointSlice. On restart, it's possible the Service could've been + // deleted. + if op == syncer.Create { + _, found, _ := c.serviceSyncer.GetResource(serviceName, endpointSlice.Namespace) + if !found { + logger.Infof("The service %q for EndpointSlice \"%s/%s\" does not exist - deleting it", + serviceName, endpointSlice.Namespace, endpointSlice.Name) + + err := c.syncer.GetLocalFederator().Delete(ctx, endpointSlice) + if apierrors.IsNotFound(err) { + err = nil + } + + if err != nil { + logger.Errorf(err, "Error deleting EndpointSlice %s/%s", endpointSlice.Namespace, endpointSlice.Name) + } + + return nil, err != nil + } + } return obj, false } diff --git a/pkg/agent/controller/reconciliation_test.go b/pkg/agent/controller/reconciliation_test.go index 03ba5cdff..53328cad3 100644 --- a/pkg/agent/controller/reconciliation_test.go +++ b/pkg/agent/controller/reconciliation_test.go @@ -26,6 +26,8 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/submariner-io/admiral/pkg/federate" + "github.com/submariner-io/admiral/pkg/resource" "github.com/submariner-io/admiral/pkg/syncer/test" testutil "github.com/submariner-io/admiral/pkg/test" "github.com/submariner-io/lighthouse/pkg/constants" @@ -59,7 +61,11 @@ var _ = Describe("Reconciliation", func() { t.cluster1.createService() t.cluster1.createServiceExport() - t.awaitNonHeadlessServiceExported(&t.cluster1) + if t.cluster1.service.Spec.ClusterIP == corev1.ClusterIPNone { + t.awaitHeadlessServiceExported(&t.cluster1) + } else { + t.awaitNonHeadlessServiceExported(&t.cluster1) + } var err error @@ -144,6 +150,7 @@ var _ = Describe("Reconciliation", func() { test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport) test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice) t.cluster1.createService() + t.cluster1.createServiceEndpointSlices() t.cluster1.start(t, *t.syncerConfig) t.awaitServiceUnexported(&t.cluster1) @@ -157,6 +164,7 @@ var _ = Describe("Reconciliation", func() { restoreBrokerResources() test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport) + test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice) t.cluster1.createServiceExport() t.cluster1.start(t, *t.syncerConfig) @@ -231,6 +239,85 @@ var _ = Describe("Reconciliation", func() { t.cluster1.service.Name, t.cluster1.clusterID) }) }) + + When("a local EndpointSlice is stale on startup", func() { + Context("because the service no longer exists", func() { + It("should delete it from the local datastore", func() { + t.afterEach() + t = newTestDiver() + + By("Restarting controllers") + + restoreBrokerResources() + test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice) + t.cluster1.start(t, *t.syncerConfig) + + t.awaitServiceUnexported(&t.cluster1) + }) + }) + + Context("because the K8s EndpointSlice no longer exists", func() { + BeforeEach(func() { + t.cluster1.service.Spec.ClusterIP = corev1.ClusterIPNone + }) + + It("should delete it from the local datastore", func() { + t.afterEach() + t = newTestDiver() + + t.cluster1.service.Spec.ClusterIP = corev1.ClusterIPNone + + By("Restarting controllers") + + restoreBrokerResources() + test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport) + test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice) + test.CreateResource(t.cluster1.localServiceExportClient, serviceExport) + t.cluster1.createService() + + fmt.Println("*******EPS 1: ", resource.ToJSON(localEndpointSlice)) + + // Create a remote EPS for the same service and ensure it's not deleted. + remoteEndpointSlice := localEndpointSlice.DeepCopy() + remoteEndpointSlice.Name = "remote-eps" + remoteEndpointSlice.Labels[constants.MCSLabelSourceCluster] = t.cluster2.clusterID + remoteEndpointSlice.Labels[federate.ClusterIDLabelKey] = t.cluster2.clusterID + test.CreateResource(t.cluster1.localEndpointSliceClient, remoteEndpointSlice) + + remoteEndpointSlice.Namespace = test.RemoteNamespace + test.CreateResource(t.brokerEndpointSliceClient, remoteEndpointSlice) + + // Create an EPS for a service in another namespace and ensure it's not deleted. + otherNS := "other-ns" + otherNSEndpointSlice := localEndpointSlice.DeepCopy() + otherNSEndpointSlice.Name = "other-ns-eps" + otherNSEndpointSlice.Namespace = otherNS + otherNSEndpointSlice.Labels[constants.LabelSourceNamespace] = otherNS + test.CreateResource(endpointSliceClientFor(t.cluster1.localDynClient, otherNS), otherNSEndpointSlice) + + test.CreateResource(t.cluster1.dynamicServiceClientFor().Namespace(otherNS), &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: t.cluster1.service.Name, + Namespace: otherNS, + }, + }) + + t.cluster1.start(t, *t.syncerConfig) + + t.awaitNoEndpointSlice(&t.cluster1) + + Consistently(func() bool { + test.AwaitResource(t.cluster1.localEndpointSliceClient, remoteEndpointSlice.Name) + return true + }).Should(BeTrue()) + + Consistently(func() bool { + test.AwaitResource(endpointSliceClientFor(t.cluster1.localDynClient, otherNS), otherNSEndpointSlice.Name) + return true + }).Should(BeTrue()) + }) + }) + }) }) var _ = Describe("EndpointSlice migration", func() { diff --git a/pkg/agent/controller/service_endpoint_slices.go b/pkg/agent/controller/service_endpoint_slices.go index cc2eb138e..ad7f8e292 100644 --- a/pkg/agent/controller/service_endpoint_slices.go +++ b/pkg/agent/controller/service_endpoint_slices.go @@ -45,6 +45,7 @@ import ( func startEndpointSliceController(localClient dynamic.Interface, restMapper meta.RESTMapper, scheme *runtime.Scheme, serviceImport *mcsv1a1.ServiceImport, clusterID string, globalIngressIPCache *globalIngressIPCache, + localLHEndpointSliceLister EndpointSliceListerFn, ) (*ServiceEndpointSliceController, error) { serviceNamespace := serviceImport.Labels[constants.LabelSourceNamespace] serviceName := serviceImportSourceName(serviceImport) @@ -89,6 +90,29 @@ func startEndpointSliceController(localClient dynamic.Interface, restMapper meta return nil, errors.Wrap(err, "error starting Endpoints syncer") } + if controller.isHeadless() { + controller.epsSyncer.Reconcile(func() []runtime.Object { + list := localLHEndpointSliceLister(k8slabels.SelectorFromSet(map[string]string{ + constants.LabelSourceNamespace: serviceNamespace, + mcsv1a1.LabelServiceName: serviceName, + constants.MCSLabelSourceCluster: clusterID, + })) + + retList := make([]runtime.Object, 0, len(list)) + for _, o := range list { + eps := o.(*discovery.EndpointSlice) + retList = append(retList, &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: eps.Labels[constants.LabelSourceName], + Namespace: serviceNamespace, + }, + }) + } + + return retList + }) + } + return controller, nil } diff --git a/pkg/agent/controller/service_import.go b/pkg/agent/controller/service_import.go index d40bac3e0..4a5273ceb 100644 --- a/pkg/agent/controller/service_import.go +++ b/pkg/agent/controller/service_import.go @@ -45,15 +45,17 @@ import ( //nolint:gocritic // (hugeParam) This function modifies syncerConf so we don't want to pass by pointer. func newServiceImportController(spec *AgentSpecification, syncerMetricNames AgentConfig, syncerConfig broker.SyncerConfig, brokerClient dynamic.Interface, brokerNamespace string, serviceExportClient *ServiceExportClient, + localLHEndpointSliceLister EndpointSliceListerFn, ) (*ServiceImportController, error) { controller := &ServiceImportController{ - localClient: syncerConfig.LocalClient, - restMapper: syncerConfig.RestMapper, - clusterID: spec.ClusterID, - localNamespace: spec.Namespace, - converter: converter{scheme: syncerConfig.Scheme}, - serviceImportAggregator: newServiceImportAggregator(brokerClient, brokerNamespace, spec.ClusterID, syncerConfig.Scheme), - serviceExportClient: serviceExportClient, + localClient: syncerConfig.LocalClient, + restMapper: syncerConfig.RestMapper, + clusterID: spec.ClusterID, + localNamespace: spec.Namespace, + converter: converter{scheme: syncerConfig.Scheme}, + serviceImportAggregator: newServiceImportAggregator(brokerClient, brokerNamespace, spec.ClusterID, syncerConfig.Scheme), + serviceExportClient: serviceExportClient, + localLHEndpointSliceLister: localLHEndpointSliceLister, } var err error @@ -222,7 +224,7 @@ func (c *ServiceImportController) startEndpointsController(serviceImport *mcsv1a } endpointController, err := startEndpointSliceController(c.localClient, c.restMapper, c.converter.scheme, - serviceImport, c.clusterID, c.globalIngressIPCache) + serviceImport, c.clusterID, c.globalIngressIPCache, c.localLHEndpointSliceLister) if err != nil { return errors.Wrapf(err, "failed to start endpoints controller for %q", key) } diff --git a/pkg/agent/controller/types.go b/pkg/agent/controller/types.go index 35c3c5863..a16a21cc0 100644 --- a/pkg/agent/controller/types.go +++ b/pkg/agent/controller/types.go @@ -28,6 +28,7 @@ import ( "github.com/submariner-io/admiral/pkg/watcher" "github.com/submariner-io/admiral/pkg/workqueue" "k8s.io/apimachinery/pkg/api/meta" + k8slabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" @@ -41,6 +42,8 @@ const ( var BrokerResyncPeriod = time.Minute * 2 +type EndpointSliceListerFn func(selector k8slabels.Selector) []runtime.Object + type converter struct { scheme *runtime.Scheme } @@ -78,18 +81,19 @@ type ServiceImportAggregator struct { // from the submariner namespace and creates/updates the aggregated ServiceImport on the broker; the other that syncs // aggregated ServiceImports from the broker to the local service namespace. It also creates a ServiceEndpointSliceController. type ServiceImportController struct { - localClient dynamic.Interface - restMapper meta.RESTMapper - serviceImportAggregator *ServiceImportAggregator - serviceImportMigrator *ServiceImportMigrator - serviceExportClient *ServiceExportClient - localSyncer syncer.Interface - remoteSyncer syncer.Interface - endpointControllers sync.Map - clusterID string - localNamespace string - converter converter - globalIngressIPCache *globalIngressIPCache + localClient dynamic.Interface + restMapper meta.RESTMapper + serviceImportAggregator *ServiceImportAggregator + serviceImportMigrator *ServiceImportMigrator + serviceExportClient *ServiceExportClient + localSyncer syncer.Interface + remoteSyncer syncer.Interface + endpointControllers sync.Map + clusterID string + localNamespace string + converter converter + globalIngressIPCache *globalIngressIPCache + localLHEndpointSliceLister EndpointSliceListerFn } // Each ServiceEndpointSliceController watches for the EndpointSlices that backs a Service and have a ServiceImport. @@ -115,6 +119,7 @@ type EndpointSliceController struct { syncer *broker.Syncer serviceImportAggregator *ServiceImportAggregator serviceExportClient *ServiceExportClient + serviceSyncer syncer.Interface conflictCheckWorkQueue workqueue.Interface }