Skip to content

Commit

Permalink
channel experiment
Browse files Browse the repository at this point in the history
Signed-off-by: Per Goncalves da Silva <[email protected]>
  • Loading branch information
Per Goncalves da Silva committed Jan 8, 2025
1 parent 35644db commit 8f1330e
Showing 1 changed file with 133 additions and 4 deletions.
137 changes: 133 additions & 4 deletions pkg/lib/queueinformer/queueinformer_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package queueinformer

import (
"context"
"crypto/sha256"
"encoding/binary"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -245,15 +247,142 @@ func (o *operator) start(ctx context.Context) error {
}

o.logger.Info("starting workers...")
for _, queueInformer := range o.queueInformers {
for w := 0; w < o.numWorkers; w++ {
go o.worker(ctx, queueInformer)
}
for idx, queueInformer := range o.queueInformers {
o.doWork(ctx, o.numWorkers, idx, queueInformer)
//for w := 0; w < o.numWorkers; w++ {
// go o.worker(ctx, queueInformer)
//}
}

return nil
}

func hashMod(s string, mod int, logger *logrus.Entry) int {
// Compute SHA-256 hash of the string
hash := sha256.Sum256([]byte(s))

// Use the first 8 bytes (or more, depending on preference) as a numeric value
// Convert bytes to an unsigned integer (uint64)
num := binary.BigEndian.Uint64(hash[:8])
value := num % uint64(mod)
logger.Infof("mod calculated from %s to %d via %d", s, value, num)
return int(value)
}

func (o *operator) doWork(ctx context.Context, numWorkers int, queueInformerId int, queueInformer *QueueInformer) {
go func() {
logger := o.logger.WithField("queue_informer_id", queueInformerId)

logger.Info("creating workers")
// create a channel for each worker
chans := make([]chan kubestate.ResourceEvent, numWorkers)
for i := 0; i < numWorkers; i++ {
chans[i] = make(chan kubestate.ResourceEvent)

// create processing go routine for each worker
go func(id int, c <-chan kubestate.ResourceEvent) {
workerLogger := logger.WithFields(logrus.Fields{
"workerId": id,
})
workerLogger.Info("starting up worker")
defer func() {
workerLogger.Info("closing down worker")
}()

for {
select {
// bail on ctx.Done
case <-ctx.Done():
return
case evt, ok := <-c:
if !ok {
// bail on channel closed
return
}

// expect evt to already contain the resource
workerLogger.Info("processing event")
err := queueInformer.Sync(ctx, evt)
if requeues := queueInformer.queue.NumRequeues(evt); err != nil && requeues < 8 && evt.Type() != kubestate.ResourceDeleted {
utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("sync %q failed", evt)))
queueInformer.queue.AddRateLimited(evt)
continue
}

workerLogger.Info("event sucessfully processed")
queueInformer.queue.Forget(evt)

select {
case o.syncCh <- err:
default:
}
}
}
}(i, chans[i])
}
defer func() {
for _, ch := range chans {
close(ch)
}
}()

queue := queueInformer.queue
for {
select {
case <-ctx.Done():
return
default:
item, shutdown := queue.Get()
if shutdown {
return
}
func() {
itemLogger := logger.WithField("item", item)
defer queue.Done(item)

// extract item resource key
key, keyable := queueInformer.key(item)
if !keyable {
itemLogger.WithField("item", item).Warnf("could not form key %s", item)
queue.Forget(item)
return
}

// compute worker to use for resource
idx := hashMod(key, numWorkers, itemLogger)

// recreate the event containing the resource (rather than reference)
// deletion events always contain the resource already (or tombstone)
event := item
if item.Type() != kubestate.ResourceDeleted {
itemLogger = itemLogger.WithField("cache-key", key)

// Get the current cached version of the resource
var exists bool
var err error
resource, exists, err := queueInformer.indexer.GetByKey(key)
if err != nil {
itemLogger.WithError(err).Error("cache get failed")
queue.Forget(item)
return
}
if !exists {
itemLogger.WithField("existing-cache-keys", queueInformer.indexer.ListKeys()).Debug("cache get failed, key not in cache")
queue.Forget(item)
return
}
event = kubestate.NewResourceEvent(item.Type(), resource)
}

// send event to appropriate worker
itemLogger.Infof("pushing event (%s) to channel %d", event, idx)
chans[idx] <- event
}()
}
}
}()
}

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (o *operator) worker(ctx context.Context, loop *QueueInformer) {
Expand Down

0 comments on commit 8f1330e

Please sign in to comment.