Skip to content

Commit

Permalink
feat: new experimental gc friendly flatten cache
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Dec 31, 2024
1 parent d5937af commit b2d37ed
Showing 1 changed file with 229 additions and 0 deletions.
229 changes: 229 additions & 0 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package rueidis

import (
"context"
"runtime"
"sync"
"time"
"unsafe"
)

// NewCacheStoreFn can be provided in ClientOption for using a custom CacheStore implementation
Expand Down Expand Up @@ -178,3 +180,230 @@ func (a *adapterEntry) Wait(ctx context.Context) (RedisMessage, error) {
return a.val, a.err
}
}

type flatentry struct {
ovfl *flatentry
next unsafe.Pointer
prev unsafe.Pointer
cmd string
key string
val []byte
ttl int64
size int64
mark int64
mu sync.Mutex
}

func (f *flatentry) insert(e *flatentry) {
f.size += e.size
f.mu.Lock()
defer f.mu.Unlock()
e.ovfl = f.ovfl
f.ovfl = e
}

func (f *flatentry) find(cmd string, ts int64) (ret RedisMessage, expired bool) {
if f == nil {
return
}
if ts >= f.ttl {
expired = true
return
}
if cmd == f.cmd {
_ = ret.CacheUnmarshalView(f.val)
return
}
f.mu.Lock()
ovfl := f.ovfl
f.mu.Unlock()
return ovfl.find(cmd, ts)
}

const lrBatchSize = 64

type lrBatch struct {
m map[*flatentry]struct{}
}

func NewFlattenCache(limit int64) CacheStore {
f := &flatten{
flights: make(map[string]*adapterEntry),
cache: make(map[string]*flatentry),
head: &flatentry{},
tail: &flatentry{},
size: 0,
limit: limit,
}
f.head.next = unsafe.Pointer(f.tail)
f.tail.prev = unsafe.Pointer(f.head)
f.lrup = sync.Pool{New: func() any {
b := &lrBatch{m: make(map[*flatentry]struct{}, lrBatchSize)}
runtime.SetFinalizer(b, func() {
f.llTailBatch(b)
})
return b
}}
return f
}

type flatten struct {
flights map[string]*adapterEntry
cache map[string]*flatentry
head *flatentry
tail *flatentry
lrup sync.Pool
mark int64
size int64
limit int64
mu sync.RWMutex
}

func (f *flatten) llAdd(e *flatentry) {
e.mark = f.mark
e.prev = f.tail.prev
e.next = unsafe.Pointer(f.tail)
f.tail.prev = unsafe.Pointer(e)
(*flatentry)(e.prev).next = unsafe.Pointer(e)
}

func (f *flatten) llDel(e *flatentry) {
(*flatentry)(e.prev).next = e.next
(*flatentry)(e.next).prev = e.prev
e.mark = 0
}

func (f *flatten) llTail(e *flatentry) {
if e.mark == f.mark {
f.llDel(e)
f.llAdd(e)
}
}

func (f *flatten) llTailBatch(b *lrBatch) {
f.mu.Lock()
for e := range b.m {
f.llTail(e)
}
f.mu.Unlock()
clear(b.m)
}

func (f *flatten) remove(e *flatentry) {
f.size -= e.size
f.llDel(e)
delete(f.cache, e.key)
}

func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (RedisMessage, CacheEntry) {
f.mu.RLock()
e := f.cache[key]
f.mu.RUnlock()
ts := now.UnixMilli()
if v, _ := e.find(cmd, ts); v.typ != 0 {
batch := f.lrup.Get().(*lrBatch)
batch.m[e] = struct{}{}
if len(batch.m) == lrBatchSize {
f.llTailBatch(batch)
}
f.lrup.Put(batch)
return v, nil
}
fk := key + cmd
f.mu.RLock()
af := f.flights[fk]
f.mu.RUnlock()
if af != nil {
return RedisMessage{}, af
}
f.mu.Lock()
defer f.mu.Unlock()
e = f.cache[key]
v, expired := e.find(cmd, ts)
if v.typ != 0 {
f.llTail(e)
return v, nil
}
if expired {
f.remove(e)
}
if af = f.flights[fk]; af != nil {
return RedisMessage{}, af
}
f.flights[fk] = &adapterEntry{ch: make(chan struct{}), xat: ts + ttl.Milliseconds()}
return RedisMessage{}, nil
}

func (f *flatten) Update(key, cmd string, val RedisMessage) int64 {
fk := key + cmd
bs := val.CacheMarshal(nil)
fe := &flatentry{cmd: cmd, val: bs, ttl: val.CachePXAT(), size: int64(len(bs)+len(key)+len(cmd)) + int64(unsafe.Sizeof(flatentry{}))}
f.mu.Lock()
af := f.flights[fk]
if af != nil {
delete(f.flights, fk)
if af.xat < fe.ttl {
fe.ttl = af.xat
}
}
f.size += fe.size
for ep := f.head.next; f.size > f.limit && ep != unsafe.Pointer(f.tail); {
e := (*flatentry)(ep)
f.remove(e)
ep = e.next
}
if e := f.cache[key]; e == nil {
fe.key = key
f.cache[key] = fe
f.llAdd(fe)
} else {
e.insert(fe)
}
f.mu.Unlock()
if af != nil {
af.set(val, nil)
}
return fe.ttl
}

func (f *flatten) Cancel(key, cmd string, err error) {
fk := key + cmd
f.mu.Lock()
defer f.mu.Unlock()
if af := f.flights[fk]; af != nil {
delete(f.flights, fk)
af.set(RedisMessage{}, err)
}
}

func (f *flatten) Delete(keys []RedisMessage) {
f.mu.Lock()
defer f.mu.Unlock()
if keys == nil {
f.cache = make(map[string]*flatentry, len(f.cache))
f.head.next = unsafe.Pointer(f.tail)
f.tail.prev = unsafe.Pointer(f.head)
f.mark++
f.size = 0
} else {
for _, k := range keys {
if e := f.cache[k.string]; e != nil {
f.remove(e)
}
}
}
}

func (f *flatten) Close(err error) {
f.mu.Lock()
flights := f.flights
f.flights = nil
f.cache = nil
f.tail = nil
f.head = nil
f.mark++
f.mu.Unlock()
for _, entry := range flights {
entry.set(RedisMessage{}, err)
}
}

0 comments on commit b2d37ed

Please sign in to comment.