Skip to content

Commit

Permalink
destination: Index pods and service with multiple IPs (#11515)
Browse files Browse the repository at this point in the history
The destination controller indexes Pods by their primary PodIP and their
primary HostIP (when applicable); and it indexes Services by their
primary ClusterIP.

In preparation for dual-stack (IPv6) support, this change updates the
destination controller indexers to consume all IPs for these resources.
Tests are updated to demonstrate a dual-stack setup (where these
resources have a primary IPv4 address and a secondary IPv6 address).

While exercising these tests, I had to replace the brittle host:port
parsing logic we use in the server, in favor of Go's standard
`net.SplitHostPort` utility.
  • Loading branch information
olix0r authored Oct 23, 2023
1 parent f6db5ab commit 8cd165a
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 53 deletions.
26 changes: 14 additions & 12 deletions controller/api/destination/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,18 +563,20 @@ func profileID(authority string, ctxToken contextToken, clusterDomain string) (w
}

func getHostAndPort(authority string) (string, watcher.Port, error) {
hostPort := strings.Split(authority, ":")
if len(hostPort) > 2 {
return "", 0, fmt.Errorf("invalid destination %s", authority)
}
host := hostPort[0]
port := 80
if len(hostPort) == 2 {
var err error
port, err = strconv.Atoi(hostPort[1])
if err != nil || port <= 0 || port > 65535 {
return "", 0, fmt.Errorf("invalid port %s", hostPort[1])
}
if !strings.Contains(authority, ":") {
return authority, watcher.Port(80), nil
}

host, sport, err := net.SplitHostPort(authority)
if err != nil {
return "", 0, fmt.Errorf("invalid destination: %w", err)
}
port, err := strconv.Atoi(sport)
if err != nil {
return "", 0, fmt.Errorf("invalid port %s: %w", sport, err)
}
if port <= 0 || port > 65535 {
return "", 0, fmt.Errorf("invalid port %d", port)
}
return host, watcher.Port(port), nil
}
Expand Down
104 changes: 93 additions & 11 deletions controller/api/destination/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package destination
import (
"context"
"fmt"
gonet "net"
"testing"
"time"

Expand All @@ -27,14 +28,17 @@ const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local"
const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local"
const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local"
const clusterIP = "172.17.12.0"
const clusterIPv6 = "2001:db8::88"
const clusterIPOpaque = "172.17.12.1"
const podIP1 = "172.17.0.12"
const podIP1v6 = "2001:db8::68"
const podIP2 = "172.17.0.13"
const podIPOpaque = "172.17.0.14"
const podIPSkipped = "172.17.0.15"
const podIPPolicy = "172.17.0.16"
const podIPStatefulSet = "172.17.13.15"
const externalIP = "192.168.1.20"
const externalIPv6 = "2001:db8::78"
const port uint32 = 8989
const opaquePort uint32 = 4242
const skippedPort uint32 = 24224
Expand Down Expand Up @@ -259,6 +263,25 @@ func TestGetProfiles(t *testing.T) {
}
})

t.Run("Return profile when using secondary cluster IP", func(t *testing.T) {
server := makeServer(t)
defer server.clusterStore.UnregisterGauges()

stream := profileStream(t, server, clusterIPv6, port, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.FullyQualifiedName != fullyQualifiedName {
t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, profile.FullyQualifiedName)
}
if profile.OpaqueProtocol {
t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
}
routes := profile.GetRoutes()
if len(routes) != 1 {
t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes)
}
})

t.Run("Return profile with endpoint when using pod DNS", func(t *testing.T) {
server := makeServer(t)
defer server.clusterStore.UnregisterGauges()
Expand Down Expand Up @@ -341,6 +364,47 @@ func TestGetProfiles(t *testing.T) {
}
})

t.Run("Return profile with endpoint when using pod secondary IP", func(t *testing.T) {
server := makeServer(t)
defer server.clusterStore.UnregisterGauges()

stream := profileStream(t, server, podIP1v6, port, "ns:ns")
defer stream.Cancel()

epAddr, err := toAddress(podIP1v6, port)
if err != nil {
t.Fatalf("Got error: %s", err)
}

// An explanation for why we expect 1 to 3 updates is in test cases
// above
updates := stream.Updates()
if len(updates) == 0 || len(updates) > 3 {
t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates)
}

first := updates[0]
if first.Endpoint == nil {
t.Fatalf("Expected response to have endpoint field")
}
if first.OpaqueProtocol {
t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
}
_, exists := first.Endpoint.MetricLabels["namespace"]
if !exists {
t.Fatalf("Expected 'namespace' metric label to exist but it did not")
}
if first.GetEndpoint().GetProtocolHint() == nil {
t.Fatalf("Expected protocol hint but found none")
}
if first.GetEndpoint().GetProtocolHint().GetOpaqueTransport() != nil {
t.Fatalf("Expected pod to not support opaque traffic on port %d", port)
}
if first.Endpoint.Addr.String() != epAddr.String() {
t.Fatalf("Expected endpoint IP to be %s, but it was %s", epAddr.Ip, first.Endpoint.Addr.Ip)
}
})

