Skip to content

Commit

Permalink
support database metrics (#272)
Browse files Browse the repository at this point in the history
* support database metrics

* add more ut
  • Loading branch information
123liuziming authored Dec 31, 2024
1 parent ec673cc commit 8e26c43
Show file tree
Hide file tree
Showing 16 changed files with 340 additions and 23 deletions.
127 changes: 127 additions & 0 deletions pkg/inst-api-semconv/instrumenter/db/db_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) 2024 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// Db://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package db

import (
"context"
"errors"
"fmt"
"github.com/alibaba/opentelemetry-go-auto-instrumentation/pkg/inst-api-semconv/instrumenter/utils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"log"
"sync"
"time"
)

const db_client_request_duration = "db.client.request.duration"

type DbClientMetric struct {
key attribute.Key
clientRequestDuration metric.Float64Histogram
}

var mu sync.Mutex

var dbMetricsConv = map[attribute.Key]bool{
semconv.DBSystemKey: true,
semconv.DBOperationNameKey: true,
semconv.ServerAddressKey: true,
}

var globalMeter metric.Meter

// InitDbMetrics so we need to make sure the otel_setup is executed before all the init() function
// related to issue Dbs://github.com/alibaba/opentelemetry-go-auto-instrumentation/issues/48
func InitDbMetrics(m metric.Meter) {
mu.Lock()
defer mu.Unlock()
globalMeter = m
}

func DbClientMetrics(key string) *DbClientMetric {
mu.Lock()
defer mu.Unlock()
return &DbClientMetric{key: attribute.Key(key)}
}

// for test only
func newDbClientMetric(key string, meter metric.Meter) (*DbClientMetric, error) {
m := &DbClientMetric{
key: attribute.Key(key),
}
d, err := newDbClientRequestDurationMeasures(meter)
if err != nil {
return nil, err
}
m.clientRequestDuration = d
return m, nil
}

func newDbClientRequestDurationMeasures(meter metric.Meter) (metric.Float64Histogram, error) {
mu.Lock()
defer mu.Unlock()
if meter == nil {
return nil, errors.New("nil meter")
}
d, err := meter.Float64Histogram(db_client_request_duration,
metric.WithUnit("ms"),
metric.WithDescription("Duration of Db client requests."))
if err == nil {
return d, nil
} else {
return d, errors.New(fmt.Sprintf("failed to create Db.client.request.duratio histogram, %v", err))
}
}

type dbMetricContext struct {
startTime time.Time
startAttributes []attribute.KeyValue
}

func (h DbClientMetric) OnBeforeStart(parentContext context.Context, startTime time.Time) context.Context {
return parentContext
}

func (h DbClientMetric) OnBeforeEnd(ctx context.Context, startAttributes []attribute.KeyValue, startTime time.Time) context.Context {
return context.WithValue(ctx, h.key, dbMetricContext{
startTime: startTime,
startAttributes: startAttributes,
})
}

func (h DbClientMetric) OnAfterStart(context context.Context, endTime time.Time) {
return
}

func (h DbClientMetric) OnAfterEnd(context context.Context, endAttributes []attribute.KeyValue, endTime time.Time) {
mc := context.Value(h.key).(dbMetricContext)
startTime, startAttributes := mc.startTime, mc.startAttributes
// end attributes should be shadowed by AttrsShadower
if h.clientRequestDuration == nil {
var err error
// second change to init the metric
h.clientRequestDuration, err = newDbClientRequestDurationMeasures(globalMeter)
if err != nil {
log.Printf("failed to create clientRequestDuration, err is %v\n", err)
}
}
endAttributes = append(endAttributes, startAttributes...)
n, metricsAttrs := utils.Shadow(endAttributes, dbMetricsConv)
if h.clientRequestDuration != nil {
h.clientRequestDuration.Record(context, float64(endTime.Sub(startTime)), metric.WithAttributeSet(attribute.NewSet(metricsAttrs[0:n]...)))
}
}
139 changes: 139 additions & 0 deletions pkg/inst-api-semconv/instrumenter/db/db_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright (c) 2024 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package db

