From af21ce7ff8834d6089f288a52e88a928e7ddad60 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 6 Sep 2024 08:55:39 -0700 Subject: [PATCH] [internal/otelarrow] Fix test flake (for 34719) (#34889) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Description:** Restore a skipped test, after understanding the nature of the problem. The problem was mostly addressed in https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/34794, which left the test disabled. The test had been flaky because while testing for an out-of-memory condition, the test could fail for timeout or other reason. To make the test more reliable, this now waits until at least one ArrowTraces span has been received by both components. After one span is available, it checks that the expected log messages are present on both sides. **Link to tracking Issue:** Fixes #34719. **Testing:** ✅ --------- Co-authored-by: Curtis Robert Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- internal/otelarrow/test/e2e_test.go | 57 +++++++++++++++++++---------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/internal/otelarrow/test/e2e_test.go b/internal/otelarrow/test/e2e_test.go index 85783b00361a..597bc511bb08 100644 --- a/internal/otelarrow/test/e2e_test.go +++ b/internal/otelarrow/test/e2e_test.go @@ -35,7 +35,6 @@ import ( "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest/observer" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -83,14 +82,18 @@ func (tc *testConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) err return tc.sink.ConsumeTraces(ctx, td) } -func testLoggerSettings(t *testing.T) (component.TelemetrySettings, *observer.ObservedLogs, *tracetest.InMemoryExporter) { +func testLoggerSettings(_ *testing.T) (component.TelemetrySettings, *observer.ObservedLogs, *tracetest.InMemoryExporter) { tset := componenttest.NewNopTelemetrySettings() core, obslogs := observer.New(zapcore.InfoLevel) exp := tracetest.NewInMemoryExporter() - tset.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core())) + // Note: To debug any of the logs-based assertions in this test, uncomment + // the following line: + // + // tset.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core())) + tset.Logger = zap.New(core) tset.TracerProvider = trace.NewTracerProvider(trace.WithSyncer(exp)) return tset, obslogs, exp @@ -329,6 +332,9 @@ func logSigs(obs *observer.ObservedLogs) (map[string]int, []string) { for _, f := range rl.Context { attrs = append(attrs, f.Key) + // One way we can see memory limit errors is through the + // OTel-Arrow common "arrow stream error" message, which both + // sides will log. if rl.Message == "arrow stream error" && f.Key == "message" { msgs = append(msgs, f.String) } @@ -346,7 +352,11 @@ var limitRegexp = regexp.MustCompile(`memory limit exceeded`) func countMemoryLimitErrors(msgs []string) (cnt int) { for _, msg := range msgs { - if limitRegexp.MatchString(msg) { + // The memory errors are expected from the receiver, + // so whether these print on the exporter or receiver, + // the message will contain "otel-arrow decode" from + // the receiver. + if limitRegexp.MatchString(msg) && strings.Contains(msg, "otel-arrow decode") { cnt++ } } @@ -357,12 +367,12 @@ func failureMemoryLimitEnding(t *testing.T, _ testParams, testCon *testConsumer, eSigs, eMsgs := logSigs(testCon.expLogs) rSigs, rMsgs := logSigs(testCon.recvLogs) - // Test for arrow stream errors. - require.Less(t, 0, eSigs["arrow stream error|||code///message///where"], "should have exporter arrow stream errors: %v", eSigs) + // Test for arrow receiver stream errors on both sides. + require.Less(t, 0, eSigs["arrow stream error|||code///message///where"], "should have exporter arrow stream errors: %v", eMsgs) require.Less(t, 0, rSigs["arrow stream error|||code///message///where"], "should have receiver arrow stream errors: %v", rSigs) - // Ensure the errors include memory limit errors. - + // Ensure both side's error logs include memory limit errors + // one way or another. require.Less(t, 0, countMemoryLimitErrors(rMsgs), "should have memory limit errors: %v", rMsgs) require.Less(t, 0, countMemoryLimitErrors(eMsgs), "should have memory limit errors: %v", eMsgs) @@ -374,7 +384,9 @@ func consumerSuccess(t *testing.T, err error) { } func consumerFailure(t *testing.T, err error) { - require.Error(t, err) + if err == nil { + return + } // there should be no permanent errors anywhere in this test. require.False(t, consumererror.IsPermanent(err), @@ -414,32 +426,37 @@ func TestIntegrationTracesSimple(t *testing.T) { } func TestIntegrationMemoryLimited(t *testing.T) { - // This test is flaky, it only shows on Windows. This will be - // addressed in a separate PR. - t.Skip("test flake disabled") - ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // until 10 threads can write 100 spans + // until exporter and receiver finish at least one ArrowTraces span. params := testParams{ threadCount: 10, requestUntil: func(test *testConsumer) bool { - cnt := 0 - for _, span := range test.expSpans.GetSpans() { - if span.Name == "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces" { - cnt++ + cf := func(spans tracetest.SpanStubs) (cnt int) { + for _, span := range spans { + if span.Name == "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces" { + cnt++ + } } + return } - return cnt == 0 || test.sentSpans.Load() < 100 - + rcnt := cf(test.recvSpans.GetSpans()) + ecnt := cf(test.expSpans.GetSpans()) + return ecnt == 0 || rcnt == 0 }, } testIntegrationTraces(ctx, t, params, func(ecfg *ExpConfig, rcfg *RecvConfig) { rcfg.Arrow.MemoryLimitMiB = 1 ecfg.Arrow.NumStreams = 10 + // Shorten timeouts for this test, because we intend + // for it to fail and don't want to wait for retries. ecfg.TimeoutSettings.Timeout = 5 * time.Second + ecfg.RetryConfig.InitialInterval = 1 * time.Second + ecfg.RetryConfig.MaxInterval = 2 * time.Second + ecfg.RetryConfig.MaxElapsedTime = 30 * time.Second + ecfg.Arrow.MaxStreamLifetime = 5 * time.Second }, bulkyGenFunc(), consumerFailure, failureMemoryLimitEnding) }