Skip to content

Commit

Permalink
iouring sq poll and io poll
Browse files Browse the repository at this point in the history
  • Loading branch information
lihuiba committed Dec 24, 2024
1 parent 73ec2bf commit 323f01f
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 64 deletions.
52 changes: 44 additions & 8 deletions io/fd-events.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,33 +136,69 @@ CascadingEventEngine* new_##name##_cascading_engine(); \

DECLARE_MASTER_AND_CASCADING_ENGINE(epoll);
DECLARE_MASTER_AND_CASCADING_ENGINE(select);
DECLARE_MASTER_AND_CASCADING_ENGINE(iouring);
// DECLARE_MASTER_AND_CASCADING_ENGINE(iouring);
DECLARE_MASTER_AND_CASCADING_ENGINE(kqueue);
DECLARE_MASTER_AND_CASCADING_ENGINE(epoll_ng);

inline int fd_events_init(uint64_t master_engine) {
struct iouring_args {
bool is_master = true;
bool setup_sqpoll = false;
bool setup_sq_aff = false;
bool setup_iopoll = false;
uint32_t sq_thread_cpu;
uint32_t sq_thread_idle = 1000; // by default polls for 1s
};

void* new_iouring_event_engine(iouring_args args = {});

inline MasterEventEngine*
new_iouring_master_engine(iouring_args args = {}) {
args.is_master = true;
auto e = new_iouring_event_engine(args);
return (MasterEventEngine*)e;
}

inline CascadingEventEngine*
new_iouring_cascading_engine(iouring_args args = {}) {
args.is_master = false;
auto e = new_iouring_event_engine(args);
return (CascadingEventEngine*)e;
}

template<typename...Ts> inline MasterEventEngine*
new_master_event_engine(uint64_t master_engine, const Ts&...xs) {
switch (master_engine) {
#ifdef __linux__
case INIT_EVENT_EPOLL:
return _fd_events_init(&new_epoll_master_engine);
return new_epoll_master_engine(xs...);
case INIT_EVENT_EPOLL_NG:
return _fd_events_init(&new_epoll_ng_master_engine);
return new_epoll_ng_master_engine(xs...);
#endif
case INIT_EVENT_SELECT:
return _fd_events_init(&new_select_master_engine);
return new_select_master_engine(xs...);
#ifdef PHOTON_URING
case INIT_EVENT_IOURING:
return _fd_events_init(&new_iouring_master_engine);
return new_iouring_master_engine(xs...);
#endif
#ifdef __APPLE__
case INIT_EVENT_KQUEUE:
return _fd_events_init(&new_kqueue_master_engine);
return new_kqueue_master_engine(xs...);
#endif
default:
return -1;
return nullptr;
}
}

inline int fd_events_init(MasterEventEngine* master_engine) {
return !master_engine ? -1 :
(get_vcpu()->master_event_engine = master_engine, 0);
}


inline int fd_events_init(uint64_t master_engine) {
return fd_events_init(new_master_event_engine(master_engine));
}

inline int fd_events_fini() {
reset_master_event_engine_default();
return 0;
Expand Down
99 changes: 68 additions & 31 deletions io/iouring-wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ constexpr static EventsMap<EVUnderlay<POLLIN | POLLRDHUP, POLLOUT, POLLERR>> evm

class iouringEngine : public MasterEventEngine, public CascadingEventEngine, public ResetHandle {
public:
explicit iouringEngine(bool master) : m_master(master) {}

~iouringEngine() {
LOG_INFO("Finish event engine: iouring ", VALUE(m_master));
fini();
Expand Down Expand Up @@ -75,8 +73,11 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
return 0;
}

int init() {
int init() { return init({m_master, m_setup_sqpoll, false, false}); }
int init(iouring_args args) {
int compare_result;
m_master = args.is_master;
m_setup_sqpoll = args.setup_sqpoll;
if (kernel_version_compare("5.11", compare_result) == 0 && compare_result <= 0) {
rlimit resource_limit{.rlim_cur = RLIM_INFINITY, .rlim_max = RLIM_INFINITY};
if (setrlimit(RLIMIT_MEMLOCK, &resource_limit) != 0)
Expand All @@ -90,21 +91,54 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub

m_ring = new io_uring{};
io_uring_params params{};
if (m_cooperative_task_flag == 1)
if (m_cooperative_task_flag == 1) {
params.flags = IORING_SETUP_COOP_TASKRUN;
}
if (args.setup_iopoll)
params.flags |= IORING_SETUP_IOPOLL;
if (args.setup_sqpoll) {
params.flags |= IORING_SETUP_SQPOLL;
// params.sq_thread_idle = 10;
if (args.setup_sq_aff) {
params.flags |= IORING_SETUP_SQ_AFF;
params.sq_thread_cpu = args.sq_thread_cpu;
}
}

retry:
int ret = io_uring_queue_init_params(QUEUE_DEPTH, m_ring, &params);
if (ret != 0) {
if (errno == EINVAL) {
auto& p = params;
if (p.flags & IORING_SETUP_DEFER_TASKRUN) {
p.flags &= ~IORING_SETUP_DEFER_TASKRUN;
p.flags &= ~IORING_SETUP_SINGLE_ISSUER;
LOG_INFO("io_uring_queue_init failed, removing IORING_SETUP_DEFER_TASKRUN, IORING_SETUP_SINGLE_ISSUER");
goto retry;
}
if (p.flags & IORING_SETUP_COOP_TASKRUN) {
// this seems to be conflicting with IORING_SETUP_SQPOLL,
// at least in 6.4.12-1.el8.elrepo.x86_64
p.flags &= ~IORING_SETUP_COOP_TASKRUN;
LOG_INFO("io_uring_queue_init failed, removing IORING_SETUP_COOP_TASKRUN");
goto retry;
}
if (p.flags & IORING_SETUP_CQSIZE) {
p.flags &= ~IORING_SETUP_CQSIZE;
LOG_INFO("io_uring_queue_init failed, removing IORING_SETUP_CQSIZE");
goto retry;
} }
// reset m_ring so that the destructor won't do duplicate munmap cleanup (io_uring_queue_exit)
delete m_ring;
m_ring = nullptr;
LOG_ERROR_RETURN(0, -1, "iouring: failed to init queue: ", ERRNO(-ret));
}

// Check feature supported
for (auto i : REQUIRED_FEATURES) {
if (!(params.features & i)) {
LOG_ERROR_RETURN(0, -1, "iouring: required feature not supported");
}
if (!check_required_features(params, IORING_FEAT_CUR_PERSONALITY,
IORING_FEAT_NODROP, IORING_FEAT_FAST_POLL,
IORING_FEAT_EXT_ARG, IORING_FEAT_RW_CUR_POS)) {
LOG_ERROR_RETURN(0, -1, "iouring: required feature not supported");
}

// Check opcode supported
Expand Down Expand Up @@ -162,6 +196,15 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
return 0;
}


template<typename T, typename...Ts>
bool check_required_features(const io_uring_params& params, T f, Ts...fs) {
return (params.features & f) && check_required_features(params, fs...);
}
bool check_required_features(const io_uring_params& params) {
return true;
}

/**
* @brief Get a SQE from ring, prepare IO, and wait for completion. Note all the SQEs are batch submitted
* later in the `wait_and_fire_events`.
Expand Down Expand Up @@ -198,6 +241,10 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
io_uring_sqe_set_data(sqe, &timer_ctx);
}

int ret = io_uring_submit(m_ring);
if (ret < 0)
LOG_ERROR_RETURN(-ret, -1, "iouring: fail to submit when adding interest, ", ERRNO(-ret));

SCOPED_PAUSE_WORK_STEALING;
photon::thread_sleep(-1);

Expand Down Expand Up @@ -239,7 +286,6 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
}
return 0;
}

int add_interest(Event e) override {
auto* sqe = _get_sqe();
if (sqe == nullptr)
Expand All @@ -261,9 +307,8 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
}
io_uring_sqe_set_data(sqe, &pair.first->second.io_ctx);
int ret = io_uring_submit(m_ring);
if (ret < 0) {
if (ret < 0)
LOG_ERROR_RETURN(-ret, -1, "iouring: fail to submit when adding interest, ", ERRNO(-ret));
}
return 0;
}

Expand All @@ -281,9 +326,8 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
io_uring_prep_poll_remove(sqe, (__u64) &iter->second.io_ctx);
io_uring_sqe_set_data(sqe, nullptr);
int ret = io_uring_submit(m_ring);
if (ret < 0) {
if (ret < 0)
LOG_ERROR_RETURN(-ret, -1, "iouring: fail to submit when removing interest, ", ERRNO(-ret));
}
return 0;
}

Expand Down Expand Up @@ -537,23 +581,17 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
return {sec, nsec};
}

static constexpr const uint32_t REQUIRED_FEATURES[] = {
IORING_FEAT_CUR_PERSONALITY, IORING_FEAT_NODROP,
IORING_FEAT_FAST_POLL, IORING_FEAT_EXT_ARG,
IORING_FEAT_RW_CUR_POS};
static const int QUEUE_DEPTH = 16384;
static const int REGISTER_FILES_SPARSE_FD = -1;
static const int REGISTER_FILES_MAX_NUM = 10000;
bool m_master;
bool m_master, m_setup_sqpoll;
io_uring* m_ring = nullptr;
int m_eventfd = -1;
std::unordered_map<fdInterest, eventCtx, fdInterestHasher> m_event_contexts;
static int m_register_files_flag;
static int m_cooperative_task_flag;
};

constexpr const uint32_t iouringEngine::REQUIRED_FEATURES[];

int iouringEngine::m_register_files_flag = -1;

int iouringEngine::m_cooperative_task_flag = -1;
Expand Down Expand Up @@ -664,18 +702,17 @@ int iouring_unregister_files(int fd) {
return ee->register_unregister_files(fd, false);
}

__attribute__((noinline))
static iouringEngine* new_iouring(bool is_master) {
LOG_INFO("Init event engine: iouring ", VALUE(is_master));
return NewObj<iouringEngine>(is_master) -> init();
void* new_iouring_event_engine(iouring_args args) {
LOG_INFO("Init event engine: iouring ",
make_named_value("is_master", args.is_master),
make_named_value("setup_sqpoll", args.setup_sqpoll),
make_named_value("setup_sq_aff", args.setup_sq_aff),
make_named_value("sq_thread_cpu", args.sq_thread_cpu));
auto uring = NewObj<iouringEngine>() -> init(args);
if (args.is_master) return uring;
CascadingEventEngine* c = uring;
return c;
}

MasterEventEngine* new_iouring_master_engine() {
return new_iouring(true);
}

CascadingEventEngine* new_iouring_cascading_engine() {
return new_iouring(false);
}

}
14 changes: 5 additions & 9 deletions io/test/test-iouring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ limitations under the License.

using namespace photon;

const auto IOURING_FLAGS = INIT_EVENT_IOURING | INIT_EVENT_IOURING_SQPOLL;

// Common parameters
bool stop_test = false;
uint64_t qps = 0;
Expand Down Expand Up @@ -221,7 +223,7 @@ static void do_io_test(IOTestType type) {
off_t max_offset = st_buf.st_size - max_io_size;
ASSERT_GT(max_offset, 0);

photon::WorkPool wp(FLAGS_vcpu_num, photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE);
WorkPool wp(FLAGS_vcpu_num, IOURING_FLAGS, INIT_IO_NONE);
std::vector<photon::thread*> join_threads;

#ifdef TEST_IOURING_REGISTER_FILES
Expand Down Expand Up @@ -340,14 +342,8 @@ TEST(perf, DISABLED_read) {
class event_engine : public testing::Test {
protected:
void SetUp() override {
GTEST_ASSERT_EQ(0, photon::init(photon::INIT_EVENT_DEFAULT,
photon::INIT_IO_NONE));
#ifdef PHOTON_URING
engine = (ci_ev_engine == photon::INIT_EVENT_EPOLL) ? photon::new_epoll_cascading_engine()
: photon::new_iouring_cascading_engine();
#else
engine = photon::new_default_cascading_engine();
#endif
GTEST_ASSERT_EQ(0, init(IOURING_FLAGS, INIT_IO_NONE));
engine = photon::new_epoll_cascading_engine();
}
void TearDown() override {
delete engine;
Expand Down
41 changes: 29 additions & 12 deletions photon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

#include "photon.h"
#include <photon/photon.h>
#include <inttypes.h>

#include "io/fd-events.h"
Expand Down Expand Up @@ -44,12 +44,12 @@ using namespace net;
static bool reset_handle_registed = false;
static thread_local uint64_t g_event_engine = 0, g_io_engine = 0;

#define INIT_IO(name, prefix, ...) if (INIT_IO_##name & io_engine) { if (prefix##_init(__VA_ARGS__) < 0) return -1; }
#define FINI_IO(name, prefix) if (INIT_IO_##name & g_io_engine) { prefix##_fini(); }
#define INIT_IO(name, prefix, ...) if (INIT_IO_##name & io_engine) { if (prefix##_init(__VA_ARGS__) < 0) return -1; }
#define FINI_IO(name, prefix) if (INIT_IO_##name & g_io_engine) { prefix##_fini(); }

class Shift {
uint8_t _n;
public:
uint8_t _n;
constexpr Shift(uint64_t x) : _n(__builtin_ctz(x)) { }
operator uint64_t() { return 1UL << _n; }
};
Expand All @@ -62,6 +62,27 @@ static const Shift recommended_order[] = {
INIT_EVENT_KQUEUE, INIT_EVENT_SELECT};
#endif

inline iouring_args mkargs(uint64_t flags, const PhotonOptions& opt) {
return {
.is_master = true,
.setup_sqpoll = bool(flags & INIT_EVENT_IOURING_SQPOLL),
.setup_sq_aff = bool(flags & INIT_EVENT_IOURING_SQ_AFF),
.setup_iopoll = bool(flags & INIT_EVENT_IOURING_IOPOLL),
.sq_thread_cpu = opt.iouring_sq_thread_cpu,
.sq_thread_idle = opt.iouring_sq_thread_idle,
}; }

static int init_event_engine(uint64_t engine, uint64_t flags, const PhotonOptions& opt) {
#ifdef PHOTON_URING
auto mee = (engine != INIT_EVENT_IOURING) ?
new_master_event_engine(engine) :
new_iouring_master_engine(mkargs(flags, opt));
#else
auto mee = new_master_event_engine(engine);
#endif
return fd_events_init(mee);
}

int __photon_init(uint64_t event_engine, uint64_t io_engine, const PhotonOptions& options) {
if (options.use_pooled_stack_allocator) {
use_pooled_stack_allocator();
Expand All @@ -78,18 +99,14 @@ int __photon_init(uint64_t event_engine, uint64_t io_engine, const PhotonOptions
INIT_EVENT_IOURING | INIT_EVENT_KQUEUE |
INIT_EVENT_SELECT | INIT_EVENT_IOCP;
if (event_engine & ALL_ENGINES) {
bool ok = false;
for (auto x : recommended_order) {
if ((x & event_engine) && fd_events_init(x) == 0) {
ok = true;
break;
if ((x & event_engine) && init_event_engine(x, event_engine, options) == 0) {
goto next;
}
}
if (!ok) {
LOG_ERROR_RETURN(0, -1, "All master engines init failed");
}
LOG_ERROR_RETURN(0, -1, "All master engines init failed");
}

next:
if ((INIT_EVENT_SIGNAL & event_engine) && sync_signal_init() < 0)
return -1;

Expand Down
Loading

0 comments on commit 323f01f

Please sign in to comment.