From 46dfdd9167a59f38fa6d38aea28196f49de7c404 Mon Sep 17 00:00:00 2001 From: Wojciech Zyla Date: Fri, 20 Dec 2024 12:14:01 +0100 Subject: [PATCH] [receiver/kafkareceiver] add topic_regex configuration option update changelog --- .../feat_kafkareceiver-regex-topics.yaml | 27 ++ receiver/kafkareceiver/README.md | 3 +- receiver/kafkareceiver/config.go | 8 +- receiver/kafkareceiver/config_test.go | 45 ++- receiver/kafkareceiver/kafka_receiver.go | 289 +++++++++++++- receiver/kafkareceiver/kafka_receiver_test.go | 366 ++++++++++++++++++ receiver/kafkareceiver/testdata/config.yaml | 21 + 7 files changed, 755 insertions(+), 4 deletions(-) create mode 100644 .chloggen/feat_kafkareceiver-regex-topics.yaml diff --git a/.chloggen/feat_kafkareceiver-regex-topics.yaml b/.chloggen/feat_kafkareceiver-regex-topics.yaml new file mode 100644 index 000000000000..056e9ddb2d57 --- /dev/null +++ b/.chloggen/feat_kafkareceiver-regex-topics.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: 'kafkareceiver' + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add 'topic_regex' configuration option to kafkareceiver. It allows to subscribe to topics based on name pattern." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36909] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index b7b25c4eadb6..c1430994740a 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -28,7 +28,8 @@ The following settings can be optionally configured: - `brokers` (default = localhost:9092): The list of kafka brokers - `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup - `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to read from. - Only one telemetry type may be used for a given topic. + Only one telemetry type may be used for a given topic. Only one setting option `topic` or `topic_regex` can be used. +- `topic_regex` (no default): Used for declaring topics subscription as name pattern. Only one setting option `topic` or `topic_regex` can be used. - `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Supports encoding extensions. Tries to load an encoding extension and falls back to internal encodings if no extension was loaded. Available internal encodings: - `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively. - `otlp_json`: the payload is deserialized to `ExportTraceServiceRequest` `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively using JSON encoding. diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 5b25c7c7c5f3..3b13c0404e2a 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -4,6 +4,7 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" import ( + "fmt" "time" "go.opentelemetry.io/collector/component" @@ -52,8 +53,10 @@ type Config struct { SessionTimeout time.Duration `mapstructure:"session_timeout"` // Heartbeat interval for the Kafka consumer HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"` - // The name of the kafka topic to consume from (default "otlp_spans" for traces, "otlp_metrics" for metrics, "otlp_logs" for logs) + // The name of the kafka topic to consume from (default "otlp_spans" for traces, "otlp_metrics" for metrics, "otlp_logs" for logs). If topics_regex is used, this field must remain empty. Topic string `mapstructure:"topic"` + // Name pattern of the kafka topics to consume from. If topic is used, this field must remain empty. + TopicRegex string `mapstructure:"topic_regex"` // Encoding of the messages (default "otlp_proto") Encoding string `mapstructure:"encoding"` // The consumer group that receiver will be consuming messages from (default "otel-collector") @@ -96,5 +99,8 @@ var _ component.Config = (*Config)(nil) // Validate checks the receiver configuration is valid func (cfg *Config) Validate() error { + if len(cfg.Topic) > 0 && len(cfg.TopicRegex) > 0 { + return fmt.Errorf("only one setting 'topic' or 'topics_regex' can be used") + } return nil } diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index b5e7faa1dc93..7e514e2dae7a 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -34,6 +34,7 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, ""), expected: &Config{ Topic: "spans", + TopicRegex: "", Encoding: "otlp_proto", Brokers: []string{"foo:123", "bar:456"}, ResolveCanonicalBootstrapServersOnly: true, @@ -71,6 +72,44 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, "logs"), expected: &Config{ Topic: "logs", + TopicRegex: "", + Encoding: "direct", + Brokers: []string{"coffee:123", "foobar:456"}, + ClientID: "otel-collector", + GroupID: "otel-collector", + InitialOffset: "earliest", + SessionTimeout: 45 * time.Second, + HeartbeatInterval: 15 * time.Second, + Authentication: kafka.Authentication{ + TLS: &configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: "ca.pem", + CertFile: "cert.pem", + KeyFile: "key.pem", + }, + }, + }, + Metadata: kafkaexporter.Metadata{ + Full: true, + Retry: kafkaexporter.MetadataRetry{ + Max: 10, + Backoff: time.Second * 5, + }, + }, + AutoCommit: AutoCommit{ + Enable: true, + Interval: 1 * time.Second, + }, + MinFetchSize: 1, + DefaultFetchSize: 1048576, + MaxFetchSize: 0, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "topics_err"), + expected: &Config{ + Topic: "logs", + TopicRegex: "logs[0-9]", Encoding: "direct", Brokers: []string{"coffee:123", "foobar:456"}, ClientID: "otel-collector", @@ -114,7 +153,11 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.NoError(t, sub.Unmarshal(cfg)) - assert.NoError(t, component.ValidateConfig(cfg)) + if tt.id.String() == "kafka/topics_err" { + assert.EqualError(t, component.ValidateConfig(cfg), "only one setting 'topic' or 'topics_regex' can be used") + } else { + assert.NoError(t, component.ValidateConfig(cfg)) + } assert.Equal(t, tt.expected, cfg) }) } diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 1ec2d5aca6e9..ac2b2abd3ee3 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -21,6 +21,8 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" + "regexp" + "time" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata" @@ -130,7 +132,7 @@ func newTracesReceiver(config Config, set receiver.Settings, nextConsumer consum }, nil } -func createKafkaClient(ctx context.Context, config Config) (sarama.ConsumerGroup, error) { +var createKafkaClient = func(ctx context.Context, config Config) (sarama.ConsumerGroup, error) { saramaConfig := sarama.NewConfig() saramaConfig.ClientID = config.ClientID saramaConfig.Metadata.Full = config.Metadata.Full @@ -162,6 +164,38 @@ func createKafkaClient(ctx context.Context, config Config) (sarama.ConsumerGroup return sarama.NewConsumerGroup(config.Brokers, config.GroupID, saramaConfig) } +var createKafkaClusterAdmin = func(ctx context.Context, config Config) (sarama.ClusterAdmin, error) { + saramaConfig := sarama.NewConfig() + saramaConfig.ClientID = config.ClientID + saramaConfig.Metadata.Full = config.Metadata.Full + saramaConfig.Metadata.Retry.Max = config.Metadata.Retry.Max + saramaConfig.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff + saramaConfig.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable + saramaConfig.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval + saramaConfig.Consumer.Group.Session.Timeout = config.SessionTimeout + saramaConfig.Consumer.Group.Heartbeat.Interval = config.HeartbeatInterval + saramaConfig.Consumer.Fetch.Min = config.MinFetchSize + saramaConfig.Consumer.Fetch.Default = config.DefaultFetchSize + saramaConfig.Consumer.Fetch.Max = config.MaxFetchSize + + var err error + if saramaConfig.Consumer.Offsets.Initial, err = toSaramaInitialOffset(config.InitialOffset); err != nil { + return nil, err + } + if config.ResolveCanonicalBootstrapServersOnly { + saramaConfig.Net.ResolveCanonicalBootstrapServers = true + } + if config.ProtocolVersion != "" { + if saramaConfig.Version, err = sarama.ParseKafkaVersion(config.ProtocolVersion); err != nil { + return nil, err + } + } + if err := kafka.ConfigureAuthentication(ctx, config.Authentication, saramaConfig); err != nil { + return nil, err + } + return sarama.NewClusterAdmin(config.Brokers, saramaConfig) +} + func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) error { ctx, cancel := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancel @@ -212,6 +246,10 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro headers: c.headers, } } + if len(c.config.TopicRegex) != 0 { + c.consumeLoopWG.Add(1) + go c.checkNewTopics(ctx) + } c.consumeLoopWG.Add(1) go c.consumeLoop(ctx, consumerGroup) <-consumerGroup.ready @@ -235,6 +273,81 @@ func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.Co } } +func (c *kafkaTracesConsumer) checkNewTopics(ctx context.Context) { + defer c.consumeLoopWG.Done() + var currentTopics []string + var admin sarama.ClusterAdmin + + admin, err := createKafkaClusterAdmin(ctx, c.config) + if err != nil { + c.settings.Logger.Error("Cluster admin failed", zap.Error(err)) + return + } + defer admin.Close() + + // Helper function to recreate the consumer group + recreateConsumerGroup := func(topics []string) error { + c.topics = topics + if c.consumerGroup != nil { + // Close the existing consumer group + c.settings.Logger.Info("Closing existing consumer group...") + err := c.consumerGroup.Close() + if err != nil { + return err + } + } + if c.consumerGroup, err = createKafkaClient(ctx, c.config); err != nil { + return err + } + return nil + } + + // Periodically check for new topics + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) + return + case <-ticker.C: + // Fetch the list of topics + topics, err := admin.ListTopics() + if err != nil { + c.settings.Logger.Error("Failed to list topics: ", zap.Error(err)) + continue + } + + // Extract topic names from the response + var topicNames []string + for topic := range topics { + topicNames = append(topicNames, topic) + } + + // Filter topics + var subTopics []string + reg, _ := regexp.Compile(c.config.TopicRegex) + for _, t := range topicNames { + if reg.MatchString(t) { + subTopics = append(subTopics, t) + } + } + + // Check if there are new topics + if !equalStringSlices(currentTopics, subTopics) { + c.settings.Logger.Info("New topics detected, recreating consumer group...") + err := recreateConsumerGroup(subTopics) + if err != nil { + c.settings.Logger.Error("Failed to recreate consumer group: ", zap.Error(err)) + continue + } + currentTopics = subTopics + } + } + } +} + func (c *kafkaTracesConsumer) Shutdown(context.Context) error { if c.cancelConsumeLoop == nil { return nil @@ -320,6 +433,10 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err headers: c.headers, } } + if len(c.config.TopicRegex) != 0 { + c.consumeLoopWG.Add(1) + go c.checkNewTopics(ctx) + } c.consumeLoopWG.Add(1) go c.consumeLoop(ctx, metricsConsumerGroup) <-metricsConsumerGroup.ready @@ -343,6 +460,81 @@ func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.C } } +func (c *kafkaMetricsConsumer) checkNewTopics(ctx context.Context) { + defer c.consumeLoopWG.Done() + var currentTopics []string + var admin sarama.ClusterAdmin + + admin, err := createKafkaClusterAdmin(ctx, c.config) + if err != nil { + c.settings.Logger.Error("Cluster admin failed", zap.Error(err)) + return + } + defer admin.Close() + + // Helper function to recreate the consumer group + recreateConsumerGroup := func(topics []string) error { + c.topics = topics + if c.consumerGroup != nil { + // Close the existing consumer group + c.settings.Logger.Info("Closing existing consumer group...") + err := c.consumerGroup.Close() + if err != nil { + return err + } + } + if c.consumerGroup, err = createKafkaClient(ctx, c.config); err != nil { + return err + } + return nil + } + + // Periodically check for new topics + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) + return + case <-ticker.C: + // Fetch the list of topics + topics, err := admin.ListTopics() + if err != nil { + c.settings.Logger.Error("Failed to list topics: ", zap.Error(err)) + continue + } + + // Extract topic names from the response + var topicNames []string + for topic := range topics { + topicNames = append(topicNames, topic) + } + + // Filter topics + var subTopics []string + reg, _ := regexp.Compile(c.config.TopicRegex) + for _, t := range topicNames { + if reg.MatchString(t) { + subTopics = append(subTopics, t) + } + } + + // Check if there are new topics + if !equalStringSlices(currentTopics, subTopics) { + c.settings.Logger.Info("New topics detected, recreating consumer group...") + err := recreateConsumerGroup(subTopics) + if err != nil { + c.settings.Logger.Error("Failed to recreate consumer group: ", zap.Error(err)) + continue + } + currentTopics = subTopics + } + } + } +} + func (c *kafkaMetricsConsumer) Shutdown(context.Context) error { if c.cancelConsumeLoop == nil { return nil @@ -431,6 +623,10 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error headers: c.headers, } } + if len(c.config.TopicRegex) != 0 { + c.consumeLoopWG.Add(1) + go c.checkNewTopics(ctx) + } c.consumeLoopWG.Add(1) go c.consumeLoop(ctx, logsConsumerGroup) <-logsConsumerGroup.ready @@ -454,6 +650,80 @@ func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.Cons } } +func (c *kafkaLogsConsumer) checkNewTopics(ctx context.Context) { + defer c.consumeLoopWG.Done() + var currentTopics []string + var admin sarama.ClusterAdmin + admin, err := createKafkaClusterAdmin(ctx, c.config) + if err != nil { + c.settings.Logger.Error("Cluster admin failed", zap.Error(err)) + return + } + defer admin.Close() + + // Helper function to recreate the consumer group + recreateConsumerGroup := func(topics []string) error { + c.topics = topics + if c.consumerGroup != nil { + // Close the existing consumer group + c.settings.Logger.Info("Closing existing consumer group...") + err := c.consumerGroup.Close() + if err != nil { + return err + } + } + fmt.Println() + if c.consumerGroup, err = createKafkaClient(ctx, c.config); err != nil { + return err + } + return nil + } + + // Periodically check for new topics + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) + return + case <-ticker.C: + // Fetch the list of topics + topics, err := admin.ListTopics() + if err != nil { + c.settings.Logger.Error("Failed to list topics: ", zap.Error(err)) + continue + } + + // Extract topic names from the response + var topicNames []string + for topic := range topics { + topicNames = append(topicNames, topic) + } + + // Filter topics + var subTopics []string + reg, _ := regexp.Compile(c.config.TopicRegex) + for _, t := range topicNames { + if reg.MatchString(t) { + subTopics = append(subTopics, t) + } + } + // Check if there are new topics + if !equalStringSlices(currentTopics, subTopics) { + c.settings.Logger.Info("New topics detected, recreating consumer group...") + err := recreateConsumerGroup(subTopics) + if err != nil { + c.settings.Logger.Error("Failed to recreate consumer group: ", zap.Error(err)) + continue + } + currentTopics = subTopics + } + } + } +} + func (c *kafkaLogsConsumer) Shutdown(context.Context) error { if c.cancelConsumeLoop == nil { return nil @@ -792,3 +1062,20 @@ func encodingToComponentID(encoding string) (*component.ID, error) { id := component.NewID(componentType) return &id, nil } + +// equalStringSlices checks if two slices of strings have the same content +func equalStringSlices(a, b []string) bool { + if len(a) != len(b) { + return false + } + aMap := make(map[string]struct{}, len(a)) + for _, v := range a { + aMap[v] = struct{}{} + } + for _, v := range b { + if _, ok := aMap[v]; !ok { + return false + } + } + return true +} diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index e353974acd91..db830add4181 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -6,6 +6,7 @@ package kafkareceiver import ( "context" "errors" + "github.com/stretchr/testify/mock" "sync" "testing" "time" @@ -394,6 +395,143 @@ func TestTracesReceiver_encoding_extension(t *testing.T) { }, 10*time.Second, time.Millisecond*100) } +type MockClusterAdmin struct { + mock.Mock + sarama.ClusterAdmin +} + +func (m *MockClusterAdmin) ListTopics() (map[string]sarama.TopicDetail, error) { + args := m.Called() + return args.Get(0).(map[string]sarama.TopicDetail), args.Error(1) +} + +func (m *MockClusterAdmin) Close() error { + return m.Called().Error(0) +} + +type MockKafkaConsumerGroup struct { + mock.Mock + sarama.ConsumerGroup +} + +func (m *MockKafkaConsumerGroup) Close() error { + return m.Called().Error(0) +} + +func TestTracesCheckNewTopics_ListTopicsError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + topicRegexPattern := "^topic[0-9]$" + + mockConsumerGroup := &MockKafkaConsumerGroup{} + mockAdmin := &MockClusterAdmin{} + mockAdmin.On("ListTopics").Return(map[string]sarama.TopicDetail{}, errors.New("list topics failed")).Once() + mockAdmin.On("Close").Return(nil).Once() + + // Override the createKafkaClusterAdmin function to return the mock admin + originalCreateKafkaClusterAdmin := createKafkaClusterAdmin + createKafkaClusterAdmin = func(ctx context.Context, config Config) (sarama.ClusterAdmin, error) { + return mockAdmin, nil + } + + wg := &sync.WaitGroup{} + set := receivertest.NewNopSettings() + + consumer := &kafkaTracesConsumer{ + config: Config{TopicRegex: topicRegexPattern}, + consumerGroup: mockConsumerGroup, + topics: []string{}, + settings: set, + consumeLoopWG: wg, + } + + wg.Add(1) + go consumer.checkNewTopics(ctx) + time.Sleep(6 * time.Second) + assert.Equal(t, []string{}, consumer.topics) + cancel() + wg.Wait() + createKafkaClusterAdmin = originalCreateKafkaClusterAdmin + mockAdmin.AssertExpectations(t) +} + +func TestTracesCheckNewTopics_TopicsChangeDetected(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + topicRegexPattern := "^topic[0-9]$" + + mockAdmin := &MockClusterAdmin{} + // During first check new topics matching the regex are detected + mockAdmin.On("ListTopics").Return(map[string]sarama.TopicDetail{ + "topic1": {}, + "topic2": {}, + }, nil).Once() + + // During the second check only one new topic which doesn't match a regex is detected + mockAdmin.On("ListTopics").Return(map[string]sarama.TopicDetail{ + "topic1": {}, + "topic2": {}, + "topics": {}, + }, nil).Once() + + // During the third check one topic matching a regex is detected + mockAdmin.On("ListTopics").Return(map[string]sarama.TopicDetail{ + "topic1": {}, + "topics": {}, + }, nil).Once() + mockAdmin.On("Close").Return(nil).Once() + + mockConsumerGroup := &MockKafkaConsumerGroup{} + // ConsumerGroup should be Closed and restarted two times. First time when new matching topics are + // detected and second time when matching topic is deleted. + mockConsumerGroup.On("Close").Return(nil).Twice() + + // Override the createKafkaClusterAdmin function + originalCreateKafkaClusterAdmin := createKafkaClusterAdmin + createKafkaClusterAdmin = func(ctx context.Context, config Config) (sarama.ClusterAdmin, error) { + return mockAdmin, nil + } + // Override the createKafkaClient function + originalCreateKafkaClient := createKafkaClient + createKafkaClient = func(ctx context.Context, config Config) (sarama.ConsumerGroup, error) { + return mockConsumerGroup, nil + } + + wg := &sync.WaitGroup{} + set := receivertest.NewNopSettings() + + consumer := &kafkaTracesConsumer{ + config: Config{TopicRegex: topicRegexPattern}, + consumerGroup: mockConsumerGroup, + topics: []string{}, + settings: set, + consumeLoopWG: wg, + } + + wg.Add(1) + go consumer.checkNewTopics(ctx) + + time.Sleep(7 * time.Second) + // Expect first detection of topics + assert.True(t, equalStringSlices([]string{"topic1", "topic2"}, consumer.topics)) + + time.Sleep(5 * time.Second) + // Expect second detection of topics + assert.True(t, equalStringSlices([]string{"topic1", "topic2"}, consumer.topics)) + + time.Sleep(5 * time.Second) + // Expect third detection of topics + assert.True(t, equalStringSlices([]string{"topic1"}, consumer.topics)) + + cancel() + wg.Wait() + createKafkaClusterAdmin = originalCreateKafkaClusterAdmin + createKafkaClient = originalCreateKafkaClient + + mockAdmin.AssertExpectations(t) + mockConsumerGroup.AssertExpectations(t) +} + func TestNewMetricsReceiver_version_err(t *testing.T) { c := Config{ Encoding: defaultEncoding, @@ -737,6 +875,120 @@ func TestMetricsReceiver_encoding_extension(t *testing.T) { }, 10*time.Second, time.Millisecond*100) } +func TestMetricsCheckNewTopics_ListTopicsError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + topicRegexPattern := "^topic[0-9]$" + + mockConsumerGroup := &MockKafkaConsumerGroup{} + mockAdmin := &MockClusterAdmin{} + mockAdmin.On("ListTopics").Return(map[string]sarama.TopicDetail{}, errors.New("list topics failed")).Once() + mockAdmin.On("Close").Return(nil).Once() + + // Override the createKafkaClusterAdmin function to return the mock admin + originalCreateKafkaClusterAdmin := createKafkaClusterAdmin + createKafkaClusterAdmin = func(ctx context.Context, config Config) (sarama.ClusterAdmin, error) { + return mockAdmin, nil + } + + wg := &sync.WaitGroup{} + set := receivertest.NewNopSettings() + + consumer := &kafkaMetricsConsumer{ + config: Config{TopicRegex: topicRegexPattern}, + consumerGroup: mockConsumerGroup, + topics: []string{}, + settings: set, + consumeLoopWG: wg, + } + + wg.Add(1) + go consumer.checkNewTopics(ctx) + time.Sleep(6 * time.Second) + assert.Equal(t, []string{}, consumer.topics) + cancel() + wg.Wait() + createKafkaClusterAdmin = originalCreateKafkaClusterAdmin + mockAdmin.AssertExpectations(t) +} + +func TestMetricsCheckNewTopics_TopicsChangeDetected(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + topicRegexPattern := "^topic[0-9]$" + + mockAdmin := &MockClusterAdmin{} + // During first check new topics matching the regex are detected + mockAdmin.On("ListTopics").Return(map[string]sarama.TopicDetail{ + "topic1": {}, + "topic2": {}, + }, nil).Once() + + // During the second check only one new topic which doesn't match a regex is detected + mockAdmin.On("ListTopics").Return(map[string]sarama.TopicDetail{ + "topic1": {}, + "topic2": {}, + "topics": {}, + }, nil).Once() + + // During the third check one topic matching a regex is detected + mockAdmin.On("ListTopics").Return(map[string]sarama.TopicDetail{ + "topic1": {}, + "topics": {}, + }, nil).Once() + mockAdmin.On("Close").Return(nil).Once() + + mockConsumerGroup := &MockKafkaConsumerGroup{} + // ConsumerGroup should be Closed and restarted two times. First time when new matching topics are + // detected and second time when matching topic is deleted. + mockConsumerGroup.On("Close").Return(nil) + + // Override the createKafkaClusterAdmin function + originalCreateKafkaClusterAdmin := createKafkaClusterAdmin + createKafkaClusterAdmin = func(ctx context.Context, config Config) (sarama.ClusterAdmin, error) { + return mockAdmin, nil + } + // Override the createKafkaClient function + originalCreateKafkaClient := createKafkaClient + createKafkaClient = func(ctx context.Context, config Config) (sarama.ConsumerGroup, error) { + return mockConsumerGroup, nil + } + + wg := &sync.WaitGroup{} + set := receivertest.NewNopSettings() + + consumer := &kafkaMetricsConsumer{ + config: Config{TopicRegex: topicRegexPattern}, + consumerGroup: mockConsumerGroup, + topics: []string{}, + settings: set, + consumeLoopWG: wg, + } + + wg.Add(1) + go consumer.checkNewTopics(ctx) + + time.Sleep(7 * time.Second) + // Expect first detection of topics + assert.True(t, equalStringSlices([]string{"topic1", "topic2"}, consumer.topics)) + + time.Sleep(5 * time.Second) + // Expect second detection of topics + assert.True(t, equalStringSlices([]string{"topic1", "topic2"}, consumer.topics)) + + time.Sleep(5 * time.Second) + // Expect third detection of topics + assert.True(t, equalStringSlices([]string{"topic1"}, consumer.topics)) + + cancel() + wg.Wait() + + createKafkaClusterAdmin = originalCreateKafkaClusterAdmin + createKafkaClient = originalCreateKafkaClient + mockAdmin.AssertExpectations(t) + mockConsumerGroup.AssertExpectations(t) +} + func TestNewLogsReceiver_version_err(t *testing.T) { c := Config{ Encoding: defaultEncoding, @@ -1220,6 +1472,120 @@ func TestLogsReceiver_encoding_extension(t *testing.T) { }, 10*time.Second, time.Millisecond*100) } +func TestLogsCheckNewTopics_ListTopicsError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + topicRegexPattern := "^topic[0-9]$" + + mockConsumerGroup := &MockKafkaConsumerGroup{} + mockAdmin := &MockClusterAdmin{} + mockAdmin.On("ListTopics").Return(map[string]sarama.TopicDetail{}, errors.New("list topics failed")).Once() + mockAdmin.On("Close").Return(nil).Once() + + // Override the createKafkaClusterAdmin function to return the mock admin + originalCreateKafkaClusterAdmin := createKafkaClusterAdmin + createKafkaClusterAdmin = func(ctx context.Context, config Config) (sarama.ClusterAdmin, error) { + return mockAdmin, nil + } + + wg := &sync.WaitGroup{} + set := receivertest.NewNopSettings() + + consumer := &kafkaLogsConsumer{ + config: Config{TopicRegex: topicRegexPattern}, + consumerGroup: mockConsumerGroup, + topics: []string{}, + settings: set, + consumeLoopWG: wg, + } + + wg.Add(1) + go consumer.checkNewTopics(ctx) + time.Sleep(6 * time.Second) + assert.Equal(t, []string{}, consumer.topics) + cancel() + wg.Wait() + createKafkaClusterAdmin = originalCreateKafkaClusterAdmin + mockAdmin.AssertExpectations(t) +} + +func TestLogsCheckNewTopics_TopicsChangeDetected(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + topicRegexPattern := "^topic[0-9]$" + + mockAdmin := &MockClusterAdmin{} + // During first check new topics matching the regex are detected + mockAdmin.On("ListTopics").Return(map[string]sarama.TopicDetail{ + "topic1": {}, + "topic2": {}, + }, nil).Once() + + // During the second check only one new topic which doesn't match a regex is detected + mockAdmin.On("ListTopics").Return(map[string]sarama.TopicDetail{ + "topic1": {}, + "topic2": {}, + "topics": {}, + }, nil).Once() + + // During the third check one topic matching a regex is detected + mockAdmin.On("ListTopics").Return(map[string]sarama.TopicDetail{ + "topic1": {}, + "topics": {}, + }, nil).Once() + mockAdmin.On("Close").Return(nil).Once() + + mockConsumerGroup := &MockKafkaConsumerGroup{} + // ConsumerGroup should be Closed and restarted two times. First time when new matching topics are + // detected and second time when matching topic is deleted. + mockConsumerGroup.On("Close").Return(nil).Twice() + + // Override the createKafkaClusterAdmin function + originalCreateKafkaClusterAdmin := createKafkaClusterAdmin + createKafkaClusterAdmin = func(ctx context.Context, config Config) (sarama.ClusterAdmin, error) { + return mockAdmin, nil + } + // Override the createKafkaClient function + originalCreateKafkaClient := createKafkaClient + createKafkaClient = func(ctx context.Context, config Config) (sarama.ConsumerGroup, error) { + return mockConsumerGroup, nil + } + + wg := &sync.WaitGroup{} + set := receivertest.NewNopSettings() + + consumer := &kafkaLogsConsumer{ + config: Config{TopicRegex: topicRegexPattern}, + consumerGroup: mockConsumerGroup, + topics: []string{}, + settings: set, + consumeLoopWG: wg, + } + + wg.Add(1) + go consumer.checkNewTopics(ctx) + + time.Sleep(7 * time.Second) + // Expect first detection of topics + assert.True(t, equalStringSlices([]string{"topic1", "topic2"}, consumer.topics)) + + time.Sleep(5 * time.Second) + // Expect second detection of topics + assert.True(t, equalStringSlices([]string{"topic1", "topic2"}, consumer.topics)) + + time.Sleep(5 * time.Second) + // Expect third detection of topics + assert.True(t, equalStringSlices([]string{"topic1"}, consumer.topics)) + + cancel() + wg.Wait() + + createKafkaClusterAdmin = originalCreateKafkaClusterAdmin + createKafkaClient = originalCreateKafkaClient + mockAdmin.AssertExpectations(t) + mockConsumerGroup.AssertExpectations(t) +} + func TestToSaramaInitialOffset_earliest(t *testing.T) { saramaInitialOffset, err := toSaramaInitialOffset(offsetEarliest) diff --git a/receiver/kafkareceiver/testdata/config.yaml b/receiver/kafkareceiver/testdata/config.yaml index a0a744764602..45bb5c015513 100644 --- a/receiver/kafkareceiver/testdata/config.yaml +++ b/receiver/kafkareceiver/testdata/config.yaml @@ -35,3 +35,24 @@ kafka/logs: retry: max: 10 backoff: 5s +kafka/topics_err: + topic: logs + topic_regex: "logs[0-9]" + session_timeout: 45s + heartbeat_interval: 15s + encoding: direct + brokers: + - "coffee:123" + - "foobar:456" + client_id: otel-collector + group_id: otel-collector + initial_offset: earliest + auth: + tls: + ca_file: ca.pem + cert_file: cert.pem + key_file: key.pem + metadata: + retry: + max: 10 + backoff: 5s