diff --git a/controller/api/destination/destination_fuzzer.go b/controller/api/destination/destination_fuzzer.go index 1369581ef22d7..1b6582708adf6 100644 --- a/controller/api/destination/destination_fuzzer.go +++ b/controller/api/destination/destination_fuzzer.go @@ -91,9 +91,10 @@ func FuzzProfileTranslatorUpdate(data []byte) int { return 0 } t := &testing.T{} - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil) + id := watcher.ServiceID{Namespace: "bar", Name: "foo"} + server := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} + translator := newProfileTranslator(id, server, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil) translator.Start() defer translator.Stop() translator.Update(profile) diff --git a/controller/api/destination/profile_translator.go b/controller/api/destination/profile_translator.go index 05471b51d5afb..f0b98f681e329 100644 --- a/controller/api/destination/profile_translator.go +++ b/controller/api/destination/profile_translator.go @@ -8,6 +8,8 @@ import ( "github.com/golang/protobuf/ptypes/duration" pb "github.com/linkerd/linkerd2-proxy-api/go/destination" + meta "github.com/linkerd/linkerd2-proxy-api/go/meta" + "github.com/linkerd/linkerd2/controller/api/destination/watcher" sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2" "github.com/linkerd/linkerd2/pkg/profiles" "github.com/linkerd/linkerd2/pkg/util" @@ -22,6 +24,7 @@ const millisPerDecimilli = 10 type profileTranslator struct { fullyQualifiedName string port uint32 + parentRef *meta.Metadata stream pb.Destination_GetProfileServer endStream chan struct{} @@ -43,10 +46,23 @@ var profileUpdatesQueueOverflowCounter = promauto.NewCounterVec( }, ) -func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}) *profileTranslator { +func newProfileTranslator(serviceID watcher.ServiceID, stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}) *profileTranslator { + parentRef := &meta.Metadata{ + Kind: &meta.Metadata_Resource{ + Resource: &meta.Resource{ + Group: "core", + Kind: "Service", + Name: serviceID.Name, + Namespace: serviceID.Namespace, + Port: port, + }, + }, + } + return &profileTranslator{ fullyQualifiedName: fqn, port: port, + parentRef: parentRef, stream: stream, endStream: endStream, @@ -154,6 +170,19 @@ func toDuration(d time.Duration) *duration.Duration { // createDestinationProfile returns a Proxy API DestinationProfile, given a // ServiceProfile. func (pt *profileTranslator) createDestinationProfile(profile *sp.ServiceProfile) (*pb.DestinationProfile, error) { + var profileRef *meta.Metadata + if profile != nil { + profileRef = &meta.Metadata{ + Kind: &meta.Metadata_Resource{ + Resource: &meta.Resource{ + Group: sp.SchemeGroupVersion.Group, + Kind: profile.Kind, + Name: profile.Name, + Namespace: profile.Namespace, + }, + }, + } + } routes := make([]*pb.Route, 0) for _, route := range profile.Spec.Routes { pbRoute, err := toRoute(profile, route) @@ -177,6 +206,8 @@ func (pt *profileTranslator) createDestinationProfile(profile *sp.ServiceProfile _, opaqueProtocol = profile.Spec.OpaquePorts[pt.port] } return &pb.DestinationProfile{ + ParentRef: pt.parentRef, + ProfileRef: profileRef, Routes: routes, RetryBudget: budget, DstOverrides: toDstOverrides(profile.Spec.DstOverrides, pt.port), diff --git a/controller/api/destination/profile_translator_test.go b/controller/api/destination/profile_translator_test.go index 17612dd5a6b91..f153aae69ea7d 100644 --- a/controller/api/destination/profile_translator_test.go +++ b/controller/api/destination/profile_translator_test.go @@ -6,9 +6,12 @@ import ( "github.com/golang/protobuf/ptypes/duration" pb "github.com/linkerd/linkerd2-proxy-api/go/destination" httpPb "github.com/linkerd/linkerd2-proxy-api/go/http_types" + "github.com/linkerd/linkerd2-proxy-api/go/meta" + "github.com/linkerd/linkerd2/controller/api/destination/watcher" sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2" logging "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var ( @@ -162,7 +165,17 @@ var ( Timeout: nil, } + spTypeMeta = metav1.TypeMeta{ + Kind: "ServiceProfile", + } + spObjectMeta = metav1.ObjectMeta{ + Name: "foo.bar.svc.cluster.local", + Namespace: "bar", + } + profile = &sp.ServiceProfile{ + TypeMeta: spTypeMeta, + ObjectMeta: spObjectMeta, Spec: sp.ServiceProfileSpec{ Routes: []*sp.RouteSpec{ route1, @@ -172,6 +185,28 @@ var ( } pbProfile = &pb.DestinationProfile{ + FullyQualifiedName: "foo.bar.svc.cluster.local", + ParentRef: &meta.Metadata{ + Kind: &meta.Metadata_Resource{ + Resource: &meta.Resource{ + Group: "core", + Kind: "Service", + Name: "foo", + Namespace: "bar", + Port: 80, + }, + }, + }, + ProfileRef: &meta.Metadata{ + Kind: &meta.Metadata_Resource{ + Resource: &meta.Resource{ + Group: "linkerd.io", + Kind: "ServiceProfile", + Name: "foo.bar.svc.cluster.local", + Namespace: "bar", + }, + }, + }, Routes: []*pb.Route{ pbRoute1, pbRoute2, @@ -185,6 +220,8 @@ var ( } multipleRequestMatches = &sp.ServiceProfile{ + TypeMeta: spTypeMeta, + ObjectMeta: spObjectMeta, Spec: sp.ServiceProfileSpec{ Routes: []*sp.RouteSpec{ { @@ -199,6 +236,9 @@ var ( } pbRequestMatchAll = &pb.DestinationProfile{ + FullyQualifiedName: pbProfile.FullyQualifiedName, + ParentRef: pbProfile.ParentRef, + ProfileRef: pbProfile.ProfileRef, Routes: []*pb.Route{ { Condition: &pb.RequestMatch{ @@ -236,6 +276,8 @@ var ( } notEnoughRequestMatches = &sp.ServiceProfile{ + TypeMeta: spTypeMeta, + ObjectMeta: spObjectMeta, Spec: sp.ServiceProfileSpec{ Routes: []*sp.RouteSpec{ { @@ -246,6 +288,8 @@ var ( } multipleResponseMatches = &sp.ServiceProfile{ + TypeMeta: spTypeMeta, + ObjectMeta: spObjectMeta, Spec: sp.ServiceProfileSpec{ Routes: []*sp.RouteSpec{ { @@ -274,6 +318,9 @@ var ( } pbResponseMatchAll = &pb.DestinationProfile{ + FullyQualifiedName: pbProfile.FullyQualifiedName, + ParentRef: pbProfile.ParentRef, + ProfileRef: pbProfile.ProfileRef, Routes: []*pb.Route{ { Condition: &pb.RequestMatch{ @@ -326,6 +373,8 @@ var ( } oneSidedStatusRange = &sp.ServiceProfile{ + TypeMeta: spTypeMeta, + ObjectMeta: spObjectMeta, Spec: sp.ServiceProfileSpec{ Routes: []*sp.RouteSpec{ { @@ -347,6 +396,8 @@ var ( } invalidStatusRange = &sp.ServiceProfile{ + TypeMeta: spTypeMeta, + ObjectMeta: spObjectMeta, Spec: sp.ServiceProfileSpec{ Routes: []*sp.RouteSpec{ { @@ -369,6 +420,8 @@ var ( } notEnoughResponseMatches = &sp.ServiceProfile{ + TypeMeta: spTypeMeta, + ObjectMeta: spObjectMeta, Spec: sp.ServiceProfileSpec{ Routes: []*sp.RouteSpec{ { @@ -393,6 +446,8 @@ var ( } profileWithTimeout = &sp.ServiceProfile{ + TypeMeta: spTypeMeta, + ObjectMeta: spObjectMeta, Spec: sp.ServiceProfileSpec{ Routes: []*sp.RouteSpec{ routeWithTimeout, @@ -412,6 +467,9 @@ var ( } pbProfileWithTimeout = &pb.DestinationProfile{ + FullyQualifiedName: pbProfile.FullyQualifiedName, + ParentRef: pbProfile.ParentRef, + ProfileRef: pbProfile.ProfileRef, Routes: []*pb.Route{ pbRouteWithTimeout, }, @@ -419,161 +477,153 @@ var ( } ) +func newMockTranslator(t *testing.T) (*profileTranslator, chan *pb.DestinationProfile) { + t.Helper() + id := watcher.ServiceID{Namespace: "bar", Name: "foo"} + server := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} + translator := newProfileTranslator(id, server, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil) + return translator, server.profilesReceived +} + func TestProfileTranslator(t *testing.T) { t.Run("Sends update", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - - translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator, profilesReceived := newMockTranslator(t) translator.Start() defer translator.Stop() translator.Update(profile) - actualPbProfile := <-mockGetProfileServer.profilesReceived + actualPbProfile := <-profilesReceived if !proto.Equal(actualPbProfile, pbProfile) { t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbProfile, actualPbProfile) } - numProfiles := len(mockGetProfileServer.profilesReceived) + 1 + numProfiles := len(profilesReceived) + 1 if numProfiles != 1 { - t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) + t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, profilesReceived) } }) t.Run("Request match with more than one field becomes ALL", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - - translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator, profilesReceived := newMockTranslator(t) translator.Start() defer translator.Stop() translator.Update(multipleRequestMatches) - actualPbProfile := <-mockGetProfileServer.profilesReceived + actualPbProfile := <-profilesReceived if !proto.Equal(actualPbProfile, pbRequestMatchAll) { t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbRequestMatchAll, actualPbProfile) } - numProfiles := len(mockGetProfileServer.profilesReceived) + 1 + numProfiles := len(profilesReceived) + 1 if numProfiles != 1 { - t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) + t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, profilesReceived) } }) t.Run("Ignores request match without any fields", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - - translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator, profilesReceived := newMockTranslator(t) translator.Start() defer translator.Stop() translator.Update(notEnoughRequestMatches) - numProfiles := len(mockGetProfileServer.profilesReceived) + numProfiles := len(profilesReceived) if numProfiles != 0 { - t.Fatalf("Expecting [0] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) + t.Fatalf("Expecting [0] profiles, got [%d]. Updates: %v", numProfiles, profilesReceived) } }) t.Run("Response match with more than one field becomes ALL", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - - translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator, profilesReceived := newMockTranslator(t) translator.Start() defer translator.Stop() translator.Update(multipleResponseMatches) - actualPbProfile := <-mockGetProfileServer.profilesReceived + actualPbProfile := <-profilesReceived if !proto.Equal(actualPbProfile, pbResponseMatchAll) { t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbResponseMatchAll, actualPbProfile) } - numProfiles := len(mockGetProfileServer.profilesReceived) + 1 + numProfiles := len(profilesReceived) + 1 if numProfiles != 1 { - t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) + t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, profilesReceived) } }) t.Run("Ignores response match without any fields", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - - translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator, profilesReceived := newMockTranslator(t) translator.Start() defer translator.Stop() translator.Update(notEnoughResponseMatches) - numProfiles := len(mockGetProfileServer.profilesReceived) + numProfiles := len(profilesReceived) if numProfiles != 0 { - t.Fatalf("Expecting [0] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) + t.Fatalf("Expecting [0] profiles, got [%d]. Updates: %v", numProfiles, profilesReceived) } }) t.Run("Ignores response match with invalid status range", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - - translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator, profilesReceived := newMockTranslator(t) translator.Start() defer translator.Stop() translator.Update(invalidStatusRange) - numProfiles := len(mockGetProfileServer.profilesReceived) + numProfiles := len(profilesReceived) if numProfiles != 0 { - t.Fatalf("Expecting [0] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) + t.Fatalf("Expecting [0] profiles, got [%d]. Updates: %v", numProfiles, profilesReceived) } }) t.Run("Sends update for one sided status range", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - - translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator, profilesReceived := newMockTranslator(t) translator.Start() defer translator.Stop() translator.Update(oneSidedStatusRange) - <-mockGetProfileServer.profilesReceived + <-profilesReceived - numProfiles := len(mockGetProfileServer.profilesReceived) + 1 + numProfiles := len(profilesReceived) + 1 if numProfiles != 1 { - t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) + t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, profilesReceived) } }) t.Run("Sends empty update", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} + server := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} + translator := newProfileTranslator(watcher.ID{}, server, logging.WithField("test", t.Name()), "", 80, nil) - translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) translator.Start() defer translator.Stop() translator.Update(nil) - actualPbProfile := <-mockGetProfileServer.profilesReceived + actualPbProfile := <-server.profilesReceived if !proto.Equal(actualPbProfile, defaultPbProfile) { t.Fatalf("Expected profile sent to be [%v] but was [%v]", defaultPbProfile, actualPbProfile) } - numProfiles := len(mockGetProfileServer.profilesReceived) + 1 + numProfiles := len(server.profilesReceived) + 1 if numProfiles != 1 { - t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) + t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, server.profilesReceived) } }) t.Run("Sends update with custom timeout", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - - translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator, profilesReceived := newMockTranslator(t) translator.Start() defer translator.Stop() translator.Update(profileWithTimeout) - actualPbProfile := <-mockGetProfileServer.profilesReceived + actualPbProfile := <-profilesReceived if !proto.Equal(actualPbProfile, pbProfileWithTimeout) { t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbProfileWithTimeout, actualPbProfile) } - numProfiles := len(mockGetProfileServer.profilesReceived) + 1 + numProfiles := len(profilesReceived) + 1 if numProfiles != 1 { - t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) + t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, profilesReceived) } }) } diff --git a/controller/api/destination/server.go b/controller/api/destination/server.go index 3401341bc4e8c..637aeafcbc557 100644 --- a/controller/api/destination/server.go +++ b/controller/api/destination/server.go @@ -394,7 +394,7 @@ func (s *server) subscribeToServiceProfile( // We build up the pipeline of profile updaters backwards, starting from // the translator which takes profile updates, translates them to protobuf // and pushes them onto the gRPC stream. - translator := newProfileTranslator(stream, log, fqn, port, streamEnd) + translator := newProfileTranslator(service, stream, log, fqn, port, streamEnd) translator.Start() defer translator.Stop()