Skip to content

Commit

Permalink
enforce ResourceEvent types
Browse files Browse the repository at this point in the history
Signed-off-by: Per Goncalves da Silva <[email protected]>
  • Loading branch information
Per Goncalves da Silva committed Jan 7, 2025
1 parent 8db7f9f commit 87ba582
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 29 deletions.
31 changes: 27 additions & 4 deletions pkg/lib/kubestate/kubestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package kubestate

import (
"context"
"fmt"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type State interface {
Expand Down Expand Up @@ -148,6 +151,7 @@ const (
type ResourceEvent interface {
Type() ResourceEventType
Resource() interface{}
String() string
}

type resourceEvent struct {
Expand All @@ -163,14 +167,33 @@ func (r resourceEvent) Resource() interface{} {
return r.resource
}

func NewUpdateEvent(resource interface{}) ResourceEvent {
return resourceEvent{
eventType: ResourceUpdated,
resource: resource,
func (r resourceEvent) String() string {
key, err := cache.MetaNamespaceKeyFunc(r.resource)

// should not happen as resources must be either cache.ExplicitKey
// or client.Object
if err != nil {
panic("could not get resource key: " + err.Error())
}
return key
}

func NewUpdateEvent(resource interface{}) ResourceEvent {
return NewResourceEvent(ResourceUpdated, resource)
}

func NewResourceEvent(eventType ResourceEventType, resource interface{}) ResourceEvent {
// assert resource type
// only accept cache.ExplicitKey or client.Objects
switch r := resource.(type) {
case string:
resource = cache.ExplicitKey(r)
case cache.ExplicitKey:
case client.Object:
default:
panic(fmt.Sprintf("NewResourceEvent called with invalid resource type: %T", resource))
}

return resourceEvent{
eventType: eventType,
resource: resource,
Expand Down
13 changes: 0 additions & 13 deletions pkg/lib/queueinformer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,6 @@ func (c *queueInformerConfig) validateQueue() (err error) {
}

func defaultKeyFunc(obj interface{}) (string, bool) {
// Get keys nested in resource events up to depth 2
keyable := false
for d := 0; d < 2 && !keyable; d++ {
switch v := obj.(type) {
case string:
return v, true
case kubestate.ResourceEvent:
obj = v.Resource()
default:
keyable = true
}
}

k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return k, false
Expand Down
12 changes: 12 additions & 0 deletions pkg/lib/queueinformer/main/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package main

import (
"fmt"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
"k8s.io/client-go/tools/cache"
)

func main() {
k, ok := cache.MetaNamespaceKeyFunc(kubestate.NewUpdateEvent("bob"))
fmt.Printf("key: %s (%t)\n", k, ok)
}
43 changes: 31 additions & 12 deletions pkg/lib/queueinformer/queueinformer_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,30 @@ func (o *operator) worker(ctx context.Context, loop *QueueInformer) {
}

func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) bool {
// **************************** WARNING ****************************
// The QueueInformer listens to resource events raised by its
// (client-go) informer. For Add/Update event, it extracts the key
// for the resource and adds to its queue (that we Get() from below)
// a ResourceEvent carrying the key.
// **Except** if it is a deletion event. In that case,
// ResourceEvent carries the resource object (or tombstone).
// The sync'er expects a ResourceEvent carrying the resource.
// So, in the case of an Add/Update event coming from the queue,
// the resource is acquired from the index (through the key), and then
// a ResourceEvent carrying the resource is handed to the syncer.
// It should also be noted that throughout the code, items are added to
// queueinformers out of band of informer notifications.
// The fact that the queueinformers queue processes ResourceEvents, which
// themselves encapsulate an interface{} "Resource" make it tricky for the
// queue to dedup. Previous to the writing of this comment, the queue was
// processing strings and ResourceEvents, which led to concurrent processing
// of the same resource. To address this, we enforce (with panic) that the resource
// in the ResourceEvent must either be a cache.ExplicitKey or a client.Object.
// We then make sure that the ResourceEvent's String() returns the key for the
// encapsulated resource. Thus, independent of the resource type, the queue always
// processes it by key and dedups appropriately.
// Furthermore, we also enforce here that Add/Update events always contain
// cache.ExplicitKey as their Resource
queue := loop.queue
item, quit := queue.Get()

Expand All @@ -271,28 +295,23 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
defer queue.Done(item)

logger := o.logger.WithField("item", item)
logger.WithField("queue-length", queue.Len()).Trace("popped queue")
logger.WithField("queue-length", queue.Len()).Info("popped queue")

var event = item
if item.Type() != kubestate.ResourceDeleted {
// Get the key
//key, keyable := loop.key(item)
//if !keyable {
// logger.WithField("item", item).Warn("could not form key")
// queue.Forget(item)
// return true
//}
key, ok := item.Resource().(string)
if !ok {
panic(fmt.Sprintf("unexpected item resource type: %T", item.Resource()))
key, keyable := loop.key(item)
if !keyable {
logger.WithField("item", item).Warn("could not form key")
queue.Forget(item)
return true
}

logger = logger.WithField("cache-key", key)

// Get the current cached version of the resource
var exists bool
var err error
resource, exists, err := loop.indexer.GetByKey(key)
resource, exists, err := loop.indexer.GetByKey(string(key))
if err != nil {
logger.WithError(err).Error("cache get failed")
queue.Forget(item)
Expand Down

0 comments on commit 87ba582

Please sign in to comment.