Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: plugin development flow #25704

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ tokio_console = ["console-subscriber", "tokio/tracing", "observability_deps/rele

# Use jemalloc as the default allocator.
jemalloc_replacing_malloc = ["influxdb3_process/jemalloc_replacing_malloc"]
system-py = ["influxdb3_write/system-py"]
system-py = ["influxdb3_write/system-py", "influxdb3_server/system-py"]

[dev-dependencies]
# Core Crates
Expand Down
21 changes: 21 additions & 0 deletions influxdb3/src/commands/plugin_test/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use std::error::Error;

pub mod wal;

#[derive(Debug, clap::Parser)]
pub(crate) struct Config {
#[clap(subcommand)]
command: Command,
}

#[derive(Debug, clap::Parser)]
enum Command {
/// Test a plugin triggered by WAL writes
Wal(wal::Config),
}

pub(crate) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
match config.command {
Command::Wal(config) => wal::command(config).await,
}
}
68 changes: 68 additions & 0 deletions influxdb3/src/commands/plugin_test/wal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use crate::commands::common::InfluxDb3Config;
use influxdb3_client::plugin_test::WalPluginTestRequest;
use secrecy::ExposeSecret;
use std::error::Error;

#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,

#[clap(flatten)]
wal_plugin_test: WalPluginTest,
}

#[derive(Debug, clap::Parser)]
pub struct WalPluginTest {
/// The name of the plugin, which should match its file name on the server <plugin-dir>/<name>.py
#[clap(short = 'n', long = "name")]
pub name: String,
/// If given, pass this line protocol as input
#[clap(long = "lp")]
pub input_lp: Option<String>,
/// If given, pass this file of LP as input from on the server <plugin-dir>/<name>_test/<input-file>
#[clap(long = "file")]
pub input_file: Option<String>,
/// If given, save the output to this file on the server in <plugin-dir>/<name>_test/<save-output-to-file>
#[clap(long = "save-output-to-file")]
pub save_output_to_file: Option<String>,
/// If given, validate the output against this file on the server in <plugin-dir>/<name>_test/<validate-output-file>
#[clap(long = "validate-output-file")]
pub validate_output_file: Option<String>,
}

impl Into<WalPluginTestRequest> for WalPluginTest {
fn into(self) -> WalPluginTestRequest {
WalPluginTestRequest {
name: self.name,
input_lp: self.input_lp,
input_file: self.input_file,
save_output_to_file: self.save_output_to_file,
validate_output_file: self.validate_output_file,
}
}
}

pub(super) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let InfluxDb3Config {
host_url,
auth_token,
..
} = config.influxdb3_config;

let wal_plugin_test_request: WalPluginTestRequest = config.wal_plugin_test.into();

let mut client = influxdb3_client::Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}
let resonse = client.wal_plugin_test(wal_plugin_test_request).await?;

// pretty print the response
println!(
"RESPONSE:\n{}",
serde_json::to_string_pretty(&resonse).expect("serialize wal plugin test response as JSON")
);

Ok(())
}
10 changes: 9 additions & 1 deletion influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ use observability_deps::tracing::*;
use panic_logging::SendPanicsToTracing;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::{num::NonZeroUsize, sync::Arc};
use std::{path::Path, str::FromStr};
use std::{
path::{Path, PathBuf},
str::FromStr,
};
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::time::Instant;
Expand Down Expand Up @@ -287,6 +290,10 @@ pub struct Config {
action
)]
pub meta_cache_eviction_interval: humantime::Duration,

/// The local directory that has python plugins and their test files.
#[clap(long = "plugin-dir", env = "INFLUXDB3_PLUGIN_DIR", action)]
pub plugin_dir: Option<PathBuf>,
}

/// Specified size of the Parquet cache in megabytes (MB)
Expand Down Expand Up @@ -481,6 +488,7 @@ pub async fn command(config: Config) -> Result<()> {
wal_config,
parquet_cache,
metric_registry: Arc::clone(&metrics),
plugin_dir: config.plugin_dir,
})
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;
Expand Down
10 changes: 10 additions & 0 deletions influxdb3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod commands {
pub mod last_cache;
pub mod manage;
pub mod meta_cache;
pub mod plugin_test;
pub mod processing_engine;
pub mod query;
pub mod serve;
Expand Down Expand Up @@ -105,6 +106,9 @@ enum Command {

/// Manage table (delete only for the moment)
Table(commands::manage::table::Config),

/// Test Python plugins for processing WAL writes, persistence Snapshots, requests, or scheduled tasks.
PluginTest(commands::plugin_test::Config),
}

fn main() -> Result<(), std::io::Error> {
Expand Down Expand Up @@ -187,6 +191,12 @@ fn main() -> Result<(), std::io::Error> {
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::PluginTest(config)) => {
if let Err(e) = commands::plugin_test::command(config).await {
eprintln!("Plugin Test command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
}
});

Expand Down
1 change: 1 addition & 0 deletions influxdb3_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ bytes.workspace = true
reqwest.workspace = true
secrecy.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
url.workspace = true

Expand Down
32 changes: 32 additions & 0 deletions influxdb3_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
pub mod plugin_test;

use std::{
collections::HashMap, fmt::Display, num::NonZeroUsize, string::FromUtf8Error, time::Duration,
};

use crate::plugin_test::{WalPluginTestRequest, WalPluginTestResponse};
use bytes::Bytes;
use iox_query_params::StatementParam;
use reqwest::{Body, IntoUrl, Method, StatusCode};
Expand Down Expand Up @@ -697,6 +700,35 @@ impl Client {
}
}

/// Make a request to the `POST /api/v3/plugin_test/wal` API
pub async fn wal_plugin_test(
&self,
wal_plugin_test_request: WalPluginTestRequest,
) -> Result<WalPluginTestResponse> {
let api_path = "/api/v3/plugin_test/wal";

let url = self.base_url.join(api_path)?;

let mut req = self.http_client.post(url).json(&wal_plugin_test_request);

if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
}
let resp = req
.send()
.await
.map_err(|src| Error::request_send(Method::POST, api_path, src))?;

if resp.status().is_success() {
resp.json().await.map_err(Error::Json)
} else {
Err(Error::ApiError {
code: resp.status(),
message: resp.text().await.map_err(Error::Text)?,
})
}
}

/// Send a `/ping` request to the target `influxdb3` server to check its
/// status and gather `version` and `revision` information
pub async fn ping(&self) -> Result<PingResponse> {
Expand Down
22 changes: 22 additions & 0 deletions influxdb3_client/src/plugin_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//! Request structs for the /api/v3/plugin_test API

use serde::{Deserialize, Serialize};
use std::collections::HashMap;

/// Request definition for `POST /api/v3/plugin_test/wal` API
#[derive(Debug, Serialize, Deserialize)]
pub struct WalPluginTestRequest {
pub name: String,
pub input_lp: Option<String>,
pub input_file: Option<String>,
pub save_output_to_file: Option<String>,
pub validate_output_file: Option<String>,
}

/// Response definition for `POST /api/v3/plugin_test/wal` API
#[derive(Debug, Serialize, Deserialize)]
pub struct WalPluginTestResponse {
pub log_lines: Vec<String>,
pub database_writes: HashMap<String, Vec<String>>,
pub errors: Vec<String>,
}
1 change: 1 addition & 0 deletions influxdb3_py_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ influxdb3_catalog = {path = "../influxdb3_catalog"}
async-trait.workspace = true
schema.workspace = true
parking_lot.workspace = true
log = "0.4.22"

[dependencies.pyo3]
version = "0.23.3"
Expand Down
Loading
Loading