Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cli arg to specify max parquet fanout #25714

Merged
merged 1 commit into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 7 additions & 44 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use influxdb3_cache::{
meta_cache::MetaCacheProvider,
parquet_cache::create_cached_obj_store_and_oracle,
};
use influxdb3_clap_blocks::tokio::TokioDatafusionConfig;
use influxdb3_clap_blocks::{datafusion::IoxQueryDatafusionConfig, tokio::TokioDatafusionConfig};
use influxdb3_process::{
build_malloc_conf, setup_metric_registry, INFLUXDB3_GIT_HASH, INFLUXDB3_VERSION, PROCESS_UUID,
};
Expand All @@ -36,8 +36,8 @@ use object_store::ObjectStore;
use observability_deps::tracing::*;
use panic_logging::SendPanicsToTracing;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::{collections::HashMap, path::Path, str::FromStr};
use std::{num::NonZeroUsize, sync::Arc};
use std::{path::Path, str::FromStr};
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::time::Instant;
Expand Down Expand Up @@ -112,6 +112,10 @@ pub struct Config {
#[clap(flatten)]
pub(crate) tokio_datafusion_config: TokioDatafusionConfig,

/// iox_query extended DataFusion config
#[clap(flatten)]
pub(crate) iox_query_datafusion_config: IoxQueryDatafusionConfig,

/// Maximum size of HTTP requests.
#[clap(
long = "max-http-request-size",
Expand Down Expand Up @@ -152,16 +156,6 @@ pub struct Config {
)]
pub exec_mem_pool_bytes: MemorySize,

/// DataFusion config.
#[clap(
long = "datafusion-config",
env = "INFLUXDB_IOX_DATAFUSION_CONFIG",
default_value = "",
value_parser = parse_datafusion_config,
action
)]
pub datafusion_config: HashMap<String, String>,

/// bearer token to be set for requests
#[clap(long = "bearer-token", env = "INFLUXDB3_BEARER_TOKEN", action)]
pub bearer_token: Option<String>,
Expand Down Expand Up @@ -514,7 +508,7 @@ pub async fn command(config: Config) -> Result<()> {
write_buffer: Arc::clone(&write_buffer),
exec: Arc::clone(&exec),
metrics: Arc::clone(&metrics),
datafusion_config: Arc::new(config.datafusion_config),
datafusion_config: Arc::new(config.iox_query_datafusion_config.build()),
query_log_size: config.query_log_size,
telemetry_store: Arc::clone(&telemetry_store),
sys_events_store: Arc::clone(&sys_events_store),
Expand Down Expand Up @@ -572,34 +566,3 @@ async fn setup_telemetry_store(
)
.await
}

fn parse_datafusion_config(
s: &str,
) -> Result<HashMap<String, String>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let s = s.trim();
if s.is_empty() {
return Ok(HashMap::with_capacity(0));
}

let mut out = HashMap::new();
for part in s.split(',') {
let kv = part.trim().splitn(2, ':').collect::<Vec<_>>();
match kv.as_slice() {
[key, value] => {
let key_owned = key.trim().to_owned();
let value_owned = value.trim().to_owned();
let existed = out.insert(key_owned, value_owned).is_some();
if existed {
return Err(format!("key '{key}' passed multiple times").into());
}
}
_ => {
return Err(
format!("Invalid key value pair - expected 'KEY:VALUE' got '{s}'").into(),
);
}
}
}

Ok(out)
}
3 changes: 3 additions & 0 deletions influxdb3_clap_blocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@ license.workspace = true

[dependencies]
# core crate dependencies
iox_query.workspace = true
observability_deps.workspace = true

# crates.io dependencies
clap.workspace = true
datafusion.workspace = true
humantime.workspace = true
libc.workspace = true
paste.workspace = true
tokio.workspace = true

[dev-dependencies]
futures.workspace = true
test-log.workspace = true

[lints]
workspace = true
125 changes: 125 additions & 0 deletions influxdb3_clap_blocks/src/datafusion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use std::collections::HashMap;

use datafusion::config::ConfigExtension;
use iox_query::config::IoxConfigExt;

