Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bloom-filter): skip applying for non-indexed columns #5246

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/mito2/src/cache/index/bloom_filter_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn bloom_filter_index_content_weight((k, _): &((FileId, ColumnId), PageKey), v:
pub struct CachedBloomFilterIndexBlobReader<R> {
file_id: FileId,
column_id: ColumnId,
file_size: u64,
blob_size: u64,
inner: R,
cache: BloomFilterIndexCacheRef,
}
Expand All @@ -71,14 +71,14 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
pub fn new(
file_id: FileId,
column_id: ColumnId,
file_size: u64,
blob_size: u64,
inner: R,
cache: BloomFilterIndexCacheRef,
) -> Self {
Self {
file_id,
column_id,
file_size,
blob_size,
inner,
cache,
}
Expand All @@ -92,7 +92,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
self.cache
.get_or_load(
(self.file_id, self.column_id),
self.file_size,
self.blob_size,
offset,
size,
move |ranges| async move { inner.read_vec(&ranges).await },
Expand Down
8 changes: 4 additions & 4 deletions src/mito2/src/cache/index/inverted_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@ fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 {
/// Inverted index blob reader with cache.
pub struct CachedInvertedIndexBlobReader<R> {
file_id: FileId,
file_size: u64,
blob_size: u64,
inner: R,
cache: InvertedIndexCacheRef,
}

impl<R> CachedInvertedIndexBlobReader<R> {
/// Creates a new inverted index blob reader with cache.
pub fn new(file_id: FileId, file_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
pub fn new(file_id: FileId, blob_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
Self {
file_id,
file_size,
blob_size,
inner,
cache,
}
Expand All @@ -82,7 +82,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
self.cache
.get_or_load(
self.file_id,
self.file_size,
self.blob_size,
offset,
size,
move |ranges| async move { inner.read_vec(&ranges).await },
Expand Down
74 changes: 57 additions & 17 deletions src/mito2/src/sst/index/bloom_filter/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, RegionId};

use super::INDEX_BLOB_TYPE;
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::index::bloom_filter_index::{
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader,
};
use crate::error::{
ApplyBloomFilterIndexSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, MetadataSnafu,
ApplyBloomFilterIndexSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Error, MetadataSnafu,
PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::row_converter::SortField;
use crate::sst::file::FileId;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
use crate::sst::index::codec::IndexValueCodec;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
Expand Down Expand Up @@ -118,28 +118,21 @@ impl BloomFilterIndexApplier {
.start_timer();

for (column_id, predicates) in &self.filters {
let mut blob = match self.cached_blob_reader(file_id, *column_id).await {
Ok(Some(puffin_reader)) => puffin_reader,
other => {
if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
self.remote_blob_reader(file_id, *column_id, file_size_hint)
.await?
}
let mut blob = match self
.blob_reader(file_id, *column_id, file_size_hint)
.await?
{
Some(blob) => blob,
None => continue,
};

// Create appropriate reader based on whether we have caching enabled
if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
let file_size = if let Some(file_size) = file_size_hint {
file_size
} else {
blob.metadata().await.context(MetadataSnafu)?.content_length
};
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
let reader = CachedBloomFilterIndexBlobReader::new(
file_id,
*column_id,
file_size,
blob_size,
BloomFilterReaderImpl::new(blob),
bloom_filter_cache.clone(),
);
Expand All @@ -157,6 +150,43 @@ impl BloomFilterIndexApplier {
Ok(())
}

/// Creates a blob reader from the cached or remote index file.
///
/// Returus `None` if the column does not have an index.
async fn blob_reader(
&self,
file_id: FileId,
column_id: ColumnId,
file_size_hint: Option<u64>,
) -> Result<Option<BlobReader>> {
let reader = match self.cached_blob_reader(file_id, column_id).await {
Ok(Some(puffin_reader)) => puffin_reader,
other => {
if let Err(err) = other {
// Blob not found means no index for this column
if is_blob_not_found(&err) {
return Ok(None);
}
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
let res = self
.remote_blob_reader(file_id, column_id, file_size_hint)
.await;
if let Err(err) = res {
// Blob not found means no index for this column
if is_blob_not_found(&err) {
return Ok(None);
}
return Err(err);
}

res?
}
};

Ok(Some(reader))
}

/// Creates a blob reader from the cached index file
async fn cached_blob_reader(
&self,
Expand Down Expand Up @@ -242,6 +272,16 @@ impl BloomFilterIndexApplier {
}
}

fn is_blob_not_found(err: &Error) -> bool {
matches!(
err,
Error::PuffinBuildReader {
source: puffin::error::Error::BlobNotFound { .. },
..
}
)
}

pub struct BloomFilterIndexApplierBuilder<'a> {
region_dir: String,
object_store: ObjectStore,
Expand Down
8 changes: 2 additions & 6 deletions src/mito2/src/sst/index/inverted_index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,10 @@ impl InvertedIndexApplier {
};

if let Some(index_cache) = &self.inverted_index_cache {
let file_size = if let Some(file_size) = file_size_hint {
file_size
} else {
blob.metadata().await.context(MetadataSnafu)?.content_length
};
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
let mut index_reader = CachedInvertedIndexBlobReader::new(
file_id,
file_size,
blob_size,
InvertedIndexBlobReader::new(blob),
index_cache.clone(),
);
Expand Down
Loading