From a683bfefc346cf2f2487c03eb52b67548859bbe3 Mon Sep 17 00:00:00 2001 From: Per Goncalves da Silva Date: Wed, 8 Jan 2025 10:50:55 +0100 Subject: [PATCH] try again Signed-off-by: Per Goncalves da Silva --- pkg/lib/kubestate/kubestate.go | 16 +++++++++---- pkg/lib/queueinformer/config.go | 5 ++++ pkg/lib/queueinformer/main/main.go | 12 ---------- pkg/lib/queueinformer/queueinformer.go | 24 +++++++++---------- .../queueinformer/queueinformer_operator.go | 6 ++--- 5 files changed, 29 insertions(+), 34 deletions(-) delete mode 100644 pkg/lib/queueinformer/main/main.go diff --git a/pkg/lib/kubestate/kubestate.go b/pkg/lib/kubestate/kubestate.go index 3bc40f6fc0..4d9af8e77c 100644 --- a/pkg/lib/kubestate/kubestate.go +++ b/pkg/lib/kubestate/kubestate.go @@ -168,20 +168,25 @@ func (r resourceEvent) Resource() interface{} { } func (r resourceEvent) String() string { - key, err := cache.MetaNamespaceKeyFunc(r.resource) - - // should not happen as resources must be either cache.ExplicitKey - // or client.Object + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(r.resource) + // should not happen as resources must be either cache.ExplicitKey or client.Object + // and this should be enforced in NewResourceEvent if err != nil { panic("could not get resource key: " + err.Error()) } - return key + return fmt.Sprintf("{%s %s}", string(r.eventType), key) } func NewUpdateEvent(resource interface{}) ResourceEvent { return NewResourceEvent(ResourceUpdated, resource) } +// NewResourceEvent creates a new resource event. The resource parameter must either be +// a client.Object, a string, a cache.DeletedFinalStateUnknown, or a cache.ExplicitKey. In case it is a string, it will be +// coerced to cache.ExplicitKey. This ensures that whether a reference (string/cache.ExplicitKey) +// or a resource, workqueue will treat the items in the same way and dedup appropriately. +// This behavior is guaranteed by the String() method, which will also ignore the type of event. +// I.e. Add/Update/Delete events for the same resource object or reference will be ded func NewResourceEvent(eventType ResourceEventType, resource interface{}) ResourceEvent { // assert resource type // only accept cache.ExplicitKey or client.Objects @@ -190,6 +195,7 @@ func NewResourceEvent(eventType ResourceEventType, resource interface{}) Resourc resource = cache.ExplicitKey(r) case cache.ExplicitKey: case client.Object: + case cache.DeletedFinalStateUnknown: default: panic(fmt.Sprintf("NewResourceEvent called with invalid resource type: %T", resource)) } diff --git a/pkg/lib/queueinformer/config.go b/pkg/lib/queueinformer/config.go index db809adba2..a94a9f2e70 100644 --- a/pkg/lib/queueinformer/config.go +++ b/pkg/lib/queueinformer/config.go @@ -1,6 +1,7 @@ package queueinformer import ( + "fmt" "github.com/pkg/errors" "github.com/sirupsen/logrus" "k8s.io/client-go/discovery" @@ -81,8 +82,12 @@ func (c *queueInformerConfig) validateQueue() (err error) { } func defaultKeyFunc(obj interface{}) (string, bool) { + if re, ok := obj.(kubestate.ResourceEvent); ok { + obj = re.Resource() + } k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { + fmt.Printf("error getting key for object %v: %v", obj, err) return k, false } diff --git a/pkg/lib/queueinformer/main/main.go b/pkg/lib/queueinformer/main/main.go deleted file mode 100644 index 883347e5a1..0000000000 --- a/pkg/lib/queueinformer/main/main.go +++ /dev/null @@ -1,12 +0,0 @@ -package main - -import ( - "fmt" - "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" - "k8s.io/client-go/tools/cache" -) - -func main() { - k, ok := cache.MetaNamespaceKeyFunc(kubestate.NewUpdateEvent("bob")) - fmt.Printf("key: %s (%t)\n", k, ok) -} diff --git a/pkg/lib/queueinformer/queueinformer.go b/pkg/lib/queueinformer/queueinformer.go index fb7a1a30ef..ff7a7a6aa7 100644 --- a/pkg/lib/queueinformer/queueinformer.go +++ b/pkg/lib/queueinformer/queueinformer.go @@ -2,8 +2,6 @@ package queueinformer import ( "context" - "fmt" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "k8s.io/client-go/tools/cache" @@ -50,17 +48,17 @@ func (q *QueueInformer) Enqueue(event kubestate.ResourceEvent) { // metev1.Object or cache.DeletedFinalStateUnknown. // Add/Update events coming from the informer should have their resource // converted to a key (string) before being enqueued. - if event.Type() != kubestate.ResourceDeleted { - // Extract key for add and update events - if key, ok := q.key(e.Resource()); ok { - e = kubestate.NewResourceEvent(event.Type(), key) - } else { - // if the resource cannot be keyed the worker will not be able to process it - // since it will not be able to retrieve the resource - q.logger.WithField("event", e).Warn(fmt.Sprintf("resource of type %T is not keyable - skipping enqueue", e.Resource())) - return - } - } + //if event.Type() != kubestate.ResourceDeleted { + // // Extract key for add and update events + // if key, ok := q.key(e.Resource()); ok { + // e = kubestate.NewResourceEvent(event.Type(), cache.ExplicitKey(key)) + // } else { + // // if the resource cannot be keyed the worker will not be able to process it + // // since it will not be able to retrieve the resource + // q.logger.WithField("event", e).Warn(fmt.Sprintf("resource of type %T is not keyable - skipping enqueue", e.Resource())) + // return + // } + //} // Create new resource event and add to queue q.logger.WithField("event", e).Trace("enqueuing resource event") diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index 7d1faf74ad..e4589be1a5 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -284,8 +284,6 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) // We then make sure that the ResourceEvent's String() returns the key for the // encapsulated resource. Thus, independent of the resource type, the queue always // processes it by key and dedups appropriately. - // Furthermore, we also enforce here that Add/Update events always contain - // cache.ExplicitKey as their Resource queue := loop.queue item, quit := queue.Get() @@ -301,7 +299,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) if item.Type() != kubestate.ResourceDeleted { key, keyable := loop.key(item) if !keyable { - logger.WithField("item", item).Warn("could not form key") + logger.WithField("item", item).Warnf("could not form key %s", item) queue.Forget(item) return true } @@ -311,7 +309,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) // Get the current cached version of the resource var exists bool var err error - resource, exists, err := loop.indexer.GetByKey(string(key)) + resource, exists, err := loop.indexer.GetByKey(key) if err != nil { logger.WithError(err).Error("cache get failed") queue.Forget(item)