diff --git a/client/admin.go b/client/admin.go index 3246d65d..f285e679 100644 --- a/client/admin.go +++ b/client/admin.go @@ -14,12 +14,17 @@ package client import ( "context" + "go.opentelemetry.io/otel" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-sdk-go/v2/entity" ) // GetVersion returns milvus server version information. func (c *GrpcClient) GetVersion(ctx context.Context) (string, error) { + method := "GetVersion" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() if c.Service == nil { return "", ErrClientNotReady } @@ -32,6 +37,9 @@ func (c *GrpcClient) GetVersion(ctx context.Context) (string, error) { // CheckHealth returns milvus state func (c *GrpcClient) CheckHealth(ctx context.Context) (*entity.MilvusState, error) { + method := "CheckHealth" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() if c.Service == nil { return nil, ErrClientNotReady } diff --git a/client/alias.go b/client/alias.go index d374aea8..4a86ac61 100644 --- a/client/alias.go +++ b/client/alias.go @@ -18,12 +18,19 @@ package client import ( "context" + "log" + + "go.opentelemetry.io/otel" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" ) // CreateAlias creates an alias for collection func (c *GrpcClient) CreateAlias(ctx context.Context, collName string, alias string) error { + method := "CreateAlias" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -36,10 +43,12 @@ func (c *GrpcClient) CreateAlias(ctx context.Context, collName string, alias str resp, err := c.Service.CreateAlias(ctx, req) if err != nil { + log.Printf("create alias failed, collName:%s, alias:%s, traceID:%s err: %v", collName, alias, traceID, err) return err } err = handleRespStatus(resp) if err != nil { + log.Printf("create alias failed, collName:%s, alias:%s, traceID:%s err: %v", collName, alias, traceID, err) return err } return nil @@ -47,6 +56,10 @@ func (c *GrpcClient) CreateAlias(ctx context.Context, collName string, alias str // DropAlias drops the specified Alias func (c *GrpcClient) DropAlias(ctx context.Context, alias string) error { + method := "DropAlias" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -58,10 +71,12 @@ func (c *GrpcClient) DropAlias(ctx context.Context, alias string) error { resp, err := c.Service.DropAlias(ctx, req) if err != nil { + log.Printf("drop alias failed, alias:%s, traceID:%s err: %v", alias, traceID, err) return err } err = handleRespStatus(resp) if err != nil { + log.Printf("drop alias failed, alias:%s, traceID:%s err: %v", alias, traceID, err) return err } return nil @@ -69,6 +84,10 @@ func (c *GrpcClient) DropAlias(ctx context.Context, alias string) error { // AlterAlias changes collection alias to provided alias func (c *GrpcClient) AlterAlias(ctx context.Context, collName string, alias string) error { + method := "AlterAlias" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -81,10 +100,12 @@ func (c *GrpcClient) AlterAlias(ctx context.Context, collName string, alias stri resp, err := c.Service.AlterAlias(ctx, req) if err != nil { + log.Printf("alter alias failed, collName:%s, alias:%s, traceID:%s err: %v", collName, alias, traceID, err) return err } err = handleRespStatus(resp) if err != nil { + log.Printf("alter alias failed, collName:%s, alias:%s, traceID:%s err: %v", collName, alias, traceID, err) return err } return nil diff --git a/client/authentication.go b/client/authentication.go index 8dec4ecf..a369e4c6 100644 --- a/client/authentication.go +++ b/client/authentication.go @@ -2,6 +2,9 @@ package client import ( "context" + "log" + + "go.opentelemetry.io/otel" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-sdk-go/v2/internal/utils/crypto" @@ -9,6 +12,10 @@ import ( // CreateCredential create new user and password func (c *GrpcClient) CreateCredential(ctx context.Context, username string, password string) error { + method := "CreateCredential" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -18,10 +25,12 @@ func (c *GrpcClient) CreateCredential(ctx context.Context, username string, pass } resp, err := c.Service.CreateCredential(ctx, req) if err != nil { + log.Printf("create credential failed, traceID:%s err: %v", traceID, err) return err } err = handleRespStatus(resp) if err != nil { + log.Printf("create credential failed, traceID:%s err: %v", traceID, err) return err } return nil @@ -29,6 +38,10 @@ func (c *GrpcClient) CreateCredential(ctx context.Context, username string, pass // UpdateCredential update password for a user func (c *GrpcClient) UpdateCredential(ctx context.Context, username string, oldPassword string, newPassword string) error { + method := "UpdateCredential" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -39,10 +52,12 @@ func (c *GrpcClient) UpdateCredential(ctx context.Context, username string, oldP } resp, err := c.Service.UpdateCredential(ctx, req) if err != nil { + log.Printf("update credential failed, traceID:%s err: %v", traceID, err) return err } err = handleRespStatus(resp) if err != nil { + log.Printf("update credential failed, traceID:%s err: %v", traceID, err) return err } return nil @@ -50,6 +65,10 @@ func (c *GrpcClient) UpdateCredential(ctx context.Context, username string, oldP // DeleteCredential delete a user func (c *GrpcClient) DeleteCredential(ctx context.Context, username string) error { + method := "DeleteCredential" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -58,10 +77,12 @@ func (c *GrpcClient) DeleteCredential(ctx context.Context, username string) erro } resp, err := c.Service.DeleteCredential(ctx, req) if err != nil { + log.Printf("delete credential failed, traceID:%s err: %v", traceID, err) return err } err = handleRespStatus(resp) if err != nil { + log.Printf("delete credential failed, traceID:%s err: %v", traceID, err) return err } return nil @@ -69,16 +90,22 @@ func (c *GrpcClient) DeleteCredential(ctx context.Context, username string) erro // ListCredUsers list all usernames func (c *GrpcClient) ListCredUsers(ctx context.Context) ([]string, error) { + method := "ListCredUsers" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } req := &milvuspb.ListCredUsersRequest{} resp, err := c.Service.ListCredUsers(ctx, req) if err != nil { + log.Printf("list credential users failed, traceID:%s err: %v", traceID, err) return nil, err } err = handleRespStatus(resp.Status) if err != nil { + log.Printf("list credential users failed, traceID:%s err: %v", traceID, err) return nil, err } return resp.Usernames, nil diff --git a/client/client.go b/client/client.go index 0bff2158..8425fdcc 100644 --- a/client/client.go +++ b/client/client.go @@ -14,12 +14,19 @@ package client import ( "context" + "sync" "time" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus-sdk-go/v2/entity" ) @@ -261,6 +268,46 @@ type Client interface { HybridSearch(ctx context.Context, collName string, partitions []string, limit int, outputFields []string, reranker Reranker, subRequests []*ANNSearchRequest, opts ...SearchQueryOptionFunc) ([]SearchResult, error) } +var ( + initClientOnce sync.Once +) + +func initTracerOnce() { + initClientOnce.Do(func() { + // init trace noop provider + tp := sdk.NewTracerProvider( + sdk.WithBatcher(nil), + sdk.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String("Client"), + )), + sdk.WithSampler(sdk.ParentBased( + sdk.TraceIDRatioBased(1), + )), + ) + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) + }) +} + +func getUnaryClientInterceptor() grpc.UnaryClientInterceptor { + initTracerOnce() + return otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider())) + +} + +func StartTrace(ctx context.Context, name string, spanName string) (context.Context, trace.Span, string) { + initTracerOnce() + ctx, span := otel.Tracer(name).Start(ctx, spanName) + return ctx, span, span.SpanContext().TraceID().String() +} + +func StartNewTrace(name string, spanName string) (context.Context, trace.Span, string) { + initTracerOnce() + ctx, span := otel.Tracer(name).Start(context.Background(), spanName) + return ctx, span, span.SpanContext().TraceID().String() +} + // NewClient create a client connected to remote milvus cluster. // More connect option can be modified by Config. func NewClient(ctx context.Context, config Config) (Client, error) { @@ -277,6 +324,7 @@ func NewClient(ctx context.Context, config Config) (Client, error) { // Parse grpc options options := c.config.getDialOption() + options = append(options, grpc.WithChainUnaryInterceptor(getUnaryClientInterceptor())) // Connect the grpc server. if err := c.connect(ctx, addr, options...); err != nil { diff --git a/client/collection.go b/client/collection.go index 65efe504..2d620e77 100644 --- a/client/collection.go +++ b/client/collection.go @@ -13,16 +13,16 @@ package client import ( "context" + "log" "time" "github.com/cockroachdb/errors" - "github.com/golang/protobuf/proto" - - "github.com/milvus-io/milvus-sdk-go/v2/entity" + "go.opentelemetry.io/otel" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-sdk-go/v2/entity" ) // handles response status @@ -46,6 +46,10 @@ func handleRespStatus(status *commonpb.Status) error { // ListCollections list collections from connection // Note that schema info are not provided in collection list func (c *GrpcClient) ListCollections(ctx context.Context, opts ...ListCollectionOption) ([]*entity.Collection, error) { + method := "ListCollections" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return []*entity.Collection{}, ErrClientNotReady } @@ -66,10 +70,12 @@ func (c *GrpcClient) ListCollections(ctx context.Context, opts ...ListCollection resp, err := c.Service.ShowCollections(ctx, req) if err != nil { + log.Printf("list collections failed, traceID:%s err: %v", traceID, err) return []*entity.Collection{}, err } err = handleRespStatus(resp.GetStatus()) if err != nil { + log.Printf("list collections failed, traceID:%s err: %v", traceID, err) return []*entity.Collection{}, err } collections := make([]*entity.Collection, 0, len(resp.GetCollectionIds())) @@ -137,10 +143,15 @@ func (c *GrpcClient) NewCollection(ctx context.Context, collName string, dimensi // CreateCollection create collection with specified schema func (c *GrpcClient) CreateCollection(ctx context.Context, collSchema *entity.Schema, shardNum int32, opts ...CreateCollectionOption) error { + method := "CreateCollection" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } if err := c.validateSchema(collSchema); err != nil { + log.Printf("create collection failed, traceID:%s err: %v", traceID, err) return err } @@ -157,6 +168,10 @@ func (c *GrpcClient) CreateCollection(ctx context.Context, collSchema *entity.Sc } func (c *GrpcClient) requestCreateCollection(ctx context.Context, sch *entity.Schema, opt *createCollOpt, shardNum int32) error { + method := "requestCreateCollection" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if opt.EnableDynamicSchema { sch.EnableDynamicField = true } @@ -178,6 +193,7 @@ func (c *GrpcClient) requestCreateCollection(ctx context.Context, sch *entity.Sc resp, err := c.Service.CreateCollection(ctx, req) if err != nil { + log.Printf("request create collection failed, traceID:%s err: %v", traceID, err) return err } err = handleRespStatus(resp) @@ -261,6 +277,10 @@ func (c *GrpcClient) checkCollectionExists(ctx context.Context, collName string) // DescribeCollection describe the collection by name func (c *GrpcClient) DescribeCollection(ctx context.Context, collName string) (*entity.Collection, error) { + method := "DescribeCollection" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -269,10 +289,12 @@ func (c *GrpcClient) DescribeCollection(ctx context.Context, collName string) (* } resp, err := c.Service.DescribeCollection(ctx, req) if err != nil { + log.Printf("describe collection failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return nil, err } err = handleRespStatus(resp.GetStatus()) if err != nil { + log.Printf("describe collection failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return nil, err } collection := &entity.Collection{ @@ -298,6 +320,10 @@ func (c *GrpcClient) DescribeCollection(ctx context.Context, collName string) (* // DropCollection drop collection by name func (c *GrpcClient) DropCollection(ctx context.Context, collName string, opts ...DropCollectionOption) error { + method := "DropCollection" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -310,6 +336,7 @@ func (c *GrpcClient) DropCollection(ctx context.Context, collName string, opts . } resp, err := c.Service.DropCollection(ctx, req) if err != nil { + log.Printf("drop collection failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return err } err = handleRespStatus(resp) @@ -321,6 +348,10 @@ func (c *GrpcClient) DropCollection(ctx context.Context, collName string, opts . // HasCollection check whether collection name exists func (c *GrpcClient) HasCollection(ctx context.Context, collName string) (bool, error) { + method := "HasCollection" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return false, ErrClientNotReady } @@ -333,6 +364,7 @@ func (c *GrpcClient) HasCollection(ctx context.Context, collName string) (bool, resp, err := c.Service.HasCollection(ctx, req) if err != nil { + log.Printf("has collection failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return false, err } if err := handleRespStatus(resp.GetStatus()); err != nil { @@ -343,6 +375,10 @@ func (c *GrpcClient) HasCollection(ctx context.Context, collName string) (bool, // GetCollectionStatistcis show collection statistics func (c *GrpcClient) GetCollectionStatistics(ctx context.Context, collName string) (map[string]string, error) { + method := "GetCollectionStatistics" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -352,9 +388,11 @@ func (c *GrpcClient) GetCollectionStatistics(ctx context.Context, collName strin } resp, err := c.Service.GetCollectionStatistics(ctx, req) if err != nil { + log.Printf("get collection statistics failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return nil, err } if err := handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("get collection statistics failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return nil, err } return entity.KvPairsMap(resp.GetStats()), nil @@ -362,6 +400,10 @@ func (c *GrpcClient) GetCollectionStatistics(ctx context.Context, collName strin // ShowCollection show collection status, used to check whether it is loaded or not func (c *GrpcClient) ShowCollection(ctx context.Context, collName string) (*entity.Collection, error) { + method := "ShowCollection" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -373,9 +415,11 @@ func (c *GrpcClient) ShowCollection(ctx context.Context, collName string) (*enti resp, err := c.Service.ShowCollections(ctx, req) if err != nil { + log.Printf("show collection failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return nil, err } if err := handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("show collection failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return nil, err } @@ -390,6 +434,10 @@ func (c *GrpcClient) ShowCollection(ctx context.Context, collName string) (*enti // RenameCollection performs renaming for provided collection. func (c *GrpcClient) RenameCollection(ctx context.Context, collName, newName string) error { + method := "RenameCollection" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -400,6 +448,7 @@ func (c *GrpcClient) RenameCollection(ctx context.Context, collName, newName str } resp, err := c.Service.RenameCollection(ctx, req) if err != nil { + log.Printf("rename collection failed, collName:%s, traceID:%s, err: %v", collName, traceID, err) return err } return handleRespStatus(resp) @@ -407,6 +456,10 @@ func (c *GrpcClient) RenameCollection(ctx context.Context, collName, newName str // LoadCollection load collection into memory func (c *GrpcClient) LoadCollection(ctx context.Context, collName string, async bool, opts ...LoadCollectionOption) error { + method := "LoadCollection" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -421,9 +474,11 @@ func (c *GrpcClient) LoadCollection(ctx context.Context, collName string, async resp, err := c.Service.LoadCollection(ctx, req) if err != nil { + log.Printf("load collection failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return err } if err := handleRespStatus(resp); err != nil { + log.Printf("load collection failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return err } @@ -450,6 +505,10 @@ func (c *GrpcClient) LoadCollection(ctx context.Context, collName string, async // ReleaseCollection release loaded collection func (c *GrpcClient) ReleaseCollection(ctx context.Context, collName string, opts ...ReleaseCollectionOption) error { + method := "ReleaseCollection" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -463,6 +522,7 @@ func (c *GrpcClient) ReleaseCollection(ctx context.Context, collName string, opt } resp, err := c.Service.ReleaseCollection(ctx, req) if err != nil { + log.Printf("release collection failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return err } return handleRespStatus(resp) @@ -470,6 +530,10 @@ func (c *GrpcClient) ReleaseCollection(ctx context.Context, collName string, opt // GetReplicas gets the replica groups as well as their querynodes and shards information func (c *GrpcClient) GetReplicas(ctx context.Context, collName string) ([]*entity.ReplicaGroup, error) { + method := "GetReplicas" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -485,9 +549,11 @@ func (c *GrpcClient) GetReplicas(ctx context.Context, collName string) ([]*entit resp, err := c.Service.GetReplicas(ctx, req) if err != nil { + log.Printf("get replicas failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return nil, err } if err = handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("get replicas failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return nil, err } @@ -513,6 +579,10 @@ func (c *GrpcClient) GetReplicas(ctx context.Context, collName string) ([]*entit // GetLoadingProgress get the collection or partitions loading progress func (c *GrpcClient) GetLoadingProgress(ctx context.Context, collName string, partitionNames []string) (int64, error) { + method := "GetLoadingProgress" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return 0, ErrClientNotReady } @@ -523,6 +593,7 @@ func (c *GrpcClient) GetLoadingProgress(ctx context.Context, collName string, pa } resp, err := c.Service.GetLoadingProgress(ctx, req) if err != nil { + log.Printf("get loading progress failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return 0, err } @@ -531,6 +602,10 @@ func (c *GrpcClient) GetLoadingProgress(ctx context.Context, collName string, pa // GetLoadState get the collection or partitions load state func (c *GrpcClient) GetLoadState(ctx context.Context, collName string, partitionNames []string) (entity.LoadState, error) { + method := "GetLoadState" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return 0, ErrClientNotReady } @@ -541,6 +616,7 @@ func (c *GrpcClient) GetLoadState(ctx context.Context, collName string, partitio } resp, err := c.Service.GetLoadState(ctx, req) if err != nil { + log.Printf("get load state failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return 0, err } @@ -549,6 +625,10 @@ func (c *GrpcClient) GetLoadState(ctx context.Context, collName string, partitio // AlterCollection changes the collection attribute. func (c *GrpcClient) AlterCollection(ctx context.Context, collName string, attrs ...entity.CollectionAttribute) error { + method := "AlterCollection" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -579,6 +659,7 @@ func (c *GrpcClient) AlterCollection(ctx context.Context, collName string, attrs resp, err := c.Service.AlterCollection(ctx, req) if err != nil { + log.Printf("alter collection failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return err } return handleRespStatus(resp) diff --git a/client/data.go b/client/data.go index 17b82038..22a1cc98 100644 --- a/client/data.go +++ b/client/data.go @@ -20,6 +20,7 @@ import ( "strings" "github.com/cockroachdb/errors" + "go.opentelemetry.io/otel" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -40,12 +41,17 @@ const ( ) func (c *GrpcClient) HybridSearch(ctx context.Context, collName string, partitions []string, limit int, outputFields []string, reranker Reranker, subRequests []*ANNSearchRequest, opts ...SearchQueryOptionFunc) ([]SearchResult, error) { + method := "HybridSearch" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } collInfo, err := c.getCollectionInfo(ctx, collName) if err != nil { + log.Printf("hybrid search failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return nil, err } schema := collInfo.Schema @@ -85,6 +91,7 @@ func (c *GrpcClient) HybridSearch(ctx context.Context, collName string, partitio err = merr.CheckRPCCall(result, err) if err != nil { + log.Printf("hybrid search failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return nil, err } @@ -95,6 +102,10 @@ func (c *GrpcClient) HybridSearch(ctx context.Context, collName string, partitio func (c *GrpcClient) Search(ctx context.Context, collName string, partitions []string, expr string, outputFields []string, vectors []entity.Vector, vectorField string, metricType entity.MetricType, topK int, sp entity.SearchParam, opts ...SearchQueryOptionFunc, ) ([]SearchResult, error) { + method := "Search" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return []SearchResult{}, ErrClientNotReady } @@ -116,6 +127,7 @@ func (c *GrpcClient) Search(ctx context.Context, collName string, partitions []s resp, err := c.Service.Search(ctx, req) if err != nil { + log.Printf("search failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return nil, err } if err := handleRespStatus(resp.GetStatus()); err != nil { @@ -292,6 +304,9 @@ func (c *GrpcClient) Get(ctx context.Context, collectionName string, ids entity. // QueryByPks query record by specified primary key(s) func (c *GrpcClient) QueryByPks(ctx context.Context, collectionName string, partitionNames []string, ids entity.Column, outputFields []string, opts ...SearchQueryOptionFunc) (ResultSet, error) { + method := "QueryByPks" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() if c.Service == nil { return nil, ErrClientNotReady } @@ -310,6 +325,10 @@ func (c *GrpcClient) QueryByPks(ctx context.Context, collectionName string, part // Query performs query by expression. func (c *GrpcClient) Query(ctx context.Context, collectionName string, partitionNames []string, expr string, outputFields []string, opts ...SearchQueryOptionFunc) (ResultSet, error) { + method := "Query" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -352,10 +371,12 @@ func (c *GrpcClient) Query(ctx context.Context, collectionName string, partition resp, err := c.Service.Query(ctx, req) if err != nil { + log.Printf("query failed, collName:%s, traceID:%s err: %v", collectionName, traceID, err) return nil, err } err = handleRespStatus(resp.GetStatus()) if err != nil { + log.Printf("query failed, collName:%s, traceID:%s err: %v", collectionName, traceID, err) return nil, err } @@ -431,6 +452,10 @@ func prepareSearchRequest(collName string, partitions []string, // GetPersistentSegmentInfo get persistent segment info func (c *GrpcClient) GetPersistentSegmentInfo(ctx context.Context, collName string) ([]*entity.Segment, error) { + method := "GetPersistentSegmentInfo" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return []*entity.Segment{}, ErrClientNotReady } @@ -440,9 +465,11 @@ func (c *GrpcClient) GetPersistentSegmentInfo(ctx context.Context, collName stri } resp, err := c.Service.GetPersistentSegmentInfo(ctx, req) if err != nil { + log.Printf("get persistent segment info failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return []*entity.Segment{}, err } if err := handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("get persistent segment info failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return []*entity.Segment{}, err } segments := make([]*entity.Segment, 0, len(resp.GetInfos())) @@ -461,6 +488,10 @@ func (c *GrpcClient) GetPersistentSegmentInfo(ctx context.Context, collName stri // GetQuerySegmentInfo get query query cluster segment loaded info func (c *GrpcClient) GetQuerySegmentInfo(ctx context.Context, collName string) ([]*entity.Segment, error) { + method := "GetQuerySegmentInfo" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return []*entity.Segment{}, ErrClientNotReady } @@ -470,6 +501,7 @@ func (c *GrpcClient) GetQuerySegmentInfo(ctx context.Context, collName string) ( } resp, err := c.Service.GetQuerySegmentInfo(ctx, req) if err != nil { + log.Printf("get query segment info failed, collName:%s traceID:%s, err: %v", collName, traceID, err) return []*entity.Segment{}, err } if err := handleRespStatus(resp.GetStatus()); err != nil { @@ -493,6 +525,10 @@ func (c *GrpcClient) GetQuerySegmentInfo(ctx context.Context, collName string) ( func (c *GrpcClient) CalcDistance(ctx context.Context, collName string, partitions []string, metricType entity.MetricType, opLeft, opRight entity.Column, ) (entity.Column, error) { + method := "CalcDistance" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -513,9 +549,11 @@ func (c *GrpcClient) CalcDistance(ctx context.Context, collName string, partitio resp, err := c.Service.CalcDistance(ctx, req) if err != nil { + log.Printf("calc distance failed, collName:%s traceID:%s err: %v", collName, traceID, err) return nil, err } if err := handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("calc distance failed, collName:%s traceID:%s err: %v", collName, traceID, err) return nil, err } diff --git a/client/database.go b/client/database.go index cfa6929e..f3459696 100644 --- a/client/database.go +++ b/client/database.go @@ -18,8 +18,11 @@ package client import ( "context" + "log" "github.com/cockroachdb/errors" + "go.opentelemetry.io/otel" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-sdk-go/v2/entity" @@ -31,9 +34,14 @@ import ( // 2. goroutine B call UsingDatabase(ctx, "DB2"). // 3. goroutine A access DB2 after 2. func (c *GrpcClient) UsingDatabase(ctx context.Context, dbName string) error { + method := "UsingDatabase" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() c.config.useDatabase(dbName) err := c.connectInternal(ctx) if err != nil { + log.Printf("using database failed, traceID:%s err: %v", traceID, err) return err } c.cache.reset() @@ -44,6 +52,10 @@ func (c *GrpcClient) UsingDatabase(ctx context.Context, dbName string) error { // CreateDatabase creates a new database for remote Milvus cluster. // TODO:New options can be added as expanding parameters. func (c *GrpcClient) CreateDatabase(ctx context.Context, dbName string, opts ...CreateDatabaseOption) error { + method := "CreateDatabase" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -63,6 +75,7 @@ func (c *GrpcClient) CreateDatabase(ctx context.Context, dbName string, opts ... resp, err := c.Service.CreateDatabase(ctx, req) if err != nil { + log.Printf("create database failed, traceID:%s err: %v", traceID, err) return err } return handleRespStatus(resp) @@ -70,6 +83,10 @@ func (c *GrpcClient) CreateDatabase(ctx context.Context, dbName string, opts ... // ListDatabases list all database in milvus cluster. func (c *GrpcClient) ListDatabases(ctx context.Context) ([]entity.Database, error) { + method := "ListDatabases" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -80,9 +97,11 @@ func (c *GrpcClient) ListDatabases(ctx context.Context) ([]entity.Database, erro req := &milvuspb.ListDatabasesRequest{} resp, err := c.Service.ListDatabases(ctx, req) if err != nil { + log.Printf("list databases failed, traceID:%s err: %v", traceID, err) return nil, err } if err = handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("list databases failed, traceID:%s err: %v", traceID, err) return nil, err } databases := make([]entity.Database, len(resp.GetDbNames())) @@ -96,6 +115,10 @@ func (c *GrpcClient) ListDatabases(ctx context.Context) ([]entity.Database, erro // DropDatabase drop all database in milvus cluster. func (c *GrpcClient) DropDatabase(ctx context.Context, dbName string, opts ...DropDatabaseOption) error { + method := "DropDatabase" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -111,6 +134,7 @@ func (c *GrpcClient) DropDatabase(ctx context.Context, dbName string, opts ...Dr } resp, err := c.Service.DropDatabase(ctx, req) if err != nil { + log.Printf("DropDatabase failed, traceID:%s err: %v", traceID, err) return err } return handleRespStatus(resp) @@ -118,6 +142,10 @@ func (c *GrpcClient) DropDatabase(ctx context.Context, dbName string, opts ...Dr // AlterDatabase changes the database attribute. func (c *GrpcClient) AlterDatabase(ctx context.Context, dbName string, attrs ...entity.DatabaseAttribute) error { + method := "AlterDatabase" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -151,6 +179,7 @@ func (c *GrpcClient) AlterDatabase(ctx context.Context, dbName string, attrs ... resp, err := c.Service.AlterDatabase(ctx, req) if err != nil { + log.Printf("AlterDatabase failed, traceID:%s err: %v", traceID, err) return err } return handleRespStatus(resp) @@ -158,6 +187,10 @@ func (c *GrpcClient) AlterDatabase(ctx context.Context, dbName string, attrs ... // DropDatabase drop all database in milvus cluster. func (c *GrpcClient) DescribeDatabase(ctx context.Context, dbName string) (*entity.Database, error) { + method := "DescribeDatabase" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -171,10 +204,12 @@ func (c *GrpcClient) DescribeDatabase(ctx context.Context, dbName string) (*enti resp, err := c.Service.DescribeDatabase(ctx, req) if err != nil { + log.Printf("DescribeDatabase failed, traceID:%s err: %v", traceID, err) return nil, err } err = handleRespStatus(resp.GetStatus()) if err != nil { + log.Printf("DescribeDatabase failed, traceID:%s err: %v", traceID, err) return nil, err } database := &entity.Database{ diff --git a/client/index.go b/client/index.go index 97d3ef77..2d6d7add 100644 --- a/client/index.go +++ b/client/index.go @@ -14,9 +14,12 @@ package client import ( "context" "fmt" + "log" "strconv" "time" + "go.opentelemetry.io/otel" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-sdk-go/v2/entity" @@ -105,6 +108,10 @@ func getIndexDef(opts ...IndexOption) indexDef { // Deprecated please use CreateIndexV2 instead. func (c *GrpcClient) CreateIndex(ctx context.Context, collName string, fieldName string, idx entity.Index, async bool, opts ...IndexOption) error { + method := "CreateIndex" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -127,9 +134,11 @@ func (c *GrpcClient) CreateIndex(ctx context.Context, collName string, fieldName resp, err := c.Service.CreateIndex(ctx, req) if err != nil { + log.Printf("create index failed, collName:%s, fieldName:%s, traceID:%s err: %v", collName, fieldName, traceID, err) return err } if err = handleRespStatus(resp); err != nil { + log.Printf("create index failed, collName:%s, fieldName:%s, traceID:%s err: %v", collName, fieldName, traceID, err) return err } if !async { // sync mode, wait index building result @@ -157,6 +166,10 @@ func (c *GrpcClient) CreateIndex(ctx context.Context, collName string, fieldName // AlterIndex modifies the index params func (c *GrpcClient) AlterIndex(ctx context.Context, collName string, indexName string, opts ...IndexOption) error { + method := "AlterIndex" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -173,6 +186,7 @@ func (c *GrpcClient) AlterIndex(ctx context.Context, collName string, indexName resp, err := c.Service.AlterIndex(ctx, req) if err != nil { + log.Printf("alter index failed, collName:%s, indexName:%s, traceID:%s err: %v", collName, indexName, traceID, err) return err } return handleRespStatus(resp) @@ -180,6 +194,10 @@ func (c *GrpcClient) AlterIndex(ctx context.Context, collName string, indexName // DescribeIndex describe index func (c *GrpcClient) DescribeIndex(ctx context.Context, collName string, fieldName string, opts ...IndexOption) ([]entity.Index, error) { + method := "DescribeIndex" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return []entity.Index{}, ErrClientNotReady } @@ -189,6 +207,7 @@ func (c *GrpcClient) DescribeIndex(ctx context.Context, collName string, fieldNa idxDesc, err := c.describeIndex(ctx, collName, fieldName, opts...) if err != nil { + log.Printf("describe index failed, collName:%s, fieldName:%s, traceID:%s err: %v", collName, fieldName, traceID, err) return nil, err } @@ -212,6 +231,10 @@ func (c *GrpcClient) DescribeIndex(ctx context.Context, collName string, fieldNa // DropIndex drop index from collection // Deprecate please use DropIndexV2 instead. func (c *GrpcClient) DropIndex(ctx context.Context, collName string, fieldName string, opts ...IndexOption) error { + method := "DropIndex" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -230,6 +253,7 @@ func (c *GrpcClient) DropIndex(ctx context.Context, collName string, fieldName s resp, err := c.Service.DropIndex(ctx, req) if err != nil { + log.Printf("drop index failed, collName:%s, fieldName:%s, traceID:%s err: %v", collName, fieldName, traceID, err) return err } return handleRespStatus(resp) @@ -237,6 +261,10 @@ func (c *GrpcClient) DropIndex(ctx context.Context, collName string, fieldName s // GetIndexState get index state func (c *GrpcClient) GetIndexState(ctx context.Context, collName string, fieldName string, opts ...IndexOption) (entity.IndexState, error) { + method := "GetIndexState" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return entity.IndexState(commonpb.IndexState_Failed), ErrClientNotReady } @@ -253,9 +281,11 @@ func (c *GrpcClient) GetIndexState(ctx context.Context, collName string, fieldNa } resp, err := c.Service.GetIndexState(ctx, req) if err != nil { + log.Printf("get index state failed, collName:%s, fieldName:%s, traceID:%s err: %v", collName, fieldName, traceID, err) return entity.IndexState(commonpb.IndexState_IndexStateNone), err } if err := handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("get index state failed, collName:%s, fieldName:%s, traceID:%s err: %v", collName, fieldName, traceID, err) return entity.IndexState(commonpb.IndexState_IndexStateNone), err } @@ -264,6 +294,10 @@ func (c *GrpcClient) GetIndexState(ctx context.Context, collName string, fieldNa // GetIndexBuildProgress get index building progress func (c *GrpcClient) GetIndexBuildProgress(ctx context.Context, collName string, fieldName string, opts ...IndexOption) (total, indexed int64, err error) { + method := "GetIndexBuildProgress" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return 0, 0, ErrClientNotReady } @@ -280,9 +314,11 @@ func (c *GrpcClient) GetIndexBuildProgress(ctx context.Context, collName string, } resp, err := c.Service.GetIndexBuildProgress(ctx, req) if err != nil { + log.Printf("get index build progress failed, collName:%s, fieldName:%s, traceID:%s err: %v", collName, fieldName, traceID, err) return 0, 0, err } if err = handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("get index build progress failed, collName:%s, fieldName:%s, traceID:%s err: %v", collName, fieldName, traceID, err) return 0, 0, err } return resp.GetTotalRows(), resp.GetIndexedRows(), nil diff --git a/client/insert.go b/client/insert.go index 4739ecc9..5b2589c7 100644 --- a/client/insert.go +++ b/client/insert.go @@ -15,15 +15,17 @@ import ( "context" "encoding/json" "fmt" + "log" "time" "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" + "go.opentelemetry.io/otel" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus-sdk-go/v2/entity" ) @@ -32,11 +34,16 @@ import ( // partitionName is the partition to insert, if not specified(empty), default partition will be used // columns are slice of the column-based data func (c *GrpcClient) Insert(ctx context.Context, collName string, partitionName string, columns ...entity.Column) (entity.Column, error) { + method := "Insert" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } collInfo, err := c.getCollectionInfo(ctx, collName) if err != nil { + log.Printf("insert failed, traceID:%s err: %v", traceID, err) return nil, err } schema := collInfo.Schema @@ -44,6 +51,7 @@ func (c *GrpcClient) Insert(ctx context.Context, collName string, partitionName // convert columns to field data fieldsData, rowSize, err := c.processInsertColumns(schema, columns...) if err != nil { + log.Printf("insert failed, traceID:%s err: %v", traceID, err) return nil, err } @@ -59,9 +67,11 @@ func (c *GrpcClient) Insert(ctx context.Context, collName string, partitionName resp, err := c.Service.Insert(ctx, req) if err != nil { + log.Printf("insert failed, traceID:%s err: %v", traceID, err) return nil, err } if err := handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("insert failed, traceID:%s err: %v", traceID, err) return nil, err } c.cache.setSessionTs(collName, resp.Timestamp) @@ -224,6 +234,10 @@ func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool, opt // Flush force collection to flush memory records into storage // in sync mode, flush will wait all segments to be flushed func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, map[string]msgpb.MsgPosition, error) { + method := "Flush" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, nil, 0, nil, ErrClientNotReady } @@ -236,9 +250,11 @@ func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, o } resp, err := c.Service.Flush(ctx, req) if err != nil { + log.Printf("flush %s failed, traceID:%s err: %v", collName, traceID, err) return nil, nil, 0, nil, err } if err := handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("flush %s failed, traceID:%s err: %v", collName, traceID, err) return nil, nil, 0, nil, err } channelCPs := resp.GetChannelCps() @@ -262,6 +278,7 @@ func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, o // respect context deadline/cancel select { case <-ctx.Done(): + log.Printf("flush %s failed, traceID:%s err: %v", collName, traceID, err) return nil, nil, 0, nil, errors.New("deadline exceeded") default: } @@ -283,6 +300,10 @@ func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, o // DeleteByPks deletes entries related to provided primary keys func (c *GrpcClient) DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error { + method := "DeleteByPks" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -297,6 +318,7 @@ func (c *GrpcClient) DeleteByPks(ctx context.Context, collName string, partition coll, err := c.DescribeCollection(ctx, collName) if err != nil { + log.Printf("delete by Pks failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return err } @@ -317,10 +339,12 @@ func (c *GrpcClient) DeleteByPks(ctx context.Context, collName string, partition resp, err := c.Service.Delete(ctx, req) if err != nil { + log.Printf("delete by Pks failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return err } err = handleRespStatus(resp.GetStatus()) if err != nil { + log.Printf("delete by Pks failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return err } c.cache.setSessionTs(collName, resp.Timestamp) @@ -329,6 +353,10 @@ func (c *GrpcClient) DeleteByPks(ctx context.Context, collName string, partition // Delete deletes entries match expression func (c *GrpcClient) Delete(ctx context.Context, collName string, partitionName string, expr string) error { + method := "Delete" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -342,10 +370,12 @@ func (c *GrpcClient) Delete(ctx context.Context, collName string, partitionName resp, err := c.Service.Delete(ctx, req) if err != nil { + log.Printf("delete failed, collName:%s, partitionName:%s, expr:%s, traceID:%s err: %v", collName, partitionName, expr, traceID, err) return err } err = handleRespStatus(resp.GetStatus()) if err != nil { + log.Printf("delete failed, collName:%s, partitionName:%s, expr:%s, traceID:%s err: %v", collName, partitionName, expr, traceID, err) return err } c.cache.setSessionTs(collName, resp.Timestamp) @@ -357,6 +387,10 @@ func (c *GrpcClient) Delete(ctx context.Context, collName string, partitionName // partitionName is the partition to upsert, if not specified(empty), default partition will be used // columns are slice of the column-based data func (c *GrpcClient) Upsert(ctx context.Context, collName string, partitionName string, columns ...entity.Column) (entity.Column, error) { + method := "Upsert" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -368,6 +402,7 @@ func (c *GrpcClient) Upsert(ctx context.Context, collName string, partitionName fieldsData, rowSize, err := c.processInsertColumns(schema, columns...) if err != nil { + log.Printf("upsert failed, collName:%s, partitionName:%s, traceID:%s err: %v", collName, partitionName, traceID, err) return nil, err } @@ -383,9 +418,11 @@ func (c *GrpcClient) Upsert(ctx context.Context, collName string, partitionName resp, err := c.Service.Upsert(ctx, req) if err != nil { + log.Printf("upsert failed, collName:%s, partitionName:%s, traceID:%s err: %v", collName, partitionName, traceID, err) return nil, err } if err := handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("upsert failed, collName:%s, partitionName:%s, traceID:%s err: %v", collName, partitionName, traceID, err) return nil, err } c.cache.setSessionTs(collName, resp.Timestamp) @@ -395,6 +432,10 @@ func (c *GrpcClient) Upsert(ctx context.Context, collName string, partitionName // BulkInsert data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments func (c *GrpcClient) BulkInsert(ctx context.Context, collName string, partitionName string, files []string, opts ...BulkInsertOption) (int64, error) { + method := "BulkInsert" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return 0, ErrClientNotReady } @@ -410,6 +451,7 @@ func (c *GrpcClient) BulkInsert(ctx context.Context, collName string, partitionN resp, err := c.Service.Import(ctx, req) if err != nil { + log.Printf("bulk insert failed, collName:%s, partitionName:%s, traceID:%s err: %v", collName, partitionName, traceID, err) return 0, err } if err := handleRespStatus(resp.GetStatus()); err != nil { @@ -421,6 +463,10 @@ func (c *GrpcClient) BulkInsert(ctx context.Context, collName string, partitionN // GetBulkInsertState checks import task state func (c *GrpcClient) GetBulkInsertState(ctx context.Context, taskID int64) (*entity.BulkInsertTaskState, error) { + method := "GetBulkInsertState" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -429,9 +475,11 @@ func (c *GrpcClient) GetBulkInsertState(ctx context.Context, taskID int64) (*ent } resp, err := c.Service.GetImportState(ctx, req) if err != nil { + log.Printf("get bulk insert state failed, taskID:%d, traceID:%s err: %v", taskID, traceID, err) return nil, err } if err := handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("get bulk insert state failed, taskID:%d, traceID:%s err: %v", taskID, traceID, err) return nil, err } @@ -449,6 +497,10 @@ func (c *GrpcClient) GetBulkInsertState(ctx context.Context, taskID int64) (*ent // ListBulkInsertTasks list state of all import tasks func (c *GrpcClient) ListBulkInsertTasks(ctx context.Context, collName string, limit int64) ([]*entity.BulkInsertTaskState, error) { + method := "ListBulkInsertTasks" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -458,9 +510,11 @@ func (c *GrpcClient) ListBulkInsertTasks(ctx context.Context, collName string, l } resp, err := c.Service.ListImportTasks(ctx, req) if err != nil { + log.Printf("list bulk insert tasks failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return nil, err } if err := handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("list bulk insert tasks failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return nil, err } diff --git a/client/iterator.go b/client/iterator.go index 5cb92ff3..d2cf9297 100644 --- a/client/iterator.go +++ b/client/iterator.go @@ -4,16 +4,24 @@ import ( "context" "fmt" "io" + "log" "strings" "github.com/cockroachdb/errors" + "go.opentelemetry.io/otel" + "github.com/milvus-io/milvus-sdk-go/v2/entity" ) func (c *GrpcClient) SearchIterator(ctx context.Context, opt *SearchIteratorOption) (*SearchIterator, error) { + method := "SearchIterator" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() collectionName := opt.collectionName collInfo, err := c.getCollectionInfo(ctx, collectionName) if err != nil { + log.Printf("search iterator failed, traceID:%s err: %v", traceID, err) return nil, err } sch := collInfo.Schema @@ -195,9 +203,14 @@ func (itr *SearchIterator) Next(ctx context.Context) (*SearchResult, error) { } func (c *GrpcClient) QueryIterator(ctx context.Context, opt *QueryIteratorOption) (*QueryIterator, error) { + method := "QueryIterator" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() collectionName := opt.collectionName collInfo, err := c.getCollectionInfo(ctx, collectionName) if err != nil { + log.Printf("query iterator failed, traceID:%s err: %v", traceID, err) return nil, err } sch := collInfo.Schema diff --git a/client/maintainance.go b/client/maintainance.go index 1f9ea0ef..331f3020 100644 --- a/client/maintainance.go +++ b/client/maintainance.go @@ -18,20 +18,28 @@ package client import ( "context" + "log" "time" + "go.opentelemetry.io/otel" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-sdk-go/v2/entity" ) // ManualCompaction triggers a compaction on provided collection func (c *GrpcClient) ManualCompaction(ctx context.Context, collName string, _ time.Duration) (int64, error) { + method := "ManualCompaction" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return 0, ErrClientNotReady } coll, err := c.DescribeCollection(ctx, collName) if err != nil { + log.Printf("manual compaction failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return 0, err } @@ -41,11 +49,13 @@ func (c *GrpcClient) ManualCompaction(ctx context.Context, collName string, _ ti resp, err := c.Service.ManualCompaction(ctx, req) if err != nil { + log.Printf("manual compaction failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return 0, err } err = handleRespStatus(resp.GetStatus()) if err != nil { + log.Printf("manual compaction failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return 0, err } @@ -54,6 +64,10 @@ func (c *GrpcClient) ManualCompaction(ctx context.Context, collName string, _ ti // GetCompactionState get compaction state of provided compaction id func (c *GrpcClient) GetCompactionState(ctx context.Context, id int64) (entity.CompactionState, error) { + method := "GetCompactionState" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return entity.CompcationStateUndefined, ErrClientNotReady } @@ -61,11 +75,13 @@ func (c *GrpcClient) GetCompactionState(ctx context.Context, id int64) (entity.C req := &milvuspb.GetCompactionStateRequest{CompactionID: id} resp, err := c.Service.GetCompactionState(ctx, req) if err != nil { + log.Printf("get compaction state failed, id:%d, traceID:%s err: %v", id, traceID, err) return entity.CompcationStateUndefined, err } err = handleRespStatus(resp.GetStatus()) if err != nil { + log.Printf("get compaction state failed, id:%d, traceID:%s err: %v", id, traceID, err) return entity.CompcationStateUndefined, err } @@ -75,6 +91,10 @@ func (c *GrpcClient) GetCompactionState(ctx context.Context, id int64) (entity.C // GetCompactionStateWithPlans get compaction state with plans of provided compaction id func (c *GrpcClient) GetCompactionStateWithPlans(ctx context.Context, id int64) (entity.CompactionState, []entity.CompactionPlan, error) { + method := "GetCompactionStateWithPlans" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return entity.CompcationStateUndefined, nil, ErrClientNotReady } @@ -84,11 +104,13 @@ func (c *GrpcClient) GetCompactionStateWithPlans(ctx context.Context, id int64) } resp, err := c.Service.GetCompactionStateWithPlans(ctx, req) if err != nil { + log.Printf("get compaction state with plans failed, id:%d, traceID:%s err: %v", id, traceID, err) return entity.CompcationStateUndefined, nil, err } err = handleRespStatus(resp.GetStatus()) if err != nil { + log.Printf("get compaction state with plans failed, id:%d, traceID:%s err: %v", id, traceID, err) return entity.CompcationStateUndefined, nil, err } diff --git a/client/mq_message.go b/client/mq_message.go index da645a19..b6b3a00d 100644 --- a/client/mq_message.go +++ b/client/mq_message.go @@ -2,6 +2,9 @@ package client import ( "context" + "log" + + "go.opentelemetry.io/otel" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" @@ -12,6 +15,10 @@ func (c *GrpcClient) ReplicateMessage(ctx context.Context, channelName string, beginTs, endTs uint64, msgsBytes [][]byte, startPositions, endPositions []*msgpb.MsgPosition, opts ...ReplicateMessageOption) (*entity.MessageInfo, error) { + method := "ReplicateMessage" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady @@ -29,10 +36,12 @@ func (c *GrpcClient) ReplicateMessage(ctx context.Context, } resp, err := c.Service.ReplicateMessage(ctx, req) if err != nil { + log.Printf("replicate message failed, channelName:%s, traceID:%s err: %v", channelName, traceID, err) return nil, err } err = handleRespStatus(resp.GetStatus()) if err != nil { + log.Printf("replicate message failed, channelName:%s, traceID:%s err: %v", channelName, traceID, err) return nil, err } return &entity.MessageInfo{ diff --git a/client/partition.go b/client/partition.go index 30126ecd..d694cb94 100644 --- a/client/partition.go +++ b/client/partition.go @@ -13,9 +13,11 @@ package client import ( "context" + "log" "time" "github.com/cockroachdb/errors" + "go.opentelemetry.io/otel" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -24,6 +26,11 @@ import ( // CreatePartition create partition for collection func (c *GrpcClient) CreatePartition(ctx context.Context, collName string, partitionName string, opts ...CreatePartitionOption) error { + method := "CreatePartition" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() + if c.Service == nil { return ErrClientNotReady } @@ -38,6 +45,7 @@ func (c *GrpcClient) CreatePartition(ctx context.Context, collName string, parti } resp, err := c.Service.CreatePartition(ctx, req) if err != nil { + log.Printf("create partition failed, collName:%s, partitionName:%s, traceID:%s err: %v", collName, partitionName, traceID, err) return err } return handleRespStatus(resp) @@ -45,6 +53,10 @@ func (c *GrpcClient) CreatePartition(ctx context.Context, collName string, parti // DropPartition drop partition from collection func (c *GrpcClient) DropPartition(ctx context.Context, collName string, partitionName string, opts ...DropPartitionOption) error { + method := "DropPartition" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -59,6 +71,7 @@ func (c *GrpcClient) DropPartition(ctx context.Context, collName string, partiti } resp, err := c.Service.DropPartition(ctx, req) if err != nil { + log.Printf("drop partition failed, collName:%s, partitionName:%s, traceID:%s err: %v", collName, partitionName, traceID, err) return err } return handleRespStatus(resp) @@ -66,6 +79,10 @@ func (c *GrpcClient) DropPartition(ctx context.Context, collName string, partiti // HasPartition check whether specified partition exists func (c *GrpcClient) HasPartition(ctx context.Context, collName string, partitionName string) (bool, error) { + method := "HasPartition" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return false, ErrClientNotReady } @@ -76,6 +93,7 @@ func (c *GrpcClient) HasPartition(ctx context.Context, collName string, partitio } resp, err := c.Service.HasPartition(ctx, req) if err != nil { + log.Printf("has partition failed, collName:%s, partitionName:%s, traceID:%s err: %v", collName, partitionName, traceID, err) return false, err } if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { @@ -86,6 +104,10 @@ func (c *GrpcClient) HasPartition(ctx context.Context, collName string, partitio // ShowPartitions list all partitions from collection func (c *GrpcClient) ShowPartitions(ctx context.Context, collName string) ([]*entity.Partition, error) { + method := "ShowPartitions" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return []*entity.Partition{}, ErrClientNotReady } @@ -95,9 +117,11 @@ func (c *GrpcClient) ShowPartitions(ctx context.Context, collName string) ([]*en } resp, err := c.Service.ShowPartitions(ctx, req) if err != nil { + log.Printf("show partitions failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return []*entity.Partition{}, err } if err := handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("show partitions failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return []*entity.Partition{}, err } partitions := make([]*entity.Partition, 0, len(resp.GetPartitionIDs())) @@ -117,6 +141,10 @@ func (c *GrpcClient) ShowPartitions(ctx context.Context, collName string) ([]*en // LoadPartitions load collection paritions into memory func (c *GrpcClient) LoadPartitions(ctx context.Context, collName string, partitionNames []string, async bool, opts ...LoadPartitionsOption) error { + method := "LoadPartitions" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -131,9 +159,11 @@ func (c *GrpcClient) LoadPartitions(ctx context.Context, collName string, partit } resp, err := c.Service.LoadPartitions(ctx, req) if err != nil { + log.Printf("load partitions failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return err } if err := handleRespStatus(resp); err != nil { + log.Printf("load partitions failed, collName:%s, traceID:%s err: %v", collName, traceID, err) return err } @@ -148,6 +178,7 @@ func (c *GrpcClient) LoadPartitions(ctx context.Context, collName string, partit case <-ticker.C: progress, err := c.getLoadingProgress(ctx, collName, partitionNames...) if err != nil { + log.Printf("get loading progress failed, traceID:%s err: %v", traceID, err) return err } if progress == 100 { @@ -162,6 +193,10 @@ func (c *GrpcClient) LoadPartitions(ctx context.Context, collName string, partit // ReleasePartitions release partitions func (c *GrpcClient) ReleasePartitions(ctx context.Context, collName string, partitionNames []string, opts ...ReleasePartitionsOption) error { + method := "ReleasePartitions" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -175,6 +210,7 @@ func (c *GrpcClient) ReleasePartitions(ctx context.Context, collName string, par } resp, err := c.Service.ReleasePartitions(ctx, req) if err != nil { + log.Printf("release partitions failed, collName:%s, partitionNames:%v, traceID:%s err: %v", collName, partitionNames, traceID, err) return err } diff --git a/client/rbac.go b/client/rbac.go index 700254fc..a72cfe07 100644 --- a/client/rbac.go +++ b/client/rbac.go @@ -19,6 +19,9 @@ package client import ( "context" "errors" + "log" + + "go.opentelemetry.io/otel" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -27,6 +30,10 @@ import ( // CreateRole creates a role entity in Milvus. func (c *GrpcClient) CreateRole(ctx context.Context, name string) error { + method := "CreateRole" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -38,6 +45,7 @@ func (c *GrpcClient) CreateRole(ctx context.Context, name string) error { } resp, err := c.Service.CreateRole(ctx, req) if err != nil { + log.Printf("create role failed, traceID:%s err: %v", traceID, err) return err } @@ -46,6 +54,10 @@ func (c *GrpcClient) CreateRole(ctx context.Context, name string) error { // DropRole drops a role entity in Milvus. func (c *GrpcClient) DropRole(ctx context.Context, name string) error { + method := "DropRole" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -56,6 +68,7 @@ func (c *GrpcClient) DropRole(ctx context.Context, name string) error { resp, err := c.Service.DropRole(ctx, req) if err != nil { + log.Printf("drop role failed, traceID:%s err: %v", traceID, err) return err } @@ -64,6 +77,10 @@ func (c *GrpcClient) DropRole(ctx context.Context, name string) error { // AddUserRole adds one role for user. func (c *GrpcClient) AddUserRole(ctx context.Context, username string, role string) error { + method := "AddUserRole" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -76,6 +93,7 @@ func (c *GrpcClient) AddUserRole(ctx context.Context, username string, role stri resp, err := c.Service.OperateUserRole(ctx, req) if err != nil { + log.Printf("add user role failed, traceID:%s err: %v", traceID, err) return err } @@ -84,6 +102,10 @@ func (c *GrpcClient) AddUserRole(ctx context.Context, username string, role stri // RemoveUserRole removes one role from user. func (c *GrpcClient) RemoveUserRole(ctx context.Context, username string, role string) error { + method := "RemoveUserRole" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -96,6 +118,7 @@ func (c *GrpcClient) RemoveUserRole(ctx context.Context, username string, role s resp, err := c.Service.OperateUserRole(ctx, req) if err != nil { + log.Printf("remove user role failed, traceID:%s err: %v", traceID, err) return err } @@ -104,6 +127,10 @@ func (c *GrpcClient) RemoveUserRole(ctx context.Context, username string, role s // ListRoles lists the role objects in system. func (c *GrpcClient) ListRoles(ctx context.Context) ([]entity.Role, error) { + method := "ListRoles" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -112,9 +139,11 @@ func (c *GrpcClient) ListRoles(ctx context.Context) ([]entity.Role, error) { resp, err := c.Service.SelectRole(ctx, req) if err != nil { + log.Printf("list roles failed, traceID:%s err: %v", traceID, err) return nil, err } if err = handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("list roles failed, traceID:%s err: %v", traceID, err) return nil, err } @@ -128,6 +157,10 @@ func (c *GrpcClient) ListRoles(ctx context.Context) ([]entity.Role, error) { // ListUsers lists the user objects in system. func (c *GrpcClient) ListUsers(ctx context.Context) ([]entity.User, error) { + method := "ListUsers" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -136,9 +169,11 @@ func (c *GrpcClient) ListUsers(ctx context.Context) ([]entity.User, error) { resp, err := c.Service.SelectUser(ctx, req) if err != nil { + log.Printf("list users failed, traceID:%s err: %v", traceID, err) return nil, err } if err = handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("list users failed, traceID:%s err: %v", traceID, err) return nil, err } @@ -152,6 +187,10 @@ func (c *GrpcClient) ListUsers(ctx context.Context) ([]entity.User, error) { // DescribeUser lists the user descriptions in the system (name, roles) func (c *GrpcClient) DescribeUser(ctx context.Context, username string) (entity.UserDescription, error) { + method := "DescribeUser" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return entity.UserDescription{}, ErrClientNotReady } @@ -165,9 +204,11 @@ func (c *GrpcClient) DescribeUser(ctx context.Context, username string) (entity. resp, err := c.Service.SelectUser(ctx, req) if err != nil { + log.Printf("describe user failed, traceID:%s err: %v", traceID, err) return entity.UserDescription{}, err } if err = handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("describe user failed, traceID:%s err: %v", traceID, err) return entity.UserDescription{}, err } results := resp.GetResults() @@ -189,6 +230,10 @@ func (c *GrpcClient) DescribeUser(ctx context.Context, username string) (entity. // DescribeUsers lists all users with descriptions (names, roles) func (c *GrpcClient) DescribeUsers(ctx context.Context) ([]entity.UserDescription, error) { + method := "DescribeUsers" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -199,9 +244,11 @@ func (c *GrpcClient) DescribeUsers(ctx context.Context) ([]entity.UserDescriptio resp, err := c.Service.SelectUser(ctx, req) if err != nil { + log.Printf("describe users failed, traceID:%s err: %v", traceID, err) return nil, err } if err = handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("describe users failed, traceID:%s err: %v", traceID, err) return nil, err } results := resp.GetResults() @@ -224,6 +271,10 @@ func (c *GrpcClient) DescribeUsers(ctx context.Context) ([]entity.UserDescriptio // ListGrants lists the role grants in the system func (c *GrpcClient) ListGrants(ctx context.Context, role string, dbName string) ([]entity.RoleGrants, error) { + method := "ListGrants" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() RoleGrantsList := make([]entity.RoleGrants, 0) if c.Service == nil { return RoleGrantsList, ErrClientNotReady @@ -240,9 +291,11 @@ func (c *GrpcClient) ListGrants(ctx context.Context, role string, dbName string) resp, err := c.Service.SelectGrant(ctx, req) if err != nil { + log.Printf("list grants failed, traceID:%s err: %v", traceID, err) return RoleGrantsList, err } if err = handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("list grants failed, traceID:%s err: %v", traceID, err) return RoleGrantsList, err } @@ -269,6 +322,10 @@ func (c *GrpcClient) ListGrants(ctx context.Context, role string, dbName string) // ListGrant lists a grant info for the role and the specific object func (c *GrpcClient) ListGrant(ctx context.Context, role string, object string, objectName string, dbName string) ([]entity.RoleGrants, error) { + method := "ListGrant" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() RoleGrantsList := make([]entity.RoleGrants, 0) if c.Service == nil { return RoleGrantsList, ErrClientNotReady @@ -289,9 +346,11 @@ func (c *GrpcClient) ListGrant(ctx context.Context, role string, object string, resp, err := c.Service.SelectGrant(ctx, req) if err != nil { + log.Printf("list grant failed, traceID:%s err: %v", traceID, err) return RoleGrantsList, err } if err = handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("list grant failed, traceID:%s err: %v", traceID, err) return RoleGrantsList, err } @@ -318,6 +377,10 @@ func (c *GrpcClient) ListGrant(ctx context.Context, role string, object string, // Grant adds object privileged for role. func (c *GrpcClient) Grant(ctx context.Context, role string, objectType entity.PriviledgeObjectType, object string, privilege string, options ...entity.OperatePrivilegeOption) error { + method := "Grant" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -349,6 +412,7 @@ func (c *GrpcClient) Grant(ctx context.Context, role string, objectType entity.P resp, err := c.Service.OperatePrivilege(ctx, req) if err != nil { + log.Printf("grant failed, traceID:%s err: %v", traceID, err) return err } @@ -357,6 +421,10 @@ func (c *GrpcClient) Grant(ctx context.Context, role string, objectType entity.P // Revoke removes privilege from role. func (c *GrpcClient) Revoke(ctx context.Context, role string, objectType entity.PriviledgeObjectType, object string, privilege string, options ...entity.OperatePrivilegeOption) error { + method := "Revoke" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -387,6 +455,7 @@ func (c *GrpcClient) Revoke(ctx context.Context, role string, objectType entity. resp, err := c.Service.OperatePrivilege(ctx, req) if err != nil { + log.Printf("revoke failed, role:%s, traceID:%s err: %v", role, traceID, err) return err } @@ -395,6 +464,10 @@ func (c *GrpcClient) Revoke(ctx context.Context, role string, objectType entity. // GrantV2 adds object privilege for role without object type func (c *GrpcClient) GrantV2(ctx context.Context, role string, colName string, privilege string, options ...entity.OperatePrivilegeOption) error { + method := "GrantV2" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -419,6 +492,7 @@ func (c *GrpcClient) GrantV2(ctx context.Context, role string, colName string, p resp, err := c.Service.OperatePrivilegeV2(ctx, req) if err != nil { + log.Printf("grant failed, traceID:%s err: %v", traceID, err) return err } @@ -427,6 +501,10 @@ func (c *GrpcClient) GrantV2(ctx context.Context, role string, colName string, p // Revoke removes privilege from role without object type func (c *GrpcClient) RevokeV2(ctx context.Context, role string, colName string, privilege string, options ...entity.OperatePrivilegeOption) error { + method := "RevokeV2" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -451,6 +529,7 @@ func (c *GrpcClient) RevokeV2(ctx context.Context, role string, colName string, resp, err := c.Service.OperatePrivilegeV2(ctx, req) if err != nil { + log.Printf("revoke failed, traceID:%s err: %v", traceID, err) return err } @@ -458,6 +537,10 @@ func (c *GrpcClient) RevokeV2(ctx context.Context, role string, colName string, } func (c *GrpcClient) BackupRBAC(ctx context.Context) (*entity.RBACMeta, error) { + method := "BackupRBAC" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -466,9 +549,11 @@ func (c *GrpcClient) BackupRBAC(ctx context.Context) (*entity.RBACMeta, error) { resp, err := c.Service.BackupRBAC(ctx, req) if err != nil { + log.Printf("backup rbac failed, traceID:%s err: %v", traceID, err) return nil, err } if err = handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("backup rbac failed, traceID:%s err: %v", traceID, err) return nil, err } @@ -518,6 +603,10 @@ func (c *GrpcClient) BackupRBAC(ctx context.Context) (*entity.RBACMeta, error) { } func (c *GrpcClient) RestoreRBAC(ctx context.Context, meta *entity.RBACMeta) error { + method := "RestoreRBAC" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -580,6 +669,7 @@ func (c *GrpcClient) RestoreRBAC(ctx context.Context, meta *entity.RBACMeta) err resp, err := c.Service.RestoreRBAC(ctx, req) if err != nil { + log.Printf("restore rbac failed, traceID:%s err: %v", traceID, err) return err } @@ -587,6 +677,10 @@ func (c *GrpcClient) RestoreRBAC(ctx context.Context, meta *entity.RBACMeta) err } func (c *GrpcClient) CreatePrivilegeGroup(ctx context.Context, groupName string) error { + method := "CreatePrivilegeGroup" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -597,6 +691,7 @@ func (c *GrpcClient) CreatePrivilegeGroup(ctx context.Context, groupName string) resp, err := c.Service.CreatePrivilegeGroup(ctx, req) if err != nil { + log.Printf("create privilege group failed, traceID:%s err: %v", traceID, err) return err } @@ -604,6 +699,10 @@ func (c *GrpcClient) CreatePrivilegeGroup(ctx context.Context, groupName string) } func (c *GrpcClient) DropPrivilegeGroup(ctx context.Context, groupName string) error { + method := "DropPrivilegeGroup" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -614,6 +713,7 @@ func (c *GrpcClient) DropPrivilegeGroup(ctx context.Context, groupName string) e resp, err := c.Service.DropPrivilegeGroup(ctx, req) if err != nil { + log.Printf("drop privilege group failed, traceID:%s err: %v", traceID, err) return err } @@ -621,6 +721,10 @@ func (c *GrpcClient) DropPrivilegeGroup(ctx context.Context, groupName string) e } func (c *GrpcClient) ListPrivilegeGroups(ctx context.Context) ([]*entity.PrivilegeGroup, error) { + method := "ListPrivilegeGroups" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() PrivilegeGroupList := make([]*entity.PrivilegeGroup, 0) if c.Service == nil { return PrivilegeGroupList, ErrClientNotReady @@ -630,10 +734,12 @@ func (c *GrpcClient) ListPrivilegeGroups(ctx context.Context) ([]*entity.Privile resp, err := c.Service.ListPrivilegeGroups(ctx, req) if err != nil { + log.Printf("list privilege groups failed, traceID:%s err: %v", traceID, err) return PrivilegeGroupList, err } if err = handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("list privilege groups failed, traceID:%s err: %v", traceID, err) return PrivilegeGroupList, err } @@ -659,6 +765,10 @@ func (c *GrpcClient) ListPrivilegeGroups(ctx context.Context) ([]*entity.Privile } func (c *GrpcClient) AddPrivilegesToGroup(ctx context.Context, groupName string, privileges []string) error { + method := "AddPrivilegesToGroup" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -678,6 +788,7 @@ func (c *GrpcClient) AddPrivilegesToGroup(ctx context.Context, groupName string, resp, err := c.Service.OperatePrivilegeGroup(ctx, req) if err != nil { + log.Printf("add privilege to group failed, traceID:%s err: %v", traceID, err) return err } @@ -685,6 +796,10 @@ func (c *GrpcClient) AddPrivilegesToGroup(ctx context.Context, groupName string, } func (c *GrpcClient) RemovePrivilegesFromGroup(ctx context.Context, groupName string, privileges []string) error { + method := "RemovePrivilegesFromGroup" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -704,6 +819,7 @@ func (c *GrpcClient) RemovePrivilegesFromGroup(ctx context.Context, groupName st resp, err := c.Service.OperatePrivilegeGroup(ctx, req) if err != nil { + log.Printf("remove privilege from group failed, traceID:%s err: %v", traceID, err) return err } diff --git a/client/resource_group.go b/client/resource_group.go index 6cfd6552..0967d257 100644 --- a/client/resource_group.go +++ b/client/resource_group.go @@ -13,6 +13,9 @@ package client import ( "context" + "log" + + "go.opentelemetry.io/otel" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-sdk-go/v2/entity" @@ -20,6 +23,10 @@ import ( // ListResourceGroups returns list of resource group names in current Milvus instance. func (c *GrpcClient) ListResourceGroups(ctx context.Context) ([]string, error) { + method := "ListResourceGroups" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -28,9 +35,11 @@ func (c *GrpcClient) ListResourceGroups(ctx context.Context) ([]string, error) { resp, err := c.Service.ListResourceGroups(ctx, req) if err != nil { + log.Printf("list resource groups failed, traceID:%s, error: %v", traceID, err) return nil, err } if err = handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("list resource groups failed, traceID:%s, error: %v", traceID, err) return nil, err } @@ -39,6 +48,10 @@ func (c *GrpcClient) ListResourceGroups(ctx context.Context) ([]string, error) { // CreateResourceGroup creates a resource group with provided name. func (c *GrpcClient) CreateResourceGroup(ctx context.Context, rgName string, opts ...CreateResourceGroupOption) error { + method := "CreateResourceGroup" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -52,6 +65,7 @@ func (c *GrpcClient) CreateResourceGroup(ctx context.Context, rgName string, opt resp, err := c.Service.CreateResourceGroup(ctx, req) if err != nil { + log.Printf("create resource group failed, traceID:%s, error: %v", traceID, err) return err } return handleRespStatus(resp) @@ -59,6 +73,10 @@ func (c *GrpcClient) CreateResourceGroup(ctx context.Context, rgName string, opt // UpdateResourceGroups updates resource groups with provided options. func (c *GrpcClient) UpdateResourceGroups(ctx context.Context, opts ...UpdateResourceGroupsOption) error { + method := "UpdateResourceGroups" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -70,6 +88,7 @@ func (c *GrpcClient) UpdateResourceGroups(ctx context.Context, opts ...UpdateRes resp, err := c.Service.UpdateResourceGroups(ctx, req) if err != nil { + log.Printf("update resource groups failed, traceID:%s, error: %v", traceID, err) return err } return handleRespStatus(resp) @@ -77,6 +96,10 @@ func (c *GrpcClient) UpdateResourceGroups(ctx context.Context, opts ...UpdateRes // DescribeResourceGroup returns resource groups information. func (c *GrpcClient) DescribeResourceGroup(ctx context.Context, rgName string) (*entity.ResourceGroup, error) { + method := "DescribeResourceGroup" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -87,9 +110,11 @@ func (c *GrpcClient) DescribeResourceGroup(ctx context.Context, rgName string) ( resp, err := c.Service.DescribeResourceGroup(ctx, req) if err != nil { + log.Printf("describe resource group failed, traceID:%s, error: %v", traceID, err) return nil, err } if err = handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("describe resource group failed, traceID:%s, error: %v", traceID, err) return nil, err } @@ -110,6 +135,10 @@ func (c *GrpcClient) DescribeResourceGroup(ctx context.Context, rgName string) ( // DropResourceGroup drops the resource group with provided name. func (c *GrpcClient) DropResourceGroup(ctx context.Context, rgName string) error { + method := "DropResourceGroup" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -120,6 +149,7 @@ func (c *GrpcClient) DropResourceGroup(ctx context.Context, rgName string) error resp, err := c.Service.DropResourceGroup(ctx, req) if err != nil { + log.Printf("drop resource group failed, traceID:%s, error: %v", traceID, err) return err } return handleRespStatus(resp) @@ -127,6 +157,10 @@ func (c *GrpcClient) DropResourceGroup(ctx context.Context, rgName string) error // TransferNode transfers querynodes between resource groups. func (c *GrpcClient) TransferNode(ctx context.Context, sourceRg, targetRg string, nodesNum int32) error { + method := "TransferNode" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -139,6 +173,7 @@ func (c *GrpcClient) TransferNode(ctx context.Context, sourceRg, targetRg string resp, err := c.Service.TransferNode(ctx, req) if err != nil { + log.Printf("transfer node failed, traceID:%s, error: %v", traceID, err) return err } return handleRespStatus(resp) @@ -146,6 +181,10 @@ func (c *GrpcClient) TransferNode(ctx context.Context, sourceRg, targetRg string // TransferReplica transfer collection replicas between source,target resource group. func (c *GrpcClient) TransferReplica(ctx context.Context, sourceRg, targetRg string, collectionName string, replicaNum int64) error { + method := "TransferReplica" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -159,6 +198,7 @@ func (c *GrpcClient) TransferReplica(ctx context.Context, sourceRg, targetRg str resp, err := c.Service.TransferReplica(ctx, req) if err != nil { + log.Printf("transfer replica failed, traceID:%s, error: %v", traceID, err) return err } return handleRespStatus(resp) diff --git a/client/row.go b/client/row.go index 12014f4b..46fc67c9 100644 --- a/client/row.go +++ b/client/row.go @@ -3,11 +3,13 @@ package client import ( "context" "fmt" + "log" "reflect" "github.com/cockroachdb/errors" - "github.com/golang/protobuf/proto" + "go.opentelemetry.io/otel" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-sdk-go/v2/entity" @@ -15,6 +17,10 @@ import ( // CreateCollectionByRow create collection by row func (c *GrpcClient) CreateCollectionByRow(ctx context.Context, row entity.Row, shardNum int32) error { + method := "CreateCollectionByRow" + ctx, span := otel.Tracer("client").Start(ctx, method) + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return ErrClientNotReady } @@ -31,7 +37,7 @@ func (c *GrpcClient) CreateCollectionByRow(ctx context.Context, row entity.Row, } // already exists collection with same name, return error if has { - return fmt.Errorf("collection %s already exist", sch.CollectionName) + return fmt.Errorf("collection %s already exist, traceID:%s", sch.CollectionName, traceID) } // marshal schema to bytes for message transfer p := sch.ProtoMessage() @@ -49,6 +55,7 @@ func (c *GrpcClient) CreateCollectionByRow(ctx context.Context, row entity.Row, resp, err := c.Service.CreateCollection(ctx, req) // handles response if err != nil { + log.Printf("create collection failed, collName:%s, traceID:%s, err: %v", sch.CollectionName, traceID, err) return err } err = handleRespStatus(resp) @@ -62,6 +69,8 @@ func (c *GrpcClient) CreateCollectionByRow(ctx context.Context, row entity.Row, func (c *GrpcClient) InsertByRows(ctx context.Context, collName string, partitionName string, rows []entity.Row, ) (entity.Column, error) { + ctx, span := otel.Tracer("client").Start(ctx, "InsertByRows") + defer span.End() anys := make([]interface{}, 0, len(rows)) for _, row := range rows { anys = append(anys, row) @@ -75,6 +84,9 @@ func (c *GrpcClient) InsertByRows(ctx context.Context, collName string, partitio func (c *GrpcClient) InsertRows(ctx context.Context, collName string, partitionName string, rows []interface{}, ) (entity.Column, error) { + ctx, span := otel.Tracer("client").Start(ctx, "InsertRows") + defer span.End() + traceID := span.SpanContext().TraceID().String() if c.Service == nil { return nil, ErrClientNotReady } @@ -84,11 +96,13 @@ func (c *GrpcClient) InsertRows(ctx context.Context, collName string, partitionN coll, err := c.DescribeCollection(ctx, collName) if err != nil { + log.Printf("collection %s not exist, traceID:%s, err: %v", collName, traceID, err) return nil, err } // 1. convert rows to columns columns, err := entity.AnyToColumns(rows, coll.Schema) if err != nil { + log.Printf("convert row to columns failed, traceID:%s, err: %v", traceID, err) return nil, err } // fieldData @@ -105,9 +119,11 @@ func (c *GrpcClient) InsertRows(ctx context.Context, collName string, partitionN } resp, err := c.Service.Insert(ctx, req) if err != nil { + log.Printf("insert failed, traceID:%s, err: %v", traceID, err) return nil, err } if err := handleRespStatus(resp.GetStatus()); err != nil { + log.Printf("handle response status failed, traceID:%s, err: %v", traceID, err) return nil, err } c.cache.setSessionTs(collName, resp.Timestamp) diff --git a/go.mod b/go.mod index 1fd838ab..797b79b1 100644 --- a/go.mod +++ b/go.mod @@ -11,8 +11,12 @@ require ( github.com/stretchr/testify v1.8.1 github.com/tidwall/gjson v1.14.4 github.com/x448/float16 v0.8.4 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0 + go.opentelemetry.io/otel v1.10.0 + go.opentelemetry.io/otel/sdk v1.10.0 + go.opentelemetry.io/otel/trace v1.10.0 golang.org/x/sync v0.8.0 - google.golang.org/grpc v1.48.0 + google.golang.org/grpc v1.50.1 google.golang.org/grpc/examples v0.0.0-20220617181431-3e7b97febc7f ) @@ -21,6 +25,8 @@ require ( github.com/cockroachdb/redact v1.1.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/getsentry/sentry-go v0.12.0 // indirect + github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/kr/text v0.2.0 // indirect diff --git a/go.sum b/go.sum index 64969703..9b6b3366 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,5 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.34.0 h1:eOI3/cP2VTU6uZLDYAoic+eyzzB9YyGmJ7eIjl8rOPg= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -63,6 +64,11 @@ github.com/go-faker/faker/v4 v4.1.0 h1:ffuWmpDrducIUOO0QSKSF5Q2dxAht+dhsT9FvVHhP github.com/go-faker/faker/v4 v4.1.0/go.mod h1:uuNc0PSRxF8nMgjGrrrU4Nw5cF30Jc6Kd0/FUTTYbhg= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= @@ -99,8 +105,9 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -246,6 +253,14 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0 h1:xFSRQBbXF6VvYRf2lqMJXxoB72XI1K/azav8TekHHSw= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0/go.mod h1:h8TWwRAhQpOd0aM5nYsRD8+flnkj+526GEIVlarH7eY= +go.opentelemetry.io/otel v1.10.0 h1:Y7DTJMR6zs1xkS/upamJYk0SxxN4C9AqRd77jmZnyY4= +go.opentelemetry.io/otel v1.10.0/go.mod h1:NbvWjCthWHKBEUMpf0/v8ZRZlni86PpGFEMA9pnQSnQ= +go.opentelemetry.io/otel/sdk v1.10.0 h1:jZ6K7sVn04kk/3DNUdJ4mqRlGDiXAVuIG+MMENpTNdY= +go.opentelemetry.io/otel/sdk v1.10.0/go.mod h1:vO06iKzD5baltJz1zarxMCNHFpUlUiOy4s65ECtn6kE= +go.opentelemetry.io/otel/trace v1.10.0 h1:npQMbR8o7mum8uF95yFbOEJffhs1sbCOfDh8zAJiH5E= +go.opentelemetry.io/otel/trace v1.10.0/go.mod h1:Sij3YYczqAdz+EhmGhE6TpTxUO5/F/AzrK+kxfGqySM= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= @@ -295,6 +310,7 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -326,6 +342,7 @@ golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -372,9 +389,9 @@ golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= @@ -396,8 +413,9 @@ google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTp google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= -google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w= -google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= +google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= +google.golang.org/grpc v1.50.1 h1:DS/BukOZWp8s6p4Dt/tOaJaTQyPyOoCcrjroHuCeLzY= +google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/grpc/examples v0.0.0-20220617181431-3e7b97febc7f h1:rqzndB2lIQGivcXdTuY3Y9NBvr70X+y77woofSRluec= google.golang.org/grpc/examples v0.0.0-20220617181431-3e7b97febc7f/go.mod h1:gxndsbNG1n4TZcHGgsYEfVGnTxqfEdfiDv6/DADXX9o= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=