import (
"context"
"github.com/alibaba/opentelemetry-go-auto-instrumentation/pkg/inst-api-semconv/instrumenter/utils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"testing"
"time"
)

func TestDbClientMetrics(t *testing.T) {
reader := metric.NewManualReader()
res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("my-service"),
semconv.ServiceVersion("v0.1.0"),
)
mp := metric.NewMeterProvider(metric.WithResource(res), metric.WithReader(reader))
meter := mp.Meter("test-meter")
client, err := newDbClientMetric("test", meter)
if err != nil {
panic(err)
}
ctx := context.Background()
start := time.Now()
ctx = client.OnBeforeStart(ctx, start)
ctx = client.OnBeforeEnd(ctx, []attribute.KeyValue{}, start)
client.OnAfterStart(ctx, start)
client.OnAfterEnd(ctx, []attribute.KeyValue{}, time.Now())
rm := &metricdata.ResourceMetrics{}
reader.Collect(ctx, rm)
if rm.ScopeMetrics[0].Metrics[0].Name != "db.client.request.duration" {
panic("wrong metrics name, " + rm.ScopeMetrics[0].Metrics[0].Name)
}
}

func TestDbMetricAttributesShadower(t *testing.T) {
attrs := make([]attribute.KeyValue, 0)
attrs = append(attrs, attribute.KeyValue{
Key: semconv.DBSystemKey,
Value: attribute.StringValue("mysql"),
}, attribute.KeyValue{
Key: "unknown",
Value: attribute.Value{},
}, attribute.KeyValue{
Key: semconv.DBOperationNameKey,
Value: attribute.StringValue("Db"),
}, attribute.KeyValue{
Key: semconv.ServerAddressKey,
Value: attribute.StringValue("abc"),
})
n, attrs := utils.Shadow(attrs, dbMetricsConv)
if n != 3 {
panic("wrong shadow array")
}
if attrs[3].Key != "unknown" {
panic("unknown should be the last attribute")
}
}

func TestLazyDbClientMetrics(t *testing.T) {
reader := metric.NewManualReader()
res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("my-service"),
semconv.ServiceVersion("v0.1.0"),
)
mp := metric.NewMeterProvider(metric.WithResource(res), metric.WithReader(reader))
m := mp.Meter("test-meter")
InitDbMetrics(m)
client := DbClientMetrics("db.client")
ctx := context.Background()
start := time.Now()
ctx = client.OnBeforeStart(ctx, start)
ctx = client.OnBeforeEnd(ctx, []attribute.KeyValue{}, start)
client.OnAfterStart(ctx, start)
client.OnAfterEnd(ctx, []attribute.KeyValue{}, time.Now())
rm := &metricdata.ResourceMetrics{}
reader.Collect(ctx, rm)
if rm.ScopeMetrics[0].Metrics[0].Name != "db.client.request.duration" {
panic("wrong metrics name, " + rm.ScopeMetrics[0].Metrics[0].Name)
}
}

func TestGlobalDbClientMetrics(t *testing.T) {
reader := metric.NewManualReader()
res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("my-service"),
semconv.ServiceVersion("v0.1.0"),
)
mp := metric.NewMeterProvider(metric.WithResource(res), metric.WithReader(reader))
m := mp.Meter("test-meter")
InitDbMetrics(m)
client := DbClientMetrics("db.client")
ctx := context.Background()
start := time.Now()
ctx = client.OnBeforeStart(ctx, start)
ctx = client.OnBeforeEnd(ctx, []attribute.KeyValue{}, start)
client.OnAfterStart(ctx, start)
client.OnAfterEnd(ctx, []attribute.KeyValue{}, time.Now())
rm := &metricdata.ResourceMetrics{}
reader.Collect(ctx, rm)
if rm.ScopeMetrics[0].Metrics[0].Name != "db.client.request.duration" {
panic("wrong metrics name, " + rm.ScopeMetrics[0].Metrics[0].Name)
}
}

