diff --git a/controller/api/destination/server.go b/controller/api/destination/server.go index 2fb637b416567..2799f53c4cd71 100644 --- a/controller/api/destination/server.go +++ b/controller/api/destination/server.go @@ -563,18 +563,20 @@ func profileID(authority string, ctxToken contextToken, clusterDomain string) (w } func getHostAndPort(authority string) (string, watcher.Port, error) { - hostPort := strings.Split(authority, ":") - if len(hostPort) > 2 { - return "", 0, fmt.Errorf("invalid destination %s", authority) - } - host := hostPort[0] - port := 80 - if len(hostPort) == 2 { - var err error - port, err = strconv.Atoi(hostPort[1]) - if err != nil || port <= 0 || port > 65535 { - return "", 0, fmt.Errorf("invalid port %s", hostPort[1]) - } + if !strings.Contains(authority, ":") { + return authority, watcher.Port(80), nil + } + + host, sport, err := net.SplitHostPort(authority) + if err != nil { + return "", 0, fmt.Errorf("invalid destination: %w", err) + } + port, err := strconv.Atoi(sport) + if err != nil { + return "", 0, fmt.Errorf("invalid port %s: %w", sport, err) + } + if port <= 0 || port > 65535 { + return "", 0, fmt.Errorf("invalid port %d", port) } return host, watcher.Port(port), nil } diff --git a/controller/api/destination/server_test.go b/controller/api/destination/server_test.go index dc4741c241c23..e169b4b102106 100644 --- a/controller/api/destination/server_test.go +++ b/controller/api/destination/server_test.go @@ -3,6 +3,7 @@ package destination import ( "context" "fmt" + gonet "net" "testing" "time" @@ -27,14 +28,17 @@ const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local" const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local" const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local" const clusterIP = "172.17.12.0" +const clusterIPv6 = "2001:db8::88" const clusterIPOpaque = "172.17.12.1" const podIP1 = "172.17.0.12" +const podIP1v6 = "2001:db8::68" const podIP2 = "172.17.0.13" const podIPOpaque = "172.17.0.14" const podIPSkipped = "172.17.0.15" const podIPPolicy = "172.17.0.16" const podIPStatefulSet = "172.17.13.15" const externalIP = "192.168.1.20" +const externalIPv6 = "2001:db8::78" const port uint32 = 8989 const opaquePort uint32 = 4242 const skippedPort uint32 = 24224 @@ -259,6 +263,25 @@ func TestGetProfiles(t *testing.T) { } }) + t.Run("Return profile when using secondary cluster IP", func(t *testing.T) { + server := makeServer(t) + defer server.clusterStore.UnregisterGauges() + + stream := profileStream(t, server, clusterIPv6, port, "") + defer stream.Cancel() + profile := assertSingleProfile(t, stream.Updates()) + if profile.FullyQualifiedName != fullyQualifiedName { + t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, profile.FullyQualifiedName) + } + if profile.OpaqueProtocol { + t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port) + } + routes := profile.GetRoutes() + if len(routes) != 1 { + t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes) + } + }) + t.Run("Return profile with endpoint when using pod DNS", func(t *testing.T) { server := makeServer(t) defer server.clusterStore.UnregisterGauges() @@ -341,6 +364,47 @@ func TestGetProfiles(t *testing.T) { } }) + t.Run("Return profile with endpoint when using pod secondary IP", func(t *testing.T) { + server := makeServer(t) + defer server.clusterStore.UnregisterGauges() + + stream := profileStream(t, server, podIP1v6, port, "ns:ns") + defer stream.Cancel() + + epAddr, err := toAddress(podIP1v6, port) + if err != nil { + t.Fatalf("Got error: %s", err) + } + + // An explanation for why we expect 1 to 3 updates is in test cases + // above + updates := stream.Updates() + if len(updates) == 0 || len(updates) > 3 { + t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates) + } + + first := updates[0] + if first.Endpoint == nil { + t.Fatalf("Expected response to have endpoint field") + } + if first.OpaqueProtocol { + t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port) + } + _, exists := first.Endpoint.MetricLabels["namespace"] + if !exists { + t.Fatalf("Expected 'namespace' metric label to exist but it did not") + } + if first.GetEndpoint().GetProtocolHint() == nil { + t.Fatalf("Expected protocol hint but found none") + } + if first.GetEndpoint().GetProtocolHint().GetOpaqueTransport() != nil { + t.Fatalf("Expected pod to not support opaque traffic on port %d", port) + } + if first.Endpoint.Addr.String() != epAddr.String() { + t.Fatalf("Expected endpoint IP to be %s, but it was %s", epAddr.Ip, first.Endpoint.Addr.Ip) + } + }) + t.Run("Return default profile when IP does not map to service or pod", func(t *testing.T) { server := makeServer(t) defer server.clusterStore.UnregisterGauges() @@ -591,9 +655,10 @@ func TestGetProfiles(t *testing.T) { Status: corev1.ConditionTrue, }, }, - HostIP: externalIP, - PodIP: "172.17.0.55", - PodIPs: []corev1.PodIP{{IP: "172.17.0.55"}}, + HostIP: externalIP, + HostIPs: []corev1.HostIP{{IP: externalIP}, {IP: externalIPv6}}, + PodIP: "172.17.0.55", + PodIPs: []corev1.PodIP{{IP: "172.17.0.55"}}, }, }, metav1.CreateOptions{}) if err != nil { @@ -725,19 +790,22 @@ func toAddress(path string, port uint32) (*net.TcpAddress, error) { func TestIpWatcherGetSvcID(t *testing.T) { name := "service" namespace := "test" - clusterIP := "10.256.0.1" - var port uint32 = 1234 - k8sConfigs := fmt.Sprintf(` + clusterIP := "10.245.0.1" + clusterIPv6 := "2001:db8::68" + k8sConfigs := ` apiVersion: v1 kind: Service metadata: - name: %s - namespace: %s + name: service + namespace: test spec: type: ClusterIP - clusterIP: %s + clusterIP: 10.245.0.1 + clusterIPs: + - 10.245.0.1 + - 2001:db8::68 ports: - - port: %d`, name, namespace, clusterIP, port) + - port: 1234` t.Run("get services IDs by IP address", func(t *testing.T) { k8sAPI, err := k8s.NewFakeAPI(k8sConfigs) @@ -766,6 +834,20 @@ spec: t.Fatalf("Expected service namespace to be [%s], but got [%s]", namespace, svc.Namespace) } + svc6, err := getSvcID(k8sAPI, clusterIPv6, logging.WithFields(nil)) + if err != nil { + t.Fatalf("Error getting service: %s", err) + } + if svc6 == nil { + t.Fatalf("Expected to find service mapped to [%s]", clusterIPv6) + } + if svc.Name != name { + t.Fatalf("Expected service name to be [%s], but got [%s]", name, svc.Name) + } + if svc.Namespace != namespace { + t.Fatalf("Expected service namespace to be [%s], but got [%s]", namespace, svc.Namespace) + } + badClusterIP := "10.256.0.2" svc, err = getSvcID(k8sAPI, badClusterIP, logging.WithFields(nil)) if err != nil { @@ -800,7 +882,7 @@ func profileStream(t *testing.T, server *server, host string, port uint32, token go func() { err := server.GetProfile(&pb.GetDestination{ Scheme: "k8s", - Path: fmt.Sprintf("%s:%d", host, port), + Path: gonet.JoinHostPort(host, fmt.Sprintf("%d", port)), ContextToken: token, }, stream) if err != nil { diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index 1ab71eb1ad5ff..fc10b881669d8 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -32,6 +32,9 @@ metadata: spec: type: LoadBalancer clusterIP: 172.17.12.0 + clusterIPs: + - 172.17.12.0 + - 2001:db8::88 ports: - port: 8989`, ` @@ -62,6 +65,7 @@ status: podIP: 172.17.0.12 podIPs: - ip: 172.17.0.12 + - ip: 2001:db8::68 spec: containers: - env: diff --git a/controller/api/destination/watcher/k8s.go b/controller/api/destination/watcher/k8s.go index 2522f565cfcf6..529b199d857c0 100644 --- a/controller/api/destination/watcher/k8s.go +++ b/controller/api/destination/watcher/k8s.go @@ -1,6 +1,7 @@ package watcher import ( + "errors" "fmt" "net" @@ -68,10 +69,20 @@ func (i ID) String() string { // InitializeIndexers is used to initialize indexers on k8s informers, to be used across watchers func InitializeIndexers(k8sAPI *k8s.API) error { err := k8sAPI.Svc().Informer().AddIndexers(cache.Indexers{PodIPIndex: func(obj interface{}) ([]string, error) { - if svc, ok := obj.(*corev1.Service); ok { + svc, ok := obj.(*corev1.Service) + if !ok { + return nil, errors.New("object is not a service") + } + + if len(svc.Spec.ClusterIPs) != 0 { + return svc.Spec.ClusterIPs, nil + } + + if svc.Spec.ClusterIP != "" { return []string{svc.Spec.ClusterIP}, nil } - return nil, fmt.Errorf("object is not a service") + + return nil, nil }}) if err != nil { @@ -87,7 +98,16 @@ func InitializeIndexers(k8sAPI *k8s.API) error { if pod.Spec.HostNetwork { return nil, nil } - return []string{pod.Status.PodIP}, nil + ips := []string{} + for _, pip := range pod.Status.PodIPs { + if pip.IP != "" { + ips = append(ips, pip.IP) + } + } + if len(ips) == 0 && pod.Status.PodIP != "" { + ips = append(ips, pod.Status.PodIP) + } + return ips, nil } return nil, fmt.Errorf("object is not a pod") }}) @@ -97,24 +117,37 @@ func InitializeIndexers(k8sAPI *k8s.API) error { } err = k8sAPI.Pod().Informer().AddIndexers(cache.Indexers{HostIPIndex: func(obj interface{}) ([]string, error) { - if pod, ok := obj.(*corev1.Pod); ok { - var hostIPPods []string - if pod.Status.HostIP != "" { - // If the pod is reachable from the host network, then for - // each of its containers' ports that exposes a host port, add - // that hostIP:hostPort endpoint to the indexer. - for _, c := range pod.Spec.Containers { - for _, p := range c.Ports { - if p.HostPort != 0 { - addr := net.JoinHostPort(pod.Status.HostIP, fmt.Sprintf("%d", p.HostPort)) - hostIPPods = append(hostIPPods, addr) - } - } + pod, ok := obj.(*corev1.Pod) + if !ok { + return nil, errors.New("object is not a pod") + } + + ips := []string{} + for _, hip := range pod.Status.HostIPs { + ips = append(ips, hip.IP) + } + if len(ips) == 0 && pod.Status.HostIP != "" { + ips = append(ips, pod.Status.HostIP) + } + if len(ips) == 0 { + return []string{}, nil + } + + // If the pod is reachable from the host network, then for + // each of its containers' ports that exposes a host port, add + // that hostIP:hostPort endpoint to the indexer. + addrs := []string{} + for _, c := range pod.Spec.Containers { + for _, p := range c.Ports { + if p.HostPort == 0 { + continue + } + for _, ip := range ips { + addrs = append(addrs, net.JoinHostPort(ip, fmt.Sprintf("%d", p.HostPort))) } } - return hostIPPods, nil } - return nil, fmt.Errorf("object is not a pod") + return addrs, nil }}) if err != nil { @@ -124,15 +157,15 @@ func InitializeIndexers(k8sAPI *k8s.API) error { return nil } -func getIndexedPods(k8sAPI *k8s.API, indexName string, podIP string) ([]*corev1.Pod, error) { - objs, err := k8sAPI.Pod().Informer().GetIndexer().ByIndex(indexName, podIP) +func getIndexedPods(k8sAPI *k8s.API, indexName string, key string) ([]*corev1.Pod, error) { + objs, err := k8sAPI.Pod().Informer().GetIndexer().ByIndex(indexName, key) if err != nil { return nil, fmt.Errorf("failed getting %s indexed pods: %w", indexName, err) } pods := make([]*corev1.Pod, 0) for _, obj := range objs { pod := obj.(*corev1.Pod) - if !podReceivingTraffic(pod) { + if !podNotTerminating(pod) { continue } pods = append(pods, pod) @@ -140,10 +173,9 @@ func getIndexedPods(k8sAPI *k8s.API, indexName string, podIP string) ([]*corev1. return pods, nil } -func podReceivingTraffic(pod *corev1.Pod) bool { +func podNotTerminating(pod *corev1.Pod) bool { phase := pod.Status.Phase podTerminated := phase == corev1.PodSucceeded || phase == corev1.PodFailed podTerminating := pod.DeletionTimestamp != nil - return !podTerminating && !podTerminated } diff --git a/controller/api/destination/watcher/pod_watcher.go b/controller/api/destination/watcher/pod_watcher.go index 89c4dab682ed0..b4099b2920e31 100644 --- a/controller/api/destination/watcher/pod_watcher.go +++ b/controller/api/destination/watcher/pod_watcher.go @@ -95,11 +95,11 @@ func NewPodWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.E // the corresponding ip is found for the given service/hostname, and returned. func (pw *PodWatcher) Subscribe(service *ServiceID, hostname, ip string, port Port, listener PodUpdateListener) (string, error) { if hostname != "" { - pw.log.Debugf("Establishing watch on %s.%s.%s:%d", hostname, service.Name, service.Namespace, port) + pw.log.Debugf("Establishing watch on pod %s.%s.%s:%d", hostname, service.Name, service.Namespace, port) } else if service != nil { - pw.log.Debugf("Establishing watch on %s.%s:%d", service.Name, service.Namespace, port) + pw.log.Debugf("Establishing watch on pod %s.%s:%d", service.Name, service.Namespace, port) } else { - pw.log.Debugf("Establishing watch on %s:%d", ip, port) + pw.log.Debugf("Establishing watch on pod %s:%d", ip, port) } pp, err := pw.getOrNewPodPublisher(service, hostname, ip, port) if err != nil { @@ -189,16 +189,32 @@ func (pw *PodWatcher) submitPodUpdate(pod *corev1.Pod, remove bool) { if remove { submitPod = nil } + for _, container := range pod.Spec.Containers { for _, containerPort := range container.Ports { if containerPort.ContainerPort != 0 { - if pp, ok := pw.getPodPublisher(pod.Status.PodIP, Port(containerPort.ContainerPort)); ok { - pp.updatePod(submitPod) + for _, pip := range pod.Status.PodIPs { + if pp, ok := pw.getPodPublisher(pip.IP, Port(containerPort.ContainerPort)); ok { + pp.updatePod(submitPod) + } + } + if len(pod.Status.PodIPs) == 0 && pod.Status.PodIP != "" { + if pp, ok := pw.getPodPublisher(pod.Status.PodIP, Port(containerPort.ContainerPort)); ok { + pp.updatePod(submitPod) + } } } + if containerPort.HostPort != 0 { - if pp, ok := pw.getPodPublisher(pod.Status.HostIP, Port(containerPort.HostPort)); ok { - pp.updatePod(submitPod) + for _, hip := range pod.Status.HostIPs { + if pp, ok := pw.getPodPublisher(hip.IP, Port(containerPort.HostPort)); ok { + pp.updatePod(submitPod) + } + } + if len(pod.Status.HostIPs) == 0 && pod.Status.HostIP != "" { + if pp, ok := pw.getPodPublisher(pod.Status.HostIP, Port(containerPort.HostPort)); ok { + pp.updatePod(submitPod) + } } } }