Skip to content

Commit

Permalink
fix(bloom-filter): skip applying for non-indexed columns (#5246)
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored and evenyag committed Jan 3, 2025
1 parent 5b42546 commit a22e8b4
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 31 deletions.
8 changes: 4 additions & 4 deletions src/mito2/src/cache/index/bloom_filter_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn bloom_filter_index_content_weight((k, _): &((FileId, ColumnId), PageKey), v:
pub struct CachedBloomFilterIndexBlobReader<R> {
file_id: FileId,
column_id: ColumnId,
file_size: u64,
blob_size: u64,
inner: R,
cache: BloomFilterIndexCacheRef,
}
Expand All @@ -71,14 +71,14 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
pub fn new(
file_id: FileId,
column_id: ColumnId,
file_size: u64,
blob_size: u64,
inner: R,
cache: BloomFilterIndexCacheRef,
) -> Self {
Self {
file_id,
column_id,
file_size,
blob_size,
inner,
cache,
}
Expand All @@ -92,7 +92,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
self.cache
.get_or_load(
(self.file_id, self.column_id),
self.file_size,
self.blob_size,
offset,
size,
move |ranges| async move { inner.read_vec(&ranges).await },
Expand Down
8 changes: 4 additions & 4 deletions src/mito2/src/cache/index/inverted_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@ fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 {
/// Inverted index blob reader with cache.
pub struct CachedInvertedIndexBlobReader<R> {
file_id: FileId,
file_size: u64,
blob_size: u64,
inner: R,
cache: InvertedIndexCacheRef,
}

impl<R> CachedInvertedIndexBlobReader<R> {
/// Creates a new inverted index blob reader with cache.
pub fn new(file_id: FileId, file_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
pub fn new(file_id: FileId, blob_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
Self {
file_id,
file_size,
blob_size,
inner,
cache,
}
Expand All @@ -82,7 +82,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
self.cache
.get_or_load(
self.file_id,
self.file_size,
self.blob_size,
offset,
size,
move |ranges| async move { inner.read_vec(&ranges).await },
Expand Down
74 changes: 57 additions & 17 deletions src/mito2/src/sst/index/bloom_filter/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, RegionId};

use super::INDEX_BLOB_TYPE;
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::index::bloom_filter_index::{
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader,
};
use crate::error::{
ApplyBloomFilterIndexSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, MetadataSnafu,
ApplyBloomFilterIndexSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Error, MetadataSnafu,
PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::row_converter::SortField;
use crate::sst::file::FileId;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
use crate::sst::index::codec::IndexValueCodec;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
Expand Down Expand Up @@ -118,28 +118,21 @@ impl BloomFilterIndexApplier {
.start_timer();

for (column_id, predicates) in &self.filters {
let mut blob = match self.cached_blob_reader(file_id, *column_id).await {
Ok(Some(puffin_reader)) => puffin_reader,
other => {
if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
self.remote_blob_reader(file_id, *column_id, file_size_hint)
.await?
}
let mut blob = match self
.blob_reader(file_id, *column_id, file_size_hint)
.await?
{
Some(blob) => blob,
None => continue,
};

// Create appropriate reader based on whether we have caching enabled
if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
let file_size = if let Some(file_size) = file_size_hint {
file_size
} else {
blob.metadata().await.context(MetadataSnafu)?.content_length
};
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
let reader = CachedBloomFilterIndexBlobReader::new(
file_id,
*column_id,
file_size,
blob_size,
BloomFilterReaderImpl::new(blob),
bloom_filter_cache.clone(),
);
Expand All @@ -157,6 +150,43 @@ impl BloomFilterIndexApplier {
Ok(())
}

/// Creates a blob reader from the cached or remote index file.
///
/// Returus `None` if the column does not have an index.
async fn blob_reader(
&self,
file_id: FileId,
column_id: ColumnId,
file_size_hint: Option<u64>,
) -> Result<Option<BlobReader>> {
let reader = match self.cached_blob_reader(file_id, column_id).await {
Ok(Some(puffin_reader)) => puffin_reader,
other => {
if let Err(err) = other {
// Blob not found means no index for this column
if is_blob_not_found(&err) {
return Ok(None);
}
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
let res = self
.remote_blob_reader(file_id, column_id, file_size_hint)
.await;
if let Err(err) = res {
// Blob not found means no index for this column
if is_blob_not_found(&err) {
return Ok(None);
}
return Err(err);
}

res?
}
};

Ok(Some(reader))
}

/// Creates a blob reader from the cached index file
async fn cached_blob_reader(
&self,
Expand Down Expand Up @@ -242,6 +272,16 @@ impl BloomFilterIndexApplier {
}
}

fn is_blob_not_found(err: &Error) -> bool {
matches!(
err,
Error::PuffinBuildReader {
source: puffin::error::Error::BlobNotFound { .. },
..
}
)
}

pub struct BloomFilterIndexApplierBuilder<'a> {
region_dir: String,
object_store: ObjectStore,
Expand Down
8 changes: 2 additions & 6 deletions src/mito2/src/sst/index/inverted_index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,10 @@ impl InvertedIndexApplier {
};

if let Some(index_cache) = &self.inverted_index_cache {
let file_size = if let Some(file_size) = file_size_hint {
file_size
} else {
blob.metadata().await.context(MetadataSnafu)?.content_length
};
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
let mut index_reader = CachedInvertedIndexBlobReader::new(
file_id,
file_size,
blob_size,
InvertedIndexBlobReader::new(blob),
index_cache.clone(),
);
Expand Down

0 comments on commit a22e8b4

Please sign in to comment.