func TestNilMeter(t *testing.T) {
reader := metric.NewManualReader()
res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("my-service"),
semconv.ServiceVersion("v0.1.0"),
)
_ = metric.NewMeterProvider(metric.WithResource(res), metric.WithReader(reader))
_, err := newDbClientMetric("test", nil)
if err == nil {
panic(err)
}
}
28 changes: 28 additions & 0 deletions pkg/inst-api-semconv/instrumenter/http/http_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,31 @@ func TestGlobalHttpClientMetrics(t *testing.T) {
panic("wrong metrics name, " + rm.ScopeMetrics[0].Metrics[0].Name)
}
}

func TestClientNilMeter(t *testing.T) {
reader := metric.NewManualReader()
res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("my-service"),
semconv.ServiceVersion("v0.1.0"),
)
_ = metric.NewMeterProvider(metric.WithResource(res), metric.WithReader(reader))
_, err := newHttpClientMetric("test", nil)
if err == nil {
panic(err)
}
}

func TestServerNilMeter(t *testing.T) {
reader := metric.NewManualReader()
res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("my-service"),
semconv.ServiceVersion("v0.1.0"),
)
_ = metric.NewMeterProvider(metric.WithResource(res), metric.WithReader(reader))
_, err := newHttpServerMetric("test", nil)
if err == nil {
panic(err)
}
}
2 changes: 1 addition & 1 deletion pkg/inst-api-semconv/instrumenter/rpc/rpc_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var rpcMetricsConv = map[attribute.Key]bool{

var globalMeter metric.Meter

// so we need to make sure the otel_setup is executed before all the init() function
// InitRpcMetrics so we need to make sure the otel_setup is executed before all the init() function
// related to issue rpcs://github.com/alibaba/opentelemetry-go-auto-instrumentation/issues/48
func InitRpcMetrics(m metric.Meter) {
mu.Lock()
Expand Down
28 changes: 28 additions & 0 deletions pkg/inst-api-semconv/instrumenter/rpc/rpc_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,31 @@ func TestGlobalRpcClientMetrics(t *testing.T) {
panic("wrong metrics name, " + rm.ScopeMetrics[0].Metrics[0].Name)
}
}

func TestNilClientMeter(t *testing.T) {
reader := metric.NewManualReader()
res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("my-service"),
semconv.ServiceVersion("v0.1.0"),
)
_ = metric.NewMeterProvider(metric.WithResource(res), metric.WithReader(reader))
_, err := newRpcClientMetric("test", nil)
if err == nil {
panic(err)
}
}

