Skip to content

Commit

Permalink
refactor: cleanup v3 write API and series key method on catalog
Browse files Browse the repository at this point in the history
Store the series key column names on the TableDefinitin in catalog so
looking up the series key by column names is more efficient

Remove the /api/v3/write API and related code/tests
  • Loading branch information
hiltontj committed Dec 29, 2024
1 parent 03ea565 commit e4b937b
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 653 deletions.
109 changes: 0 additions & 109 deletions influxdb3/tests/server/write.rs
Original file line number Diff line number Diff line change
@@ -1,118 +1,9 @@
use hyper::StatusCode;
use influxdb3_client::Precision;
use pretty_assertions::assert_eq;
use test_helpers::assert_contains;

use crate::TestServer;

#[tokio::test]
async fn api_v3_write() {
let server = TestServer::spawn().await;
let client = reqwest::Client::new();

let url = format!("{base}/api/v3/write", base = server.client_addr());
let params = &[("db", "foo")];

// Make a successful write:
assert!(client
.post(&url)
.query(params)
.body(
"\
cpu,region/us-east/host/a1 usage=42.0,temp=10 1234\n\
cpu,region/us-east/host/b1 usage=10.5,temp=18 1234\n\
cpu,region/us-west/host/a2 usage=88.0,temp=15 1234\n\
cpu,region/us-west/host/b2 usage=92.2,temp=14 1234\n\
",
)
.send()
.await
.expect("send write request")
.status()
.is_success());

// Query from the table written to:
let resp = server
.api_v3_query_sql(&[
("db", "foo"),
("q", "SELECT * FROM cpu"),
("format", "pretty"),
])
.await
.text()
.await
.expect("get body");

assert_eq!(
"\
+------+---------+------+---------------------+-------+\n\
| host | region | temp | time | usage |\n\
+------+---------+------+---------------------+-------+\n\
| a1 | us-east | 10.0 | 1970-01-01T00:20:34 | 42.0 |\n\
| a2 | us-west | 15.0 | 1970-01-01T00:20:34 | 88.0 |\n\
| b1 | us-east | 18.0 | 1970-01-01T00:20:34 | 10.5 |\n\
| b2 | us-west | 14.0 | 1970-01-01T00:20:34 | 92.2 |\n\
+------+---------+------+---------------------+-------+",
resp
);

// Test several failure modes:
struct TestCase {
body: &'static str,
response_contains: &'static str,
}

let test_cases = [
// No series key:
TestCase {
body: "cpu usage=10.0,temp=5 1235",
response_contains:
"write to table cpu was missing a series key, the series key contains [region, host]",
},
// Series key out-of-order:
TestCase {
body: "cpu,host/c1/region/ca-cent usage=22.0,temp=6 1236",
response_contains: "write to table cpu had the incorrect series key, \
expected: [region, host], received: [host, region]",
},
// Series key with invalid member at end:
TestCase {
body: "cpu,region/ca-cent/host/c1/container/foo usage=22.0,temp=6 1236",
response_contains: "write to table cpu had the incorrect series key, \
expected: [region, host], received: [region, host, container]",
},
// Series key with invalid member in middle:
TestCase {
body: "cpu,region/ca-cent/sub-region/toronto/host/c1 usage=22.0,temp=6 1236",
response_contains: "write to table cpu had the incorrect series key, \
expected: [region, host], received: [region, sub-region, host]",
},
// Series key with invalid member at start:
TestCase {
body: "cpu,planet/earth/region/ca-cent/host/c1 usage=22.0,temp=6 1236",
response_contains: "write to table cpu had the incorrect series key, \
expected: [region, host], received: [planet, region, host]",
},
];

for t in test_cases {
let resp = client
.post(&url)
.query(params)
.body(t.body)
.send()
.await
.expect("get response from server")
.text()
.await
.expect("parse response");

println!("RESPONSE:\n{resp}");

assert_contains!(resp, t.response_contains);
}
}

