Skip to content

Commit

Permalink
WIP: cache
Browse files Browse the repository at this point in the history
Signed-off-by: Date Huang <[email protected]>
  • Loading branch information
tjjh89017 committed Oct 24, 2024
1 parent ff85e76 commit c00fb6d
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 30 deletions.
2 changes: 1 addition & 1 deletion buffer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ char *buffer_pool::allocate_buffer_impl(std::unique_lock<std::mutex> &l)
m_exceeded_max_size = true;
}

char *buf = (char*)malloc(DEFAULT_BLOCK_SIZE);
char *buf = (char *)malloc(DEFAULT_BLOCK_SIZE);
if (!buf) {
m_exceeded_max_size = true;
return nullptr;
Expand Down
34 changes: 18 additions & 16 deletions cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define __CACHE_HPP__

#include <mutex>
#include <spdlog/spdlog.h>
#include "store_buffer.hpp"
#include "buffer_pool.hpp"

Expand All @@ -19,9 +20,9 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya
typedef Value value_type;
typedef std::list<key_type> list_type;
typedef std::unordered_map<
key_type,
std::pair<value_type, typename list_type::iterator>
> map_type;
key_type,
std::pair<value_type, typename list_type::iterator>>
map_type;

lru_cache(size_t capacity) :
m_capacity(capacity)
Expand All @@ -35,6 +36,7 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya

~lru_cache()
{
clear();
}

size_t size() const
Expand All @@ -61,9 +63,9 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya
{
std::lock_guard<std::mutex> lock(m_cache_mutex);
typename map_type::iterator i = m_map.find(key);
if(i == m_map.end()){
if (i == m_map.end()) {
// insert item into the cache, but first check if it is full
if(size() >= m_capacity){
if (size() >= m_capacity) {
// cache is full, evict the least recently used item
evict();
}
Expand All @@ -78,15 +80,15 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya
{
// lookup value in the cache
typename map_type::iterator i = m_map.find(key);
if(i == m_map.end()){
if (i == m_map.end()) {
// value not in cache
return boost::none;
}

// return the value, but first update its place in the most
// recently used list
typename list_type::iterator j = i->second.second;
if(j != m_list.begin()){
if (j != m_list.begin()) {
// move item to the front of the most recently used list
m_list.erase(j);
m_list.push_front(key);
Expand All @@ -98,8 +100,7 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya

// return the value
return value;
}
else {
} else {
// the item is already at the front of the most recently
// used list so just return it
return i->second.first;
Expand All @@ -121,9 +122,9 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya
}
}

char* allocate_buffer()
char *allocate_buffer()
{
return (char*)malloc(DEFAULT_BLOCK_SIZE);
return (char *)malloc(DEFAULT_BLOCK_SIZE);
}

void free_disk_buffer(char *buf) override
Expand All @@ -135,14 +136,14 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya
{
std::lock_guard<std::mutex> lock(m_cache_mutex);
typename map_type::iterator i = m_map.find(loc);
if(i == m_map.end()){
if (i == m_map.end()) {
// insert item into the cache, but first check if it is full
if(size() >= m_capacity){
if (size() >= m_capacity) {
// cache is full, evict the least recently used item
evict();
}

char *buf = (char*)malloc(DEFAULT_BLOCK_SIZE);
char *buf = (char *)malloc(DEFAULT_BLOCK_SIZE);
memcpy(buf, buf1, DEFAULT_BLOCK_SIZE);

// insert the new item
Expand All @@ -158,10 +159,11 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya

auto it = m_map.find(loc);
if (it != m_map.end()) {
SPDLOG_INFO("Hit Cache: {} {}", loc.piece, loc.offset);
f(it->second.first);
return true;
}

return false;
}

Expand Down Expand Up @@ -200,6 +202,6 @@ class lru_cache : public libtorrent::buffer_allocator_interface, boost::noncopya
std::mutex m_cache_mutex;
};

} // namesapce ezio
} // namespace ezio

#endif
1 change: 1 addition & 0 deletions config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ void config::parse_from_argv(int argc, char **argv)
desc.add_options()
("help,h", "some help")
("file,F", bpo::bool_switch(&file_flag)->default_value(false), "read data from file rather than raw disk")
("cache,C", bpo::value<int>(&cache_size)->default_value(16), "cache size (in MB)")
("listen,l", bpo::value<std::string>(&listen_address), "gRPC service listen address and port, default is 127.0.0.1:50051")
("version,v", "show version")
;
Expand Down
2 changes: 2 additions & 0 deletions config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class config
bool file_flag = false;
// --listen address
std::string listen_address = "127.0.0.1:50051";
// cache size in MB
int cache_size = 16;
};

} // namespace ezio
Expand Down
10 changes: 7 additions & 3 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@ int main(int argc, char **argv)
p.set_int(lt::settings_pack::mixed_mode_algorithm, lt::settings_pack::prefer_tcp);

//p.set_int(lt::settings_pack::alert_mask, lt::alert_category::peer | lt::alert_category::status);

// tune
//p.set_int(lt::settings_pack::suggest_mode, lt::settings_pack::suggest_read_cache);
p.set_int(lt::settings_pack::suggest_mode, lt::settings_pack::suggest_read_cache);
p.set_int(lt::settings_pack::max_suggest_pieces, 1024);
//p.set_int(lt::settings_pack::max_queued_disk_bytes, 128 * 1024 * 1024);

lt::session_params ses_params(p);
if (!current_config.file_flag) {
ses_params.disk_io_constructor = ezio::raw_disk_io_constructor;
//ses_params.disk_io_constructor = ezio::raw_disk_io_constructor;
ses_params.disk_io_constructor = [&](libtorrent::io_context &ioc, libtorrent::settings_interface const &s, libtorrent::counters &c) {
return std::make_unique<ezio::raw_disk_io>(ioc, current_config.cache_size);
};
}

// create session and inject to daemon.
Expand Down
20 changes: 12 additions & 8 deletions raw_disk_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,17 @@ std::unique_ptr<libtorrent::disk_interface> raw_disk_io_constructor(libtorrent::
libtorrent::settings_interface const &s,
libtorrent::counters &c)
{
return std::make_unique<raw_disk_io>(ioc);
return std::make_unique<raw_disk_io>(ioc, 16);
}

raw_disk_io::raw_disk_io(libtorrent::io_context &ioc) :
raw_disk_io::raw_disk_io(libtorrent::io_context &ioc, int cache) :
ioc_(ioc),
read_buffer_pool_(ioc),
write_buffer_pool_(ioc),
read_thread_pool_(8),
write_thread_pool_(8),
hash_thread_pool_(8)
hash_thread_pool_(8),
cache_(cache * 64)
{
}

Expand Down Expand Up @@ -216,7 +217,7 @@ void raw_disk_io::async_read(
});
ret |= 2;
}

if ((ret & 1) == 0) {
bool r2 = cache_.get(loc2, [&](char const *buf1) {
std::memcpy(buf + len1, buf1, std::size_t(r.length - len1));
Expand All @@ -238,9 +239,12 @@ void raw_disk_io::async_read(
auto offset = (ret == 1) ? r.start : block_offset + DEFAULT_BLOCK_SIZE;
auto len = (ret == 1) ? len1 : r.length - len1;
auto buf_offset = (ret == 1) ? 0 : len1;
storages_[idx]->read(buf + buf_offset, r.piece, offset, len, error);

// TODO store to cache_
char *tmp = (char *)malloc(DEFAULT_BLOCK_SIZE);
storages_[idx]->read(tmp, r.piece, offset, DEFAULT_BLOCK_SIZE, error);
cache_.insert({idx, r.piece, offset}, tmp);
SPDLOG_INFO("Put Cache {} {}", r.piece, offset);
std::memcpy(buf + buf_offset, tmp + offset, len);
free(tmp);

post(ioc_, [h = std::move(handler), b = std::move(buffer), error]() mutable {
h(std::move(b), error);
Expand Down Expand Up @@ -272,7 +276,7 @@ void raw_disk_io::async_read(
libtorrent::storage_error error;
storages_[idx]->read(buf, r.piece, r.start, r.length, error);

// TODO store to cache_
cache_.insert({idx, r.piece, r.start}, buf);

post(ioc_, [h = std::move(handler), b = std::move(buffer), error]() mutable {
h(std::move(b), error);
Expand Down
4 changes: 2 additions & 2 deletions raw_disk_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class raw_disk_io final : public libtorrent::disk_interface

store_buffer store_buffer_;

lru_cache<torrent_location, char*> cache_;
lru_cache<torrent_location, char *> cache_;

boost::asio::thread_pool read_thread_pool_;
boost::asio::thread_pool write_thread_pool_;
Expand All @@ -40,7 +40,7 @@ class raw_disk_io final : public libtorrent::disk_interface
std::deque<libtorrent::storage_index_t> free_slots_;

public:
raw_disk_io(libtorrent::io_context &);
raw_disk_io(libtorrent::io_context &, int);
~raw_disk_io();

// this is called when a new torrent is added. The shared_ptr can be
Expand Down

0 comments on commit c00fb6d

Please sign in to comment.