diff --git a/.chloggen/componenttest-extra-attributes.yaml b/.chloggen/componenttest-extra-attributes.yaml new file mode 100644 index 00000000000..3683ca634b2 --- /dev/null +++ b/.chloggen/componenttest-extra-attributes.yaml @@ -0,0 +1,18 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: component/componenttest + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add optional ...attribute.KeyValue argument to TestTelemetry.CheckExporterMetricGauge. + +# One or more tracking issues or pull requests related to the change +issues: [10593] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/exporterhelper-report-data-type-in-queue-metrics.yaml b/.chloggen/exporterhelper-report-data-type-in-queue-metrics.yaml new file mode 100644 index 00000000000..e4065d6e4bc --- /dev/null +++ b/.chloggen/exporterhelper-report-data-type-in-queue-metrics.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add data_type attribute to `otelcol_exporter_queue_size` metric to report the type of data being processed. + +# One or more tracking issues or pull requests related to the change +issues: [9943] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.chloggen/service-remove-ballast-deps-2.yaml b/.chloggen/service-remove-ballast-deps-2.yaml new file mode 100644 index 00000000000..e6377b387de --- /dev/null +++ b/.chloggen/service-remove-ballast-deps-2.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processor/memorylimiter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: The memory limiter processor will no longer account for ballast size. + +# One or more tracking issues or pull requests related to the change +issues: [10696] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: If you are already using GOMEMLIMIT instead of the ballast extension this does not affect you. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/service-remove-ballast-deps-3.yaml b/.chloggen/service-remove-ballast-deps-3.yaml new file mode 100644 index 00000000000..c07f90439f6 --- /dev/null +++ b/.chloggen/service-remove-ballast-deps-3.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: extension/memorylimiter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: The memory limiter extension will no longer account for ballast size. + +# One or more tracking issues or pull requests related to the change +issues: [10696] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: If you are already using GOMEMLIMIT instead of the ballast extension this does not affect you. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/service-remove-ballast-deps.yaml b/.chloggen/service-remove-ballast-deps.yaml new file mode 100644 index 00000000000..31ffa90c3ac --- /dev/null +++ b/.chloggen/service-remove-ballast-deps.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: The service will no longer be able to get a ballast size from the deprecated ballast extension. + +# One or more tracking issues or pull requests related to the change +issues: [10696] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: If you are already using GOMEMLIMIT instead of the ballast extension this does not affect you. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/component/componenttest/obsreporttest.go b/component/componenttest/obsreporttest.go index ba076c65905..652db62529f 100644 --- a/component/componenttest/obsreporttest.go +++ b/component/componenttest/obsreporttest.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/otel/attribute" otelprom "go.opentelemetry.io/otel/exporters/prometheus" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" @@ -72,8 +73,10 @@ func (tts *TestTelemetry) CheckExporterLogs(sentLogRecords, sendFailedLogRecords return tts.prometheusChecker.checkExporterLogs(tts.id, sentLogRecords, sendFailedLogRecords) } -func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64) error { - return tts.prometheusChecker.checkExporterMetricGauge(tts.id, metric, val) +func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64, extraAttrs ...attribute.KeyValue) error { + attrs := attributesForExporterMetrics(tts.id) + attrs = append(attrs, extraAttrs...) + return tts.prometheusChecker.checkGauge(metric, val, attrs) } // CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values. diff --git a/component/componenttest/otelprometheuschecker.go b/component/componenttest/otelprometheuschecker.go index f7a0e548dfe..6a63617c206 100644 --- a/component/componenttest/otelprometheuschecker.go +++ b/component/componenttest/otelprometheuschecker.go @@ -100,10 +100,8 @@ func (pc *prometheusChecker) checkExporterEnqueueFailed(exporter component.ID, d return pc.checkCounter(fmt.Sprintf("exporter_enqueue_failed_%s", datatype), enqueueFailed, exporterAttrs) } -func (pc *prometheusChecker) checkExporterMetricGauge(exporter component.ID, metric string, val int64) error { - exporterAttrs := attributesForExporterMetrics(exporter) - - ts, err := pc.getMetric(metric, io_prometheus_client.MetricType_GAUGE, exporterAttrs) +func (pc *prometheusChecker) checkGauge(metric string, val int64, attrs []attribute.KeyValue) error { + ts, err := pc.getMetric(metric, io_prometheus_client.MetricType_GAUGE, attrs) if err != nil { return err } diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 4016c14e7c9..8d68c06bed7 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -110,7 +110,7 @@ func WithQueue(config QueueSettings) Option { NumConsumers: config.NumConsumers, QueueSize: config.QueueSize, }) - o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder) + o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep) return nil } } @@ -132,7 +132,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto DataType: o.signal, ExporterSettings: o.set, } - o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder) + o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage, o.obsrep) return nil } } @@ -250,7 +250,7 @@ type baseExporter struct { } func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) { - obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set}) + obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set, DataType: signal}) if err != nil { return nil, err } diff --git a/exporter/exporterhelper/obsexporter.go b/exporter/exporterhelper/obsexporter.go index d5d366ccabe..70cc0642c47 100644 --- a/exporter/exporterhelper/obsexporter.go +++ b/exporter/exporterhelper/obsexporter.go @@ -26,6 +26,7 @@ type ObsReport struct { level configtelemetry.Level spanNamePrefix string tracer trace.Tracer + dataType component.DataType otelAttrs []attribute.KeyValue telemetryBuilder *metadata.TelemetryBuilder @@ -38,6 +39,7 @@ type ObsReport struct { type ObsReportSettings struct { ExporterID component.ID ExporterCreateSettings exporter.Settings + DataType component.DataType } // NewObsReport creates a new Exporter. @@ -58,7 +60,7 @@ func newExporter(cfg ObsReportSettings) (*ObsReport, error) { level: cfg.ExporterCreateSettings.TelemetrySettings.MetricsLevel, spanNamePrefix: obsmetrics.ExporterPrefix + cfg.ExporterID.String(), tracer: cfg.ExporterCreateSettings.TracerProvider.Tracer(cfg.ExporterID.String()), - + dataType: cfg.DataType, otelAttrs: []attribute.KeyValue{ attribute.String(obsmetrics.ExporterKey, cfg.ExporterID.String()), }, diff --git a/exporter/exporterhelper/obsreport_test.go b/exporter/exporterhelper/obsreport_test.go index daafd422e61..d7c4795615b 100644 --- a/exporter/exporterhelper/obsreport_test.go +++ b/exporter/exporterhelper/obsreport_test.go @@ -15,7 +15,6 @@ import ( ) func TestExportEnqueueFailure(t *testing.T) { - exporterID := component.MustNewID("fakeExporter") tt, err := componenttest.SetupTelemetry(exporterID) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 88d17a7c03f..d7166335557 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -15,7 +15,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/internal/queue" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" @@ -74,18 +73,18 @@ type queueSender struct { traceAttribute attribute.KeyValue consumers *queue.Consumers[Request] - telemetryBuilder *metadata.TelemetryBuilder - exporterID component.ID + obsrep *ObsReport + exporterID component.ID } func newQueueSender(q exporterqueue.Queue[Request], set exporter.Settings, numConsumers int, - exportFailureMessage string, telemetryBuilder *metadata.TelemetryBuilder) *queueSender { + exportFailureMessage string, obsrep *ObsReport) *queueSender { qs := &queueSender{ - queue: q, - numConsumers: numConsumers, - traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), - telemetryBuilder: telemetryBuilder, - exporterID: set.ID, + queue: q, + numConsumers: numConsumers, + traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), + obsrep: obsrep, + exporterID: set.ID, } consumeFunc := func(ctx context.Context, req Request) error { err := qs.nextSender.send(ctx, req) @@ -105,10 +104,12 @@ func (qs *queueSender) Start(ctx context.Context, host component.Host) error { return err } - opts := metric.WithAttributeSet(attribute.NewSet(attribute.String(obsmetrics.ExporterKey, qs.exporterID.String()))) + dataTypeAttr := attribute.String(obsmetrics.DataTypeKey, qs.obsrep.dataType.String()) return multierr.Append( - qs.telemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }, opts), - qs.telemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }, opts), + qs.obsrep.telemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }, + metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute, dataTypeAttr))), + qs.obsrep.telemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }, + metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute))), ) } diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index ff9aa7c5f74..bd783512d3b 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" @@ -18,10 +19,10 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/internal/queue" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) func TestQueuedRetry_StopWhileWaiting(t *testing.T) { @@ -202,28 +203,33 @@ func TestQueuedRetryHappyPath(t *testing.T) { }) } } -func TestQueuedRetry_QueueMetricsReported(t *testing.T) { - tt, err := componenttest.SetupTelemetry(defaultID) - require.NoError(t, err) - qCfg := NewDefaultQueueSettings() - qCfg.NumConsumers = 0 // to make every request go straight to the queue - rCfg := configretry.NewDefaultBackOffConfig() - set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, defaultDataType, newObservabilityConsumerSender, - withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize))) +func TestQueuedRetry_QueueMetricsReported(t *testing.T) { + dataTypes := []component.DataType{component.DataTypeLogs, component.DataTypeTraces, component.DataTypeMetrics} + for _, dataType := range dataTypes { + tt, err := componenttest.SetupTelemetry(defaultID) + require.NoError(t, err) + + qCfg := NewDefaultQueueSettings() + qCfg.NumConsumers = 0 // to make every request go straight to the queue + rCfg := configretry.NewDefaultBackOffConfig() + set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} + be, err := newBaseExporter(set, dataType, newObservabilityConsumerSender, + withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize))) + + for i := 0; i < 7; i++ { + require.NoError(t, be.send(context.Background(), newErrorRequest())) + } + require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7), + attribute.String(obsmetrics.DataTypeKey, dataType.String()))) - for i := 0; i < 7; i++ { - require.NoError(t, be.send(context.Background(), newErrorRequest())) + assert.NoError(t, be.Shutdown(context.Background())) } - require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7))) - - assert.NoError(t, be.Shutdown(context.Background())) } func TestNoCancellationContext(t *testing.T) { @@ -426,9 +432,12 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { func TestQueueSenderNoStartShutdown(t *testing.T) { queue := queue.NewBoundedMemoryQueue[Request](queue.MemoryQueueSettings[Request]{}) set := exportertest.NewNopSettings() - builder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + obsrep, err := NewObsReport(ObsReportSettings{ + ExporterID: exporterID, + ExporterCreateSettings: exportertest.NewNopSettings(), + }) assert.NoError(t, err) - qs := newQueueSender(queue, set, 1, "", builder) + qs := newQueueSender(queue, set, 1, "", obsrep) assert.NoError(t, qs.Shutdown(context.Background())) } diff --git a/internal/memorylimiter/memorylimiter.go b/internal/memorylimiter/memorylimiter.go index a010cba5427..db93795127a 100644 --- a/internal/memorylimiter/memorylimiter.go +++ b/internal/memorylimiter/memorylimiter.go @@ -44,7 +44,6 @@ type MemoryLimiter struct { usageChecker memUsageChecker memCheckWait time.Duration - ballastSize uint64 // mustRefuse is used to indicate when data should be refused. mustRefuse *atomic.Bool @@ -58,8 +57,7 @@ type MemoryLimiter struct { readMemStatsFn func(m *runtime.MemStats) // Fields used for logging. - logger *zap.Logger - configMismatchedLogged bool + logger *zap.Logger refCounterLock sync.Mutex refCounter int @@ -114,14 +112,7 @@ func (ml *MemoryLimiter) startMonitoring() { } } -func (ml *MemoryLimiter) Start(_ context.Context, host component.Host) error { - extensions := host.GetExtensions() - for _, extension := range extensions { - if ext, ok := extension.(interface{ GetBallastSize() uint64 }); ok { - ml.ballastSize = ext.GetBallastSize() - break - } - } +func (ml *MemoryLimiter) Start(_ context.Context, _ component.Host) error { ml.startMonitoring() return nil } @@ -168,16 +159,6 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro func (ml *MemoryLimiter) readMemStats() *runtime.MemStats { ms := &runtime.MemStats{} ml.readMemStatsFn(ms) - // If proper configured ms.Alloc should be at least ml.ballastSize but since - // a misconfiguration is possible check for that here. - if ms.Alloc >= ml.ballastSize { - ms.Alloc -= ml.ballastSize - } else if !ml.configMismatchedLogged { - // This indicates misconfiguration. Log it once. - ml.configMismatchedLogged = true - ml.logger.Warn(`"size_mib" in ballast extension is likely incorrectly configured.`) - } - return ms } diff --git a/internal/memorylimiter/memorylimiter_test.go b/internal/memorylimiter/memorylimiter_test.go index e9e92a33f70..6919ce50968 100644 --- a/internal/memorylimiter/memorylimiter_test.go +++ b/internal/memorylimiter/memorylimiter_test.go @@ -4,17 +4,14 @@ package memorylimiter import ( - "context" "runtime" "sync/atomic" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/internal/iruntime" ) @@ -43,22 +40,6 @@ func TestMemoryPressureResponse(t *testing.T) { ml.CheckMemLimits() assert.True(t, ml.MustRefuse()) - // Check ballast effect - ml.ballastSize = 1000 - - // Below memAllocLimit accounting for ballast. - currentMemAlloc = 800 + ml.ballastSize - ml.CheckMemLimits() - assert.False(t, ml.MustRefuse()) - - // Above memAllocLimit even accounting for ballast. - currentMemAlloc = 1800 + ml.ballastSize - ml.CheckMemLimits() - assert.True(t, ml.MustRefuse()) - - // Restore ballast to default. - ml.ballastSize = 0 - // Check spike limit ml.usageChecker.memSpikeLimit = 512 @@ -151,38 +132,3 @@ func TestRefuseDecision(t *testing.T) { }) } } - -func TestBallastSize(t *testing.T) { - cfg := &Config{ - CheckInterval: 10 * time.Second, - MemoryLimitMiB: 1024, - } - got, err := NewMemoryLimiter(cfg, zap.NewNop()) - require.NoError(t, err) - - got.startMonitoring() - require.NoError(t, got.Start(context.Background(), &host{ballastSize: 113})) - assert.Equal(t, uint64(113), got.ballastSize) - require.NoError(t, got.Shutdown(context.Background())) -} - -type host struct { - ballastSize uint64 - component.Host -} - -func (h *host) GetExtensions() map[component.ID]component.Component { - ret := make(map[component.ID]component.Component) - ret[component.MustNewID("ballast")] = &ballastExtension{ballastSize: h.ballastSize} - return ret -} - -type ballastExtension struct { - ballastSize uint64 - component.StartFunc - component.ShutdownFunc -} - -func (be *ballastExtension) GetBallastSize() uint64 { - return be.ballastSize -} diff --git a/internal/obsreportconfig/obsmetrics/obs_exporter.go b/internal/obsreportconfig/obsmetrics/obs_exporter.go index b87270f5e5a..ffaa33a54a4 100644 --- a/internal/obsreportconfig/obsmetrics/obs_exporter.go +++ b/internal/obsreportconfig/obsmetrics/obs_exporter.go @@ -7,6 +7,9 @@ const ( // ExporterKey used to identify exporters in metrics and traces. ExporterKey = "exporter" + // DataTypeKey used to identify the data type in the queue size metric. + DataTypeKey = "data_type" + // SentSpansKey used to track spans sent by exporters. SentSpansKey = "sent_spans" // FailedToSendSpansKey used to track spans that failed to be sent by exporters. diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index 1ce63d794f3..6172fa39889 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -122,7 +122,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { tests := []struct { name string mlCfg *Config - ballastSize uint64 memAlloc uint64 expectError bool }{ @@ -133,7 +132,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -144,29 +142,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, - memAlloc: 1800, - expectError: true, - }, - { - name: "Below memAllocLimit accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, - memAlloc: 800, - expectError: false, - }, - { - name: "Above memAllocLimit even accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, memAlloc: 1800, expectError: true, }, @@ -177,7 +152,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 10, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -188,7 +162,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 11, }, - ballastSize: 0, memAlloc: 800, expectError: true, }, @@ -197,7 +170,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { t.Run(tt.name, func(t *testing.T) { memorylimiter.GetMemoryFn = totalMemory memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) { - ms.Alloc = tt.memAlloc + tt.ballastSize + ms.Alloc = tt.memAlloc } ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) @@ -213,7 +186,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { processorhelper.WithShutdown(ml.shutdown)) require.NoError(t, err) - assert.NoError(t, mp.Start(ctx, &host{ballastSize: tt.ballastSize})) + assert.NoError(t, mp.Start(ctx, &host{})) ml.memlimiter.CheckMemLimits() err = mp.ConsumeMetrics(ctx, md) if tt.expectError { @@ -239,7 +212,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) { tests := []struct { name string mlCfg *Config - ballastSize uint64 memAlloc uint64 expectError bool }{ @@ -250,7 +222,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -261,29 +232,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, - memAlloc: 1800, - expectError: true, - }, - { - name: "Below memAllocLimit accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, - memAlloc: 800, - expectError: false, - }, - { - name: "Above memAllocLimit even accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, memAlloc: 1800, expectError: true, }, @@ -294,7 +242,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 10, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -305,7 +252,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 11, }, - ballastSize: 0, memAlloc: 800, expectError: true, }, @@ -314,7 +260,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) { t.Run(tt.name, func(t *testing.T) { memorylimiter.GetMemoryFn = totalMemory memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) { - ms.Alloc = tt.memAlloc + tt.ballastSize + ms.Alloc = tt.memAlloc } ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) @@ -330,7 +276,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) { processorhelper.WithShutdown(ml.shutdown)) require.NoError(t, err) - assert.NoError(t, tp.Start(ctx, &host{ballastSize: tt.ballastSize})) + assert.NoError(t, tp.Start(ctx, &host{})) ml.memlimiter.CheckMemLimits() err = tp.ConsumeTraces(ctx, td) if tt.expectError { @@ -356,7 +302,6 @@ func TestLogMemoryPressureResponse(t *testing.T) { tests := []struct { name string mlCfg *Config - ballastSize uint64 memAlloc uint64 expectError bool }{ @@ -367,7 +312,6 @@ func TestLogMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -378,29 +322,6 @@ func TestLogMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, - memAlloc: 1800, - expectError: true, - }, - { - name: "Below memAllocLimit accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, - memAlloc: 800, - expectError: false, - }, - { - name: "Above memAllocLimit even accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, memAlloc: 1800, expectError: true, }, @@ -411,7 +332,6 @@ func TestLogMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 10, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -422,7 +342,6 @@ func TestLogMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 11, }, - ballastSize: 0, memAlloc: 800, expectError: true, }, @@ -431,7 +350,7 @@ func TestLogMemoryPressureResponse(t *testing.T) { t.Run(tt.name, func(t *testing.T) { memorylimiter.GetMemoryFn = totalMemory memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) { - ms.Alloc = tt.memAlloc + tt.ballastSize + ms.Alloc = tt.memAlloc } ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) @@ -447,7 +366,7 @@ func TestLogMemoryPressureResponse(t *testing.T) { processorhelper.WithShutdown(ml.shutdown)) require.NoError(t, err) - assert.NoError(t, tp.Start(ctx, &host{ballastSize: tt.ballastSize})) + assert.NoError(t, tp.Start(ctx, &host{})) ml.memlimiter.CheckMemLimits() err = tp.ConsumeLogs(ctx, ld) if tt.expectError { @@ -465,26 +384,14 @@ func TestLogMemoryPressureResponse(t *testing.T) { } type host struct { - ballastSize uint64 component.Host } func (h *host) GetExtensions() map[component.ID]component.Component { ret := make(map[component.ID]component.Component) - ret[component.MustNewID("ballast")] = &ballastExtension{ballastSize: h.ballastSize} return ret } -type ballastExtension struct { - ballastSize uint64 - component.StartFunc - component.ShutdownFunc -} - -func (be *ballastExtension) GetBallastSize() uint64 { - return be.ballastSize -} - func totalMemory() (uint64, error) { return uint64(2048), nil } diff --git a/service/internal/proctelemetry/process_telemetry.go b/service/internal/proctelemetry/process_telemetry.go index 0b978758104..e7a0cc1454a 100644 --- a/service/internal/proctelemetry/process_telemetry.go +++ b/service/internal/proctelemetry/process_telemetry.go @@ -21,7 +21,6 @@ import ( // processMetrics is a struct that contains views related to process metrics (cpu, mem, etc) type processMetrics struct { startTimeUnixNano int64 - ballastSizeBytes uint64 proc *process.Process context context.Context @@ -54,7 +53,7 @@ func WithHostProc(hostProc string) RegisterOption { // RegisterProcessMetrics creates a new set of processMetrics (mem, cpu) that can be used to measure // basic information about this process. -func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, ballastSizeBytes uint64, opts ...RegisterOption) error { +func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, opts ...RegisterOption) error { set := registerOption{} for _, opt := range opts { opt.apply(&set) @@ -62,7 +61,6 @@ func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, ballastSizeB var err error pm := &processMetrics{ startTimeUnixNano: time.Now().UnixNano(), - ballastSizeBytes: ballastSizeBytes, ms: &runtime.MemStats{}, } @@ -139,10 +137,4 @@ func (pm *processMetrics) readMemStatsIfNeeded() { } pm.lastMsRead = now runtime.ReadMemStats(pm.ms) - if pm.ballastSizeBytes > 0 { - pm.ms.Alloc -= pm.ballastSizeBytes - pm.ms.HeapAlloc -= pm.ballastSizeBytes - pm.ms.HeapSys -= pm.ballastSizeBytes - pm.ms.HeapInuse -= pm.ballastSizeBytes - } } diff --git a/service/internal/proctelemetry/process_telemetry_linux_test.go b/service/internal/proctelemetry/process_telemetry_linux_test.go index 73605c0ae8e..99471b4eca2 100644 --- a/service/internal/proctelemetry/process_telemetry_linux_test.go +++ b/service/internal/proctelemetry/process_telemetry_linux_test.go @@ -21,7 +21,7 @@ func TestProcessTelemetryWithHostProc(t *testing.T) { // Make the sure the environment variable value is not used. t.Setenv("HOST_PROC", "foo/bar") - require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings, 0, WithHostProc("/proc"))) + require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings, WithHostProc("/proc"))) // Check that the metrics are actually filled. time.Sleep(200 * time.Millisecond) diff --git a/service/internal/proctelemetry/process_telemetry_test.go b/service/internal/proctelemetry/process_telemetry_test.go index cccc5ad8e30..fb750fcf664 100644 --- a/service/internal/proctelemetry/process_telemetry_test.go +++ b/service/internal/proctelemetry/process_telemetry_test.go @@ -78,7 +78,7 @@ func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_cli func TestProcessTelemetry(t *testing.T) { tel := setupTelemetry(t) - require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings, 0)) + require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings)) mp, err := fetchPrometheusMetrics(tel.promHandler) require.NoError(t, err) diff --git a/service/service.go b/service/service.go index 99747db9ac6..8b57bd12cad 100644 --- a/service/service.go +++ b/service/service.go @@ -159,7 +159,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && cfg.Telemetry.Metrics.Address != "" { // The process telemetry initialization requires the ballast size, which is available after the extensions are initialized. - if err = proctelemetry.RegisterProcessMetrics(srv.telemetrySettings, getBallastSize(srv.host)); err != nil { + if err = proctelemetry.RegisterProcessMetrics(srv.telemetrySettings); err != nil { return nil, fmt.Errorf("failed to register process metrics: %w", err) } } @@ -316,15 +316,6 @@ func (srv *Service) Logger() *zap.Logger { return srv.telemetrySettings.Logger } -func getBallastSize(host component.Host) uint64 { - for _, ext := range host.GetExtensions() { - if bExt, ok := ext.(interface{ GetBallastSize() uint64 }); ok { - return bExt.GetBallastSize() - } - } - return 0 -} - func pdataFromSdk(res *sdkresource.Resource) pcommon.Resource { // pcommon.NewResource is the best way to generate a new resource currently and is safe to use outside of tests. // Because the resource is signal agnostic, and we need a net new resource, not an existing one, this is the only