diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 534fdd429e..17c983d551 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1301,7 +1301,7 @@ where .extend_connection_map(connection_map); if let Err(err) = Self::refresh_slots_and_subscriptions_with_retries( inner.clone(), - &RefreshPolicy::Throttable, + &RefreshPolicy::NotThrottable, ) .await { diff --git a/glide-core/redis-rs/redis/src/cmd.rs b/glide-core/redis-rs/redis/src/cmd.rs index 92e8aea989..8ebe9cf9c7 100644 --- a/glide-core/redis-rs/redis/src/cmd.rs +++ b/glide-core/redis-rs/redis/src/cmd.rs @@ -11,6 +11,7 @@ use std::{fmt, io}; use crate::connection::ConnectionLike; use crate::pipeline::Pipeline; use crate::types::{from_owned_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs}; +use telemetrylib::GlideSpan; /// An argument to a redis command #[derive(Clone)] @@ -30,6 +31,8 @@ pub struct Cmd { cursor: Option, // If it's true command's response won't be read from socket. Useful for Pub/Sub. no_response: bool, + /// The span associated with this command + span: Option, } /// Represents a redis iterator. @@ -321,6 +324,7 @@ impl Cmd { args: vec![], cursor: None, no_response: false, + span: None, } } @@ -331,6 +335,7 @@ impl Cmd { args: Vec::with_capacity(arg_count), cursor: None, no_response: false, + span: None, } } @@ -360,6 +365,16 @@ impl Cmd { self } + /// Associate a trackable span to the command. This allow tracking the lifetime + /// of the command. + /// + /// A span is used by an OpenTelemetry backend to track the lifetime of the command + #[inline] + pub fn with_span(&mut self, name: &str) -> &mut Cmd { + self.span = Some(telemetrylib::GlideOpenTelemetry::new_span(name)); + self + } + /// Works similar to `arg` but adds a cursor argument. This is always /// an integer and also flips the command implementation to support a /// different mode for the iterators where the iterator will ask for @@ -582,6 +597,12 @@ impl Cmd { pub fn is_no_response(&self) -> bool { self.no_response } + + /// Return this command span + #[inline] + pub fn span(&self) -> Option { + self.span.clone() + } } impl fmt::Debug for Cmd { diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index 4896f83565..9d137d21bf 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -302,10 +302,15 @@ async fn send_command( mut client: Client, routing: Option, ) -> ClientUsageResult { - client + let child_span = cmd.span().map(|span| span.add_span("send_command")); + let res = client .send_command(&cmd, routing) .await - .map_err(|err| err.into()) + .map_err(|err| err.into()); + if let Some(child_span) = child_span { + child_span.end(); + } + res } // Parse the cluster scan command parameters from protobuf and send the command to redis-rs. diff --git a/glide-core/telemetry/Cargo.toml b/glide-core/telemetry/Cargo.toml index 73b9cb25ea..b6bd004274 100644 --- a/glide-core/telemetry/Cargo.toml +++ b/glide-core/telemetry/Cargo.toml @@ -9,3 +9,9 @@ authors = ["Valkey GLIDE Maintainers"] lazy_static = "1" serde = { version = "1", features = ["derive"] } serde_json = "1" +chrono = "0" +futures-util = "0" +tokio = { version = "1", features = ["macros", "time"] } + +opentelemetry = "0" +opentelemetry_sdk = { version = "0", features = ["rt-tokio"] } diff --git a/glide-core/telemetry/src/lib.rs b/glide-core/telemetry/src/lib.rs index 886e43a2c8..f0a938f5e8 100644 --- a/glide-core/telemetry/src/lib.rs +++ b/glide-core/telemetry/src/lib.rs @@ -1,6 +1,11 @@ use lazy_static::lazy_static; use serde::Serialize; use std::sync::RwLock as StdRwLock; +mod open_telemetry; +mod open_telemetry_exporter_file; + +pub use open_telemetry::{GlideOpenTelemetry, GlideSpan}; +pub use open_telemetry_exporter_file::SpanExporterFile; #[derive(Default, Serialize)] #[allow(dead_code)] diff --git a/glide-core/telemetry/src/open_telemetry.rs b/glide-core/telemetry/src/open_telemetry.rs new file mode 100644 index 0000000000..eb61247bd5 --- /dev/null +++ b/glide-core/telemetry/src/open_telemetry.rs @@ -0,0 +1,359 @@ +use opentelemetry::global::ObjectSafeSpan; +use opentelemetry::trace::SpanKind; +use opentelemetry::trace::TraceContextExt; +use opentelemetry::{global, trace::Tracer}; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use opentelemetry_sdk::trace::TracerProvider; +use std::path::PathBuf; +use std::sync::{Arc, RwLock}; + +const SPAN_WRITE_LOCK_ERR: &str = "Failed to get span write lock"; +const SPAN_READ_LOCK_ERR: &str = "Failed to get span read lock"; +const TRACE_SCOPE: &str = "valkey_glide"; + +pub enum GlideSpanStatus { + Ok, + Error(String), +} + +#[allow(dead_code)] +#[derive(Clone, Debug)] +/// Defines the method that exporter connects to the collector. It can be: +/// gRPC or HTTP. The third type (i.e. "File") defines an exporter that does not connect to a collector +/// instead, it writes the collected signals to files. +pub enum GlideOpenTelemetryTraceExporter { + /// Collector is listening on grpc + Grpc(String), + /// Collector is listening on http + Http(String), + /// No collector. Instead, write the traces collected to a file. The contained value "PathBuf" + /// points to the folder where the collected data should be placed. + File(PathBuf), +} + +#[derive(Clone, Debug)] +struct GlideSpanInner { + span: Arc>, +} + +impl GlideSpanInner { + /// Create new span with no parent. + pub fn new(name: &str) -> Self { + let tracer = global::tracer(TRACE_SCOPE); + let span = Arc::new(RwLock::new( + tracer + .span_builder(name.to_string()) + .with_kind(SpanKind::Client) + .start(&tracer), + )); + GlideSpanInner { span } + } + + /// Create new span as a child of `parent`. + pub fn new_with_parent(name: &str, parent: &GlideSpanInner) -> Self { + let parent_span_ctx = parent + .span + .read() + .expect(SPAN_READ_LOCK_ERR) + .span_context() + .clone(); + + let parent_context = + opentelemetry::Context::new().with_remote_span_context(parent_span_ctx); + + let tracer = global::tracer(TRACE_SCOPE); + let span = Arc::new(RwLock::new( + tracer + .span_builder(name.to_string()) + .with_kind(SpanKind::Client) + .start_with_context(&tracer, &parent_context), + )); + GlideSpanInner { span } + } + + /// Attach event with name and list of attributes to this span. + pub fn add_event(&self, name: &str, attributes: Option<&Vec<(&str, &str)>>) { + let attributes: Vec = if let Some(attributes) = attributes { + attributes + .iter() + .map(|(k, v)| opentelemetry::KeyValue::new(k.to_string(), v.to_string())) + .collect() + } else { + Vec::::default() + }; + self.span + .write() + .expect(SPAN_WRITE_LOCK_ERR) + .add_event_with_timestamp( + name.to_string().into(), + std::time::SystemTime::now(), + attributes, + ); + } + + pub fn set_status(&self, status: GlideSpanStatus) { + match status { + GlideSpanStatus::Ok => self + .span + .write() + .expect(SPAN_WRITE_LOCK_ERR) + .set_status(opentelemetry::trace::Status::Ok), + GlideSpanStatus::Error(what) => { + self.span.write().expect(SPAN_WRITE_LOCK_ERR).set_status( + opentelemetry::trace::Status::Error { + description: what.into(), + }, + ) + } + } + } + + /// Create new span, add it as a child to this span and return it + pub fn add_span(&self, name: &str) -> GlideSpanInner { + let child = GlideSpanInner::new_with_parent(name, self); + { + let child_span = child.span.read().expect(SPAN_WRITE_LOCK_ERR); + self.span + .write() + .expect(SPAN_WRITE_LOCK_ERR) + .add_link(child_span.span_context().clone(), Vec::default()); + } + child + } + + /// Return the span ID + pub fn id(&self) -> String { + self.span + .read() + .expect(SPAN_READ_LOCK_ERR) + .span_context() + .span_id() + .to_string() + } + + /// Finishes the `Span`. + pub fn end(&self) { + self.span.write().expect(SPAN_READ_LOCK_ERR).end() + } +} + +#[derive(Clone, Debug)] +pub struct GlideSpan { + inner: GlideSpanInner, +} + +impl GlideSpan { + pub fn new(name: &str) -> Self { + GlideSpan { + inner: GlideSpanInner::new(name), + } + } + + /// Attach event with name to this span. + pub fn add_event(&self, name: &str) { + self.inner.add_event(name, None) + } + + /// Attach event with name and attributes to this span. + pub fn add_event_with_attributes(&self, name: &str, attributes: &Vec<(&str, &str)>) { + self.inner.add_event(name, Some(attributes)) + } + + pub fn set_status(&self, status: GlideSpanStatus) { + self.inner.set_status(status) + } + + /// Add child span to this span and return it + pub fn add_span(&self, name: &str) -> GlideSpan { + GlideSpan { + inner: self.inner.add_span(name), + } + } + + pub fn id(&self) -> String { + self.inner.id() + } + + /// Finishes the `Span`. + pub fn end(&self) { + self.inner.end() + } +} + +/// OpenTelemetry configuration object. Use `GlideOpenTelemetryConfigBuilder` to construct it: +/// +/// ```text +/// let config = GlideOpenTelemetryConfigBuilder::default() +/// .with_flush_interval(std::time::Duration::from_millis(100)) +/// .build(); +/// GlideOpenTelemetry::initialise(config); +/// ``` +pub struct GlideOpenTelemetryConfig { + /// Default delay interval between two consecutive exports. + span_flush_interval: std::time::Duration, + /// Determines the protocol between the collector and GLIDE + trace_exporter: GlideOpenTelemetryTraceExporter, +} + +#[derive(Clone, Debug)] +#[allow(dead_code)] +pub struct GlideOpenTelemetryConfigBuilder { + span_flush_interval: std::time::Duration, + trace_exporter: GlideOpenTelemetryTraceExporter, +} + +impl Default for GlideOpenTelemetryConfigBuilder { + fn default() -> Self { + GlideOpenTelemetryConfigBuilder { + span_flush_interval: std::time::Duration::from_millis(5_000), + trace_exporter: GlideOpenTelemetryTraceExporter::File(std::env::temp_dir()), + } + } +} + +#[allow(dead_code)] +impl GlideOpenTelemetryConfigBuilder { + pub fn with_flush_interval(mut self, duration: std::time::Duration) -> Self { + self.span_flush_interval = duration; + self + } + + pub fn with_trace_exporter(mut self, protocol: GlideOpenTelemetryTraceExporter) -> Self { + self.trace_exporter = protocol; + self + } + + pub fn build(self) -> GlideOpenTelemetryConfig { + GlideOpenTelemetryConfig { + span_flush_interval: self.span_flush_interval, + trace_exporter: self.trace_exporter, + } + } +} + +pub struct GlideOpenTelemetry {} + +/// Our interface to OpenTelemetry +impl GlideOpenTelemetry { + /// Initialise the open telemetry library with a file system exporter + /// + /// This method should be called once for the given **process** + pub fn initialise(config: GlideOpenTelemetryConfig) { + let trace_exporter = match config.trace_exporter { + GlideOpenTelemetryTraceExporter::File(p) => { + let exporter = crate::SpanExporterFile::new(p); + let batch_config = opentelemetry_sdk::trace::BatchConfigBuilder::default() + .with_scheduled_delay(config.span_flush_interval) + .build(); + opentelemetry_sdk::trace::BatchSpanProcessor::builder( + exporter, + opentelemetry_sdk::runtime::Tokio, + ) + .with_batch_config(batch_config) + .build() + } + GlideOpenTelemetryTraceExporter::Http(_url) => { + todo!("HTTP protocol is not implemented yet!") + } + GlideOpenTelemetryTraceExporter::Grpc(_url) => { + todo!("GRPC protocol is not implemented yet!") + } + }; + + global::set_text_map_propagator(TraceContextPropagator::new()); + let provider = TracerProvider::builder() + .with_span_processor(trace_exporter) + .build(); + global::set_tracer_provider(provider); + } + + /// Create new span + pub fn new_span(name: &str) -> GlideSpan { + GlideSpan::new(name) + } + + /// Trigger a shutdown procedure flushing all remaining traces + pub fn shutdown() { + global::shutdown_tracer_provider(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + const SPANS_JSON: &str = "/tmp/spans.json"; + + fn string_property_to_u64(json: &serde_json::Value, prop: &str) -> u64 { + let s = json[prop].to_string().replace('"', ""); + s.parse::().unwrap() + } + + #[test] + fn test_span_json_exporter() { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + runtime.block_on(async { + let _ = std::fs::remove_file(SPANS_JSON); + let config = GlideOpenTelemetryConfigBuilder::default() + .with_flush_interval(std::time::Duration::from_millis(100)) + .with_trace_exporter(GlideOpenTelemetryTraceExporter::File(PathBuf::from("/tmp"))) + .build(); + GlideOpenTelemetry::initialise(config); + let span = GlideOpenTelemetry::new_span("Root_Span_1"); + span.add_event("Event1"); + span.set_status(GlideSpanStatus::Ok); + + let child1 = span.add_span("Network_Span"); + + // Simulate some work + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + child1.end(); + + // Simulate that the parent span is still doing some work + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + span.end(); + + let span = GlideOpenTelemetry::new_span("Root_Span_2"); + span.add_event("Event1"); + span.add_event("Event2"); + span.set_status(GlideSpanStatus::Ok); + drop(span); // writes the span + + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + // Read the file content + let file_content = std::fs::read_to_string(SPANS_JSON).unwrap(); + let lines: Vec<&str> = file_content.split('\n').collect(); + assert_eq!(lines.len(), 4); + + let span_json: serde_json::Value = serde_json::from_str(lines[0]).unwrap(); + assert_eq!(span_json["name"], "Network_Span"); + let network_span_id = span_json["span_id"].to_string(); + let network_span_start_time = string_property_to_u64(&span_json, "start_time"); + let network_span_end_time = string_property_to_u64(&span_json, "end_time"); + + // Because of the sleep above, the network span should be at least 100ms (units are microseconds) + assert!(network_span_end_time - network_span_start_time >= 100_000); + + let span_json: serde_json::Value = serde_json::from_str(lines[1]).unwrap(); + assert_eq!(span_json["name"], "Root_Span_1"); + assert_eq!(span_json["links"].as_array().unwrap().len(), 1); // we expect 1 child + let root_1_span_start_time = string_property_to_u64(&span_json, "start_time"); + let root_1_span_end_time = string_property_to_u64(&span_json, "end_time"); + + // The network span started *after* its parent + assert!(network_span_start_time >= root_1_span_start_time); + + // The parent span ends *after* the child span (by at least 100ms) + assert!(root_1_span_end_time - network_span_end_time >= 100_000); + + let child_span_id = span_json["links"][0]["span_id"].to_string(); + assert_eq!(child_span_id, network_span_id); + + let span_json: serde_json::Value = serde_json::from_str(lines[2]).unwrap(); + assert_eq!(span_json["name"], "Root_Span_2"); + }); + } +} diff --git a/glide-core/telemetry/src/open_telemetry_exporter_file.rs b/glide-core/telemetry/src/open_telemetry_exporter_file.rs new file mode 100644 index 0000000000..71282cccda --- /dev/null +++ b/glide-core/telemetry/src/open_telemetry_exporter_file.rs @@ -0,0 +1,194 @@ +use chrono::{DateTime, Utc}; +use core::fmt; +use futures_util::future::BoxFuture; +use opentelemetry::trace::TraceError; +use opentelemetry_sdk::export::{self, trace::ExportResult}; +use serde_json::{Map, Value}; +use std::fs::OpenOptions; +use std::io::Write; +use std::path::PathBuf; +use std::sync::atomic; + +use opentelemetry_sdk::resource::Resource; + +/// An OpenTelemetry exporter that writes Spans to a file on export. +pub struct SpanExporterFile { + resource: Resource, + is_shutdown: atomic::AtomicBool, + path: PathBuf, +} + +impl fmt::Debug for SpanExporterFile { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("SpanExporterFile") + } +} + +impl SpanExporterFile { + pub fn new(mut path: PathBuf) -> Self { + path.push("spans.json"); + SpanExporterFile { + resource: Resource::default(), + is_shutdown: atomic::AtomicBool::new(false), + path, + } + } +} + +macro_rules! file_writeln { + ($file:expr, $content:expr) => {{ + if let Err(e) = $file.write(format!("{}\n", $content).as_bytes()) { + return Box::pin(std::future::ready(Err(TraceError::from(format!( + "File write error. {e}", + ))))); + } + }}; +} + +impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporterFile { + /// Write Spans to JSON file + fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { + let Ok(mut data_file) = OpenOptions::new() + .create(true) + .append(true) + .open(&self.path) + else { + return Box::pin(std::future::ready(Err(TraceError::from(format!( + "Unable to open exporter file: {} for append.", + self.path.display() + ))))); + }; + + let spans = to_jsons(batch); + for span in &spans { + if let Ok(s) = serde_json::to_string(&span) { + file_writeln!(data_file, s); + } + } + Box::pin(std::future::ready(Ok(()))) + } + + fn shutdown(&mut self) { + self.is_shutdown.store(true, atomic::Ordering::SeqCst); + } + + fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) { + self.resource = res.clone(); + } +} + +fn to_jsons(batch: Vec) -> Vec { + let mut spans = Vec::::new(); + for span in &batch { + let mut map = Map::new(); + map.insert( + "scope".to_string(), + Value::String(span.instrumentation_scope.name().to_string()), + ); + if let Some(version) = &span.instrumentation_scope.version() { + map.insert("version".to_string(), Value::String(version.to_string())); + } + if let Some(schema_url) = &span.instrumentation_scope.schema_url() { + map.insert( + "schema_url".to_string(), + Value::String(schema_url.to_string()), + ); + } + + let mut scope_attributes = Vec::::new(); + for kv in span.instrumentation_scope.attributes() { + let mut attr = Map::new(); + attr.insert(kv.key.to_string(), Value::String(kv.value.to_string())); + scope_attributes.push(Value::Object(attr)); + } + map.insert( + "scope_attributes".to_string(), + Value::Array(scope_attributes), + ); + map.insert("name".to_string(), Value::String(span.name.to_string())); + map.insert( + "span_id".to_string(), + Value::String(span.span_context.span_id().to_string()), + ); + map.insert( + "parent_span_id".to_string(), + Value::String(span.parent_span_id.to_string()), + ); + map.insert( + "trace_id".to_string(), + Value::String(span.span_context.trace_id().to_string()), + ); + map.insert( + "kind".to_string(), + Value::String(format!("{:?}", span.span_kind)), + ); + + let datetime: DateTime = span.start_time.into(); + map.insert( + "start_time".to_string(), + Value::String(datetime.timestamp_micros().to_string()), + ); + + let datetime: DateTime = span.end_time.into(); + map.insert( + "end_time".to_string(), + Value::String(datetime.timestamp_micros().to_string()), + ); + + map.insert( + "status".to_string(), + Value::String(format!("{:?}", span.status)), + ); + + // Add the span attributes + let mut span_attributes = Vec::::new(); + for kv in span.attributes.iter() { + let mut attr = Map::new(); + attr.insert(kv.key.to_string(), Value::String(kv.value.to_string())); + span_attributes.push(Value::Object(attr)); + } + map.insert("span_attributes".to_string(), Value::Array(span_attributes)); + + // Add span events + let mut events = Vec::::new(); + for event in span.events.iter() { + let mut evt = Map::new(); + evt.insert("name".to_string(), Value::String(event.name.to_string())); + let datetime: DateTime = event.timestamp.into(); + evt.insert( + "timestamp".to_string(), + Value::String(datetime.format("%Y-%m-%d %H:%M:%S%.6f").to_string()), + ); + + let mut event_attributes = Vec::::new(); + for kv in event.attributes.iter() { + let mut attr = Map::new(); + attr.insert(kv.key.to_string(), Value::String(kv.value.to_string())); + event_attributes.push(Value::Object(attr)); + } + evt.insert( + "event_attributes".to_string(), + Value::Array(event_attributes), + ); + events.push(Value::Object(evt)); + } + map.insert("events".to_string(), Value::Array(events)); + + let mut links = Vec::::new(); + for link in span.links.iter() { + let mut lk = Map::new(); + lk.insert( + "trace_id".to_string(), + Value::String(link.span_context.trace_id().to_string()), + ); + lk.insert( + "span_id".to_string(), + Value::String(link.span_context.span_id().to_string()), + ); + links.push(Value::Object(lk)); + } + map.insert("links".to_string(), Value::Array(links)); + spans.push(Value::Object(map)); + } + spans +} diff --git a/utils/get_licenses_from_ort.py b/utils/get_licenses_from_ort.py index 6b4b6cb60e..0ba84559e7 100644 --- a/utils/get_licenses_from_ort.py +++ b/utils/get_licenses_from_ort.py @@ -34,6 +34,7 @@ "BSD-3-Clause OR Apache-2.0", "ISC", "MIT", + "MPL-2.0", "Zlib", "MIT OR Unlicense", "PSF-2.0", @@ -42,7 +43,9 @@ # Packages with non-pre-approved licenses that received manual approval. APPROVED_PACKAGES = [ "PyPI::pathspec:0.12.1", - "PyPI::certifi:2023.11.17" + "PyPI::certifi:2023.11.17", + "Crate::ring:0.17.8", + "Maven:org.json:json:20231013" ] SCRIPT_PATH = os.path.dirname(os.path.realpath(__file__))