Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logs scraper in factory #11822

Open
wants to merge 42 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
90a3adf
Add scraper for logs
sincejune Dec 4, 2024
fe03c9c
Update
sincejune Dec 4, 2024
6e889ab
Update
sincejune Dec 4, 2024
3d60d14
Fix
sincejune Dec 4, 2024
092e62a
Merge branch 'main' into add-logs-scraper
sincejune Dec 4, 2024
b52732a
Merge branch 'main' into add-logs-scraper
sincejune Dec 8, 2024
ea35198
Update
sincejune Dec 8, 2024
9bc8cb6
Add logs scraper in the factory
sincejune Dec 8, 2024
b1d1de0
Update
sincejune Dec 8, 2024
92156f1
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 10, 2024
0124c61
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 10, 2024
b33cec6
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 10, 2024
ed603a1
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 10, 2024
ce9b96b
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 10, 2024
d64b454
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 11, 2024
fc59d50
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 11, 2024
03263de
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 11, 2024
33eca67
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 11, 2024
33c942f
Update
sincejune Dec 11, 2024
a4345c1
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 12, 2024
365f9cf
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 12, 2024
5f15faa
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 13, 2024
d0e4fe0
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 13, 2024
ea10b91
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 14, 2024
f0dfd93
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 14, 2024
f9325fe
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 15, 2024
f4ec5a9
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 15, 2024
db46e30
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 16, 2024
b518e4d
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 16, 2024
367765b
Update
sincejune Dec 17, 2024
5f88901
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 17, 2024
06c4513
Update
sincejune Dec 17, 2024
14195ff
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 17, 2024
0def70f
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 18, 2024
46cd15d
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 18, 2024
6c14808
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 19, 2024
0392be8
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 19, 2024
94f6762
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 21, 2024
3d31da9
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 22, 2024
3dbb083
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 27, 2024
f13709c
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Dec 28, 2024
42fb906
Merge branch 'main' into add-logs-scraper-in-factory
sincejune Jan 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/add-scraper-for-logs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: receiver/scraperhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add scraper for logs

# One or more tracking issues or pull requests related to the change
issues: [11799,11822]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
4 changes: 2 additions & 2 deletions scraper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ A scraper defines how to connect and scrape telemetry data from an external sour
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: metrics |
| Stability | [development]: metrics, logs |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector?query=is%3Aissue%20is%3Aopen%20label%3Apkg%2F%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector/issues?q=is%3Aopen+is%3Aissue+label%3Apkg%2F) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector?query=is%3Aissue%20is%3Aclosed%20label%3Apkg%2F%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector/issues?q=is%3Aclosed+is%3Aissue+label%3Apkg%2F) |