t.Run("Return default profile when IP does not map to service or pod", func(t *testing.T) {
server := makeServer(t)
defer server.clusterStore.UnregisterGauges()
Expand Down Expand Up @@ -591,9 +655,10 @@ func TestGetProfiles(t *testing.T) {
Status: corev1.ConditionTrue,
},
},
HostIP: externalIP,
PodIP: "172.17.0.55",
PodIPs: []corev1.PodIP{{IP: "172.17.0.55"}},
HostIP: externalIP,
HostIPs: []corev1.HostIP{{IP: externalIP}, {IP: externalIPv6}},
PodIP: "172.17.0.55",
PodIPs: []corev1.PodIP{{IP: "172.17.0.55"}},
},
}, metav1.CreateOptions{})
if err != nil {
Expand Down Expand Up @@ -725,19 +790,22 @@ func toAddress(path string, port uint32) (*net.TcpAddress, error) {
func TestIpWatcherGetSvcID(t *testing.T) {
name := "service"
namespace := "test"
clusterIP := "10.256.0.1"
var port uint32 = 1234
k8sConfigs := fmt.Sprintf(`
clusterIP := "10.245.0.1"
clusterIPv6 := "2001:db8::68"
k8sConfigs := `
apiVersion: v1
kind: Service
metadata:
name: %s
namespace: %s
name: service
namespace: test
spec:
type: ClusterIP
clusterIP: %s
clusterIP: 10.245.0.1
clusterIPs:
- 10.245.0.1
- 2001:db8::68
ports:
- port: %d`, name, namespace, clusterIP, port)
- port: 1234`

t.Run("get services IDs by IP address", func(t *testing.T) {
k8sAPI, err := k8s.NewFakeAPI(k8sConfigs)
Expand Down Expand Up @@ -766,6 +834,20 @@ spec:
t.Fatalf("Expected service namespace to be [%s], but got [%s]", namespace, svc.Namespace)
}

svc6, err := getSvcID(k8sAPI, clusterIPv6, logging.WithFields(nil))
if err != nil {
t.Fatalf("Error getting service: %s", err)
}
if svc6 == nil {
t.Fatalf("Expected to find service mapped to [%s]", clusterIPv6)
}
if svc.Name != name {
t.Fatalf("Expected service name to be [%s], but got [%s]", name, svc.Name)
}
if svc.Namespace != namespace {
t.Fatalf("Expected service namespace to be [%s], but got [%s]", namespace, svc.Namespace)
}

badClusterIP := "10.256.0.2"
svc, err = getSvcID(k8sAPI, badClusterIP, logging.WithFields(nil))
if err != nil {
Expand Down Expand Up @@ -800,7 +882,7 @@ func profileStream(t *testing.T, server *server, host string, port uint32, token
go func() {
err := server.GetProfile(&pb.GetDestination{
Scheme: "k8s",
Path: fmt.Sprintf("%s:%d", host, port),
Path: gonet.JoinHostPort(host, fmt.Sprintf("%d", port)),
ContextToken: token,
}, stream)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ metadata:
spec:
type: LoadBalancer
clusterIP: 172.17.12.0
clusterIPs:
- 172.17.12.0
- 2001:db8::88
ports:
- port: 8989`,
`
Expand Down Expand Up @@ -62,6 +65,7 @@ status:
podIP: 172.17.0.12
podIPs:
- ip: 172.17.0.12
- ip: 2001:db8::68
spec:
containers:
- env:
Expand Down
78 changes: 55 additions & 23 deletions controller/api/destination/watcher/k8s.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package watcher

import (
"errors"
"fmt"
"net"

Expand Down Expand Up @@ -68,10 +69,20 @@ func (i ID) String() string {
// InitializeIndexers is used to initialize indexers on k8s informers, to be used across watchers
func InitializeIndexers(k8sAPI *k8s.API) error {
err := k8sAPI.Svc().Informer().AddIndexers(cache.Indexers{PodIPIndex: func(obj interface{}) ([]string, error) {
if svc, ok := obj.(*corev1.Service); ok {
svc, ok := obj.(*corev1.Service)
if !ok {
return nil, errors.New("object is not a service")
}

if len(svc.Spec.ClusterIPs) != 0 {
return svc.Spec.ClusterIPs, nil
}

if svc.Spec.ClusterIP != "" {
return []string{svc.Spec.ClusterIP}, nil
}
return nil, fmt.Errorf("object is not a service")

return nil, nil
}})

if err != nil {
Expand All @@ -87,7 +98,16 @@ func InitializeIndexers(k8sAPI *k8s.API) error {
if pod.Spec.HostNetwork {
return nil, nil
}
return []string{pod.Status.PodIP}, nil
ips := []string{}
for _, pip := range pod.Status.PodIPs {
if pip.IP != "" {
ips = append(ips, pip.IP)
}
}
if len(ips) == 0 && pod.Status.PodIP != "" {
ips = append(ips, pod.Status.PodIP)
}
return ips, nil
}
return nil, fmt.Errorf("object is not a pod")
}})
Expand All @@ -97,24 +117,37 @@ func InitializeIndexers(k8sAPI *k8s.API) error {
}

err = k8sAPI.Pod().Informer().AddIndexers(cache.Indexers{HostIPIndex: func(obj interface{}) ([]string, error) {
if pod, ok := obj.(*corev1.Pod); ok {
var hostIPPods []string
if pod.Status.HostIP != "" {
// If the pod is reachable from the host network, then for
// each of its containers' ports that exposes a host port, add
// that hostIP:hostPort endpoint to the indexer.
for _, c := range pod.Spec.Containers {
for _, p := range c.Ports {
if p.HostPort != 0 {
addr := net.JoinHostPort(pod.Status.HostIP, fmt.Sprintf("%d", p.HostPort))
hostIPPods = append(hostIPPods, addr)
}
}
pod, ok := obj.(*corev1.Pod)
if !ok {
return nil, errors.New("object is not a pod")
}

ips := []string{}
for _, hip := range pod.Status.HostIPs {
ips = append(ips, hip.IP)
}
if len(ips) == 0 && pod.Status.HostIP != "" {
ips = append(ips, pod.Status.HostIP)
}
if len(ips) == 0 {
return []string{}, nil
}

// If the pod is reachable from the host network, then for
// each of its containers' ports that exposes a host port, add
// that hostIP:hostPort endpoint to the indexer.
addrs := []string{}
for _, c := range pod.Spec.Containers {
for _, p := range c.Ports {
if p.HostPort == 0 {
continue
}
for _, ip := range ips {
addrs = append(addrs, net.JoinHostPort(ip, fmt.Sprintf("%d", p.HostPort)))
}
}
return hostIPPods, nil
}
return nil, fmt.Errorf("object is not a pod")
return addrs, nil
}})

if err != nil {
Expand All @@ -124,26 +157,25 @@ func InitializeIndexers(k8sAPI *k8s.API) error {
return nil
}

func getIndexedPods(k8sAPI *k8s.API, indexName string, podIP string) ([]*corev1.Pod, error) {
objs, err := k8sAPI.Pod().Informer().GetIndexer().ByIndex(indexName, podIP)
func getIndexedPods(k8sAPI *k8s.API, indexName string, key string) ([]*corev1.Pod, error) {
objs, err := k8sAPI.Pod().Informer().GetIndexer().ByIndex(indexName, key)
if err != nil {
return nil, fmt.Errorf("failed getting %s indexed pods: %w", indexName, err)
}
pods := make([]*corev1.Pod, 0)
for _, obj := range objs {
pod := obj.(*corev1.Pod)
if !podReceivingTraffic(pod) {
if !podNotTerminating(pod) {
continue
}
pods = append(pods, pod)
}
return pods, nil
}

func podReceivingTraffic(pod *corev1.Pod) bool {
func podNotTerminating(pod *corev1.Pod) bool {
phase := pod.Status.Phase
podTerminated := phase == corev1.PodSucceeded || phase == corev1.PodFailed
podTerminating := pod.DeletionTimestamp != nil

return !podTerminating && !podTerminated
}
Loading

0 comments on commit 8cd165a

Please sign in to comment.