-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix ingress dest ports issue #58
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,7 +64,7 @@ func (r *defaultEndpointsResolver) computeIngressEndpoints(ctx context.Context, | |
ingressEndpoints = append(ingressEndpoints, r.getAllowAllNetworkPeers(rule.Ports)...) | ||
continue | ||
} | ||
resolvedPeers, err := r.resolveNetworkPeers(ctx, policy, rule.From, rule.Ports) | ||
resolvedPeers, err := r.resolveNetworkPeers(ctx, policy, rule.From, rule.Ports, networking.PolicyTypeIngress) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "unable to resolve ingress network peers") | ||
} | ||
|
@@ -82,7 +82,7 @@ func (r *defaultEndpointsResolver) computeEgressEndpoints(ctx context.Context, p | |
egressEndpoints = append(egressEndpoints, r.getAllowAllNetworkPeers(rule.Ports)...) | ||
continue | ||
} | ||
resolvedPeers, err := r.resolveNetworkPeers(ctx, policy, rule.To, rule.Ports) | ||
resolvedPeers, err := r.resolveNetworkPeers(ctx, policy, rule.To, rule.Ports, networking.PolicyTypeEgress) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "unable to resolve egress network peers") | ||
} | ||
|
@@ -149,7 +149,7 @@ func (r *defaultEndpointsResolver) getAllowAllNetworkPeers(ports []networking.Ne | |
} | ||
|
||
func (r *defaultEndpointsResolver) resolveNetworkPeers(ctx context.Context, policy *networking.NetworkPolicy, | ||
peers []networking.NetworkPolicyPeer, ports []networking.NetworkPolicyPort) ([]policyinfo.EndpointInfo, error) { | ||
peers []networking.NetworkPolicyPeer, ports []networking.NetworkPolicyPort, policyType networking.PolicyType) ([]policyinfo.EndpointInfo, error) { | ||
var networkPeers []policyinfo.EndpointInfo | ||
for _, peer := range peers { | ||
if peer.IPBlock != nil { | ||
|
@@ -185,14 +185,65 @@ func (r *defaultEndpointsResolver) resolveNetworkPeers(ctx context.Context, poli | |
} else { | ||
namespaces = []string{policy.Namespace} | ||
} | ||
r.logger.V(1).Info("Namespaces for network peers resolution", "list", namespaces, "policy", k8s.NamespacedName(policy)) | ||
|
||
var portsToApply []policyinfo.Port | ||
// populate the policy applied targets' ports | ||
// only populate ports for Ingress and from network policy namespaces as destination ports | ||
if policyType == networking.PolicyTypeIngress { | ||
portsToApply = r.getIngressRulesPorts(ctx, policy.Namespace, &policy.Spec.PodSelector, ports) | ||
} | ||
|
||
for _, ns := range namespaces { | ||
networkPeers = append(networkPeers, r.getMatchingPodAddresses(ctx, peer.PodSelector, ns, ports)...) | ||
networkPeers = append(networkPeers, r.getMatchingPodAddresses(ctx, peer.PodSelector, ns, portsToApply, ports, policyType)...) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If |
||
} | ||
|
||
} | ||
return networkPeers, nil | ||
} | ||
|
||
func (r *defaultEndpointsResolver) getIngressRulesPorts(ctx context.Context, policyNamespace string, policyPodSelector *metav1.LabelSelector, ports []networking.NetworkPolicyPort) []policyinfo.Port { | ||
podList := &corev1.PodList{} | ||
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{ | ||
LabelSelector: r.createPodLabelSelector(policyPodSelector), | ||
Namespace: policyNamespace, | ||
}); err != nil { | ||
r.logger.Info("Unable to List Pods", "err", err) | ||
return nil | ||
} | ||
|
||
r.logger.V(2).Info("list pods for ingress", "podList", *podList, "namespace", policyNamespace, "selector", *policyPodSelector) | ||
var portList []policyinfo.Port | ||
for _, pod := range podList.Items { | ||
portList = append(portList, r.getPortList(pod, ports)...) | ||
r.logger.Info("got ingress port", "port", portList, "pod", pod) | ||
} | ||
|
||
return portList | ||
} | ||
|
||
func (r *defaultEndpointsResolver) getPortList(pod corev1.Pod, ports []networking.NetworkPolicyPort) []policyinfo.Port { | ||
var portList []policyinfo.Port | ||
for _, port := range ports { | ||
var portPtr *int32 | ||
if port.Port != nil { | ||
portVal, _, err := k8s.LookupContainerPortAndName(&pod, *port.Port, *port.Protocol) | ||
if err != nil { | ||
// Isolate the pod for the port if we are unable to resolve the named port | ||
r.logger.Info("Unable to lookup container port", "pod", k8s.NamespacedName(&pod), | ||
"port", *port.Port, "err", err) | ||
continue | ||
} | ||
portPtr = &portVal | ||
} | ||
portList = append(portList, policyinfo.Port{ | ||
Protocol: port.Protocol, | ||
Port: portPtr, | ||
EndPort: port.EndPort, | ||
}) | ||
} | ||
return portList | ||
} | ||
|
||
func (r *defaultEndpointsResolver) resolveServiceClusterIPs(ctx context.Context, peers []networking.NetworkPolicyPeer, policyNamespace string, | ||
ports []networking.NetworkPolicyPort) ([]policyinfo.EndpointInfo, error) { | ||
var networkPeers []policyinfo.EndpointInfo | ||
|
@@ -257,64 +308,56 @@ func (r *defaultEndpointsResolver) resolveNamespaces(ctx context.Context, ls *me | |
} | ||
|
||
func (r *defaultEndpointsResolver) getMatchingPodAddresses(ctx context.Context, ls *metav1.LabelSelector, namespace string, | ||
ports []networking.NetworkPolicyPort) []policyinfo.EndpointInfo { | ||
policyPorts []policyinfo.Port, ports []networking.NetworkPolicyPort, rule networking.PolicyType) []policyinfo.EndpointInfo { | ||
var addresses []policyinfo.EndpointInfo | ||
var podSelector labels.Selector | ||
if ls == nil { | ||
podSelector = labels.Everything() | ||
} else { | ||
var err error | ||
if podSelector, err = metav1.LabelSelectorAsSelector(ls); err != nil { | ||
r.logger.Info("Unable to get pod selector", "err", err) | ||
return nil | ||
} | ||
} | ||
|
||
podList := &corev1.PodList{} | ||
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{ | ||
LabelSelector: podSelector, | ||
LabelSelector: r.createPodLabelSelector(ls), | ||
Namespace: namespace, | ||
}); err != nil { | ||
r.logger.Info("Unable to List Pods", "err", err) | ||
return nil | ||
} | ||
r.logger.V(1).Info("Got pods for label selector", "count", len(podList.Items), "selector", podSelector.String()) | ||
r.logger.V(1).Info("Got pods for label selector", "count", len(podList.Items), "selector", ls.String()) | ||
for _, pod := range podList.Items { | ||
podIP := k8s.GetPodIP(&pod) | ||
if len(podIP) == 0 { | ||
r.logger.V(1).Info("pod IP not assigned yet", "pod", k8s.NamespacedName(&pod)) | ||
continue | ||
} | ||
var portList []policyinfo.Port | ||
for _, port := range ports { | ||
var portPtr *int32 | ||
if port.Port != nil { | ||
portVal, _, err := k8s.LookupContainerPortAndName(&pod, *port.Port, *port.Protocol) | ||
if err != nil { | ||
// Isolate the pod for the port if we are unable to resolve the named port | ||
r.logger.Info("Unable to lookup container port", "pod", k8s.NamespacedName(&pod), | ||
"port", *port.Port, "err", err) | ||
continue | ||
} | ||
portPtr = &portVal | ||
} | ||
portList = append(portList, policyinfo.Port{ | ||
Protocol: port.Protocol, | ||
Port: portPtr, | ||
EndPort: port.EndPort, | ||
}) | ||
} | ||
portList := r.getPortList(pod, ports) | ||
if len(ports) != len(portList) && len(portList) == 0 { | ||
continue | ||
} | ||
addresses = append(addresses, policyinfo.EndpointInfo{ | ||
CIDR: policyinfo.NetworkAddress(podIP), | ||
Ports: portList, | ||
CIDR: policyinfo.NetworkAddress(podIP), | ||
Ports: func(policyType networking.PolicyType) []policyinfo.Port { | ||
if policyType == networking.PolicyTypeIngress { | ||
return policyPorts | ||
} | ||
return portList | ||
}(rule), | ||
}) | ||
} | ||
|
||
return addresses | ||
} | ||
|
||
func (r *defaultEndpointsResolver) createPodLabelSelector(ls *metav1.LabelSelector) labels.Selector { | ||
var podSelector labels.Selector | ||
if ls == nil { | ||
podSelector = labels.Everything() | ||
} else { | ||
var err error | ||
if podSelector, err = metav1.LabelSelectorAsSelector(ls); err != nil { | ||
r.logger.Info("Unable to get pod selector", "err", err) | ||
return nil | ||
} | ||
} | ||
return podSelector | ||
} | ||
|
||
// getMatchingServiceClusterIPs returns the clusterIPs of the services with service.spec.Selector matching the pod selector | ||
// in the egress rules. This serves as a workaround for the network agent in case of the policy enforcement for egress traffic | ||
// from the pod to the cluster IPs. The network agent limitation arises since it attaches the ebpf probes to the TC hook of the | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Please add few comments as to why this is done only for ingress...