Skip to content

Commit

Permalink
fix: add generation tracking to prevent keys surviving a clear operation
Browse files Browse the repository at this point in the history
Running a Purge and a Get at the same time could allow an old key to be
pulled back into the cache if the request was initiated by a node that
had been cleared and sent to a node that hadn't. This patch tries to
mitigate this, by adding an additional `generation` field, that keeps
track of the number of purges issued. Requests are fulfilled successfully
only if boths ends of a request are on the same generation.
  • Loading branch information
ct16k committed Jan 22, 2023
1 parent 9584832 commit 7f8db4d
Show file tree
Hide file tree
Showing 6 changed files with 399 additions and 130 deletions.
13 changes: 10 additions & 3 deletions byteview.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,24 @@ import (
// A ByteView is meant to be used as a value type, not
// a pointer (like a time.Time).
type ByteView struct {
e time.Time
s string
// If b is non-nil, b is used, else s is used.
b []byte
s string
e time.Time
// generation is an optional field, used only in certain operations
g int64
}

// Returns the expire time associated with this view
// Expire returns the expire time associated with this view
func (v ByteView) Expire() time.Time {
return v.e
}

// Generation returns the generation associated with this cache view
func (v ByteView) Generation() int64 {
return v.g
}

// Len returns the view's length.
func (v ByteView) Len() int {
if v.b != nil {
Expand Down
74 changes: 52 additions & 22 deletions groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,19 +274,20 @@ func (g *Group) Set(ctx context.Context, key string, value []byte, expire time.T
_, err := g.setGroup.Do(key, func() (interface{}, error) {
// If remote peer owns this key
owner, ok := g.peers.PickPeer(key)
generation := g.mainCache.generation()
if ok {
if err := g.setFromPeer(ctx, owner, key, value, expire); err != nil {
if err := g.setFromPeer(ctx, owner, key, value, expire, generation); err != nil {
return nil, err
}
// TODO(thrawn01): Not sure if this is useful outside of tests...
// maybe we should ALWAYS update the local cache?
if hotCache {
g.localSet(key, value, expire, &g.hotCache)
g.localSet(key, value, expire, generation, &g.hotCache)
}
return nil, nil
}
// We own this key
g.localSet(key, value, expire, &g.mainCache)
g.localSet(key, value, expire, generation, &g.mainCache)
return nil, nil
})
return err
Expand Down Expand Up @@ -495,23 +496,29 @@ func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (
}
}

value := ByteView{b: res.Value, e: expire}
var generation int64
if res.Generation != nil {
generation = *res.Generation
}

value := ByteView{b: res.Value, e: expire, g: generation}

// Always populate the hot cache
g.populateCache(key, value, &g.hotCache)
return value, nil
}

func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, k string, v []byte, e time.Time) error {
func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, k string, v []byte, e time.Time, gen int64) error {
var expire int64
if !e.IsZero() {
expire = e.UnixNano()
}
req := &pb.SetRequest{
Expire: &expire,
Group: &g.name,
Key: &k,
Value: v,
Expire: &expire,
Group: &g.name,
Key: &k,
Value: v,
Generation: &gen,
}
return peer.Set(ctx, req)
}
Expand Down Expand Up @@ -543,14 +550,15 @@ func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
return
}

func (g *Group) localSet(key string, value []byte, expire time.Time, cache *cache) {
func (g *Group) localSet(key string, value []byte, expire time.Time, generation int64, cache *cache) {
if g.cacheBytes <= 0 {
return
}

bv := ByteView{
b: value,
e: expire,
g: generation,
}

// Ensure no requests are in flight
Expand Down Expand Up @@ -646,21 +654,23 @@ var NowFunc lru.NowFunc = time.Now
// values.
type cache struct {
mu sync.RWMutex
nbytes int64 // of all keys and values
lru *lru.Cache
nbytes int64 // of all keys and values
nhit, nget int64
nevict int64 // number of evictions
gen int64
}

func (c *cache) stats() CacheStats {
c.mu.RLock()
defer c.mu.RUnlock()
return CacheStats{
Bytes: c.nbytes,
Items: c.itemsLocked(),
Gets: c.nget,
Hits: c.nhit,
Evictions: c.nevict,
Bytes: c.nbytes,
Items: c.itemsLocked(),
Gets: c.nget,
Hits: c.nhit,
Evictions: c.nevict,
Generation: c.gen,
}
}

Expand All @@ -677,6 +687,16 @@ func (c *cache) add(key string, value ByteView) {
},
}
}
if c.gen != value.g {
if logger != nil {
logger.Error().WithFields(map[string]interface{}{
"got": value.g,
"have": c.generation,
"key": key,
}).Printf("generation mismatch")
}
return
}
c.lru.Add(key, value, value.Expire())
c.nbytes += int64(len(key)) + int64(value.Len())
}
Expand All @@ -693,7 +713,10 @@ func (c *cache) get(key string) (value ByteView, ok bool) {
return
}
c.nhit++
return vi.(ByteView), true

bv := vi.(ByteView)
bv.g = c.gen
return bv, true
}

func (c *cache) remove(key string) {
Expand Down Expand Up @@ -741,6 +764,12 @@ func (c *cache) itemsLocked() int64 {
return int64(c.lru.Len())
}

func (c *cache) generation() int64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.gen
}

// An AtomicInt is an int64 to be accessed atomically.
type AtomicInt int64

Expand All @@ -765,9 +794,10 @@ func (i *AtomicInt) String() string {

// CacheStats are returned by stats accessors on Group.
type CacheStats struct {
Bytes int64
Items int64
Gets int64
Hits int64
Evictions int64
Bytes int64
Items int64
Gets int64
Hits int64
Evictions int64
Generation int64
}
Loading

0 comments on commit 7f8db4d

Please sign in to comment.