Skip to content

Commit

Permalink
Translate between different versions of prost-types
Browse files Browse the repository at this point in the history
This is absolutely awful, but do you know what was even more awful?
Trying to upgrade every prost packages and all the packages that use prost under the hood.
  • Loading branch information
goakley committed Oct 1, 2024
1 parent e967d63 commit 0339ad1
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 7 deletions.
1 change: 0 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,3 @@ fn main() {
// Emit the aforementioned stanzas.
tracker.emit_rerun_stanzas();
}

2 changes: 1 addition & 1 deletion src/sinks/gcp/bigquery/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use vector_lib::codecs::encoding::ProtobufSerializerConfig;
use futures::FutureExt;
use http::Uri;
use indoc::indoc;
use tonic::transport::Channel;
use vector_lib::codecs::encoding::ProtobufSerializerConfig;
use vector_lib::configurable::configurable_component;

use super::proto::third_party::google::cloud::bigquery::storage::v1 as proto;
Expand Down
162 changes: 159 additions & 3 deletions src/sinks/gcp/bigquery/request_builder.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use bytes::BytesMut;
use vector_lib::codecs::encoding::ProtobufSerializer;
use prost::Message;
use std::num::NonZeroUsize;
use tokio_util::codec::Encoder;
use vector_lib::request_metadata::RequestMetadata;
use vector_lib::codecs::encoding::ProtobufSerializer;
use vector_lib::event::Finalizable;
use vector_lib::request_metadata::RequestMetadata;

use super::proto::third_party::google::cloud::bigquery::storage::v1 as proto;
use super::service::BigqueryRequest;
Expand Down Expand Up @@ -48,7 +48,9 @@ impl BigqueryRequestBuilder {
) -> (NonZeroUsize, proto::append_rows_request::ProtoData) {
let proto_data = proto::append_rows_request::ProtoData {
writer_schema: Some(proto::ProtoSchema {
proto_descriptor: Some(self.protobuf_serializer.descriptor_proto().clone()),
proto_descriptor: Some(translate_descriptor_proto(
self.protobuf_serializer.descriptor_proto().clone(),
)),
}),
rows: Some(proto::ProtoRows { serialized_rows }),
};
Expand Down Expand Up @@ -136,6 +138,160 @@ impl IncrementalRequestBuilder<Vec<Event>> for BigqueryRequestBuilder {
}
}

/// Convert from `prost_reflect::prost_types::DescriptorProto` to `prost_types::DescriptorProto`
///
/// Someone upgraded `prost_reflect` without upgrading the other prost crates,
/// so the `prost_types` version used by `prost_reflect` is newer than the version used by vector.
///
/// This function discards any `UninterpretedOption`s.
///
/// "Why don't you just upgrade `prost_types` to match the version used by `prost_reflect`?
/// Ha. Hahaha. Hahahahahahaha. My branches are littered with the corpses of such attempts.

Check failure

Code scanning / check-spelling

Unrecognized Spelling Error

Hahaha is not a recognized word. (unrecognized-spelling)

Check failure

Code scanning / check-spelling

Unrecognized Spelling Error