#[tokio::test]
async fn api_v1_write_request_parsing() {
let server = TestServer::spawn().await;
Expand Down
31 changes: 20 additions & 11 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ pub struct TableDefinition {
pub columns: IndexMap<ColumnId, ColumnDefinition>,
pub column_map: BiHashMap<ColumnId, Arc<str>>,
pub series_key: Vec<ColumnId>,
pub series_key_names: Vec<Arc<str>>,
pub last_caches: HashMap<Arc<str>, LastCacheDefinition>,
pub meta_caches: HashMap<Arc<str>, MetaCacheDefinition>,
pub deleted: bool,
Expand Down Expand Up @@ -862,12 +863,18 @@ impl TableDefinition {
);
column_map.insert(*col_id, name.into());
}
schema_builder.with_series_key(series_key.clone().into_iter().map(|id| {
column_map
.get_by_left(&id)
// NOTE: should this be an error instead of panic?
.expect("invalid column id in series key definition")
}));
let series_key_names = series_key
.clone()
.into_iter()
.map(|id| {
column_map
.get_by_left(&id)
.cloned()
// NOTE: should this be an error instead of panic?
.expect("invalid column id in series key definition")
})
.collect::<Vec<Arc<str>>>();
schema_builder.with_series_key(&series_key_names);
let schema = schema_builder.build().expect("schema should be valid");

Ok(Self {
Expand All @@ -877,6 +884,7 @@ impl TableDefinition {
columns,
column_map,
series_key,
series_key_names,
last_caches: HashMap::new(),
meta_caches: HashMap::new(),
deleted: false,
Expand Down Expand Up @@ -1103,11 +1111,12 @@ impl TableDefinition {
.and_then(|id| self.columns.get(id).map(|def| (*id, def)))
}

pub fn series_key(&self) -> Vec<String> {
self.series_key
.iter()
.map(|k| self.column_id_to_name_unchecked(k).to_string())
.collect()
pub fn series_key_ids(&self) -> &[ColumnId] {
&self.series_key
}

pub fn series_key_names(&self) -> &[Arc<str>] {
&self.series_key_names
}
}

Expand Down
45 changes: 13 additions & 32 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,21 +451,14 @@ where
async fn write_lp(&self, req: Request<Body>) -> Result<Response<Body>> {
let query = req.uri().query().ok_or(Error::MissingWriteParams)?;
let params: WriteParams = serde_urlencoded::from_str(query)?;
self.write_lp_inner(params, req, false, false).await
}

async fn write_v3(&self, req: Request<Body>) -> Result<Response<Body>> {
let query = req.uri().query().ok_or(Error::MissingWriteParams)?;
let params: WriteParams = serde_urlencoded::from_str(query)?;
self.write_lp_inner(params, req, false, true).await
self.write_lp_inner(params, req, false).await
}

async fn write_lp_inner(
&self,
params: WriteParams,
req: Request<Body>,
accept_rp: bool,
use_v3: bool,
) -> Result<Response<Body>> {
validate_db_name(&params.db, accept_rp)?;
info!("write_lp to {}", params.db);
Expand All @@ -477,27 +470,16 @@ where

let default_time = self.time_provider.now();

let result = if use_v3 {
self.write_buffer
.write_lp_v3(
database,
body,
default_time,
params.accept_partial,
params.precision,
)
.await?
} else {
self.write_buffer
.write_lp(
database,
body,
default_time,
params.accept_partial,
params.precision,
)
.await?
};
let result = self
.write_buffer
.write_lp(
database,
body,
default_time,
params.accept_partial,
params.precision,
)
.await?;

let num_lines = result.line_count;
let payload_size = body.len();
Expand Down Expand Up @@ -1547,17 +1529,16 @@ pub(crate) async fn route_request<T: TimeProvider>(
Err(e) => return Ok(legacy_write_error_to_response(e)),
};

http_server.write_lp_inner(params, req, true, false).await
http_server.write_lp_inner(params, req, true).await
}
(Method::POST, "/api/v2/write") => {
let params = match http_server.legacy_write_param_unifier.parse_v2(&req).await {
Ok(p) => p.into(),
Err(e) => return Ok(legacy_write_error_to_response(e)),
};

http_server.write_lp_inner(params, req, false, false).await
http_server.write_lp_inner(params, req, false).await
}
(Method::POST, "/api/v3/write") => http_server.write_v3(req).await,
(Method::POST, "/api/v3/write_lp") => http_server.write_lp(req).await,
(Method::GET | Method::POST, "/api/v3/query_sql") => http_server.query_sql(req).await,
(Method::GET | Method::POST, "/api/v3/query_influxql") => {
Expand Down
10 changes: 0 additions & 10 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,6 @@ pub trait Bufferer: Debug + Send + Sync + 'static {
precision: Precision,
) -> write_buffer::Result<BufferedWriteRequest>;

/// Write v3 line protocol
async fn write_lp_v3(
&self,
database: NamespaceName<'static>,
lp: &str,
ingest_time: Time,
accept_partial: bool,
precision: Precision,
) -> write_buffer::Result<BufferedWriteRequest>;

/// Returns the database schema provider
fn catalog(&self) -> Arc<Catalog>;

Expand Down
54 changes: 0 additions & 54 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,48 +317,6 @@ impl WriteBufferImpl {
})
}

async fn write_lp_v3(
&self,
db_name: NamespaceName<'static>,
lp: &str,
ingest_time: Time,
accept_partial: bool,
precision: Precision,
) -> Result<BufferedWriteRequest> {
// validated lines will update the in-memory catalog, ensuring that all write operations
// past this point will be infallible
let result = WriteValidator::initialize(
db_name.clone(),
self.catalog(),
ingest_time.timestamp_nanos(),
)?
.v3_parse_lines_and_update_schema(lp, accept_partial, ingest_time, precision)?
.convert_lines_to_buffer(self.wal_config.gen1_duration);

// if there were catalog updates, ensure they get persisted to the wal, so they're
// replayed on restart
let mut ops = Vec::with_capacity(2);
if let Some(catalog_batch) = result.catalog_updates {
ops.push(WalOp::Catalog(catalog_batch));
}
ops.push(WalOp::Write(result.valid_data));

// write to the wal. Behind the scenes the ops get buffered in memory and once a second (or
// whatever the configured wal flush interval is set to) the buffer is flushed and all the
// data is persisted into a single wal file in the configured object store. Then the
// contents are sent to the configured notifier, which in this case is the queryable buffer.
// Thus, after this returns, the data is both durable and queryable.
self.wal.write_ops(ops).await?;

Ok(BufferedWriteRequest {
db_name,
invalid_lines: result.errors,
line_count: result.line_count,
field_count: result.field_count,
index_count: result.index_count,
})
}

fn get_table_chunks(
&self,
database_name: &str,
Expand Down Expand Up @@ -471,18 +429,6 @@ impl Bufferer for WriteBufferImpl {
.await
}

async fn write_lp_v3(
&self,
database: NamespaceName<'static>,
lp: &str,
ingest_time: Time,
accept_partial: bool,
precision: Precision,
) -> Result<BufferedWriteRequest> {
self.write_lp_v3(database, lp, ingest_time, accept_partial, precision)
.await
}

fn catalog(&self) -> Arc<Catalog> {
self.catalog()
}
Expand Down
Loading

0 comments on commit e4b937b

Please sign in to comment.