Skip to content

Commit

Permalink
WIP: cache: init implement
Browse files Browse the repository at this point in the history
Signed-off-by: Date Huang <[email protected]>
  • Loading branch information
tjjh89017 committed Oct 24, 2024
1 parent 46f00af commit ff85e76
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 1 deletion.
205 changes: 205 additions & 0 deletions cache.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
#ifndef __CACHE_HPP__
#define __CACHE_HPP__

#include <mutex>
#include "store_buffer.hpp"
#include "buffer_pool.hpp"

// default 16MB
#define DEFAULT_CACHE_SIZE (1ULL * 1024)

namespace ezio
{

template<class Key, class Value>
class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopyable
{
public:
typedef Key key_type;
typedef Value value_type;
typedef std::list<key_type> list_type;
typedef std::unordered_map<
key_type,
std::pair<value_type, typename list_type::iterator>
> map_type;

lru_cache(size_t capacity) :
m_capacity(capacity)
{
}

lru_cache() :
m_capacity(DEFAULT_CACHE_SIZE)
{
}

~lru_cache()
{
}

size_t size() const
{
return m_map.size();
}

size_t capacity() const
{
return m_capacity;
}

bool empty() const
{
return m_map.empty();
}

bool contains(const key_type &key)
{
return m_map.find(key) != m_map.end();
}

void insert_impl(const key_type &key, const value_type &value)
{
std::lock_guard<std::mutex> lock(m_cache_mutex);
typename map_type::iterator i = m_map.find(key);
if(i == m_map.end()){
// insert item into the cache, but first check if it is full
if(size() >= m_capacity){
// cache is full, evict the least recently used item
evict();
}

// insert the new item
m_list.push_front(key);
m_map[key] = std::make_pair(value, m_list.begin());
}
}

boost::optional<value_type> get_impl(const key_type &key)
{
// lookup value in the cache
typename map_type::iterator i = m_map.find(key);
if(i == m_map.end()){
// value not in cache
return boost::none;
}

// return the value, but first update its place in the most
// recently used list
typename list_type::iterator j = i->second.second;
if(j != m_list.begin()){
// move item to the front of the most recently used list
m_list.erase(j);
m_list.push_front(key);

// update iterator in map
j = m_list.begin();
const value_type &value = i->second.first;
m_map[key] = std::make_pair(value, j);

// return the value
return value;
}
else {
// the item is already at the front of the most recently
// used list so just return it
return i->second.first;
}
}

void clear()
{
m_map.clear();
m_list.clear();
}

void set_capacity(size_t max_capacity)
{
std::lock_guard<std::mutex> lock(m_cache_mutex);
m_capacity = max_capacity;
while (size() > m_capacity) {
evict();
}
}

char* allocate_buffer()
{
return (char*)malloc(DEFAULT_BLOCK_SIZE);
}

void free_disk_buffer(char *buf) override
{
free(buf);
}

void insert(torrent_location const loc, char const *buf1)
{
std::lock_guard<std::mutex> lock(m_cache_mutex);
typename map_type::iterator i = m_map.find(loc);
if(i == m_map.end()){
// insert item into the cache, but first check if it is full
if(size() >= m_capacity){
// cache is full, evict the least recently used item
evict();
}

char *buf = (char*)malloc(DEFAULT_BLOCK_SIZE);
memcpy(buf, buf1, DEFAULT_BLOCK_SIZE);

// insert the new item
m_list.push_front(loc);
m_map[loc] = std::make_pair(buf, m_list.begin());
}
}

template<typename Fun>
bool get(torrent_location const loc, Fun f)
{
std::lock_guard<std::mutex> lock(m_cache_mutex);

auto it = m_map.find(loc);
if (it != m_map.end()) {
f(it->second.first);
return true;
}

return false;
}

template<typename Fun>
int get2(torrent_location const loc1, torrent_location const loc2, Fun f)
{
std::lock_guard<std::mutex> lock(m_cache_mutex);
auto const it1 = m_map.find(loc1);
auto const it2 = m_map.find(loc2);
char const *buf1 = (it1 == m_map.end()) ? nullptr : it1->second.first;
char const *buf2 = (it2 == m_map.end()) ? nullptr : it2->second.first;

if (buf1 == nullptr && buf2 == nullptr) {
return 0;
}

return f(buf1, buf2);
}

private:
void evict()
{
// evict item from the end of most recently used list
typename list_type::iterator i = --m_list.end();
auto it = m_map.find(*i);
free(it->second.first);
m_map.erase(*i);
m_list.erase(i);
}

private:
map_type m_map;
list_type m_list;
size_t m_capacity;

std::mutex m_cache_mutex;
};

} // namesapce ezio