[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
<!-- end autogenerated section -->
<!-- end autogenerated section -->
34 changes: 34 additions & 0 deletions scraper/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,18 @@ type Factory interface {
// Implementers can assume `next` is never nil.
CreateMetrics(ctx context.Context, set Settings, cfg component.Config) (Metrics, error)

// CreateLogs creates a Logs scraper based on this config.
// If the scraper type does not support logs,
// this function returns the error [pipeline.ErrSignalNotSupported].
// Implementers can assume `next` is never nil.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, what is next here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Implementers can assume `next` is never nil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atoulme Good catch! I think this line is copied from somewhere. Removed it.

CreateLogs(ctx context.Context, set Settings, cfg component.Config) (Logs, error)

// MetricsStability gets the stability level of the Metrics scraper.
MetricsStability() component.StabilityLevel

// LogsStability gets the stability level of the Logs scraper.
LogsStability() component.StabilityLevel

unexportedFactoryFunc()
}

Expand All @@ -60,7 +69,9 @@ type factory struct {
cfgType component.Type
component.CreateDefaultConfigFunc
CreateMetricsFunc
CreateLogsFunc
metricsStabilityLevel component.StabilityLevel
logsStabilityLevel component.StabilityLevel
}

func (f *factory) Type() component.Type {
Expand All @@ -73,9 +84,16 @@ func (f *factory) MetricsStability() component.StabilityLevel {
return f.metricsStabilityLevel
}

func (f *factory) LogsStability() component.StabilityLevel {
return f.logsStabilityLevel
}

// CreateMetricsFunc is the equivalent of Factory.CreateMetrics().
type CreateMetricsFunc func(context.Context, Settings, component.Config) (Metrics, error)

// CreateLogsFunc is the equivalent of Factory.CreateLogs().
type CreateLogsFunc func(context.Context, Settings, component.Config) (Logs, error)

// CreateMetrics implements Factory.CreateMetrics.
func (f CreateMetricsFunc) CreateMetrics(ctx context.Context, set Settings, cfg component.Config) (Metrics, error) {
if f == nil {
Expand All @@ -84,6 +102,14 @@ func (f CreateMetricsFunc) CreateMetrics(ctx context.Context, set Settings, cfg
return f(ctx, set, cfg)
}

// CreateLogs implements Factory.CreateLogs.
func (f CreateLogsFunc) CreateLogs(ctx context.Context, set Settings, cfg component.Config) (Logs, error) {
if f == nil {
return nil, pipeline.ErrSignalNotSupported
}
return f(ctx, set, cfg)
}

// WithMetrics overrides the default "error not supported" implementation for CreateMetrics and the default "undefined" stability level.
func WithMetrics(createMetrics CreateMetricsFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
Expand All @@ -92,6 +118,14 @@ func WithMetrics(createMetrics CreateMetricsFunc, sl component.StabilityLevel) F
})
}

// WithLogs overrides the default "error not supported" implementation for CreateLogs and the default "undefined" stability level.
func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
o.logsStabilityLevel = sl
o.CreateLogsFunc = createLogs
})
}

// NewFactory returns a Factory.
func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory {
f := &factory{
Expand Down
12 changes: 11 additions & 1 deletion scraper/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func TestNewFactory(t *testing.T) {
assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig())
_, err := f.CreateMetrics(context.Background(), nopSettings(), &defaultCfg)
require.ErrorIs(t, err, pipeline.ErrSignalNotSupported)
_, err = f.CreateLogs(context.Background(), nopSettings(), &defaultCfg)
require.ErrorIs(t, err, pipeline.ErrSignalNotSupported)
}

func TestNewFactoryWithOptions(t *testing.T) {
Expand All @@ -40,13 +42,17 @@ func TestNewFactoryWithOptions(t *testing.T) {
f := NewFactory(
testType,
func() component.Config { return &defaultCfg },
WithMetrics(createMetrics, component.StabilityLevelAlpha))
WithMetrics(createMetrics, component.StabilityLevelAlpha),
WithLogs(createLogs, component.StabilityLevelAlpha))
assert.EqualValues(t, testType, f.Type())
assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig())

assert.Equal(t, component.StabilityLevelAlpha, f.MetricsStability())
assert.Equal(t, component.StabilityLevelAlpha, f.LogsStability())
_, err := f.CreateMetrics(context.Background(), Settings{}, &defaultCfg)
require.NoError(t, err)
_, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg)
require.NoError(t, err)
}

