diff --git a/.github/template/template.yml b/.github/template/template.yml index 3f3c58d..afd9e4e 100644 --- a/.github/template/template.yml +++ b/.github/template/template.yml @@ -3,9 +3,9 @@ name: on: env: - RUST_TOOLCHAIN: nightly-2022-04-09 + RUST_TOOLCHAIN: nightly-2022-06-09 CARGO_TERM_COLOR: always - CACHE_KEY_SUFFIX: 20220514 + CACHE_KEY_SUFFIX: 20220614 jobs: misc-check: diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 3c30e16..f0509f1 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -11,9 +11,9 @@ on: branches: [main] workflow_dispatch: env: - RUST_TOOLCHAIN: nightly-2022-04-09 + RUST_TOOLCHAIN: nightly-2022-06-09 CARGO_TERM_COLOR: always - CACHE_KEY_SUFFIX: 20220514 + CACHE_KEY_SUFFIX: 20220614 jobs: misc-check: name: misc check diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index d3d0419..2575a75 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -10,9 +10,9 @@ on: pull_request: branches: [main] env: - RUST_TOOLCHAIN: nightly-2022-04-09 + RUST_TOOLCHAIN: nightly-2022-06-09 CARGO_TERM_COLOR: always - CACHE_KEY_SUFFIX: 20220514 + CACHE_KEY_SUFFIX: 20220614 jobs: misc-check: name: misc check diff --git a/rust-toolchain b/rust-toolchain index 4d6f714..7e041bc 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2022-04-09 +nightly-2022-06-09 diff --git a/storage/src/file_cache/buffer.rs b/storage/src/file_cache/buffer.rs index 7b10b87..7490e34 100644 --- a/storage/src/file_cache/buffer.rs +++ b/storage/src/file_cache/buffer.rs @@ -34,6 +34,12 @@ impl } } + pub fn with_size(len: usize) -> Self { + let mut ret = Self::with_capacity(len); + ret.resize(len); + ret + } + #[inline(always)] pub fn align(&self) -> usize { ALIGN @@ -255,6 +261,16 @@ impl Drop } } +unsafe impl Send + for AlignedBuffer +{ +} + +unsafe impl Sync + for AlignedBuffer +{ +} + #[cfg(test)] mod tests { diff --git a/storage/src/file_cache/dio_file.rs b/storage/src/file_cache/dio_file.rs new file mode 100644 index 0000000..5cc129e --- /dev/null +++ b/storage/src/file_cache/dio_file.rs @@ -0,0 +1,190 @@ +use std::fs::{File, OpenOptions}; +use std::os::unix::prelude::{AsRawFd, FileExt, OpenOptionsExt}; +use std::path::Path; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use super::buffer::AlignedBuffer; +use super::error::Result; +use super::fs::LOGICAL_BLOCK_SIZE; + +// Get logical block size by ioctl(2) BLKSSZGET or shell command `blockdev --getss`, see open(2) man +// page for more details. +const SMOOTH_GROWTH_SIZE: usize = 64 * 1024 * 1024; // 64 MiB +const DEFAULT_BUFFER_SIZE: usize = 64 * 1024; // 64 KiB + +const FSTAT_BLOCK_SIZE: usize = 512; + +pub type DioBuffer = AlignedBuffer; + +/// [`DioFile`] is a wrapper of a O_DIRECT sparse file. +/// +/// Buffers of I/O requests to [`DioFile`] must be aligned to the logical block size, and buffer +/// sizes must be a multiple of `DioFileOptions.block_size`. +pub struct DioFile { + file: File, + + block_size: usize, + + cursor: AtomicUsize, +} + +impl DioFile { + /// NOTE: `block_size` must be a multiple of target file system block size. + /// + /// Hint: use `FsInfo.block_size` as `block_size`. + pub fn open(path: impl AsRef, block_size: usize, create: bool) -> Result { + assert_eq!(block_size % LOGICAL_BLOCK_SIZE, 0); + + let mut opts = OpenOptions::new(); + opts.create(create); + opts.read(true); + opts.write(true); + opts.custom_flags(libc::O_DIRECT); + + let file = opts.open(path.as_ref())?; + let cursor = AtomicUsize::new(file.metadata()?.len() as usize); + + Ok(Self { + file, + block_size, + cursor, + }) + } + + /// Append data to the cache file. + /// + /// Given `buf` must be aligned to the logical block size, and the size of `buf` must be a + /// multiple of block size. + /// + /// Returns the block idx of the written data. + /// + /// # Panics + /// + /// * Panic if given `buf` is not aligned to the logical block size or the size of `buf` is not + /// multiple of block size. + pub fn append(&self, buf: &[u8]) -> Result { + assert_eq!(buf.len() % self.block_size, 0); + let cursor = self.cursor.fetch_add(buf.len(), Ordering::SeqCst) as u64; + self.file.write_all_at(buf, cursor)?; + Ok(cursor / self.block_size as u64) + } + + /// Write data at the given `offset`. + /// + /// Written position must not exceed the file end position. + /// + /// Given `buf` must be aligned to the logical block size, and the size of `buf` must be a + /// multiple of block size. + /// + /// # Panics + /// + /// * Panic if given `buf` is not aligned to the logical block size or the size of `buf` is not + /// multiple of block size. + pub fn write_at(&self, buf: &[u8], block_offset: u64) -> Result<()> { + assert_eq!(buf.len() % self.block_size, 0); + let offset = block_offset * self.block_size as u64; + let cursor = self.cursor.load(Ordering::Acquire); + assert!( + offset as usize + buf.len() <= cursor, + "offset + len: {}, cursor: {}", + offset as usize + buf.len(), + cursor + ); + self.file.write_all_at(buf, offset)?; + Ok(()) + } + + /// Read data by blocks. + pub fn read(&self, block_offset: u64, block_len: usize) -> Result { + let offset = block_offset * self.block_size as u64; + let len = block_len * self.block_size; + let mut buf = DioBuffer::with_size(len); + self.file.read_exact_at(&mut buf[..], offset)?; + Ok(buf) + } + + /// Reclaim disk space by blocks. + pub fn reclaim(&self, block_offset: u64, block_len: usize) -> Result<()> { + let fd = self.file.as_raw_fd(); + let mode = nix::fcntl::FallocateFlags::FALLOC_FL_PUNCH_HOLE + | nix::fcntl::FallocateFlags::FALLOC_FL_KEEP_SIZE; + let offset = block_offset * self.block_size as u64; + let len = block_len * self.block_size; + nix::fcntl::fallocate(fd, mode, offset as i64, len as i64)?; + Ok(()) + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Actually occupied disk space. + pub fn len(&self) -> usize { + let fd = self.file.as_raw_fd(); + let stat = nix::sys::stat::fstat(fd).unwrap(); + stat.st_blocks as usize * FSTAT_BLOCK_SIZE + } + + /// Length of the sparse file, including the sizes of holes. + pub fn length(&self) -> usize { + self.cursor.load(Ordering::Acquire) + } + + pub fn sync_data(&self) -> Result<()> { + self.file.sync_data()?; + Ok(()) + } + + pub fn sync_all(&self) -> Result<()> { + self.file.sync_all()?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use test_log::test; + + use super::*; + use crate::file_cache::fs::fs_info; + + #[test] + fn test_dio_file() { + let dir = tempfile::tempdir().unwrap(); + + let fs_info = fs_info(dir.path()).unwrap(); + let bs = fs_info.block_size; + + let cf = + DioFile::open(dir.path().join("test-dio-file-1"), fs_info.block_size, true).unwrap(); + assert_eq!(cf.len(), 0); + + let mut buf = DioBuffer::default(); + buf.append(&vec![b'x'; bs * 10]); + buf.align_up_to(fs_info.block_size); + assert_eq!(buf.len(), bs * 10); + + cf.append(&buf[..]).unwrap(); + assert_eq!(cf.len(), bs * 10); + + assert_eq!(&cf.read(0, 2).unwrap()[..], &buf[0..2 * fs_info.block_size]); + + cf.reclaim(0, 2).unwrap(); + assert_eq!(cf.len(), bs * 8); + assert_eq!(&cf.read(0, 2).unwrap()[..], &vec![0; bs * 2]); + + let mut buf2 = DioBuffer::default(); + buf2.append(&vec![b'z'; bs * 2]); + buf2.align_up_to(fs_info.block_size); + cf.write_at(&buf2[..], 8).unwrap(); + assert_eq!(cf.len(), bs * 8); + assert_eq!(&cf.read(8, 2).unwrap()[..], &buf2[..]); + + // Test rewrite holes. + assert_eq!(&cf.read(0, 2).unwrap()[..], &vec![0; bs * 2]); + cf.write_at(&buf[0..bs * 2], 0).unwrap(); + assert_eq!(&cf.read(0, 2).unwrap()[..], &buf[0..2 * fs_info.block_size]); + assert_eq!(cf.len(), bs * 10); + } +} diff --git a/storage/src/file_cache/error.rs b/storage/src/file_cache/error.rs index 9006805..4731819 100644 --- a/storage/src/file_cache/error.rs +++ b/storage/src/file_cache/error.rs @@ -11,7 +11,7 @@ pub enum Error { #[error("magic file not found")] MagicFileNotFound, #[error("invalid version")] - InvalidVersion(u32), + InvalidVersion(u64), #[error("cache file full")] Full, #[error("unsupported fs: [super block magic: {0}]")] diff --git a/storage/src/file_cache/file.rs b/storage/src/file_cache/file.rs index 5f6e26c..9a66215 100644 --- a/storage/src/file_cache/file.rs +++ b/storage/src/file_cache/file.rs @@ -1,148 +1,333 @@ -use std::fs::{File, OpenOptions}; -use std::os::unix::prelude::{AsRawFd, FileExt, OpenOptionsExt}; -use std::path::PathBuf; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::io::Read; +use std::ops::Range; +use std::os::unix::prelude::FileExt; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use bitvec::prelude::*; +use bytes::{Buf, BufMut}; +use parking_lot::RwLock; + +use super::dio_file::{DioBuffer, DioFile}; +use super::error::{Error, Result}; +use super::fs::{FsInfo, LOGICAL_BLOCK_SIZE}; + +const MAGIC: &[u8; 16] = b"runkv-cache-file"; +const VERSION: u64 = 1; + +const META_SLOT_ALIGN_SIZE: usize = LOGICAL_BLOCK_SIZE; +const HEADER_PROBE_SIZE: usize = 16 + 8 + 8 + 8; + +/// Each [`Index`] uses 20B. +const SLOT_INDEX_SIZE: usize = 20; + +#[derive(Default, Debug)] +pub struct SlotIndex { + /// sst id + pub sst: u64, + /// sst block idx + pub idx: u32, + /// block offset after cache file header and meta + pub block_offset: u32, + /// length by bytes + pub len: u32, +} + +impl SlotIndex { + #[inline(always)] + fn aligned_len(&self) -> u32 { + Self::align_up(self.len) + } + + #[inline(always)] + fn align_up(v: u32) -> u32 { + (v + META_SLOT_ALIGN_SIZE as u32 - 1) & !(META_SLOT_ALIGN_SIZE as u32 - 1) + } +} + +struct CacheFileMeta { + /// fixed total slots count + blocks: usize, + /// block size + block_size: usize, + + /// dio buffer + buffer: DioBuffer, -use super::buffer::AlignedBuffer; -use super::error::Result; -use super::fs::FsInfo; + /// valid slots bitmap + valid: BitVec, + /// dirty blocks bitmap + dirty: BitVec, +} -// Get logical block size by ioctl(2) BLKSSZGET or shell command `blockdev --getss`, see open(2) man -// page for more details. -const LOGICAL_BLOCK_SIZE: usize = 512; -const SMOOTH_GROWTH_SIZE: usize = 64 * 1024 * 1024; // 64 MiB -const DEFAULT_BUFFER_SIZE: usize = 64 * 1024; // 64 KiB +impl CacheFileMeta { + fn with_buffer(blocks: usize, block_size: usize, buffer: DioBuffer) -> Self { + assert_eq!(buffer.len() % block_size, 0); + assert_eq!(buffer.len() / block_size, blocks); + + let slots = Self::total_slots(blocks, block_size); + + let dirty = bitvec![usize,Lsb0;0;blocks]; + let mut valid = bitvec![usize,Lsb0;0;slots]; + + for idx in 0..slots { + if (&buffer[Self::slot_index_range(blocks, block_size, idx)]).get_u64() != 0 { + valid.set(idx, true); + } + } + + Self { + blocks, + buffer, + block_size, + valid, + dirty, + } + } -const FSTAT_BLOCK_SIZE: usize = 512; + fn get(&self, idx: usize) -> Option { + if idx >= self.slots() { + panic!("out of range: [slots: {}] [given: {}]", self.slots(), idx); + } + + let buf = &mut &self.buffer[self.range(idx)]; + + let sst = buf.get_u64(); + if sst == 0 { + return None; + } + let idx = buf.get_u32(); + let block_offset = buf.get_u32(); + let len = buf.get_u32(); + Some(SlotIndex { + sst, + idx, + block_offset, + len, + }) + } -pub type CacheFileBuffer = - AlignedBuffer; + fn set(&mut self, idx: usize, index: &SlotIndex) { + if idx >= self.slots() { + panic!("out of range: [slots: {}] [given: {}]", self.slots(), idx); + } + + let range = self.range(idx); + let mut buf = &mut self.buffer[range]; + buf.put_u64(index.sst); + buf.put_u32(index.idx); + buf.put_u32(index.block_offset); + buf.put_u32(index.len); + + let block = self.block(idx); + self.valid.set(idx, index.sst != 0); + self.dirty.set(block, true); + } + + fn flush(&mut self, dfile: &DioFile) -> Result<()> { + for (block, mut dirty) in self.dirty.iter_mut().enumerate() { + if !*dirty { + continue; + } + let offset = block * self.block_size; + dfile.write_at( + &self.buffer[offset..offset + self.block_size], + (offset + /* header segment */ self.block_size) as u64, + )?; + *dirty = false; + } + Ok(()) + } + + fn is_slot_valid(&self, idx: usize) -> bool { + if idx >= self.slots() { + panic!("out of range: [slots: {}] [given: {}]", self.slots(), idx); + } + *self.valid.get(idx).unwrap() + } + + /// Return if all slots are already used. + fn is_full(&self) -> bool { + self.valid.all() + } + + fn pick_unused_slot(&self) -> Option { + self.valid.first_zero() + } + + #[inline(always)] + fn slots(&self) -> usize { + Self::total_slots(self.blocks, self.block_size) + } + + #[inline(always)] + fn range(&self, idx: usize) -> Range { + Self::slot_index_range(self.blocks, self.block_size, idx) + } + + #[inline(always)] + fn block(&self, idx: usize) -> usize { + idx / (self.block_size / SLOT_INDEX_SIZE) + } + + #[inline(always)] + fn total_slots(blocks: usize, block_size: usize) -> usize { + block_size / SLOT_INDEX_SIZE * blocks + } + + #[inline(always)] + fn slot_index_range(blocks: usize, block_size: usize, idx: usize) -> Range { + let slots = Self::total_slots(blocks, block_size); + + if idx >= slots { + panic!("out of range: [slots: {}] [given: {}]", slots, idx); + } + + let slots_per_block = block_size / SLOT_INDEX_SIZE; + let offset = idx / slots_per_block * block_size + idx % slots_per_block * SLOT_INDEX_SIZE; + offset..offset + SLOT_INDEX_SIZE + } +} pub struct CacheFileOptions { /// Cache file id. pub id: u64, /// Cache file directory. pub dir: String, - /// Cache file block size, which must be a multiple of `fs_info.block_size` and - /// `LOGICAL_BLOCK_SIZE`. + /// Block count of the meta segment. Only used when creating new cache file. + pub meta_blocks: usize, + /// Cache file block size, which must be a multiple of target file system block size + /// and `LOGICAL_BLOCK_SIZE`. pub block_size: usize, - /// File system info. - pub fs_info: FsInfo, } -/// [`CacheFile`] is a wrapper of a O_DIRECT file. -/// I/O requests to [`CacheFile`] are aligned to file system block size. pub struct CacheFile { id: u64, - - file: File, - block_size: usize, - _fs_info: FsInfo, - cursor: AtomicUsize, + meta: CacheFileMeta, + dfile: DioFile, } -impl std::fmt::Debug for CacheFile { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("CacheFile").field("id", &self.id).finish() +impl CacheFile { + pub fn create(options: CacheFileOptions) -> Result { + let path = PathBuf::from(options.dir).join(Self::filename(options.id)); + if std::fs::metadata(&path).is_ok() { + return Err(Error::Other(format!( + "cache file {} already exists: {:?}", + options.id, path + ))); + } + + let dfile = DioFile::open(&path, options.block_size, true)?; + + // Write header segment. + let mut buf = DioBuffer::with_size(options.block_size); + buf.append(MAGIC); + buf.append(&VERSION.to_be_bytes()); + buf.append(&options.meta_blocks.to_be_bytes()); + buf.append(&options.block_size.to_be_bytes()); + dfile.append(&buf[..])?; + + // Write empty meta segment. + let buf = DioBuffer::with_size(options.block_size * options.meta_blocks); + dfile.append(&buf[..])?; + + let meta = CacheFileMeta::with_buffer(options.meta_blocks, options.block_size, buf); + + Ok(Self { + id: options.id, + block_size: options.block_size, + meta, + dfile, + }) } -} -impl CacheFile { - pub fn open(options: CacheFileOptions) -> Result { - assert_eq!(options.block_size % LOGICAL_BLOCK_SIZE, 0); - assert_eq!(options.block_size % options.fs_info.block_size, 0); + pub fn open(dir: impl AsRef, id: u64) -> Result { + let path = dir.as_ref().join(Self::filename(id)); + + // Probe header. + let tempf = std::fs::File::open(&path)?; + let mut buf = vec![0; HEADER_PROBE_SIZE]; + tempf.read_exact_at(&mut buf, 0)?; + drop(tempf); + + let mut cursor = 0; + + if &buf[cursor..cursor + MAGIC.len()] != MAGIC { + return Err(Error::MagicNotMatch); + } + cursor += MAGIC.len(); + + let version = (&buf[cursor..cursor + 8]).get_u64(); + if version != VERSION { + return Err(Error::InvalidVersion(version)); + } + cursor += 8; + + let meta_blocks = (&buf[cursor..cursor + 8]).get_u64() as usize; + cursor += 8; + + let block_size = (&buf[cursor..cursor + 8]).get_u64() as usize; + cursor += 8; - let mut opts = OpenOptions::new(); - opts.create(true); - opts.read(true); - opts.write(true); - opts.custom_flags(libc::O_DIRECT); + assert_eq!(cursor, buf.len()); - let file = opts.open(PathBuf::from(options.dir).join(Self::filename(options.id)))?; - let cursor = AtomicUsize::new(file.metadata()?.len() as usize); + // FIXME: Check if cache file block size is a multiple of target file system block size. + // If not, mark it as read-only. Delete the file after all its slots are invalid. + + // Open with O_DIRECT and read meta. + let dfile = DioFile::open(&path, block_size, false)?; + let buffer = dfile.read(1, meta_blocks)?; + let meta = CacheFileMeta::with_buffer(meta_blocks, block_size, buffer); Ok(Self { - id: options.id, - file, - block_size: options.block_size, - _fs_info: options.fs_info, - cursor, + id, + block_size, + meta, + dfile, }) } - /// Append data to the cache file. - /// - /// Given `buf` must be aligned to the logical block size, and the size of `buf` must be - /// multiple of block size. - /// - /// # Panics - /// - /// * Panic if given `buf` is not aligned to the logical block size or the size of `buf` is not - /// multiple of block size. - pub fn append(&self, buf: &[u8]) -> Result<()> { - assert_eq!(buf.len() % self.block_size, 0); - let cursor = self.cursor.fetch_add(buf.len(), Ordering::SeqCst) as u64; - self.file.write_all_at(buf, cursor)?; + /// NOTE: `data` buffer must be aligned. + pub fn append(&mut self, sst: u64, idx: u32, data: &[u8]) -> Result<()> { + let slot_idx = self.meta.pick_unused_slot().ok_or(Error::Full)?; + let block_offset = self.dfile.append(data)? as u32; + let index = SlotIndex { + sst, + idx, + block_offset, + len: data.len() as u32, + }; + self.meta.set(slot_idx, &index); Ok(()) } - /// Write data at the given `offset`. - /// - /// Written position must not exceed the file end position. - /// - /// Given `buf` must be aligned to the logical block size, and the size of `buf` must be - /// multiple of block size. - /// - /// # Panics - /// - /// * Panic if given `buf` is not aligned to the logical block size or the size of `buf` is not - /// multiple of block size. - pub fn write_at(&self, buf: &[u8], block_offset: u64) -> Result<()> { - assert_eq!(buf.len() % self.block_size, 0); - let offset = block_offset * self.block_size as u64; - let cursor = self.cursor.load(Ordering::Acquire); - assert!( - offset as usize + buf.len() <= cursor, - "offset + len: {}, cursor: {}", - offset as usize + buf.len(), - cursor - ); - self.file.write_all_at(buf, offset)?; - Ok(()) + pub fn get(&self, idx: usize) -> Result> { + let index = match self.meta.get(idx) { + Some(index) => index, + None => return Ok(None), + }; + let mut buf = self.dfile.read( + index.block_offset as u64, + index.aligned_len() as usize / self.block_size, + )?; + buf.resize(index.len as usize); + Ok(Some(buf)) } - /// Read data by blocks. - pub fn read(&self, block_offset: u64, block_len: usize) -> Result { - let offset = block_offset * self.block_size as u64; - let len = block_len * self.block_size; - let mut buf = CacheFileBuffer::with_capacity(len); - buf.resize(len); - self.file.read_exact_at(&mut buf[..], offset)?; - Ok(buf) - } - - /// Reclaim disk space by blocks. - pub fn reclaim(&self, block_offset: u64, block_len: usize) -> Result<()> { - let fd = self.file.as_raw_fd(); - let mode = nix::fcntl::FallocateFlags::FALLOC_FL_PUNCH_HOLE - | nix::fcntl::FallocateFlags::FALLOC_FL_KEEP_SIZE; - let offset = block_offset * self.block_size as u64; - let len = block_len * self.block_size; - nix::fcntl::fallocate(fd, mode, offset as i64, len as i64)?; - Ok(()) + pub fn invalidate(&mut self, idx: usize) { + self.meta.set(idx, &SlotIndex::default()); } - pub fn is_empty(&self) -> bool { - self.len() == 0 + pub fn flush(&mut self) -> Result<()> { + self.meta.flush(&self.dfile)?; + Ok(()) } - /// Actually occupied disk space. + #[allow(clippy::len_without_is_empty)] pub fn len(&self) -> usize { - let fd = self.file.as_raw_fd(); - let stat = nix::sys::stat::fstat(fd).unwrap(); - stat.st_blocks as usize * FSTAT_BLOCK_SIZE + self.dfile.len() + /* header & meta */ (1 + self.meta.blocks) * self.block_size } pub fn filename(id: u64) -> String { @@ -156,49 +341,4 @@ mod tests { use test_log::test; use super::*; - use crate::file_cache::fs::fs_info; - - #[test] - fn test_cache_file() { - let dir = tempfile::tempdir().unwrap(); - - let fs_info = fs_info(dir.path()).unwrap(); - let bs = fs_info.block_size; - - let cf = CacheFile::open(CacheFileOptions { - id: 1, - dir: dir.path().to_str().unwrap().to_string(), - block_size: fs_info.block_size, - fs_info, - }) - .unwrap(); - assert_eq!(cf.len(), 0); - - let mut buf = CacheFileBuffer::default(); - buf.append(&vec![b'x'; bs * 10]); - buf.align_up_to(fs_info.block_size); - assert_eq!(buf.len(), bs * 10); - - cf.append(&buf[..]).unwrap(); - assert_eq!(cf.len(), bs * 10); - - assert_eq!(&cf.read(0, 2).unwrap()[..], &buf[0..2 * fs_info.block_size]); - - cf.reclaim(0, 2).unwrap(); - assert_eq!(cf.len(), bs * 8); - assert_eq!(&cf.read(0, 2).unwrap()[..], &vec![0; bs * 2]); - - let mut buf2 = CacheFileBuffer::default(); - buf2.append(&vec![b'z'; bs * 2]); - buf2.align_up_to(fs_info.block_size); - cf.write_at(&buf2[..], 8).unwrap(); - assert_eq!(cf.len(), bs * 8); - assert_eq!(&cf.read(8, 2).unwrap()[..], &buf2[..]); - - // Test rewrite holes. - assert_eq!(&cf.read(0, 2).unwrap()[..], &vec![0; bs * 2]); - cf.write_at(&buf[0..bs * 2], 0).unwrap(); - assert_eq!(&cf.read(0, 2).unwrap()[..], &buf[0..2 * fs_info.block_size]); - assert_eq!(cf.len(), bs * 10); - } } diff --git a/storage/src/file_cache/file_old.rs b/storage/src/file_cache/file_old.rs deleted file mode 100644 index bc40952..0000000 --- a/storage/src/file_cache/file_old.rs +++ /dev/null @@ -1,642 +0,0 @@ -use std::collections::BTreeMap; -use std::os::unix::prelude::{FileExt, OpenOptionsExt}; -use std::path::Path; - -use bitvec::prelude::*; -use bitvec::slice::BitSlice; -use bytes::{Buf, BufMut}; -use rangemap::RangeSet; - -use super::buffer::AlignedBuffer; -use super::error::{Error, Result}; - -const MAGIC: &[u8] = b"runkv"; -const VERSION: u32 = 1; - -// TODO: Get logical block size by ioctl(2) BLKSSZGET, see open(2) for details. -const LOGICAL_BLOCK_SIZE: usize = 512; -const SMOOTH_GROWTH_SIZE: usize = 64 * 1024 * 1024; // 64 MiB -const DEFAULT_BUFFER_SIZE: usize = 128 * 1024; // 128 KiB - -/// Each [`CacheFileSlot`] uses 24B. -const SLOT_META_SIZE: usize = 20; -const SLOT_METAS_PER_LOGICAL_BLOCK: usize = LOGICAL_BLOCK_SIZE / SLOT_META_SIZE; - -pub type CacheFileBuffer = - AlignedBuffer; - -#[derive(PartialEq, Eq, Debug)] -struct CacheFileSlot { - /// sst id - sst: u64, - /// sst block index - idx: u32, - /// Logical block offset after cache file header and cache file meta. - block_offset: u32, - /// length by bytes - len: u32, -} - -impl CacheFileSlot { - #[inline(always)] - fn aligned_len(&self) -> u32 { - Self::align_up(self.len) - } - - #[inline(always)] - fn block_len(&self) -> u32 { - self.aligned_len() / LOGICAL_BLOCK_SIZE as u32 - } - - #[inline(always)] - fn align_up(v: u32) -> u32 { - (v + LOGICAL_BLOCK_SIZE as u32 - 1) & !(LOGICAL_BLOCK_SIZE as u32 - 1) - } - - #[inline(always)] - fn align_down(v: u32) -> u32 { - v & !(LOGICAL_BLOCK_SIZE as u32 - 1) - } -} - -struct CacheFileMeta { - slots: u64, - buffer: CacheFileBuffer, - valid: BitVec, - dirty: BitVec, -} - -impl CacheFileMeta { - fn with_buffer(slots: u64, buffer: CacheFileBuffer) -> Self { - let blocks = Self::blocks(slots); - let capacity = blocks * LOGICAL_BLOCK_SIZE; - - assert_eq!(buffer.capacity(), capacity); - assert_eq!(buffer.len(), capacity); - - let dirty = bitvec![usize, Lsb0; 0; Self::blocks(slots)]; - let mut valid = bitvec![usize, Lsb0; 0; Self::blocks(slots)]; - - for (slot, cursor) in (0..buffer.len()).step_by(SLOT_META_SIZE).enumerate() { - if (&buffer[cursor..cursor + 8]).get_u64() != 0 { - valid.set(slot, true); - } - } - - Self { - slots, - buffer, - valid, - dirty, - } - } - - fn slots(&self) -> u64 { - self.slots - } - - fn is_slot_valid(&self, slot: u64) -> bool { - if slot >= self.slots { - panic!("out of range: [slots: {}] [given: {}]", self.slots, slot); - } - *self.valid.get(slot as usize).unwrap() - } - - /// Return used slot count. - fn used_slots(&self) -> u64 { - self.valid.count_ones() as u64 - } - - /// Return unused slot count. - fn unused_slots(&self) -> u64 { - self.valid.count_zeros() as u64 - } - - /// Return if all slots are already used. - fn is_full(&self) -> bool { - self.valid.all() - } - - fn is_used(&self, slot: u64) -> bool { - if slot >= self.slots { - panic!("out of range: [slots: {}] [given: {}]", self.slots, slot); - } - let offset = Self::range(slot).start; - let buf = &mut &self.buffer[offset..offset + 8]; - buf.get_u64() != 0 - } - - fn first_unused_slot(&self) -> Option { - self.valid.first_zero().map(|slot| slot as u64) - } - - fn get(&self, slot: u64) -> CacheFileSlot { - if slot >= self.slots { - panic!("out of range: [slots: {}] [given: {}]", self.slots, slot); - } - let buf = &mut &self.buffer[Self::range(slot)]; - let sst = buf.get_u64(); - let idx = buf.get_u32(); - let offset = buf.get_u32(); - let len = buf.get_u32(); - CacheFileSlot { - sst, - idx, - block_offset: offset, - len, - } - } - - fn set(&mut self, slot: u64, data: &CacheFileSlot) { - if slot >= self.slots { - panic!("out of range: [slots: {}] [given: {}]", self.slots, slot); - } - let mut buf = &mut self.buffer[Self::range(slot)]; - buf.put_u64(data.sst); - buf.put_u32(data.idx); - buf.put_u32(data.block_offset); - buf.put_u32(data.len); - if data.sst != 0 { - self.valid.set(slot as usize, true); - } - self.dirty.set(Self::block(slot), true); - } - - fn flush(&mut self, file: &std::fs::File) -> Result<()> { - let mut flushed = -1isize; - for (block, mut dirty) in self.dirty.iter_mut().enumerate() { - if *dirty { - if block as isize != flushed { - let offset = block * LOGICAL_BLOCK_SIZE; - file.write_all_at( - &self.buffer[offset..offset + LOGICAL_BLOCK_SIZE], - offset as u64, - )?; - flushed = block as isize; - } - *dirty = false; - } - } - Ok(()) - } - - #[inline(always)] - fn blocks(slots: u64) -> usize { - (slots as usize + SLOT_METAS_PER_LOGICAL_BLOCK - 1) / SLOT_METAS_PER_LOGICAL_BLOCK - } - - #[inline(always)] - fn range(slot: u64) -> core::ops::Range { - let offset = slot as usize / SLOT_METAS_PER_LOGICAL_BLOCK * LOGICAL_BLOCK_SIZE - + slot as usize % SLOT_METAS_PER_LOGICAL_BLOCK * SLOT_META_SIZE; - offset..offset + SLOT_META_SIZE - } - - #[inline(always)] - fn block(slot: u64) -> usize { - slot as usize / SLOT_METAS_PER_LOGICAL_BLOCK - } -} - -pub struct CacheFile { - file: std::fs::File, - meta: CacheFileMeta, - - /// total slots - slots: u64, - /// cache data offset - offset: u64, - /// total cache data blocks - blocks: usize, - /// unused block ranges - fragmentations: RangeSet, - /// { (sst id, block idx) -> slot } - mapping: BTreeMap<(u64, u32), u64>, -} - -impl CacheFile { - pub fn create(dir: impl AsRef, id: u64, slots: u64) -> Result { - let path = dir.as_ref().join(format!("cache-{:08}", id)); - let mut options = std::fs::OpenOptions::new(); - options.create(true); - options.read(true); - options.write(true); - options.custom_flags(libc::O_DIRECT); - let file = options.open(path)?; - - Self::write_header(&file, slots)?; - - let meta_blocks = CacheFileMeta::blocks(slots); - let meta_buffer = { - let capacity = meta_blocks * LOGICAL_BLOCK_SIZE; - let mut buffer = CacheFileBuffer::with_capacity(capacity); - buffer.resize(capacity); - buffer - }; - file.write_all_at(&meta_buffer[..], LOGICAL_BLOCK_SIZE as u64)?; - let meta = CacheFileMeta::with_buffer(slots, meta_buffer); - - let offset = ((meta_blocks + 1) * LOGICAL_BLOCK_SIZE) as u64; - let fragmentations = RangeSet::new(); - let mapping = BTreeMap::default(); - - Ok(Self { - slots, - file, - meta, - offset, - blocks: 0, - fragmentations, - mapping, - }) - } - - pub fn open(dir: impl AsRef, id: u64) -> Result { - let path = dir.as_ref().join(format!("cache-{:08}", id)); - let mut options = std::fs::OpenOptions::new(); - options.read(true); - options.write(true); - options.custom_flags(libc::O_DIRECT); - let file = options.open(path)?; - - let slots = Self::read_header(&file)?; - - let meta_blocks = CacheFileMeta::blocks(slots); - let mut meta_buffer = { - let capacity = meta_blocks * LOGICAL_BLOCK_SIZE; - let mut buffer = CacheFileBuffer::with_capacity(capacity); - buffer.resize(capacity); - buffer - }; - file.read_exact_at(&mut meta_buffer[..], LOGICAL_BLOCK_SIZE as u64)?; - let meta = CacheFileMeta::with_buffer(slots, meta_buffer); - - let offset = ((meta_blocks + 1) * LOGICAL_BLOCK_SIZE) as u64; - let blocks = (file.metadata()?.len() as usize + LOGICAL_BLOCK_SIZE - 1) - / LOGICAL_BLOCK_SIZE - - meta_blocks - - 1; - - let mut mapping = BTreeMap::default(); - let mut fragmentations = RangeSet::new(); - if blocks > 0 { - fragmentations.insert(0..blocks); - } - for slot in 0..slots { - let s = meta.get(slot); - if s.sst != 0 { - fragmentations.remove( - s.block_offset as usize..s.block_offset as usize + s.block_len() as usize, - ); - } - mapping.insert((s.sst, s.idx), slot); - } - - Ok(Self { - slots, - file, - meta, - offset, - blocks, - fragmentations, - mapping, - }) - } - - pub fn insert(&mut self, sst: u64, idx: u32, buffer: &CacheFileBuffer) -> Result<()> { - // TODO: Use first-fit policy here, optimize this later based on the real workload. - - let slot = self.meta.first_unused_slot().ok_or(Error::Full)?; - let mut offset = None; - let len = buffer.alignments(); - - // Find a fragmentation that fits the given buffer. - for gap in self.fragmentations.gaps(&(0..self.blocks)) { - if gap.end - gap.start >= len { - offset = Some(gap.start); - self.fragmentations.remove(gap.start..gap.start + len); - break; - } - } - - // Append to the end of the file if there is no fit fragmentations. - if offset.is_none() { - offset = Some(self.blocks); - self.blocks += len; - } - - let slot_meta = CacheFileSlot { - sst, - idx, - block_offset: offset.unwrap() as u32, - len: buffer.len() as u32, - }; - self.meta.set(slot, &slot_meta); - self.write_at(buffer, slot_meta.block_offset)?; - self.mapping.insert((sst, idx), slot); - - Ok(()) - } - - pub fn invalidate(&mut self) -> Result<()> { - todo!() - } - - #[inline(always)] - pub fn slots(&self) -> u64 { - self.slots - } - - pub fn flush_meta(&mut self) -> Result<()> { - self.meta.flush(&self.file) - } - - pub fn sync_data(&self) -> Result<()> { - self.file.sync_data()?; - Ok(()) - } - - pub fn sync_all(&self) -> Result<()> { - self.file.sync_all()?; - Ok(()) - } -} - -impl CacheFile { - fn write_header(file: &std::fs::File, slots: u64) -> Result<()> { - let mut buf = CacheFileBuffer::with_capacity(LOGICAL_BLOCK_SIZE); - buf.resize(LOGICAL_BLOCK_SIZE); - - let mut cursor = 0; - - (&mut buf[cursor..MAGIC.len()]).put_slice(MAGIC); - cursor += MAGIC.len(); - - (&mut buf[cursor..cursor + 4]).put_u32(VERSION); - cursor += 4; - - (&mut buf[cursor..cursor + 8]).put_u64(slots); - cursor += 8; - - assert!( - cursor < LOGICAL_BLOCK_SIZE, - "cursor={}, logical block size={}", - cursor, - LOGICAL_BLOCK_SIZE - ); - - file.write_all_at(&buf[..], 0)?; - - Ok(()) - } - - fn read_header(file: &std::fs::File) -> Result { - let mut buf = CacheFileBuffer::with_capacity(LOGICAL_BLOCK_SIZE); - buf.resize(LOGICAL_BLOCK_SIZE); - file.read_exact_at(&mut buf[..], 0)?; - - let mut cursor = 0; - - if MAGIC != &buf[cursor..cursor + MAGIC.len()] { - return Err(Error::MagicNotMatch); - } - cursor += MAGIC.len(); - - let version = (&buf[cursor..cursor + 4]).get_u32(); - if version != VERSION { - return Err(Error::InvalidVersion(version)); - } - cursor += 4; - - let slots = (&buf[cursor..cursor + 8]).get_u64(); - - Ok(slots) - } - - /// Write data from [`buffer`] at the given block (header and meta blocks excluded). - fn write_at(&self, buffer: &CacheFileBuffer, block_at: u32) -> Result<()> { - self.file.write_all_at( - &buffer[..], - self.offset + block_at as u64 * LOGICAL_BLOCK_SIZE as u64, - )?; - Ok(()) - } - - /// Read data from file at the given block and length, returns a new [`CacheFileBuffer`]. - fn read_at_to_buffer(&self, block_at: u32, block_len: u32) -> Result { - let capacity = block_len as usize * LOGICAL_BLOCK_SIZE; - let mut buffer = CacheFileBuffer::with_capacity(capacity); - buffer.resize(capacity); - debug_assert_eq!(buffer.capacity(), capacity); - debug_assert_eq!(buffer.len(), capacity); - self.file.read_exact_at( - &mut buffer[..], - self.offset + block_at as u64 * LOGICAL_BLOCK_SIZE as u64, - )?; - Ok(buffer) - } -} - -#[cfg(test)] -mod tests { - - use std::io::{IoSlice, Seek, SeekFrom, Write}; - use std::os::unix::prelude::{AsRawFd, FileExt}; - use std::time::Instant; - - use nix::sys::uio::pwrite; - use nix::unistd::write; - use test_log::test; - - use super::*; - - fn new_meta_buffer(slots: u64) -> CacheFileBuffer { - let capacity = CacheFileMeta::blocks(slots) * LOGICAL_BLOCK_SIZE; - let mut buffer = CacheFileBuffer::with_capacity(capacity); - buffer.resize(capacity); - buffer - } - - #[test] - fn test_slot_meta_ranges() { - let meta = CacheFileMeta::with_buffer(4096, new_meta_buffer(4096)); - let mut range = 0..0; - for i in 0..4096 { - let r = CacheFileMeta::range(i); - if i > 0 { - assert!( - r.start >= range.end, - "r.start={}, range.end={}", - r.start, - range.end - ); - } - assert_eq!( - r.start / LOGICAL_BLOCK_SIZE, - (r.end - 1) / LOGICAL_BLOCK_SIZE - ); - assert_eq!(r.start / LOGICAL_BLOCK_SIZE, CacheFileMeta::block(i)); - range = r; - } - } - - #[test] - fn test_cache_file_meta() { - let tempdir = tempfile::tempdir().unwrap(); - let mut options = std::fs::OpenOptions::new(); - options.create(true); - options.read(true); - options.write(true); - options.custom_flags(libc::O_DIRECT); - let file = options.open(tempdir.path().join("test-file-001")).unwrap(); - - let mut meta0 = CacheFileMeta::with_buffer(4096, new_meta_buffer(4096)); - file.write_all_at(&meta0.buffer[..], 0).unwrap(); - - for slot in 0..meta0.slots() { - assert_eq!( - meta0.get(slot), - CacheFileSlot { - sst: 0, - idx: 0, - block_offset: 0, - len: 0 - } - ); - } - meta0.flush(&file).unwrap(); - file.sync_all().unwrap(); - - let data = CacheFileSlot { - sst: 1, - idx: 1, - block_offset: 1, - len: 1, - }; - meta0.set(0, &data); - assert_eq!(meta0.get(0), data); - - meta0.flush(&file).unwrap(); - file.sync_all().unwrap(); - let mut buffer1 = new_meta_buffer(4096); - file.read_exact_at(&mut buffer1[..], 0).unwrap(); - let meta1 = CacheFileMeta::with_buffer(4096, buffer1); - assert_eq!(meta1.get(0), data); - assert!(meta1.valid.get(0).unwrap()); - } - - #[test] - fn test_cache_file() { - let tempdir = tempfile::tempdir().unwrap(); - - let cf = CacheFile::create(tempdir.path(), 1, 4096).unwrap(); - cf.sync_all().unwrap(); - drop(cf); - - let cf = CacheFile::open(tempdir.path(), 1).unwrap(); - assert_eq!(cf.slots(), 4096); - } - - // TODO: REMOVE ME!! - #[test] - fn test_dio() { - let tempdir = tempfile::tempdir().unwrap(); - let mut options = std::fs::OpenOptions::new(); - options.create(true); - options.read(true); - options.write(true); - options.custom_flags(libc::O_DIRECT); - let file = options.open(tempdir.path().join("test-file-001")).unwrap(); - - let mut buf = CacheFileBuffer::default(); - buf.append(&[b'x'; 4096]); - file.write_all_at(&buf[0..4096], 0).unwrap(); - - buf.write_at(&[b'a'; 512], 0); - nix::sys::uio::pwritev( - file.as_raw_fd(), - &[ - IoSlice::new(&buf[0..512]), - IoSlice::new(&buf[0..512]), - IoSlice::new(&buf[0..512]), - IoSlice::new(&buf[0..512]), - ], - 0, - ) - .unwrap(); - - let mut read = CacheFileBuffer::default(); - read.resize(2048); - file.read_exact_at(&mut read[0..2048], 0).unwrap(); - assert_eq!(read[0..2048], [b'a'; 2048]); - } - - #[test] - fn test_dio_drop_buffer() { - const SIZE: usize = 1024 * 1024 * 1024; - - let tempdir = tempfile::tempdir().unwrap(); - - // Buffered I/O - - let mut options = std::fs::OpenOptions::new(); - options.create(true); - options.read(true); - options.write(true); - let file = options.open(tempdir.path().join("test-file-001")).unwrap(); - - let mut buf = CacheFileBuffer::with_capacity(SIZE); - buf.append(&[b'x'; SIZE]); - - let start = Instant::now(); - file.write_all_at(&buf[..], 0).unwrap(); - println!("write: {:?}", start.elapsed()); - drop(buf); - file.sync_data().unwrap(); - println!("sync: {:?}", start.elapsed()); - file.sync_data().unwrap(); - println!("sync: {:?}", start.elapsed()); - - // Direct I/O - - let mut options = std::fs::OpenOptions::new(); - options.create(true); - options.read(true); - options.write(true); - options.custom_flags(libc::O_DIRECT); - let file = options.open(tempdir.path().join("test-file-002")).unwrap(); - - let mut buf = CacheFileBuffer::with_capacity(SIZE); - buf.append(&[b'x'; SIZE]); - - let start = Instant::now(); - file.write_all_at(&buf[..], 0).unwrap(); - println!("write: {:?}", start.elapsed()); - drop(buf); - file.sync_data().unwrap(); - println!("sync: {:?}", start.elapsed()); - file.sync_data().unwrap(); - println!("sync: {:?}", start.elapsed()); - - // Sync Direct I/O - - let mut options = std::fs::OpenOptions::new(); - options.create(true); - options.read(true); - options.write(true); - options.custom_flags(libc::O_DIRECT | libc::O_SYNC); - let file = options.open(tempdir.path().join("test-file-003")).unwrap(); - - let mut buf = CacheFileBuffer::with_capacity(SIZE); - buf.append(&[b'x'; SIZE]); - - let start = Instant::now(); - file.write_all_at(&buf[..], 0).unwrap(); - println!("write: {:?}", start.elapsed()); - drop(buf); - file.sync_data().unwrap(); - println!("sync: {:?}", start.elapsed()); - file.sync_data().unwrap(); - println!("sync: {:?}", start.elapsed()); - } -} diff --git a/storage/src/file_cache/fs.rs b/storage/src/file_cache/fs.rs index d01ea35..f76090b 100644 --- a/storage/src/file_cache/fs.rs +++ b/storage/src/file_cache/fs.rs @@ -2,6 +2,8 @@ use std::path::Path; use super::error::{Error, Result}; +pub const LOGICAL_BLOCK_SIZE: usize = 512; + #[derive(Clone, Copy, Debug)] pub enum FsType { Ext4, diff --git a/storage/src/file_cache/judge.rs b/storage/src/file_cache/judge.rs index 0b77d5b..26f9c37 100644 --- a/storage/src/file_cache/judge.rs +++ b/storage/src/file_cache/judge.rs @@ -15,6 +15,9 @@ pub struct DefaultJudge {} impl Judge for DefaultJudge { fn judge(&self, _stats: &Stats, _level: u64, _sst: u64, _block_idx: u32, _len: u32) -> bool { - todo!() + // TODO: IMPL ME!!! + // TODO: IMPL ME!!! + // TODO: IMPL ME!!! + true } } diff --git a/storage/src/file_cache/manager.rs b/storage/src/file_cache/manager.rs index 7d73457..fecf095 100644 --- a/storage/src/file_cache/manager.rs +++ b/storage/src/file_cache/manager.rs @@ -17,15 +17,13 @@ struct IndexKey { struct Index { /// cache file id cache_file_id: u64, - /// logical block offset after header and meta segment - logical_block_offset: u32, - /// length in bytes - len: u32, + /// slot idx + slot_idx: u64, } pub struct FileCacheOptions { pub node: u64, - pub path: String, + pub dir: String, pub capacity: usize, pub judge: J, } @@ -33,7 +31,7 @@ pub struct FileCacheOptions { #[derive(Clone)] pub struct FileCacheManager { node: u64, - path: String, + dir: String, capacity: usize, _judge: J, @@ -50,19 +48,19 @@ impl std::fmt::Debug for FileCacheManager { impl FileCacheManager { pub async fn open(options: FileCacheOptions) -> Result { - let exists = tokio::fs::metadata(&options.path).await.is_ok(); - let magic_file_path = PathBuf::from(&options.path).join(MAGIC_FILENAME); + let exists = tokio::fs::metadata(&options.dir).await.is_ok(); + let magic_file_path = PathBuf::from(&options.dir).join(MAGIC_FILENAME); let indices = if exists { if tokio::fs::metadata(&magic_file_path).await.is_err() { return Err(Error::MagicFileNotFound); } ShardedHashMap::new(64) } else { - tokio::fs::create_dir_all(&options.path).await?; + tokio::fs::create_dir_all(&options.dir).await?; tokio::fs::File::create(&magic_file_path).await?; let indices = ShardedHashMap::new(64); - let mut r = tokio::fs::read_dir(&options.path).await?; + let mut r = tokio::fs::read_dir(&options.dir).await?; while let Some(entry) = r.next_entry().await? { let raw_filename = entry.file_name(); let _id: u64 = match raw_filename @@ -83,7 +81,7 @@ impl FileCacheManager { Ok(Self { node: options.node, - path: options.path, + dir: options.dir, capacity: options.capacity, _judge: options.judge, _indices: indices, diff --git a/storage/src/file_cache/mod.rs b/storage/src/file_cache/mod.rs index 940bed5..839832d 100644 --- a/storage/src/file_cache/mod.rs +++ b/storage/src/file_cache/mod.rs @@ -1,10 +1,10 @@ pub mod buffer; +pub mod dio_file; pub mod error; -pub mod file; #[allow(dead_code)] #[allow(unused_imports)] #[allow(unused_variables)] -pub mod file_old; +pub mod file; pub mod fs; pub mod judge; #[allow(dead_code)] diff --git a/storage/src/lsm_tree/iterator/merge_iterator.rs b/storage/src/lsm_tree/iterator/merge_iterator.rs index 89d062b..d9d5021 100644 --- a/storage/src/lsm_tree/iterator/merge_iterator.rs +++ b/storage/src/lsm_tree/iterator/merge_iterator.rs @@ -7,7 +7,7 @@ use async_trait::async_trait; use super::{BoxedIterator, Iterator, Seek}; use crate::Result; -#[derive(PartialEq, Debug)] +#[derive(PartialEq, Eq, Debug)] enum Direction { Forward, Backward, diff --git a/storage/src/utils/bloom.rs b/storage/src/utils/bloom.rs index 565df3b..a896508 100644 --- a/storage/src/utils/bloom.rs +++ b/storage/src/utils/bloom.rs @@ -13,7 +13,7 @@ pub trait BitSliceMut { fn set_bit(&mut self, idx: usize, val: bool); } -impl<'a, T: AsRef<[u8]>> BitSlice for T { +impl> BitSlice for T { fn get_bit(&self, idx: usize) -> bool { let pos = idx / 8; let offset = idx % 8; @@ -25,7 +25,7 @@ impl<'a, T: AsRef<[u8]>> BitSlice for T { } } -impl<'a, T: AsMut<[u8]>> BitSliceMut for T { +impl> BitSliceMut for T { fn set_bit(&mut self, idx: usize, val: bool) { let pos = idx / 8; let offset = idx % 8; diff --git a/wheel/src/components/fsm.rs b/wheel/src/components/fsm.rs index 63d42e5..06cc206 100644 --- a/wheel/src/components/fsm.rs +++ b/wheel/src/components/fsm.rs @@ -310,6 +310,7 @@ impl Fsm for ObjectLsmTreeFsm { None => { #[cfg(feature = "deadlock")] tracing::info!("{} load enter", self.raft_node); + #[allow(clippy::let_and_return)] let index = self.load_index(AVAILABLE_INDEX_KEY).await?; #[cfg(feature = "deadlock")] tracing::info!("{} load exit", self.raft_node); diff --git a/wheel/src/components/lsm_tree.rs b/wheel/src/components/lsm_tree.rs index 888fa69..3765cd8 100644 --- a/wheel/src/components/lsm_tree.rs +++ b/wheel/src/components/lsm_tree.rs @@ -110,14 +110,11 @@ impl ObjectStoreLsmTreeCore { } // Pick overlap ssts. - let levels = { - let levels = self - .version_manager - .pick_overlap_ssts_by_key(0..self.version_manager.levels(), key) - .await - .unwrap(); - levels - }; + let levels = self + .version_manager + .pick_overlap_ssts_by_key(0..self.version_manager.levels(), key) + .await + .unwrap(); trace!("find key {:?} in ssts:\n{:?}", key, levels);