Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename IO backends to support Linux platforms without io_uring #628

Merged
merged 3 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,20 @@ name = "limbo_core"
path = "lib.rs"

[features]
default = ["fs", "json", "uuid"]
default = ["fs", "json", "uuid", "io_uring"]
fs = []
json = [
"dep:jsonb",
"dep:pest",
"dep:pest_derive",
]
uuid = ["dep:uuid"]
io_uring = ["dep:io-uring"]

[target.'cfg(target_os = "linux")'.dependencies]
io-uring = "0.6.1"
io-uring = { version = "0.6.1", optional = true }

[target.'cfg(target_os = "macos")'.dependencies]
[target.'cfg(target_family = "unix")'.dependencies]
polling = "3.7.2"
rustix = "0.38.34"

Expand Down
6 changes: 3 additions & 3 deletions core/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ pub enum LimboError {
EnvVarError(#[from] std::env::VarError),
#[error("I/O error: {0}")]
IOError(#[from] std::io::Error),
#[cfg(target_os = "linux")]
#[cfg(all(target_os = "linux", feature = "io_uring"))]
#[error("I/O error: {0}")]
LinuxIOError(String),
UringIOError(String),
#[error("Locking error: {0}")]
LockingError(String),
#[cfg(target_os = "macos")]
#[cfg(target_family = "unix")]
#[error("I/O error: {0}")]
RustixIOError(#[from] rustix::io::Errno),
#[error("Parse error: {0}")]
Expand Down
40 changes: 20 additions & 20 deletions core/io/linux.rs → core/io/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ const MAX_IOVECS: usize = 128;
const SQPOLL_IDLE: u32 = 1000;

#[derive(Debug, Error)]
enum LinuxIOError {
enum UringIOError {
IOUringCQError(i32),
}

// Implement the Display trait to customize error messages
impl fmt::Display for LinuxIOError {
impl fmt::Display for UringIOError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
LinuxIOError::IOUringCQError(code) => write!(
UringIOError::IOUringCQError(code) => write!(
f,
"IOUring completion queue error occurred with code {}",
code
Expand All @@ -31,8 +31,8 @@ impl fmt::Display for LinuxIOError {
}
}

pub struct LinuxIO {
inner: Rc<RefCell<InnerLinuxIO>>,
pub struct UringIO {
inner: Rc<RefCell<InnerUringIO>>,
}

struct WrappedIOUring {
Expand All @@ -42,13 +42,13 @@ struct WrappedIOUring {
key: u64,
}

struct InnerLinuxIO {
struct InnerUringIO {
ring: WrappedIOUring,
iovecs: [iovec; MAX_IOVECS],
next_iovec: usize,
}

impl LinuxIO {
impl UringIO {
pub fn new() -> Result<Self> {
let ring = match io_uring::IoUring::builder()
.setup_sqpoll(SQPOLL_IDLE)
Expand All @@ -57,7 +57,7 @@ impl LinuxIO {
Ok(ring) => ring,
Err(_) => io_uring::IoUring::new(MAX_IOVECS as u32)?,
};
let inner = InnerLinuxIO {
let inner = InnerUringIO {
ring: WrappedIOUring {
ring,
pending_ops: 0,
Expand All @@ -76,7 +76,7 @@ impl LinuxIO {
}
}

impl InnerLinuxIO {
impl InnerUringIO {
pub fn get_iovec(&mut self, buf: *const u8, len: usize) -> &iovec {
let iovec = &mut self.iovecs[self.next_iovec];
iovec.iov_base = buf as *mut std::ffi::c_void;
Expand Down Expand Up @@ -125,7 +125,7 @@ impl WrappedIOUring {
}
}

impl IO for LinuxIO {
impl IO for UringIO {
fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result<Rc<dyn File>> {
trace!("open_file(path = {})", path);
let file = std::fs::File::options()
Expand All @@ -142,14 +142,14 @@ impl IO for LinuxIO {
Err(error) => debug!("Error {error:?} returned when setting O_DIRECT flag to read file. The performance of the system may be affected"),
};
}
let linux_file = Rc::new(LinuxFile {
let uring_file = Rc::new(UringFile {
io: self.inner.clone(),
file,
});
if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() {
linux_file.lock_file(true)?;
uring_file.lock_file(true)?;
}
Ok(linux_file)
Ok(uring_file)
}

fn run_once(&self) -> Result<()> {
Expand All @@ -165,9 +165,9 @@ impl IO for LinuxIO {
while let Some(cqe) = ring.get_completion() {
let result = cqe.result();
if result < 0 {
return Err(LimboError::LinuxIOError(format!(
return Err(LimboError::UringIOError(format!(
"{} cqe: {:?}",
LinuxIOError::IOUringCQError(result),
UringIOError::IOUringCQError(result),
cqe
)));
}
Expand All @@ -191,12 +191,12 @@ impl IO for LinuxIO {
}
}

pub struct LinuxFile {
io: Rc<RefCell<InnerLinuxIO>>,
pub struct UringFile {
io: Rc<RefCell<InnerUringIO>>,
file: std::fs::File,
}

impl File for LinuxFile {
impl File for UringFile {
fn lock_file(&self, exclusive: bool) -> Result<()> {
let fd = self.file.as_raw_fd();
let flock = flock {
Expand Down Expand Up @@ -306,7 +306,7 @@ impl File for LinuxFile {
}
}

impl Drop for LinuxFile {
impl Drop for UringFile {
fn drop(&mut self) {
self.unlock_file().expect("Failed to unlock file");
}
Expand All @@ -319,6 +319,6 @@ mod tests {

#[test]
fn test_multiple_processes_cannot_open_file() {
common::tests::test_multiple_processes_cannot_open_file(LinuxIO::new);
common::tests::test_multiple_processes_cannot_open_file(UringIO::new);
}
}
12 changes: 6 additions & 6 deletions core/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,14 @@ impl Buffer {
}

cfg_block! {
#[cfg(target_os = "linux")] {
mod linux;
pub use linux::LinuxIO as PlatformIO;
#[cfg(all(target_os = "linux", feature = "io_uring"))] {
mod io_uring;
pub use io_uring::UringIO as PlatformIO;
}

#[cfg(target_os = "macos")] {
mod darwin;
pub use darwin::DarwinIO as PlatformIO;
#[cfg(any(all(target_os = "linux",not(feature = "io_uring")), target_os = "macos"))] {
mod unix;
pub use unix::UnixIO as PlatformIO;
}

#[cfg(target_os = "windows")] {
Expand Down
20 changes: 10 additions & 10 deletions core/io/darwin.rs → core/io/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ use std::collections::HashMap;
use std::io::{Read, Seek, Write};
use std::rc::Rc;

pub struct DarwinIO {
pub struct UnixIO {
poller: Rc<RefCell<Poller>>,
events: Rc<RefCell<Events>>,
callbacks: Rc<RefCell<HashMap<usize, CompletionCallback>>>,
}

impl DarwinIO {
impl UnixIO {
pub fn new() -> Result<Self> {
Ok(Self {
poller: Rc::new(RefCell::new(Poller::new()?)),
Expand All @@ -30,7 +30,7 @@ impl DarwinIO {
}
}

impl IO for DarwinIO {
impl IO for UnixIO {
fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result<Rc<dyn File>> {
trace!("open_file(path = {})", path);
let file = std::fs::File::options()
Expand All @@ -40,15 +40,15 @@ impl IO for DarwinIO {
.create(matches!(flags, OpenFlags::Create))
.open(path)?;

let darwin_file = Rc::new(DarwinFile {
let unix_file = Rc::new(UnixFile {
file: Rc::new(RefCell::new(file)),
poller: self.poller.clone(),
callbacks: self.callbacks.clone(),
});
if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() {
darwin_file.lock_file(true)?;
unix_file.lock_file(true)?;
}
Ok(darwin_file)
Ok(unix_file)
}

fn run_once(&self) -> Result<()> {
Expand Down Expand Up @@ -127,13 +127,13 @@ enum CompletionCallback {
),
}

pub struct DarwinFile {
pub struct UnixFile {
file: Rc<RefCell<std::fs::File>>,
poller: Rc<RefCell<polling::Poller>>,
callbacks: Rc<RefCell<HashMap<usize, CompletionCallback>>>,
}

impl File for DarwinFile {
impl File for UnixFile {
fn lock_file(&self, exclusive: bool) -> Result<()> {
let fd = self.file.borrow().as_raw_fd();
let flock = flock {
Expand Down Expand Up @@ -279,7 +279,7 @@ impl File for DarwinFile {
}
}

impl Drop for DarwinFile {
impl Drop for UnixFile {
fn drop(&mut self) {
self.unlock_file().expect("Failed to unlock file");
}
Expand All @@ -291,6 +291,6 @@ mod tests {

#[test]
fn test_multiple_processes_cannot_open_file() {
common::tests::test_multiple_processes_cannot_open_file(DarwinIO::new);
common::tests::test_multiple_processes_cannot_open_file(UnixIO::new);
}
}
Loading