#endif
27 changes: 26 additions & 1 deletion raw_disk_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ void raw_disk_io::async_read(

BOOST_ASSERT(r.length > len1);

int const ret = store_buffer_.get2(loc1, loc2, [&](char const *buf1, char const *buf2) {
int ret = store_buffer_.get2(loc1, loc2, [&](char const *buf1, char const *buf2) {
if (buf1) {
std::memcpy(buf, buf1 + read_offset, std::size_t(len1));
}
Expand All @@ -210,6 +210,19 @@ void raw_disk_io::async_read(
return (buf1 ? 2 : 0) | (buf2 ? 1 : 0);
});

if ((ret & 2) == 0) {
bool r1 = cache_.get(loc1, [&](char const *buf1) {
std::memcpy(buf, buf1 + read_offset, std::size_t(len1));
});
ret |= 2;
}

if ((ret & 1) == 0) {
bool r2 = cache_.get(loc2, [&](char const *buf1) {
std::memcpy(buf + len1, buf1, std::size_t(r.length - len1));
});
}

if (ret == 3) {
// success get whole piece
// return immediately
Expand All @@ -227,6 +240,8 @@ void raw_disk_io::async_read(
auto buf_offset = (ret == 1) ? 0 : len1;
storages_[idx]->read(buf + buf_offset, r.piece, offset, len, error);

// TODO store to cache_

post(ioc_, [h = std::move(handler), b = std::move(buffer), error]() mutable {
h(std::move(b), error);
});
Expand All @@ -243,13 +258,22 @@ void raw_disk_io::async_read(
handler(std::move(buffer), error);
return;
}

if (cache_.get({idx, r.piece, block_offset}, [&](char const *buf1) {
std::memcpy(buf, buf1 + read_offset, std::size_t(r.length));
})) {
handler(std::move(buffer), error);
return;
}
}

boost::asio::post(read_thread_pool_,
[=, this, handler = std::move(handler), buffer = std::move(buffer)]() mutable {
libtorrent::storage_error error;
storages_[idx]->read(buf, r.piece, r.start, r.length, error);

// TODO store to cache_

post(ioc_, [h = std::move(handler), b = std::move(buffer), error]() mutable {
h(std::move(b), error);
});
Expand All @@ -266,6 +290,7 @@ bool raw_disk_io::async_write(libtorrent::storage_index_t storage, libtorrent::p
bool exceeded = false;
libtorrent::disk_buffer_holder buffer(write_buffer_pool_, write_buffer_pool_.allocate_buffer(exceeded, o), DEFAULT_BLOCK_SIZE);

cache_.insert({storage, r.piece, r.start}, buf);
if (buffer) {
// async
memcpy(buffer.data(), buf, r.length);
Expand Down
3 changes: 3 additions & 0 deletions raw_disk_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <boost/asio.hpp>
#include "buffer_pool.hpp"
#include "store_buffer.hpp"
#include "cache.hpp"

namespace ezio
{
Expand All @@ -26,6 +27,8 @@ class raw_disk_io final : public libtorrent::disk_interface

store_buffer store_buffer_;

lru_cache<torrent_location, char*> cache_;

boost::asio::thread_pool read_thread_pool_;
boost::asio::thread_pool write_thread_pool_;
boost::asio::thread_pool hash_thread_pool_;
Expand Down

0 comments on commit ff85e76

Please sign in to comment.