diff --git a/influxdb3/tests/server/write.rs b/influxdb3/tests/server/write.rs index ac49e96bea3..e0f6e5b4547 100644 --- a/influxdb3/tests/server/write.rs +++ b/influxdb3/tests/server/write.rs @@ -1,118 +1,9 @@ use hyper::StatusCode; use influxdb3_client::Precision; use pretty_assertions::assert_eq; -use test_helpers::assert_contains; use crate::TestServer; -#[tokio::test] -async fn api_v3_write() { - let server = TestServer::spawn().await; - let client = reqwest::Client::new(); - - let url = format!("{base}/api/v3/write", base = server.client_addr()); - let params = &[("db", "foo")]; - - // Make a successful write: - assert!(client - .post(&url) - .query(params) - .body( - "\ - cpu,region/us-east/host/a1 usage=42.0,temp=10 1234\n\ - cpu,region/us-east/host/b1 usage=10.5,temp=18 1234\n\ - cpu,region/us-west/host/a2 usage=88.0,temp=15 1234\n\ - cpu,region/us-west/host/b2 usage=92.2,temp=14 1234\n\ - ", - ) - .send() - .await - .expect("send write request") - .status() - .is_success()); - - // Query from the table written to: - let resp = server - .api_v3_query_sql(&[ - ("db", "foo"), - ("q", "SELECT * FROM cpu"), - ("format", "pretty"), - ]) - .await - .text() - .await - .expect("get body"); - - assert_eq!( - "\ - +------+---------+------+---------------------+-------+\n\ - | host | region | temp | time | usage |\n\ - +------+---------+------+---------------------+-------+\n\ - | a1 | us-east | 10.0 | 1970-01-01T00:20:34 | 42.0 |\n\ - | a2 | us-west | 15.0 | 1970-01-01T00:20:34 | 88.0 |\n\ - | b1 | us-east | 18.0 | 1970-01-01T00:20:34 | 10.5 |\n\ - | b2 | us-west | 14.0 | 1970-01-01T00:20:34 | 92.2 |\n\ - +------+---------+------+---------------------+-------+", - resp - ); - - // Test several failure modes: - struct TestCase { - body: &'static str, - response_contains: &'static str, - } - - let test_cases = [ - // No series key: - TestCase { - body: "cpu usage=10.0,temp=5 1235", - response_contains: - "write to table cpu was missing a series key, the series key contains [region, host]", - }, - // Series key out-of-order: - TestCase { - body: "cpu,host/c1/region/ca-cent usage=22.0,temp=6 1236", - response_contains: "write to table cpu had the incorrect series key, \ - expected: [region, host], received: [host, region]", - }, - // Series key with invalid member at end: - TestCase { - body: "cpu,region/ca-cent/host/c1/container/foo usage=22.0,temp=6 1236", - response_contains: "write to table cpu had the incorrect series key, \ - expected: [region, host], received: [region, host, container]", - }, - // Series key with invalid member in middle: - TestCase { - body: "cpu,region/ca-cent/sub-region/toronto/host/c1 usage=22.0,temp=6 1236", - response_contains: "write to table cpu had the incorrect series key, \ - expected: [region, host], received: [region, sub-region, host]", - }, - // Series key with invalid member at start: - TestCase { - body: "cpu,planet/earth/region/ca-cent/host/c1 usage=22.0,temp=6 1236", - response_contains: "write to table cpu had the incorrect series key, \ - expected: [region, host], received: [planet, region, host]", - }, - ]; - - for t in test_cases { - let resp = client - .post(&url) - .query(params) - .body(t.body) - .send() - .await - .expect("get response from server") - .text() - .await - .expect("parse response"); - - println!("RESPONSE:\n{resp}"); - - assert_contains!(resp, t.response_contains); - } -} - #[tokio::test] async fn api_v1_write_request_parsing() { let server = TestServer::spawn().await; diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 74c920a0748..da00824fa41 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -823,6 +823,7 @@ pub struct TableDefinition { pub columns: IndexMap, pub column_map: BiHashMap>, pub series_key: Vec, + pub series_key_names: Vec>, pub last_caches: HashMap, LastCacheDefinition>, pub meta_caches: HashMap, MetaCacheDefinition>, pub deleted: bool, @@ -862,12 +863,18 @@ impl TableDefinition { ); column_map.insert(*col_id, name.into()); } - schema_builder.with_series_key(series_key.clone().into_iter().map(|id| { - column_map - .get_by_left(&id) - // NOTE: should this be an error instead of panic? - .expect("invalid column id in series key definition") - })); + let series_key_names = series_key + .clone() + .into_iter() + .map(|id| { + column_map + .get_by_left(&id) + .cloned() + // NOTE: should this be an error instead of panic? + .expect("invalid column id in series key definition") + }) + .collect::>>(); + schema_builder.with_series_key(&series_key_names); let schema = schema_builder.build().expect("schema should be valid"); Ok(Self { @@ -877,6 +884,7 @@ impl TableDefinition { columns, column_map, series_key, + series_key_names, last_caches: HashMap::new(), meta_caches: HashMap::new(), deleted: false, @@ -1103,11 +1111,12 @@ impl TableDefinition { .and_then(|id| self.columns.get(id).map(|def| (*id, def))) } - pub fn series_key(&self) -> Vec { - self.series_key - .iter() - .map(|k| self.column_id_to_name_unchecked(k).to_string()) - .collect() + pub fn series_key_ids(&self) -> &[ColumnId] { + &self.series_key + } + + pub fn series_key_names(&self) -> &[Arc] { + &self.series_key_names } } diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 1aa4fc295f6..ef0ac007141 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -451,13 +451,7 @@ where async fn write_lp(&self, req: Request) -> Result> { let query = req.uri().query().ok_or(Error::MissingWriteParams)?; let params: WriteParams = serde_urlencoded::from_str(query)?; - self.write_lp_inner(params, req, false, false).await - } - - async fn write_v3(&self, req: Request) -> Result> { - let query = req.uri().query().ok_or(Error::MissingWriteParams)?; - let params: WriteParams = serde_urlencoded::from_str(query)?; - self.write_lp_inner(params, req, false, true).await + self.write_lp_inner(params, req, false).await } async fn write_lp_inner( @@ -465,7 +459,6 @@ where params: WriteParams, req: Request, accept_rp: bool, - use_v3: bool, ) -> Result> { validate_db_name(¶ms.db, accept_rp)?; info!("write_lp to {}", params.db); @@ -477,27 +470,16 @@ where let default_time = self.time_provider.now(); - let result = if use_v3 { - self.write_buffer - .write_lp_v3( - database, - body, - default_time, - params.accept_partial, - params.precision, - ) - .await? - } else { - self.write_buffer - .write_lp( - database, - body, - default_time, - params.accept_partial, - params.precision, - ) - .await? - }; + let result = self + .write_buffer + .write_lp( + database, + body, + default_time, + params.accept_partial, + params.precision, + ) + .await?; let num_lines = result.line_count; let payload_size = body.len(); @@ -1547,7 +1529,7 @@ pub(crate) async fn route_request( Err(e) => return Ok(legacy_write_error_to_response(e)), }; - http_server.write_lp_inner(params, req, true, false).await + http_server.write_lp_inner(params, req, true).await } (Method::POST, "/api/v2/write") => { let params = match http_server.legacy_write_param_unifier.parse_v2(&req).await { @@ -1555,9 +1537,8 @@ pub(crate) async fn route_request( Err(e) => return Ok(legacy_write_error_to_response(e)), }; - http_server.write_lp_inner(params, req, false, false).await + http_server.write_lp_inner(params, req, false).await } - (Method::POST, "/api/v3/write") => http_server.write_v3(req).await, (Method::POST, "/api/v3/write_lp") => http_server.write_lp(req).await, (Method::GET | Method::POST, "/api/v3/query_sql") => http_server.query_sql(req).await, (Method::GET | Method::POST, "/api/v3/query_influxql") => { diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index 19a8c8a13be..823cdb3a6dd 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -92,16 +92,6 @@ pub trait Bufferer: Debug + Send + Sync + 'static { precision: Precision, ) -> write_buffer::Result; - /// Write v3 line protocol - async fn write_lp_v3( - &self, - database: NamespaceName<'static>, - lp: &str, - ingest_time: Time, - accept_partial: bool, - precision: Precision, - ) -> write_buffer::Result; - /// Returns the database schema provider fn catalog(&self) -> Arc; diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 7a8535d0997..b2d0160a226 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -317,48 +317,6 @@ impl WriteBufferImpl { }) } - async fn write_lp_v3( - &self, - db_name: NamespaceName<'static>, - lp: &str, - ingest_time: Time, - accept_partial: bool, - precision: Precision, - ) -> Result { - // validated lines will update the in-memory catalog, ensuring that all write operations - // past this point will be infallible - let result = WriteValidator::initialize( - db_name.clone(), - self.catalog(), - ingest_time.timestamp_nanos(), - )? - .v3_parse_lines_and_update_schema(lp, accept_partial, ingest_time, precision)? - .convert_lines_to_buffer(self.wal_config.gen1_duration); - - // if there were catalog updates, ensure they get persisted to the wal, so they're - // replayed on restart - let mut ops = Vec::with_capacity(2); - if let Some(catalog_batch) = result.catalog_updates { - ops.push(WalOp::Catalog(catalog_batch)); - } - ops.push(WalOp::Write(result.valid_data)); - - // write to the wal. Behind the scenes the ops get buffered in memory and once a second (or - // whatever the configured wal flush interval is set to) the buffer is flushed and all the - // data is persisted into a single wal file in the configured object store. Then the - // contents are sent to the configured notifier, which in this case is the queryable buffer. - // Thus, after this returns, the data is both durable and queryable. - self.wal.write_ops(ops).await?; - - Ok(BufferedWriteRequest { - db_name, - invalid_lines: result.errors, - line_count: result.line_count, - field_count: result.field_count, - index_count: result.index_count, - }) - } - fn get_table_chunks( &self, database_name: &str, @@ -471,18 +429,6 @@ impl Bufferer for WriteBufferImpl { .await } - async fn write_lp_v3( - &self, - database: NamespaceName<'static>, - lp: &str, - ingest_time: Time, - accept_partial: bool, - precision: Precision, - ) -> Result { - self.write_lp_v3(database, lp, ingest_time, accept_partial, precision) - .await - } - fn catalog(&self) -> Arc { self.catalog() } diff --git a/influxdb3_write/src/write_buffer/validator.rs b/influxdb3_write/src/write_buffer/validator.rs index f99b68f1318..afbfd860437 100644 --- a/influxdb3_write/src/write_buffer/validator.rs +++ b/influxdb3_write/src/write_buffer/validator.rs @@ -12,7 +12,7 @@ use influxdb3_wal::{ CatalogBatch, CatalogOp, Field, FieldAdditions, FieldData, FieldDefinition, Gen1Duration, OrderedCatalogBatch, Row, TableChunks, WriteBatch, }; -use influxdb_line_protocol::{parse_lines, v3, ParsedLine}; +use influxdb_line_protocol::{parse_lines, ParsedLine}; use iox_time::Time; use schema::{InfluxColumnType, TIME_COLUMN_NAME}; @@ -70,90 +70,6 @@ impl WriteValidator { }) } - /// Parse the incoming lines of line protocol using the v3 parser and update - /// the [`DatabaseSchema`] if: - /// - /// * A new table is being added - /// * New fields, or tags are being added to an existing table - /// - /// # Implementation Note - /// - /// If this function succeeds, then the catalog will receive an update, so - /// steps following this should be infallible. - pub fn v3_parse_lines_and_update_schema( - self, - lp: &str, - accept_partial: bool, - ingest_time: Time, - precision: Precision, - ) -> Result> { - let mut errors = vec![]; - let mut lp_lines = lp.lines(); - let mut lines = vec![]; - let mut bytes = 0; - let mut catalog_updates = vec![]; - let mut schema = Cow::Borrowed(self.state.db_schema.as_ref()); - - for (line_idx, maybe_line) in v3::parse_lines(lp).enumerate() { - let (qualified_line, catalog_op) = match maybe_line - .map_err(|e| WriteLineError { - original_line: lp_lines.next().unwrap().to_string(), - line_number: line_idx + 1, - error_message: e.to_string(), - }) - .and_then(|line| { - let raw_line = lp_lines.next().unwrap(); - validate_and_qualify_v3_line( - &mut schema, - line_idx, - line, - raw_line, - ingest_time, - precision, - ) - .inspect(|_| bytes += raw_line.len() as u64) - }) { - Ok((qualified_line, catalog_ops)) => (qualified_line, catalog_ops), - Err(error) => { - if accept_partial { - errors.push(error); - } else { - return Err(Error::ParseError(error)); - } - continue; - } - }; - - if let Some(op) = catalog_op { - catalog_updates.push(op); - } - - lines.push(qualified_line); - } - - let catalog_batch = if catalog_updates.is_empty() { - None - } else { - let catalog_batch = CatalogBatch { - database_id: self.state.db_schema.id, - database_name: Arc::clone(&self.state.db_schema.name), - time_ns: self.state.time_now_ns, - ops: catalog_updates, - }; - self.state.catalog.apply_catalog_batch(catalog_batch)? - }; - - Ok(WriteValidator { - state: LinesParsed { - catalog: self.state, - lines, - bytes, - catalog_batch, - errors, - }, - }) - } - /// Parse the incoming lines of line protocol using the v1 parser and update /// the [`DatabaseSchema`] if: /// @@ -240,278 +156,6 @@ impl WriteValidator { /// Type alias for storing new columns added by a write type ColumnTracker = Vec<(ColumnId, Arc, InfluxColumnType)>; -/// Validate an individual line of v3 line protocol and update the database -/// schema -/// -/// The [`DatabaseSchema`] will be updated if the line is being written to a new table, or if -/// the line contains new fields. Note that for v3, the series key members must be consistent, -/// and therefore new tag columns will never be added after the first write. -/// -/// This errors if the write is being performed against a v1 table, i.e., one that does not have -/// a series key. -fn validate_and_qualify_v3_line( - db_schema: &mut Cow<'_, DatabaseSchema>, - line_number: usize, - line: v3::ParsedLine, - raw_line: &str, - ingest_time: Time, - precision: Precision, -) -> Result<(QualifiedLine, Option), WriteLineError> { - let mut catalog_op = None; - let table_name = line.series.measurement.as_str(); - let mut fields = Vec::with_capacity(line.column_count()); - let mut index_count = 0; - let mut field_count = 0; - let qualified = if let Some(table_def) = db_schema.table_definition(table_name) { - let table_id = table_def.table_id; - // TODO: may be faster to compare using table def/column IDs than comparing with schema: - match (table_def.series_key(), &line.series.series_key) { - (s, Some(l)) => { - let l = l.iter().map(|sk| sk.0.as_str()).collect::>(); - if s != l { - return Err(WriteLineError { - original_line: raw_line.to_string(), - line_number, - error_message: format!( - "write to table {table_name} had the incorrect series key, \ - expected: [{expected}], received: [{received}]", - table_name = table_def.table_name, - expected = s.join(", "), - received = l.join(", "), - ), - }); - } - } - (s, None) => { - if !s.is_empty() { - return Err(WriteLineError { - original_line: raw_line.to_string(), - line_number, - error_message: format!( - "write to table {table_name} was missing a series key, the series key \ - contains [{key_members}]", - table_name = table_def.table_name, - key_members = s.join(", "), - ), - }); - } - } - } - - let mut columns = ColumnTracker::with_capacity(line.column_count() + 1); - - // qualify the series key members: - if let Some(sk) = &line.series.series_key { - for (key, val) in sk.iter() { - let col_id = - table_def - .column_name_to_id(key.as_str()) - .ok_or_else(|| WriteLineError { - original_line: raw_line.to_string(), - line_number, - error_message: format!( - "write contained invalid series key column ({key})\ - that does not exist in the catalog table definition" - ), - })?; - fields.push(Field::new(col_id, val)); - index_count += 1; - } - } - - // qualify the fields: - for (field_name, field_val) in line.field_set.iter() { - if let Some((col_id, col_def)) = table_def.column_id_and_definition(field_name.as_str()) - { - let field_col_type = influx_column_type_from_field_value(field_val); - let existing_col_type = col_def.data_type; - if field_col_type != existing_col_type { - let field_name = field_name.to_string(); - return Err(WriteLineError { - original_line: raw_line.to_string(), - line_number: line_number + 1, - error_message: format!( - "invalid field value in line protocol for field '{field_name}' on line \ - {line_number}: expected type {expected}, but got {got}", - expected = existing_col_type, - got = field_col_type, - ), - }); - } - fields.push(Field::new(col_id, field_val)); - } else { - let col_id = ColumnId::new(); - columns.push(( - col_id, - Arc::from(field_name.as_str()), - influx_column_type_from_field_value(field_val), - )); - fields.push(Field::new(col_id, field_val)); - } - field_count += 1; - } - - // qualify the timestamp: - let time_col_id = table_def - .column_name_to_id(TIME_COLUMN_NAME) - .unwrap_or_else(|| { - let col_id = ColumnId::new(); - columns.push(( - col_id, - Arc::from(TIME_COLUMN_NAME), - InfluxColumnType::Timestamp, - )); - col_id - }); - let timestamp_ns = line - .timestamp - .map(|ts| apply_precision_to_timestamp(precision, ts)) - .unwrap_or(ingest_time.timestamp_nanos()); - fields.push(Field::new(time_col_id, FieldData::Timestamp(timestamp_ns))); - - // if we have new columns defined, add them to the db_schema table so that subsequent lines - // won't try to add the same definitions. Collect these additions into a catalog op, which - // will be applied to the catalog with any other ops after all lines in the write request - // have been parsed and validated. - if !columns.is_empty() { - let database_name = Arc::clone(&db_schema.name); - let database_id = db_schema.id; - let db_schema = db_schema.to_mut(); - let mut new_table_def = db_schema - .tables - .get_mut(&table_id) - .unwrap() - .as_ref() - .clone(); - - let mut field_definitions = Vec::with_capacity(columns.len()); - for (id, name, influx_type) in columns.iter() { - field_definitions.push(FieldDefinition::new(*id, Arc::clone(name), influx_type)); - } - catalog_op = Some(CatalogOp::AddFields(FieldAdditions { - database_id, - database_name, - table_id: new_table_def.table_id, - table_name: Arc::clone(&new_table_def.table_name), - field_definitions, - })); - - new_table_def - .add_columns(columns) - .map_err(|e| WriteLineError { - original_line: raw_line.to_string(), - line_number: line_number + 1, - error_message: e.to_string(), - })?; - db_schema - .insert_table(table_id, Arc::new(new_table_def)) - .map_err(|e| WriteLineError { - original_line: raw_line.to_string(), - line_number: line_number + 1, - error_message: e.to_string(), - })?; - } - QualifiedLine { - table_id, - row: Row { - time: timestamp_ns, - fields, - }, - index_count, - field_count, - } - } else { - let table_id = TableId::new(); - let mut columns = Vec::new(); - let mut key = Vec::new(); - if let Some(series_key) = &line.series.series_key { - for (sk, sv) in series_key.iter() { - let col_id = ColumnId::new(); - key.push(col_id); - columns.push((col_id, Arc::from(sk.as_str()), InfluxColumnType::Tag)); - fields.push(Field::new(col_id, sv)); - index_count += 1; - } - } - for (field_name, field_val) in line.field_set.iter() { - let col_id = ColumnId::new(); - columns.push(( - col_id, - Arc::from(field_name.as_str()), - influx_column_type_from_field_value(field_val), - )); - fields.push(Field::new(col_id, field_val)); - field_count += 1; - } - // Always add time last on new table: - let time_col_id = ColumnId::new(); - columns.push(( - time_col_id, - Arc::from(TIME_COLUMN_NAME), - InfluxColumnType::Timestamp, - )); - let timestamp_ns = line - .timestamp - .map(|ts| apply_precision_to_timestamp(precision, ts)) - .unwrap_or(ingest_time.timestamp_nanos()); - fields.push(Field::new(time_col_id, FieldData::Timestamp(timestamp_ns))); - - let table_name = table_name.into(); - - let mut field_definitions = Vec::with_capacity(columns.len()); - for (id, name, influx_type) in &columns { - field_definitions.push(FieldDefinition::new(*id, Arc::clone(name), influx_type)); - } - - let table = TableDefinition::new(table_id, Arc::clone(&table_name), columns, key.clone()) - .map_err(|e| WriteLineError { - original_line: raw_line.to_string(), - line_number: line_number + 1, - error_message: e.to_string(), - })?; - - let table_definition_op = CatalogOp::CreateTable(influxdb3_wal::TableDefinition { - table_id, - database_id: db_schema.id, - database_name: Arc::clone(&db_schema.name), - table_name: Arc::clone(&table_name), - field_definitions, - key, - }); - catalog_op = Some(table_definition_op); - - let db_schema = db_schema.to_mut(); - db_schema - .insert_table(table_id, Arc::new(table)) - .map_err(|e| WriteLineError { - original_line: raw_line.to_string(), - line_number: line_number + 1, - error_message: e.to_string(), - })? - .map_or_else( - || Ok(()), - |_| { - Err(WriteLineError { - original_line: raw_line.to_string(), - line_number: line_number + 1, - error_message: "unexpected overwrite of existing table".to_string(), - }) - }, - )?; - QualifiedLine { - table_id, - row: Row { - time: timestamp_ns, - fields, - }, - index_count, - field_count, - } - }; - - Ok((qualified, catalog_op)) -} - /// Validate a line of line protocol against the given schema definition /// /// This is for scenarios where a write comes in for a table that exists, but may have @@ -947,84 +591,4 @@ mod tests { Ok(()) } - - #[test] - fn write_validator_v3() -> Result<(), Error> { - let host_id = Arc::from("sample-host-id"); - let instance_id = Arc::from("sample-instance-id"); - let namespace = NamespaceName::new("test").unwrap(); - let catalog = Arc::new(Catalog::new(host_id, instance_id)); - let result = WriteValidator::initialize(namespace.clone(), Arc::clone(&catalog), 0) - .unwrap() - .v3_parse_lines_and_update_schema( - "cpu,tag_a/foo val1=\"bar\" 1234", - false, - Time::from_timestamp_nanos(0), - Precision::Auto, - ) - .unwrap() - .convert_lines_to_buffer(Gen1Duration::new_5m()); - assert_eq!(result.line_count, 1); - assert_eq!(result.field_count, 1); - assert_eq!(result.index_count, 1); - assert!(result.errors.is_empty()); - assert_eq!(result.valid_data.database_name.as_ref(), namespace.as_str()); - // cpu table - let batch = result - .valid_data - .table_chunks - .get(&TableId::from(0)) - .unwrap(); - assert_eq!(batch.row_count(), 1); - // Validate another write, the result should be very similar, but now the catalog - // has the table/columns added, so it will excercise a different code path: - let result = WriteValidator::initialize(namespace.clone(), Arc::clone(&catalog), 0) - .unwrap() - .v3_parse_lines_and_update_schema( - "cpu,tag_a/foo val1=\"bar\" 1235", - false, - Time::from_timestamp_nanos(0), - Precision::Auto, - ) - .unwrap() - .convert_lines_to_buffer(Gen1Duration::new_5m()); - println!("result: {result:?}"); - assert_eq!(result.line_count, 1); - assert_eq!(result.field_count, 1); - assert_eq!(result.index_count, 1); - assert!(result.errors.is_empty()); - // Validate another write, this time adding a new field: - let result = WriteValidator::initialize(namespace.clone(), Arc::clone(&catalog), 0) - .unwrap() - .v3_parse_lines_and_update_schema( - "cpu,tag_a/foo val1=\"bar\",val2=false 1236", - false, - Time::from_timestamp_nanos(0), - Precision::Auto, - ) - .unwrap() - .convert_lines_to_buffer(Gen1Duration::new_5m()); - - println!("result: {result:?}"); - assert_eq!(result.line_count, 1); - assert_eq!(result.field_count, 2); - assert_eq!(result.index_count, 1); - assert!(result.errors.is_empty()); - - // Validate another write, this time failing when adding a new tag: - match WriteValidator::initialize(namespace.clone(), catalog, 0) - .unwrap() - .v3_parse_lines_and_update_schema( - "cpu,tag_a/foo/tag_b/baz val1=\"bar\",val2=false 1236", - false, - Time::from_timestamp_nanos(0), - Precision::Auto, - ) { - Err(Error::ParseError(WriteLineError { error_message, .. })) => { - assert_eq!("write to table cpu had the incorrect series key, expected: [tag_a], received: [tag_a, tag_b]", error_message); - } - Ok(_) | Err(_) => panic!("Validator should have failed on new tag"), - } - Ok(()) - } }