From c00fb6dd77d41fb45e0c3e5a96314fc1cc8e29e7 Mon Sep 17 00:00:00 2001 From: Date Huang Date: Thu, 24 Oct 2024 20:49:28 +0800 Subject: [PATCH] WIP: cache Signed-off-by: Date Huang --- buffer_pool.cpp | 2 +- cache.hpp | 34 ++++++++++++++++++---------------- config.cpp | 1 + config.hpp | 2 ++ main.cpp | 10 +++++++--- raw_disk_io.cpp | 20 ++++++++++++-------- raw_disk_io.hpp | 4 ++-- 7 files changed, 43 insertions(+), 30 deletions(-) diff --git a/buffer_pool.cpp b/buffer_pool.cpp index f8b214b..3d30309 100644 --- a/buffer_pool.cpp +++ b/buffer_pool.cpp @@ -37,7 +37,7 @@ char *buffer_pool::allocate_buffer_impl(std::unique_lock &l) m_exceeded_max_size = true; } - char *buf = (char*)malloc(DEFAULT_BLOCK_SIZE); + char *buf = (char *)malloc(DEFAULT_BLOCK_SIZE); if (!buf) { m_exceeded_max_size = true; return nullptr; diff --git a/cache.hpp b/cache.hpp index 3a4b642..5db418c 100644 --- a/cache.hpp +++ b/cache.hpp @@ -2,6 +2,7 @@ #define __CACHE_HPP__ #include +#include #include "store_buffer.hpp" #include "buffer_pool.hpp" @@ -19,9 +20,9 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya typedef Value value_type; typedef std::list list_type; typedef std::unordered_map< - key_type, - std::pair - > map_type; + key_type, + std::pair> + map_type; lru_cache(size_t capacity) : m_capacity(capacity) @@ -35,6 +36,7 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya ~lru_cache() { + clear(); } size_t size() const @@ -61,9 +63,9 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya { std::lock_guard lock(m_cache_mutex); typename map_type::iterator i = m_map.find(key); - if(i == m_map.end()){ + if (i == m_map.end()) { // insert item into the cache, but first check if it is full - if(size() >= m_capacity){ + if (size() >= m_capacity) { // cache is full, evict the least recently used item evict(); } @@ -78,7 +80,7 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya { // lookup value in the cache typename map_type::iterator i = m_map.find(key); - if(i == m_map.end()){ + if (i == m_map.end()) { // value not in cache return boost::none; } @@ -86,7 +88,7 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya // return the value, but first update its place in the most // recently used list typename list_type::iterator j = i->second.second; - if(j != m_list.begin()){ + if (j != m_list.begin()) { // move item to the front of the most recently used list m_list.erase(j); m_list.push_front(key); @@ -98,8 +100,7 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya // return the value return value; - } - else { + } else { // the item is already at the front of the most recently // used list so just return it return i->second.first; @@ -121,9 +122,9 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya } } - char* allocate_buffer() + char *allocate_buffer() { - return (char*)malloc(DEFAULT_BLOCK_SIZE); + return (char *)malloc(DEFAULT_BLOCK_SIZE); } void free_disk_buffer(char *buf) override @@ -135,14 +136,14 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya { std::lock_guard lock(m_cache_mutex); typename map_type::iterator i = m_map.find(loc); - if(i == m_map.end()){ + if (i == m_map.end()) { // insert item into the cache, but first check if it is full - if(size() >= m_capacity){ + if (size() >= m_capacity) { // cache is full, evict the least recently used item evict(); } - char *buf = (char*)malloc(DEFAULT_BLOCK_SIZE); + char *buf = (char *)malloc(DEFAULT_BLOCK_SIZE); memcpy(buf, buf1, DEFAULT_BLOCK_SIZE); // insert the new item @@ -158,10 +159,11 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya auto it = m_map.find(loc); if (it != m_map.end()) { + SPDLOG_INFO("Hit Cache: {} {}", loc.piece, loc.offset); f(it->second.first); return true; } - + return false; } @@ -200,6 +202,6 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya std::mutex m_cache_mutex; }; -} // namesapce ezio +} // namespace ezio #endif diff --git a/config.cpp b/config.cpp index e759227..7a34b90 100644 --- a/config.cpp +++ b/config.cpp @@ -10,6 +10,7 @@ void config::parse_from_argv(int argc, char **argv) desc.add_options() ("help,h", "some help") ("file,F", bpo::bool_switch(&file_flag)->default_value(false), "read data from file rather than raw disk") + ("cache,C", bpo::value(&cache_size)->default_value(16), "cache size (in MB)") ("listen,l", bpo::value(&listen_address), "gRPC service listen address and port, default is 127.0.0.1:50051") ("version,v", "show version") ; diff --git a/config.hpp b/config.hpp index 46e8b77..0cfa30c 100644 --- a/config.hpp +++ b/config.hpp @@ -21,6 +21,8 @@ class config bool file_flag = false; // --listen address std::string listen_address = "127.0.0.1:50051"; + // cache size in MB + int cache_size = 16; }; } // namespace ezio diff --git a/main.cpp b/main.cpp index fd8b196..75165cc 100644 --- a/main.cpp +++ b/main.cpp @@ -38,14 +38,18 @@ int main(int argc, char **argv) p.set_int(lt::settings_pack::mixed_mode_algorithm, lt::settings_pack::prefer_tcp); //p.set_int(lt::settings_pack::alert_mask, lt::alert_category::peer | lt::alert_category::status); - + // tune - //p.set_int(lt::settings_pack::suggest_mode, lt::settings_pack::suggest_read_cache); + p.set_int(lt::settings_pack::suggest_mode, lt::settings_pack::suggest_read_cache); + p.set_int(lt::settings_pack::max_suggest_pieces, 1024); //p.set_int(lt::settings_pack::max_queued_disk_bytes, 128 * 1024 * 1024); lt::session_params ses_params(p); if (!current_config.file_flag) { - ses_params.disk_io_constructor = ezio::raw_disk_io_constructor; + //ses_params.disk_io_constructor = ezio::raw_disk_io_constructor; + ses_params.disk_io_constructor = [&](libtorrent::io_context &ioc, libtorrent::settings_interface const &s, libtorrent::counters &c) { + return std::make_unique(ioc, current_config.cache_size); + }; } // create session and inject to daemon. diff --git a/raw_disk_io.cpp b/raw_disk_io.cpp index f555953..2596c53 100644 --- a/raw_disk_io.cpp +++ b/raw_disk_io.cpp @@ -115,16 +115,17 @@ std::unique_ptr raw_disk_io_constructor(libtorrent:: libtorrent::settings_interface const &s, libtorrent::counters &c) { - return std::make_unique(ioc); + return std::make_unique(ioc, 16); } -raw_disk_io::raw_disk_io(libtorrent::io_context &ioc) : +raw_disk_io::raw_disk_io(libtorrent::io_context &ioc, int cache) : ioc_(ioc), read_buffer_pool_(ioc), write_buffer_pool_(ioc), read_thread_pool_(8), write_thread_pool_(8), - hash_thread_pool_(8) + hash_thread_pool_(8), + cache_(cache * 64) { } @@ -216,7 +217,7 @@ void raw_disk_io::async_read( }); ret |= 2; } - + if ((ret & 1) == 0) { bool r2 = cache_.get(loc2, [&](char const *buf1) { std::memcpy(buf + len1, buf1, std::size_t(r.length - len1)); @@ -238,9 +239,12 @@ void raw_disk_io::async_read( auto offset = (ret == 1) ? r.start : block_offset + DEFAULT_BLOCK_SIZE; auto len = (ret == 1) ? len1 : r.length - len1; auto buf_offset = (ret == 1) ? 0 : len1; - storages_[idx]->read(buf + buf_offset, r.piece, offset, len, error); - - // TODO store to cache_ + char *tmp = (char *)malloc(DEFAULT_BLOCK_SIZE); + storages_[idx]->read(tmp, r.piece, offset, DEFAULT_BLOCK_SIZE, error); + cache_.insert({idx, r.piece, offset}, tmp); + SPDLOG_INFO("Put Cache {} {}", r.piece, offset); + std::memcpy(buf + buf_offset, tmp + offset, len); + free(tmp); post(ioc_, [h = std::move(handler), b = std::move(buffer), error]() mutable { h(std::move(b), error); @@ -272,7 +276,7 @@ void raw_disk_io::async_read( libtorrent::storage_error error; storages_[idx]->read(buf, r.piece, r.start, r.length, error); - // TODO store to cache_ + cache_.insert({idx, r.piece, r.start}, buf); post(ioc_, [h = std::move(handler), b = std::move(buffer), error]() mutable { h(std::move(b), error); diff --git a/raw_disk_io.hpp b/raw_disk_io.hpp index 88a9948..17cd221 100644 --- a/raw_disk_io.hpp +++ b/raw_disk_io.hpp @@ -27,7 +27,7 @@ class raw_disk_io final : public libtorrent::disk_interface store_buffer store_buffer_; - lru_cache cache_; + lru_cache cache_; boost::asio::thread_pool read_thread_pool_; boost::asio::thread_pool write_thread_pool_; @@ -40,7 +40,7 @@ class raw_disk_io final : public libtorrent::disk_interface std::deque free_slots_; public: - raw_disk_io(libtorrent::io_context &); + raw_disk_io(libtorrent::io_context &, int); ~raw_disk_io(); // this is called when a new torrent is added. The shared_ptr can be