Skip to content

Commit

Permalink
try again
Browse files Browse the repository at this point in the history
Signed-off-by: Per Goncalves da Silva <[email protected]>
  • Loading branch information
Per Goncalves da Silva committed Jan 8, 2025
1 parent 87ba582 commit 35644db
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 33 deletions.
13 changes: 9 additions & 4 deletions pkg/lib/kubestate/kubestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,24 @@ func (r resourceEvent) Resource() interface{} {

func (r resourceEvent) String() string {
key, err := cache.MetaNamespaceKeyFunc(r.resource)

// should not happen as resources must be either cache.ExplicitKey
// or client.Object
// should not happen as resources must be either cache.ExplicitKey or client.Object
// and this should be enforced in NewResourceEvent
if err != nil {
panic("could not get resource key: " + err.Error())
}
return key
return fmt.Sprintf("%s/%s", string(r.eventType), key)
}

func NewUpdateEvent(resource interface{}) ResourceEvent {
return NewResourceEvent(ResourceUpdated, resource)
}

// NewResourceEvent creates a new resource event. The resource parameter must either be
// a client.Object, a string, or a cache.ExplicitKey. In case it is a string, it will be
// coerced to cache.ExplicitKey. This ensures that whether a reference (string/cache.ExplicitKey)
// or a resource, workqueue will treat the items in the same way and dedup appropriately.
// This behavior is guaranteed by the String() method, which will also ignore the type of event.
// I.e. Add/Update/Delete events for the same resource object or reference will be ded
func NewResourceEvent(eventType ResourceEventType, resource interface{}) ResourceEvent {
// assert resource type
// only accept cache.ExplicitKey or client.Objects
Expand Down
5 changes: 5 additions & 0 deletions pkg/lib/queueinformer/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package queueinformer

import (
"fmt"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/client-go/discovery"
Expand Down Expand Up @@ -81,8 +82,12 @@ func (c *queueInformerConfig) validateQueue() (err error) {
}

func defaultKeyFunc(obj interface{}) (string, bool) {
if re, ok := obj.(kubestate.ResourceEvent); ok {
obj = re.Resource()
}
k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
fmt.Printf("error getting key for object %v: %v", obj, err)
return k, false
}

Expand Down
12 changes: 0 additions & 12 deletions pkg/lib/queueinformer/main/main.go

This file was deleted.

24 changes: 11 additions & 13 deletions pkg/lib/queueinformer/queueinformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package queueinformer

import (
"context"
"fmt"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -50,17 +48,17 @@ func (q *QueueInformer) Enqueue(event kubestate.ResourceEvent) {
// metev1.Object or cache.DeletedFinalStateUnknown.
// Add/Update events coming from the informer should have their resource
// converted to a key (string) before being enqueued.
if event.Type() != kubestate.ResourceDeleted {
// Extract key for add and update events
if key, ok := q.key(e.Resource()); ok {
e = kubestate.NewResourceEvent(event.Type(), key)
} else {
// if the resource cannot be keyed the worker will not be able to process it
// since it will not be able to retrieve the resource
q.logger.WithField("event", e).Warn(fmt.Sprintf("resource of type %T is not keyable - skipping enqueue", e.Resource()))
return
}
}
//if event.Type() != kubestate.ResourceDeleted {
// // Extract key for add and update events
// if key, ok := q.key(e.Resource()); ok {
// e = kubestate.NewResourceEvent(event.Type(), cache.ExplicitKey(key))
// } else {
// // if the resource cannot be keyed the worker will not be able to process it
// // since it will not be able to retrieve the resource
// q.logger.WithField("event", e).Warn(fmt.Sprintf("resource of type %T is not keyable - skipping enqueue", e.Resource()))
// return
// }
//}

// Create new resource event and add to queue
q.logger.WithField("event", e).Trace("enqueuing resource event")
Expand Down
6 changes: 2 additions & 4 deletions pkg/lib/queueinformer/queueinformer_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,6 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
// We then make sure that the ResourceEvent's String() returns the key for the
// encapsulated resource. Thus, independent of the resource type, the queue always
// processes it by key and dedups appropriately.
// Furthermore, we also enforce here that Add/Update events always contain
// cache.ExplicitKey as their Resource
queue := loop.queue
item, quit := queue.Get()

Expand All @@ -301,7 +299,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
if item.Type() != kubestate.ResourceDeleted {
key, keyable := loop.key(item)
if !keyable {
logger.WithField("item", item).Warn("could not form key")
logger.WithField("item", item).Warnf("could not form key %s", item)
queue.Forget(item)
return true
}
Expand All @@ -311,7 +309,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
// Get the current cached version of the resource
var exists bool
var err error
resource, exists, err := loop.indexer.GetByKey(string(key))
resource, exists, err := loop.indexer.GetByKey(key)
if err != nil {
logger.WithError(err).Error("cache get failed")
queue.Forget(item)
Expand Down

0 comments on commit 35644db

Please sign in to comment.