func TestNilServerMeter(t *testing.T) {
reader := metric.NewManualReader()
res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("my-service"),
semconv.ServiceVersion("v0.1.0"),
)
_ = metric.NewMeterProvider(metric.WithResource(res), metric.WithReader(reader))
_, err := newRpcServerMetric("test", nil)
if err == nil {
panic(err)
}
}
3 changes: 3 additions & 0 deletions pkg/otel_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"github.com/alibaba/opentelemetry-go-auto-instrumentation/pkg/inst-api-semconv/instrumenter/db"
"github.com/alibaba/opentelemetry-go-auto-instrumentation/pkg/inst-api-semconv/instrumenter/experimental"
"github.com/alibaba/opentelemetry-go-auto-instrumentation/pkg/inst-api-semconv/instrumenter/rpc"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -161,6 +162,8 @@ func initMetrics() error {
http.InitHttpMetrics(m)
// init rpc metrics
rpc.InitRpcMetrics(m)
// init db metrics
db.InitDbMetrics(m)
// nacos experimental metrics
experimental.InitNacosExperimentalMetrics(m)
// DefaultMinimumReadMemStatsInterval is 15 second
Expand Down
3 changes: 2 additions & 1 deletion pkg/rules/databasesql/databasesql_otel_instrumenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,6 @@ func BuildDatabaseSqlOtelInstrumenter() instrumenter.Instrumenter[databaseSqlReq
SetInstrumentationScope(instrumentation.Scope{
Name: utils.DATABASE_SQL_SCOPE_NAME,
Version: version.Tag,
}).BuildInstrumenter()
}).AddOperationListeners(db.DbClientMetrics("database.sql")).
BuildInstrumenter()
}
2 changes: 2 additions & 0 deletions pkg/rules/fasthttp/fasthttp_otel_instrumenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func BuildFastHttpClientOtelInstrumenter() *instrumenter.PropagatingToDownstream
networkExtractor := net.NetworkAttrsExtractor[fastHttpRequest, fastHttpResponse, fastHttpClientAttrsGetter]{Getter: clientGetter}
return builder.Init().SetSpanStatusExtractor(http.HttpClientSpanStatusExtractor[fastHttpRequest, fastHttpResponse]{Getter: clientGetter}).SetSpanNameExtractor(&http.HttpClientSpanNameExtractor[fastHttpRequest, fastHttpResponse]{Getter: clientGetter}).
SetSpanKindExtractor(&instrumenter.AlwaysClientExtractor[fastHttpRequest]{}).
AddOperationListeners(http.HttpClientMetrics("fasthttp.client")).
SetInstrumentationScope(instrumentation.Scope{
Name: utils.FAST_HTTP_CLIENT_SCOPE_NAME,
Version: version.Tag,
Expand All @@ -212,6 +213,7 @@ func BuildFastHttpServerOtelInstrumenter() *instrumenter.PropagatingFromUpstream
urlExtractor := net.UrlAttrsExtractor[fastHttpRequest, fastHttpResponse, fastHttpServerAttrsGetter]{Getter: serverGetter}
return builder.Init().SetSpanStatusExtractor(http.HttpServerSpanStatusExtractor[fastHttpRequest, fastHttpResponse]{Getter: serverGetter}).SetSpanNameExtractor(&http.HttpServerSpanNameExtractor[fastHttpRequest, fastHttpResponse]{Getter: serverGetter}).
SetSpanKindExtractor(&instrumenter.AlwaysServerExtractor[fastHttpRequest]{}).
AddOperationListeners(http.HttpClientMetrics("fasthttp.server")).
SetInstrumentationScope(instrumentation.Scope{
Name: utils.FAST_HTTP_SERVER_SCOPE_NAME,
Version: version.Tag,
Expand Down
1 change: 1 addition & 0 deletions pkg/rules/goredis/goredis_otel_instrumenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func BuildGoRedisOtelInstrumenter() instrumenter.Instrumenter[goRedisRequest, an
getter := goRedisAttrsGetter{}
return builder.Init().SetSpanNameExtractor(&db.DBSpanNameExtractor[goRedisRequest]{Getter: getter}).SetSpanKindExtractor(&instrumenter.AlwaysClientExtractor[goRedisRequest]{}).
AddAttributesExtractor(&db.DbClientAttrsExtractor[goRedisRequest, any, db.DbClientAttrsGetter[goRedisRequest]]{Base: db.DbClientCommonAttrsExtractor[goRedisRequest, any, db.DbClientAttrsGetter[goRedisRequest]]{Getter: getter}}).
AddOperationListeners(db.DbClientMetrics("nosql.goredisv9")).
SetInstrumentationScope(instrumentation.Scope{
Name: utils.GO_REDIS_V9_SCOPE_NAME,
Version: version.Tag,
Expand Down
1 change: 1 addition & 0 deletions pkg/rules/goredisv8/goredis_v8_otel_instrumenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func BuildRedisv8Instrumenter() instrumenter.Instrumenter[redisv8Data, any] {
getter := goRedisV8AttrsGetter{}
return builder.Init().SetSpanNameExtractor(&db.DBSpanNameExtractor[redisv8Data]{Getter: getter}).SetSpanKindExtractor(&instrumenter.AlwaysClientExtractor[redisv8Data]{}).
AddAttributesExtractor(&db.DbClientAttrsExtractor[redisv8Data, any, db.DbClientAttrsGetter[redisv8Data]]{Base: db.DbClientCommonAttrsExtractor[redisv8Data, any, db.DbClientAttrsGetter[redisv8Data]]{Getter: getter}}).
AddOperationListeners(db.DbClientMetrics("nosql.goredisv8")).
SetInstrumentationScope(instrumentation.Scope{
Name: utils.GO_REDIS_V8_SCOPE_NAME,
Version: version.Tag,
Expand Down
Loading

0 comments on commit 8e26c43

Please sign in to comment.