/// Extends the standard [`HashMap`] based DataFusion config option in the CLI with specific
/// options (along with defaults) for InfluxDB 3 OSS/Pro. This is intended for customization of
/// options that are defined in the `iox_query` crate, e.g., those defined in [`IoxConfigExt`]
/// that are relevant to the monolithinc versions of InfluxDB 3.
#[derive(Debug, clap::Parser, Clone)]
pub struct IoxQueryDatafusionConfig {
/// When multiple parquet files are required in a sorted way (e.g. for de-duplication), we have
/// two options:
///
/// 1. **In-mem sorting:** Put them into `datafusion.target_partitions` DataFusion partitions.
/// This limits the fan-out, but requires that we potentially chain multiple parquet files into
/// a single DataFusion partition. Since chaining sorted data does NOT automatically result in
/// sorted data (e.g. AB-AB is not sorted), we need to preform an in-memory sort using
/// `SortExec` afterwards. This is expensive.
/// 2. **Fan-out:** Instead of chaining files within DataFusion partitions, we can accept a
/// fan-out beyond `target_partitions`. This prevents in-memory sorting but may result in OOMs
/// (out-of-memory) if the fan-out is too large.
///
/// We try to pick option 2 up to a certain number of files, which is configured by this
/// setting.
#[clap(
long = "datafusion-max-parquet-fanout",
env = "INFLUXDB3_DATAFUSION_MAX_PARQUET_FANOUT",
default_value = "1000",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking if 1000 is a good default value, I understand this depends on the size of the files but given it can result in OOM just wanted to double check 1000 is still good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to call this out. I copied the comment from IOx/core to preserve the context it provided. I think we may need to tune this a bit, or it could be possible to base the default on the system memory, and how we allocate memory in different modes in pro.

As it stands, with the low default of 40, we are getting OOMs with the fallback, i.e., non-fanout, query plan, so we should know soon if increasing this much makes the problem worse or not. Based on https://github.com/influxdata/influxdb_pro/issues/308#issuecomment-2562955195, this default may be a bit low/out-dated (perhaps the way the DataFusion plan handles fanout is different than when the default was decided). There are some distributed clusters in IOx setting this to 800 as per https://github.com/influxdata/influxdb_pro/issues/308#issuecomment-2563245404.

We'll see how this goes - at the minimum, I got the env vars switched from INFLUXDB_IOX_ to INFLUXDB3_ 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have misunderstood the docs for this setting, I interpreted it as, the higher this number the more files it tries to fan-out, which leads to OOMs. If we don't fan-out then it leads to doing expensive in memory sorting (guessing without running into OOMs?).

Copy link
Contributor Author

@hiltontj hiltontj Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(guessing without running into OOMs?)

Unfortunately, though, it is OOM'ing without the fanout, while not OOM'ing with the fanout, so we may need to update this doc comment (see https://github.com/influxdata/influxdb_pro/issues/205#issuecomment-2565377397)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the memory sort is going to OOM, unless you set a memory limit on DF, but in that case it just means that the query will get killed and return a resource exhaustion error. The only way around that I can think of is if spill to disk is enabled, but that's not really much better either.

I think the fanout setting should effectively be ignored (i.e. set to whatever the max of the type is). Resorting the data is always going to be more expensive and completely unnecessary in our case.

If DF allocates an arrow buffer for each input file, then you'd have that size * num of files. The Arrow buffer could be quite large if there are very wide tables and depending on the size of that buffer. I think one way to counter this would be to make sure that the pre-allocated buffer is limited in size or scaled down depending on the number of input files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, there are two things here that I can create issues for:

  • Set fanout to whatever the maximum is by default and could even just remove the setting entirely (this is pretty easy)
  • Scale the buffer size in the compactor based on the number of input files and estimated row size from table schema (probably a bit harder 😄)

action
)]
pub max_parquet_fanout: usize,

/// Provide custom configuration to DataFusion as a comma-separated list of key:value pairs.
///
/// # Example
/// ```text
/// --datafusion-config "datafusion.key1:value1, datafusion.key2:value2"
/// ```
#[clap(
long = "datafusion-config",
env = "INFLUXDB3_DATAFUSION_CONFIG",
default_value = "",
value_parser = parse_datafusion_config,
action
)]
pub datafusion_config: HashMap<String, String>,
}

impl IoxQueryDatafusionConfig {
/// Build a [`HashMap`] to be used as the DataFusion config for the query executor
///
/// This takes the provided `--datafusion-config` and extends it with options available on this
/// [`IoxQueryDatafusionConfig`] struct. Note, any IOx extension parameters that are defined
/// in the `datafusion_config` will be overridden by the provided values or their default. For
/// example, if the user provides:
/// ```
/// --datafusion-config "iox.max_arquet_fanout:50"
/// ```
/// This will be overridden with with the default value for `max_parquet_fanout` of `1000`, or
/// with the value provided for the `--datafusion-max-parquet-fanout` argument.
pub fn build(mut self) -> HashMap<String, String> {
self.datafusion_config.insert(
format!("{prefix}.max_parquet_fanout", prefix = IoxConfigExt::PREFIX),
self.max_parquet_fanout.to_string(),
);
self.datafusion_config
}
}

fn parse_datafusion_config(
s: &str,
) -> Result<HashMap<String, String>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let s = s.trim();
if s.is_empty() {
return Ok(HashMap::with_capacity(0));
}

let mut out = HashMap::new();
for part in s.split(',') {
let kv = part.trim().splitn(2, ':').collect::<Vec<_>>();
match kv.as_slice() {
[key, value] => {
let key_owned = key.trim().to_owned();
let value_owned = value.trim().to_owned();
let existed = out.insert(key_owned, value_owned).is_some();
if existed {
return Err(format!("key '{key}' passed multiple times").into());
}
}
_ => {
return Err(
format!("Invalid key value pair - expected 'KEY:VALUE' got '{s}'").into(),
);
}
}
}

Ok(out)
}

#[cfg(test)]
mod tests {
use clap::Parser;
use iox_query::{config::IoxConfigExt, exec::Executor};

use super::IoxQueryDatafusionConfig;

#[test_log::test]
fn max_parquet_fanout() {
let datafusion_config =
IoxQueryDatafusionConfig::parse_from(["", "--datafusion-max-parquet-fanout", "5"])
.build();
let exec = Executor::new_testing();
let mut session_config = exec.new_session_config();
for (k, v) in &datafusion_config {
session_config = session_config.with_config_option(k, v);
}
let ctx = session_config.build();
let inner_ctx = ctx.inner().state();
let config = inner_ctx.config();
let iox_config_ext = config.options().extensions.get::<IoxConfigExt>().unwrap();
assert_eq!(5, iox_config_ext.max_parquet_fanout);
}
}
1 change: 1 addition & 0 deletions influxdb3_clap_blocks/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Configuration options for the `influxdb3` CLI which uses the `clap` crate

pub mod datafusion;
pub mod tokio;
Loading