Hahahahahahaha is not a recognized word. (unrecognized-spelling)
fn translate_descriptor_proto(
old_descriptor: prost_reflect::prost_types::DescriptorProto,
) -> prost_types::DescriptorProto {
prost_types::DescriptorProto {
name: old_descriptor.name,
field: old_descriptor
.field
.into_iter()
.map(|field| prost_types::FieldDescriptorProto {
name: field.name,
number: field.number,
label: field.label,
r#type: field.r#type,
type_name: field.type_name,
extendee: field.extendee,
default_value: field.default_value,
oneof_index: field.oneof_index,
json_name: field.json_name,
options: field.options.map(|options| prost_types::FieldOptions {
ctype: options.ctype,
packed: options.packed,
jstype: options.jstype,
lazy: options.lazy,
deprecated: options.deprecated,
weak: options.weak,
uninterpreted_option: Default::default(),
}),
proto3_optional: field.proto3_optional,
})
.collect(),
extension: old_descriptor
.extension
.into_iter()
.map(|field| prost_types::FieldDescriptorProto {
name: field.name,
number: field.number,
label: field.label,
r#type: field.r#type,
type_name: field.type_name,
extendee: field.extendee,
default_value: field.default_value,
oneof_index: field.oneof_index,
json_name: field.json_name,
options: field.options.map(|options| prost_types::FieldOptions {
ctype: options.ctype,
packed: options.packed,
jstype: options.jstype,
lazy: options.lazy,
deprecated: options.deprecated,
weak: options.weak,
uninterpreted_option: Default::default(),
}),
proto3_optional: field.proto3_optional,
})
.collect(),
nested_type: old_descriptor
.nested_type
.into_iter()
.map(translate_descriptor_proto)
.collect(),
enum_type: old_descriptor
.enum_type
.into_iter()
.map(|enum_descriptor| prost_types::EnumDescriptorProto {
name: enum_descriptor.name,
value: enum_descriptor
.value
.into_iter()
.map(|value| prost_types::EnumValueDescriptorProto {
name: value.name,
number: value.number,
options: value.options.map(|options| prost_types::EnumValueOptions {
deprecated: options.deprecated,
uninterpreted_option: Default::default(),
}),
})
.collect(),
options: enum_descriptor
.options
.map(|options| prost_types::EnumOptions {
allow_alias: options.allow_alias,
deprecated: options.deprecated,
uninterpreted_option: Default::default(),
}),
reserved_range: enum_descriptor
.reserved_range
.into_iter()
.map(
|reserved_range| prost_types::enum_descriptor_proto::EnumReservedRange {
start: reserved_range.start,
end: reserved_range.end,
},
)
.collect(),
reserved_name: enum_descriptor.reserved_name,
})
.collect(),
extension_range: old_descriptor
.extension_range
.into_iter()
.map(
|extension_range| prost_types::descriptor_proto::ExtensionRange {
start: extension_range.start,
end: extension_range.end,
options: extension_range
.options
.map(|_| prost_types::ExtensionRangeOptions {
uninterpreted_option: Default::default(),
}),
},
)
.collect(),
oneof_decl: old_descriptor
.oneof_decl
.into_iter()
.map(|oneof| prost_types::OneofDescriptorProto {
name: oneof.name,
options: oneof.options.map(|_| prost_types::OneofOptions {
uninterpreted_option: Default::default(),
}),
})
.collect(),
options: old_descriptor
.options
.map(|options| prost_types::MessageOptions {
message_set_wire_format: options.message_set_wire_format,
no_standard_descriptor_accessor: options.no_standard_descriptor_accessor,
deprecated: options.deprecated,
map_entry: options.map_entry,
uninterpreted_option: Default::default(),
}),
reserved_range: old_descriptor
.reserved_range
.into_iter()
.map(
|reserved_range| prost_types::descriptor_proto::ReservedRange {
start: reserved_range.start,
end: reserved_range.end,
},
)
.collect(),
reserved_name: old_descriptor.reserved_name,
}
}

#[cfg(test)]
mod test {
use bytes::{BufMut, Bytes, BytesMut};
Expand Down
6 changes: 4 additions & 2 deletions src/sinks/gcp/bigquery/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use tonic::service::Interceptor;
use tonic::transport::Channel;
use tonic::{Request, Status};
use tower::Service;
use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
use vector_lib::event::EventStatus;
use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
use vector_lib::stream::DriverResponse;

use super::proto::third_party::google::cloud::bigquery::storage::v1 as proto;
Expand Down Expand Up @@ -81,7 +81,9 @@ impl DriverResponse for BigqueryResponse {
// these errors can't be retried because the event payload is almost definitely bad
Ok(super::proto::third_party::google::rpc::Code::InvalidArgument)
| Ok(super::proto::third_party::google::rpc::Code::NotFound)
| Ok(super::proto::third_party::google::rpc::Code::AlreadyExists) => EventStatus::Rejected,
| Ok(super::proto::third_party::google::rpc::Code::AlreadyExists) => {
EventStatus::Rejected
}
// everything else can probably be retried
_ => EventStatus::Errored,
}
Expand Down

0 comments on commit 0339ad1

Please sign in to comment.