func TestMakeFactoryMap(t *testing.T) {
Expand Down Expand Up @@ -89,3 +95,7 @@ func TestMakeFactoryMap(t *testing.T) {
func createMetrics(context.Context, Settings, component.Config) (Metrics, error) {
return NewMetrics(newTestScrapeMetricsFunc(nil))
}

func createLogs(context.Context, Settings, component.Config) (Logs, error) {
return NewLogs(newTestScrapeLogsFunc(nil))
}
44 changes: 44 additions & 0 deletions scraper/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package scraper // import "go.opentelemetry.io/collector/scraper"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
)

// Logs is the base interface for logs scrapers.
type Logs interface {
component.Component

// ScrapeLogs is the base interface to indicate that how should logs be scraped.
ScrapeLogs(context.Context) (plog.Logs, error)
}

// ScrapeLogsFunc is a helper function that is similar to Logs.ScrapeLogs.
type ScrapeLogsFunc ScrapeFunc[plog.Logs]

func (sf ScrapeLogsFunc) ScrapeLogs(ctx context.Context) (plog.Logs, error) {
return sf(ctx)
}

type logs struct {
baseScraper
ScrapeLogsFunc
}

// NewLogs creates a new Logs scraper.
func NewLogs(scrape ScrapeLogsFunc, options ...Option) (Logs, error) {
if scrape == nil {
return nil, errNilFunc
}
bs := &logs{
baseScraper: newBaseScraper(options),
ScrapeLogsFunc: scrape,
}

return bs, nil
}
79 changes: 79 additions & 0 deletions scraper/logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package scraper

import (
"context"
"errors"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/plog"
)

func TestNewLogs(t *testing.T) {
mp, err := NewLogs(newTestScrapeLogsFunc(nil))
require.NoError(t, err)

require.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))
md, err := mp.ScrapeLogs(context.Background())
require.NoError(t, err)
assert.Equal(t, plog.NewLogs(), md)
require.NoError(t, mp.Shutdown(context.Background()))
}

func TestNewLogs_WithOptions(t *testing.T) {
want := errors.New("my_error")
mp, err := NewLogs(newTestScrapeLogsFunc(nil),
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }))
require.NoError(t, err)

assert.Equal(t, want, mp.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, want, mp.Shutdown(context.Background()))
}

func TestNewLogs_NilRequiredFields(t *testing.T) {
_, err := NewLogs(nil)
require.Error(t, err)
}

func TestNewLogs_ProcessLogsError(t *testing.T) {
want := errors.New("my_error")
mp, err := NewLogs(newTestScrapeLogsFunc(want))
require.NoError(t, err)
_, err = mp.ScrapeLogs(context.Background())
require.ErrorIs(t, err, want)
}

func TestLogsConcurrency(t *testing.T) {
mp, err := NewLogs(newTestScrapeLogsFunc(nil))
require.NoError(t, err)
require.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10000; j++ {
_, errScrape := mp.ScrapeLogs(context.Background())
assert.NoError(t, errScrape)
}
}()
}
wg.Wait()
require.NoError(t, mp.Shutdown(context.Background()))
}

func newTestScrapeLogsFunc(retError error) ScrapeLogsFunc {
return func(_ context.Context) (plog.Logs, error) {
return plog.NewLogs(), retError
}
}
2 changes: 1 addition & 1 deletion scraper/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ github_project: open-telemetry/opentelemetry-collector
status:
class: pkg
stability:
development: [metrics]
development: [metrics, logs]
2 changes: 1 addition & 1 deletion scraper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

var errNilFunc = errors.New("nil scrape func")

// ScrapeFunc scrapes metrics.
// ScrapeFunc scrapes data.
type ScrapeFunc[T any] func(context.Context) (T, error)

// Option apply changes to internal options.
Expand Down
4 changes: 2 additions & 2 deletions scraper/scrapererror/partialscrapeerror.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ package scrapererror // import "go.opentelemetry.io/collector/scraper/scrapererr
import "errors"

// PartialScrapeError is an error to represent
// that a subset of metrics were failed to be scraped.
// that a subset of data were failed to be scraped.
type PartialScrapeError struct {
error
Failed int
}

// NewPartialScrapeError creates PartialScrapeError for failed metrics.
// NewPartialScrapeError creates PartialScrapeError for failed data.
// Use this error type only when a subset of data was failed to be scraped.
func NewPartialScrapeError(err error, failed int) PartialScrapeError {
return PartialScrapeError{
Expand Down
Loading