diff --git a/cache.hpp b/cache.hpp new file mode 100644 index 0000000..3a4b642 --- /dev/null +++ b/cache.hpp @@ -0,0 +1,205 @@ +#ifndef __CACHE_HPP__ +#define __CACHE_HPP__ + +#include +#include "store_buffer.hpp" +#include "buffer_pool.hpp" + +// default 16MB +#define DEFAULT_CACHE_SIZE (1ULL * 1024) + +namespace ezio +{ + +template +class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopyable +{ +public: + typedef Key key_type; + typedef Value value_type; + typedef std::list list_type; + typedef std::unordered_map< + key_type, + std::pair + > map_type; + + lru_cache(size_t capacity) : + m_capacity(capacity) + { + } + + lru_cache() : + m_capacity(DEFAULT_CACHE_SIZE) + { + } + + ~lru_cache() + { + } + + size_t size() const + { + return m_map.size(); + } + + size_t capacity() const + { + return m_capacity; + } + + bool empty() const + { + return m_map.empty(); + } + + bool contains(const key_type &key) + { + return m_map.find(key) != m_map.end(); + } + + void insert_impl(const key_type &key, const value_type &value) + { + std::lock_guard lock(m_cache_mutex); + typename map_type::iterator i = m_map.find(key); + if(i == m_map.end()){ + // insert item into the cache, but first check if it is full + if(size() >= m_capacity){ + // cache is full, evict the least recently used item + evict(); + } + + // insert the new item + m_list.push_front(key); + m_map[key] = std::make_pair(value, m_list.begin()); + } + } + + boost::optional get_impl(const key_type &key) + { + // lookup value in the cache + typename map_type::iterator i = m_map.find(key); + if(i == m_map.end()){ + // value not in cache + return boost::none; + } + + // return the value, but first update its place in the most + // recently used list + typename list_type::iterator j = i->second.second; + if(j != m_list.begin()){ + // move item to the front of the most recently used list + m_list.erase(j); + m_list.push_front(key); + + // update iterator in map + j = m_list.begin(); + const value_type &value = i->second.first; + m_map[key] = std::make_pair(value, j); + + // return the value + return value; + } + else { + // the item is already at the front of the most recently + // used list so just return it + return i->second.first; + } + } + + void clear() + { + m_map.clear(); + m_list.clear(); + } + + void set_capacity(size_t max_capacity) + { + std::lock_guard lock(m_cache_mutex); + m_capacity = max_capacity; + while (size() > m_capacity) { + evict(); + } + } + + char* allocate_buffer() + { + return (char*)malloc(DEFAULT_BLOCK_SIZE); + } + + void free_disk_buffer(char *buf) override + { + free(buf); + } + + void insert(torrent_location const loc, char const *buf1) + { + std::lock_guard lock(m_cache_mutex); + typename map_type::iterator i = m_map.find(loc); + if(i == m_map.end()){ + // insert item into the cache, but first check if it is full + if(size() >= m_capacity){ + // cache is full, evict the least recently used item + evict(); + } + + char *buf = (char*)malloc(DEFAULT_BLOCK_SIZE); + memcpy(buf, buf1, DEFAULT_BLOCK_SIZE); + + // insert the new item + m_list.push_front(loc); + m_map[loc] = std::make_pair(buf, m_list.begin()); + } + } + + template + bool get(torrent_location const loc, Fun f) + { + std::lock_guard lock(m_cache_mutex); + + auto it = m_map.find(loc); + if (it != m_map.end()) { + f(it->second.first); + return true; + } + + return false; + } + + template + int get2(torrent_location const loc1, torrent_location const loc2, Fun f) + { + std::lock_guard lock(m_cache_mutex); + auto const it1 = m_map.find(loc1); + auto const it2 = m_map.find(loc2); + char const *buf1 = (it1 == m_map.end()) ? nullptr : it1->second.first; + char const *buf2 = (it2 == m_map.end()) ? nullptr : it2->second.first; + + if (buf1 == nullptr && buf2 == nullptr) { + return 0; + } + + return f(buf1, buf2); + } + +private: + void evict() + { + // evict item from the end of most recently used list + typename list_type::iterator i = --m_list.end(); + auto it = m_map.find(*i); + free(it->second.first); + m_map.erase(*i); + m_list.erase(i); + } + +private: + map_type m_map; + list_type m_list; + size_t m_capacity; + + std::mutex m_cache_mutex; +}; + +} // namesapce ezio + +#endif diff --git a/raw_disk_io.cpp b/raw_disk_io.cpp index 4185b24..f555953 100644 --- a/raw_disk_io.cpp +++ b/raw_disk_io.cpp @@ -200,7 +200,7 @@ void raw_disk_io::async_read( BOOST_ASSERT(r.length > len1); - int const ret = store_buffer_.get2(loc1, loc2, [&](char const *buf1, char const *buf2) { + int ret = store_buffer_.get2(loc1, loc2, [&](char const *buf1, char const *buf2) { if (buf1) { std::memcpy(buf, buf1 + read_offset, std::size_t(len1)); } @@ -210,6 +210,19 @@ void raw_disk_io::async_read( return (buf1 ? 2 : 0) | (buf2 ? 1 : 0); }); + if ((ret & 2) == 0) { + bool r1 = cache_.get(loc1, [&](char const *buf1) { + std::memcpy(buf, buf1 + read_offset, std::size_t(len1)); + }); + ret |= 2; + } + + if ((ret & 1) == 0) { + bool r2 = cache_.get(loc2, [&](char const *buf1) { + std::memcpy(buf + len1, buf1, std::size_t(r.length - len1)); + }); + } + if (ret == 3) { // success get whole piece // return immediately @@ -227,6 +240,8 @@ void raw_disk_io::async_read( auto buf_offset = (ret == 1) ? 0 : len1; storages_[idx]->read(buf + buf_offset, r.piece, offset, len, error); + // TODO store to cache_ + post(ioc_, [h = std::move(handler), b = std::move(buffer), error]() mutable { h(std::move(b), error); }); @@ -243,6 +258,13 @@ void raw_disk_io::async_read( handler(std::move(buffer), error); return; } + + if (cache_.get({idx, r.piece, block_offset}, [&](char const *buf1) { + std::memcpy(buf, buf1 + read_offset, std::size_t(r.length)); + })) { + handler(std::move(buffer), error); + return; + } } boost::asio::post(read_thread_pool_, @@ -250,6 +272,8 @@ void raw_disk_io::async_read( libtorrent::storage_error error; storages_[idx]->read(buf, r.piece, r.start, r.length, error); + // TODO store to cache_ + post(ioc_, [h = std::move(handler), b = std::move(buffer), error]() mutable { h(std::move(b), error); }); @@ -266,6 +290,7 @@ bool raw_disk_io::async_write(libtorrent::storage_index_t storage, libtorrent::p bool exceeded = false; libtorrent::disk_buffer_holder buffer(write_buffer_pool_, write_buffer_pool_.allocate_buffer(exceeded, o), DEFAULT_BLOCK_SIZE); + cache_.insert({storage, r.piece, r.start}, buf); if (buffer) { // async memcpy(buffer.data(), buf, r.length); diff --git a/raw_disk_io.hpp b/raw_disk_io.hpp index ce34157..88a9948 100644 --- a/raw_disk_io.hpp +++ b/raw_disk_io.hpp @@ -8,6 +8,7 @@ #include #include "buffer_pool.hpp" #include "store_buffer.hpp" +#include "cache.hpp" namespace ezio { @@ -26,6 +27,8 @@ class raw_disk_io final : public libtorrent::disk_interface store_buffer store_buffer_; + lru_cache cache_; + boost::asio::thread_pool read_thread_pool_; boost::asio::thread_pool write_thread_pool_; boost::asio::thread_pool hash_thread_pool_;