diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index f6d19687ceb9..08ac7e8273d1 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -61,7 +61,7 @@ fn bloom_filter_index_content_weight((k, _): &((FileId, ColumnId), PageKey), v: pub struct CachedBloomFilterIndexBlobReader { file_id: FileId, column_id: ColumnId, - file_size: u64, + blob_size: u64, inner: R, cache: BloomFilterIndexCacheRef, } @@ -71,14 +71,14 @@ impl CachedBloomFilterIndexBlobReader { pub fn new( file_id: FileId, column_id: ColumnId, - file_size: u64, + blob_size: u64, inner: R, cache: BloomFilterIndexCacheRef, ) -> Self { Self { file_id, column_id, - file_size, + blob_size, inner, cache, } @@ -92,7 +92,7 @@ impl BloomFilterReader for CachedBloomFilterIndexBl self.cache .get_or_load( (self.file_id, self.column_id), - self.file_size, + self.blob_size, offset, size, move |ranges| async move { inner.read_vec(&ranges).await }, diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index 3c399a37cc82..53929c467a98 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -58,17 +58,17 @@ fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 { /// Inverted index blob reader with cache. pub struct CachedInvertedIndexBlobReader { file_id: FileId, - file_size: u64, + blob_size: u64, inner: R, cache: InvertedIndexCacheRef, } impl CachedInvertedIndexBlobReader { /// Creates a new inverted index blob reader with cache. - pub fn new(file_id: FileId, file_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self { + pub fn new(file_id: FileId, blob_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self { Self { file_id, - file_size, + blob_size, inner, cache, } @@ -82,7 +82,7 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead self.cache .get_or_load( self.file_id, - self.file_size, + self.blob_size, offset, size, move |ranges| async move { inner.read_vec(&ranges).await }, diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 3476ec097243..1e7533a7044c 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -33,18 +33,18 @@ use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; use store_api::storage::{ColumnId, RegionId}; -use super::INDEX_BLOB_TYPE; use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::index::bloom_filter_index::{ BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, }; use crate::error::{ - ApplyBloomFilterIndexSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, MetadataSnafu, + ApplyBloomFilterIndexSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result, }; use crate::metrics::INDEX_APPLY_ELAPSED; use crate::row_converter::SortField; use crate::sst::file::FileId; +use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE; use crate::sst::index::codec::IndexValueCodec; use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; use crate::sst::index::TYPE_BLOOM_FILTER_INDEX; @@ -118,28 +118,21 @@ impl BloomFilterIndexApplier { .start_timer(); for (column_id, predicates) in &self.filters { - let mut blob = match self.cached_blob_reader(file_id, *column_id).await { - Ok(Some(puffin_reader)) => puffin_reader, - other => { - if let Err(err) = other { - warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") - } - self.remote_blob_reader(file_id, *column_id, file_size_hint) - .await? - } + let mut blob = match self + .blob_reader(file_id, *column_id, file_size_hint) + .await? + { + Some(blob) => blob, + None => continue, }; // Create appropriate reader based on whether we have caching enabled if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache { - let file_size = if let Some(file_size) = file_size_hint { - file_size - } else { - blob.metadata().await.context(MetadataSnafu)?.content_length - }; + let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length; let reader = CachedBloomFilterIndexBlobReader::new( file_id, *column_id, - file_size, + blob_size, BloomFilterReaderImpl::new(blob), bloom_filter_cache.clone(), ); @@ -157,6 +150,43 @@ impl BloomFilterIndexApplier { Ok(()) } + /// Creates a blob reader from the cached or remote index file. + /// + /// Returus `None` if the column does not have an index. + async fn blob_reader( + &self, + file_id: FileId, + column_id: ColumnId, + file_size_hint: Option, + ) -> Result> { + let reader = match self.cached_blob_reader(file_id, column_id).await { + Ok(Some(puffin_reader)) => puffin_reader, + other => { + if let Err(err) = other { + // Blob not found means no index for this column + if is_blob_not_found(&err) { + return Ok(None); + } + warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") + } + let res = self + .remote_blob_reader(file_id, column_id, file_size_hint) + .await; + if let Err(err) = res { + // Blob not found means no index for this column + if is_blob_not_found(&err) { + return Ok(None); + } + return Err(err); + } + + res? + } + }; + + Ok(Some(reader)) + } + /// Creates a blob reader from the cached index file async fn cached_blob_reader( &self, @@ -242,6 +272,16 @@ impl BloomFilterIndexApplier { } } +fn is_blob_not_found(err: &Error) -> bool { + matches!( + err, + Error::PuffinBuildReader { + source: puffin::error::Error::BlobNotFound { .. }, + .. + } + ) +} + pub struct BloomFilterIndexApplierBuilder<'a> { region_dir: String, object_store: ObjectStore, diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index 6ad116dae035..73785f3264a0 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -137,14 +137,10 @@ impl InvertedIndexApplier { }; if let Some(index_cache) = &self.inverted_index_cache { - let file_size = if let Some(file_size) = file_size_hint { - file_size - } else { - blob.metadata().await.context(MetadataSnafu)?.content_length - }; + let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length; let mut index_reader = CachedInvertedIndexBlobReader::new( file_id, - file_size, + blob_size, InvertedIndexBlobReader::new(blob), index_cache.clone(), );