Skip to content

Commit

Permalink
Transparent Item movements
Browse files Browse the repository at this point in the history
The new algorithm relies on the moving bit and does not require external
synchronization. Data movement happens transparently for the client: if
the client thread attempts to get a handle for the item being moved it
will get a handle with wait context to wait till the movement is
completed.
  • Loading branch information
vinser52 committed Aug 18, 2023
1 parent b828c2b commit ab70b7d
Show file tree
Hide file tree
Showing 14 changed files with 627 additions and 357 deletions.
709 changes: 404 additions & 305 deletions cachelib/allocator/CacheAllocator-inl.h

Large diffs are not rendered by default.

155 changes: 138 additions & 17 deletions cachelib/allocator/CacheAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -1349,7 +1349,7 @@ class CacheAllocator : public CacheBase {

private:
// wrapper around Item's refcount and active handle tracking
FOLLY_ALWAYS_INLINE bool incRef(Item& it);
FOLLY_ALWAYS_INLINE RefcountWithFlags::incResult incRef(Item& it);
FOLLY_ALWAYS_INLINE RefcountWithFlags::Value decRef(Item& it);

// drops the refcount and if needed, frees the allocation back to the memory
Expand Down Expand Up @@ -1473,26 +1473,13 @@ class CacheAllocator : public CacheBase {
// The parent handle parameter here is mainly used to find the
// correct pool to allocate memory for this chained item
//
// @param parent handle to the cache item
// @param parent the parent item
// @param size the size for the chained allocation
//
// @return handle to the chained allocation
// @throw std::invalid_argument if the size requested is invalid or
// if the item is invalid
WriteHandle allocateChainedItemInternal(const ReadHandle& parent,
uint32_t size);

// Given an item and its parentKey, validate that the parentKey
// corresponds to an item that's the parent of the supplied item.
//
// @param item item that we want to get the parent handle for
// @param parentKey key of the item's parent
//
// @return handle to the parent item if the validations pass
// otherwise, an empty Handle is returned.
//
ReadHandle validateAndGetParentHandleForChainedMoveLocked(
const ChainedItem& item, const Key& parentKey);
WriteHandle allocateChainedItemInternal(const Item& parent, uint32_t size);

// Given an existing item, allocate a new one for the
// existing one to later be moved into.
Expand Down Expand Up @@ -1609,7 +1596,7 @@ class CacheAllocator : public CacheBase {
// @param newParent the new parent for the chain
//
// @throw if any of the conditions for parent or newParent are not met.
void transferChainLocked(WriteHandle& parent, WriteHandle& newParent);
void transferChainLocked(Item& parent, WriteHandle& newParent);

// replace a chained item in the existing chain. This needs to be called
// with the chained item lock held exclusive
Expand All @@ -1623,6 +1610,24 @@ class CacheAllocator : public CacheBase {
WriteHandle newItemHdl,
const Item& parent);

//
// Performs the actual inplace replace - it is called from
// moveChainedItem and replaceChainedItemLocked
// must hold chainedItemLock
//
// @param oldItem the item we are replacing in the chain
// @param newItem the item we are replacing it with
// @param parent the parent for the chain
// @param fromMove used to determine if the replaced was called from
// moveChainedItem - we avoid the handle destructor
// in this case.
//
// @return handle to the oldItem
void replaceInChainLocked(Item& oldItem,
WriteHandle& newItemHdl,
const Item& parent,
bool fromMove);

// Insert an item into MM container. The caller must hold a valid handle for
// the item.
//
Expand Down Expand Up @@ -1731,6 +1736,19 @@ class CacheAllocator : public CacheBase {

using EvictionIterator = typename MMContainer::LockedIterator;

// Wakes up waiters if there are any
//
// @param item wakes waiters that are waiting on that item
// @param handle handle to pass to the waiters
void wakeUpWaiters(Item& item, WriteHandle handle);

// Unmarks item as moving and wakes up any waiters waiting on that item
//
// @param item wakes waiters that are waiting on that item
// @param handle handle to pass to the waiters
typename RefcountWithFlags::Value unmarkMovingAndWakeUpWaiters(
Item& item, WriteHandle handle);

// Deserializer CacheAllocatorMetadata and verify the version
//
// @param deserializer Deserializer object
Expand Down Expand Up @@ -1844,6 +1862,11 @@ class CacheAllocator : public CacheBase {
Item& item,
util::Throttler& throttler);

// Helper function to create PutToken
//
// @return valid token if the item should be written to NVM cache.
typename NvmCacheT::PutToken createPutToken(Item& item);

// Helper function to evict a normal item for slab release
//
// @return last handle for corresponding to item on success. empty handle on
Expand Down Expand Up @@ -2082,6 +2105,88 @@ class CacheAllocator : public CacheBase {

// BEGIN private members

bool tryGetHandleWithWaitContextForMovingItem(Item& item,
WriteHandle& handle);

size_t wakeUpWaitersLocked(folly::StringPiece key, WriteHandle&& handle);

class MoveCtx {
public:
MoveCtx() {}

~MoveCtx() {
// prevent any further enqueue to waiters
// Note: we don't need to hold locks since no one can enqueue
// after this point.
wakeUpWaiters();
}

// record the item handle. Upon destruction we will wake up the waiters
// and pass a clone of the handle to the callBack. By default we pass
// a null handle
void setItemHandle(WriteHandle _it) { it = std::move(_it); }

// enqueue a waiter into the waiter list
// @param waiter WaitContext
void addWaiter(std::shared_ptr<WaitContext<ReadHandle>> waiter) {
XDCHECK(waiter);
waiters.push_back(std::move(waiter));
}

size_t numWaiters() const { return waiters.size(); }

private:
// notify all pending waiters that are waiting for the fetch.
void wakeUpWaiters() {
bool refcountOverflowed = false;
for (auto& w : waiters) {
// If refcount overflowed earlier, then we will return miss to
// all subsequent waiters.
if (refcountOverflowed) {
w->set(WriteHandle{});
continue;
}

try {
w->set(it.clone());
} catch (const exception::RefcountOverflow&) {
// We'll return a miss to the user's pending read,
// so we should enqueue a delete via NvmCache.
// TODO: cache.remove(it);
refcountOverflowed = true;
}
}
}

WriteHandle it; // will be set when Context is being filled
std::vector<std::shared_ptr<WaitContext<ReadHandle>>> waiters; // list of
// waiters
};
using MoveMap =
folly::F14ValueMap<folly::StringPiece,
std::unique_ptr<MoveCtx>,
folly::HeterogeneousAccessHash<folly::StringPiece>>;

static size_t getShardForKey(folly::StringPiece key) {
return folly::Hash()(key) % kShards;
}

MoveMap& getMoveMapForShard(size_t shard) {
return movesMap_[shard].movesMap_;
}

MoveMap& getMoveMap(folly::StringPiece key) {
return getMoveMapForShard(getShardForKey(key));
}

std::unique_lock<std::mutex> getMoveLockForShard(size_t shard) {
return std::unique_lock<std::mutex>(moveLock_[shard].moveLock_);
}

std::unique_lock<std::mutex> getMoveLock(folly::StringPiece key) {
return getMoveLockForShard(getShardForKey(key));
}

// Whether the memory allocator for this cache allocator was created on shared
// memory. The hash table, chained item hash table etc is also created on
// shared memory except for temporary shared memory mode when they're created
Expand Down Expand Up @@ -2175,6 +2280,22 @@ class CacheAllocator : public CacheBase {
// poolResizer_, poolOptimizer_, memMonitor_, reaper_
mutable std::mutex workersMutex_;

static constexpr size_t kShards = 8192; // TODO: need to define right value

struct MovesMapShard {
alignas(folly::hardware_destructive_interference_size) MoveMap movesMap_;
};

struct MoveLock {
alignas(folly::hardware_destructive_interference_size) std::mutex moveLock_;
};

// a map of all pending moves
std::vector<MovesMapShard> movesMap_;

// a map of move locks for each shard
std::vector<MoveLock> moveLock_;

// time when the ram cache was first created
const uint32_t cacheCreationTime_{0};

Expand Down
2 changes: 1 addition & 1 deletion cachelib/allocator/CacheItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class CACHELIB_PACKED_ATTR CacheItem {
//
// @return true on success, failure if item is marked as exclusive
// @throw exception::RefcountOverflow on ref count overflow
FOLLY_ALWAYS_INLINE bool incRef() {
FOLLY_ALWAYS_INLINE RefcountWithFlags::incResult incRef() {
try {
return ref_.incRef();
} catch (exception::RefcountOverflow& e) {
Expand Down
6 changes: 6 additions & 0 deletions cachelib/allocator/MM2Q-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ void MM2Q::Container<T, HookPtr>::withEvictionIterator(F&& fun) {
}
}

template <typename T, MM2Q::Hook<T> T::*HookPtr>
template <typename F>
void MM2Q::Container<T, HookPtr>::withContainerLock(F&& fun) {
lruMutex_->lock_combine([this, &fun]() { fun(); });
}

template <typename T, MM2Q::Hook<T> T::*HookPtr>
void MM2Q::Container<T, HookPtr>::removeLocked(T& node,
bool doRebalance) noexcept {
Expand Down
4 changes: 4 additions & 0 deletions cachelib/allocator/MM2Q.h
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,10 @@ class MM2Q {
template <typename F>
void withEvictionIterator(F&& f);

// Execute provided function under container lock.
template <typename F>
void withContainerLock(F&& f);

// get the current config as a copy
Config getConfig() const;

Expand Down
6 changes: 6 additions & 0 deletions cachelib/allocator/MMLru-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ void MMLru::Container<T, HookPtr>::withEvictionIterator(F&& fun) {
}
}

template <typename T, MMLru::Hook<T> T::*HookPtr>
template <typename F>
void MMLru::Container<T, HookPtr>::withContainerLock(F&& fun) {
lruMutex_->lock_combine([this, &fun]() { fun(); });
}

template <typename T, MMLru::Hook<T> T::*HookPtr>
void MMLru::Container<T, HookPtr>::ensureNotInsertionPoint(T& node) noexcept {
// If we are removing the insertion point node, grow tail before we remove
Expand Down
4 changes: 4 additions & 0 deletions cachelib/allocator/MMLru.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ class MMLru {
template <typename F>
void withEvictionIterator(F&& f);

// Execute provided function under container lock.
template <typename F>
void withContainerLock(F&& f);

// get copy of current config
Config getConfig() const;

Expand Down
7 changes: 7 additions & 0 deletions cachelib/allocator/MMTinyLFU-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ void MMTinyLFU::Container<T, HookPtr>::withEvictionIterator(F&& fun) {
fun(getEvictionIterator());
}

template <typename T, MMTinyLFU::Hook<T> T::*HookPtr>
template <typename F>
void MMTinyLFU::Container<T, HookPtr>::withContainerLock(F&& fun) {
LockHolder l(lruMutex_);
fun();
}

template <typename T, MMTinyLFU::Hook<T> T::*HookPtr>
void MMTinyLFU::Container<T, HookPtr>::removeLocked(T& node) noexcept {
if (isTiny(node)) {
Expand Down
4 changes: 4 additions & 0 deletions cachelib/allocator/MMTinyLFU.h
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,10 @@ class MMTinyLFU {
template <typename F>
void withEvictionIterator(F&& f);

// Execute provided function under container lock.
template <typename F>
void withContainerLock(F&& f);

// for saving the state of the lru
//
// precondition: serialization must happen without any reader or writer
Expand Down
29 changes: 21 additions & 8 deletions cachelib/allocator/Refcount.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,30 +130,35 @@ class FOLLY_PACK_ATTR RefcountWithFlags {
RefcountWithFlags& operator=(const RefcountWithFlags&) = delete;
RefcountWithFlags(RefcountWithFlags&&) = delete;
RefcountWithFlags& operator=(RefcountWithFlags&&) = delete;

enum incResult { incOk, incFailedMoving, incFailedEviction };
// Bumps up the reference count only if the new count will be strictly less
// than or equal to the maxCount and the item is not exclusive
// @return true if refcount is bumped. false otherwise (if item is exclusive)
// @throw exception::RefcountOverflow if new count would be greater than
// maxCount
FOLLY_ALWAYS_INLINE bool incRef() {
auto predicate = [](const Value curValue) {
FOLLY_ALWAYS_INLINE incResult incRef() {
incResult res = incOk;
auto predicate = [&res](const Value curValue) {
Value bitMask = getAdminRef<kExclusive>();

const bool exlusiveBitIsSet = curValue & bitMask;
if (UNLIKELY((curValue & kAccessRefMask) == (kAccessRefMask))) {
throw exception::RefcountOverflow("Refcount maxed out.");
} else if (exlusiveBitIsSet) {
res = (curValue & kAccessRefMask) == 0 ? incFailedEviction
: incFailedMoving;
return false;
}

// Check if the item is not marked for eviction
return !exlusiveBitIsSet || ((curValue & kAccessRefMask) != 0);
res = incOk;
return true;
};

auto newValue = [](const Value curValue) {
return (curValue + static_cast<Value>(1));
};

return atomicUpdateValue(predicate, newValue);
atomicUpdateValue(predicate, newValue);
return res;
}

// Bumps down the reference count
Expand Down Expand Up @@ -322,11 +327,19 @@ class FOLLY_PACK_ATTR RefcountWithFlags {
bool markMoving() {
Value linkedBitMask = getAdminRef<kLinked>();
Value exclusiveBitMask = getAdminRef<kExclusive>();
Value isChainedItemFlag = getFlag<kIsChainedItem>();

auto predicate = [linkedBitMask, exclusiveBitMask](const Value curValue) {
auto predicate = [linkedBitMask, exclusiveBitMask,
isChainedItemFlag](const Value curValue) {
const bool unlinked = !(curValue & linkedBitMask);
const bool alreadyExclusive = curValue & exclusiveBitMask;
const bool isChained = curValue & isChainedItemFlag;

// chained item can have ref count == 1, this just means it's linked in
// the chain
if ((curValue & kAccessRefMask) > isChained ? 1 : 0) {
return false;
}
if (unlinked || alreadyExclusive) {
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions cachelib/allocator/tests/AllocatorTypeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ TYPED_TEST(BaseAllocatorTest, AddChainedItemMultiThreadWithMovingAndSync) {
this->testAddChainedItemMultithreadWithMovingAndSync();
}

TYPED_TEST(BaseAllocatorTest, TransferChainWhileMoving) {
this->testTransferChainWhileMoving();
TYPED_TEST(BaseAllocatorTest, TransferChainAfterMoving) {
this->testTransferChainAfterMoving();
}

TYPED_TEST(BaseAllocatorTest, AddAndPopChainedItemMultithread) {
Expand Down
Loading

0 comments on commit ab70b7d

Please sign in to comment.