diff --git a/api/v1alpha1/policyendpoint_types.go b/api/v1alpha1/policyendpoint_types.go index 4c82857..25075a6 100644 --- a/api/v1alpha1/policyendpoint_types.go +++ b/api/v1alpha1/policyendpoint_types.go @@ -17,9 +17,13 @@ limitations under the License. package v1alpha1 import ( + "github.com/awslabs/operatorpkg/status" corev1 "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" ) // PolicyReference is the reference to the network policy resource @@ -105,7 +109,167 @@ type PolicyEndpointStatus struct { // Important: Run "make" to regenerate code after modifying this file // +optional - Conditions []PolicyEndpointCondition `json:"conditions,omitempty"` + Conditions []status.Condition `json:"conditions,omitempty"` +} + +// DeepCopyObject implements status.Object. +func (*PolicyEndpointStatus) DeepCopyObject() runtime.Object { + panic("unimplemented") +} + +// GetAnnotations implements status.Object. +func (*PolicyEndpointStatus) GetAnnotations() map[string]string { + panic("unimplemented") +} + +// GetCreationTimestamp implements status.Object. +func (*PolicyEndpointStatus) GetCreationTimestamp() metav1.Time { + panic("unimplemented") +} + +// GetDeletionGracePeriodSeconds implements status.Object. +func (*PolicyEndpointStatus) GetDeletionGracePeriodSeconds() *int64 { + panic("unimplemented") +} + +// GetDeletionTimestamp implements status.Object. +func (*PolicyEndpointStatus) GetDeletionTimestamp() *metav1.Time { + panic("unimplemented") +} + +// GetFinalizers implements status.Object. +func (*PolicyEndpointStatus) GetFinalizers() []string { + panic("unimplemented") +} + +// GetGenerateName implements status.Object. +func (*PolicyEndpointStatus) GetGenerateName() string { + panic("unimplemented") +} + +// GetGeneration implements status.Object. +func (*PolicyEndpointStatus) GetGeneration() int64 { + panic("unimplemented") +} + +// GetLabels implements status.Object. +func (*PolicyEndpointStatus) GetLabels() map[string]string { + panic("unimplemented") +} + +// GetManagedFields implements status.Object. +func (*PolicyEndpointStatus) GetManagedFields() []metav1.ManagedFieldsEntry { + panic("unimplemented") +} + +// GetName implements status.Object. +func (*PolicyEndpointStatus) GetName() string { + panic("unimplemented") +} + +// GetNamespace implements status.Object. +func (*PolicyEndpointStatus) GetNamespace() string { + panic("unimplemented") +} + +// GetObjectKind implements status.Object. +func (*PolicyEndpointStatus) GetObjectKind() schema.ObjectKind { + panic("unimplemented") +} + +// GetOwnerReferences implements status.Object. +func (*PolicyEndpointStatus) GetOwnerReferences() []metav1.OwnerReference { + panic("unimplemented") +} + +// GetResourceVersion implements status.Object. +func (*PolicyEndpointStatus) GetResourceVersion() string { + panic("unimplemented") +} + +// GetSelfLink implements status.Object. +func (*PolicyEndpointStatus) GetSelfLink() string { + panic("unimplemented") +} + +// GetUID implements status.Object. +func (*PolicyEndpointStatus) GetUID() types.UID { + panic("unimplemented") +} + +// SetAnnotations implements status.Object. +func (*PolicyEndpointStatus) SetAnnotations(annotations map[string]string) { + panic("unimplemented") +} + +// SetCreationTimestamp implements status.Object. +func (*PolicyEndpointStatus) SetCreationTimestamp(timestamp metav1.Time) { + panic("unimplemented") +} + +// SetDeletionGracePeriodSeconds implements status.Object. +func (*PolicyEndpointStatus) SetDeletionGracePeriodSeconds(*int64) { + panic("unimplemented") +} + +// SetDeletionTimestamp implements status.Object. +func (*PolicyEndpointStatus) SetDeletionTimestamp(timestamp *metav1.Time) { + panic("unimplemented") +} + +// SetFinalizers implements status.Object. +func (*PolicyEndpointStatus) SetFinalizers(finalizers []string) { + panic("unimplemented") +} + +// SetGenerateName implements status.Object. +func (*PolicyEndpointStatus) SetGenerateName(name string) { + panic("unimplemented") +} + +// SetGeneration implements status.Object. +func (*PolicyEndpointStatus) SetGeneration(generation int64) { + panic("unimplemented") +} + +// SetLabels implements status.Object. +func (*PolicyEndpointStatus) SetLabels(labels map[string]string) { + panic("unimplemented") +} + +// SetManagedFields implements status.Object. +func (*PolicyEndpointStatus) SetManagedFields(managedFields []metav1.ManagedFieldsEntry) { + panic("unimplemented") +} + +// SetName implements status.Object. +func (*PolicyEndpointStatus) SetName(name string) { + panic("unimplemented") +} + +// SetNamespace implements status.Object. +func (*PolicyEndpointStatus) SetNamespace(namespace string) { + panic("unimplemented") +} + +// SetOwnerReferences implements status.Object. +func (*PolicyEndpointStatus) SetOwnerReferences([]metav1.OwnerReference) { + panic("unimplemented") +} + +// SetResourceVersion implements status.Object. +func (*PolicyEndpointStatus) SetResourceVersion(version string) { + panic("unimplemented") +} + +// SetSelfLink implements status.Object. +func (*PolicyEndpointStatus) SetSelfLink(selfLink string) { + panic("unimplemented") +} + +// SetUID implements status.Object. +func (*PolicyEndpointStatus) SetUID(uid types.UID) { + panic("unimplemented") } type PolicyEndpointConditionType string @@ -115,26 +279,6 @@ const ( Updated PolicyEndpointConditionType = "PatchedPolicyEndpoint" ) -// PolicyEndpointCondition describes the state of a PolicyEndpoint at a certain point. -// For example, binpacking PE slices should be updated as a condition change -type PolicyEndpointCondition struct { - // Type of PolicyEndpoint condition. - // +optional - Type PolicyEndpointConditionType `json:"type"` - // Status of the condition, one of True, False, Unknown. - // +optional - Status corev1.ConditionStatus `json:"status"` - // Last time the condition transitioned from one status to another. - // +optional - LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` - // The reason for the condition's last transition. - // +optional - Reason string `json:"reason,omitempty"` - // A human readable message indicating details about the transition. - // +optional - Message string `json:"message,omitempty"` -} - //+kubebuilder:object:root=true //+kubebuilder:subresource:status @@ -159,3 +303,15 @@ type PolicyEndpointList struct { func init() { SchemeBuilder.Register(&PolicyEndpoint{}, &PolicyEndpointList{}) } + +func (s *PolicyEndpoint) GetConditions() []status.Condition { + return []status.Condition(s.Status.Conditions) +} + +func (s *PolicyEndpoint) SetConditions(conds []status.Condition) { + s.Status.Conditions = conds +} + +func (s *PolicyEndpoint) StatusConditions() status.ConditionSet { + return status.NewReadyConditions().For(s) +} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index d7d3c3a..f3f9b8a 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -22,10 +22,11 @@ limitations under the License. package v1alpha1 import ( - v1 "k8s.io/api/core/v1" + "github.com/awslabs/operatorpkg/status" + "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -76,7 +77,7 @@ func (in *PolicyEndpoint) DeepCopyInto(out *PolicyEndpoint) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PolicyEndpoint. @@ -177,6 +178,13 @@ func (in *PolicyEndpointSpec) DeepCopy() *PolicyEndpointSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PolicyEndpointStatus) DeepCopyInto(out *PolicyEndpointStatus) { *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]status.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PolicyEndpointStatus. diff --git a/cmd/main.go b/cmd/main.go index 95577b3..6b61ece 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -116,7 +116,7 @@ func main() { os.Exit(1) } - policyEndpointsManager := policyendpoints.NewPolicyEndpointsManager(mgr.GetClient(), + policyEndpointsManager := policyendpoints.NewPolicyEndpointsManager(ctx, mgr.GetClient(), controllerCFG.EndpointChunkSize, ctrl.Log.WithName("endpoints-manager")) finalizerManager := k8s.NewDefaultFinalizerManager(mgr.GetClient(), ctrl.Log.WithName("finalizer-manager")) policyController := controllers.NewPolicyReconciler(mgr.GetClient(), policyEndpointsManager, diff --git a/config/crd/bases/networking.k8s.aws_policyendpoints.yaml b/config/crd/bases/networking.k8s.aws_policyendpoints.yaml index af4025a..86257c5 100644 --- a/config/crd/bases/networking.k8s.aws_policyendpoints.yaml +++ b/config/crd/bases/networking.k8s.aws_policyendpoints.yaml @@ -232,28 +232,63 @@ spec: properties: conditions: items: - description: PolicyEndpointCondition describes the state of a PolicyEndpoint - at a certain point. For example, binpacking PE slices should be - updated as a condition change + description: Condition aliases the upstream type and adds additional + helper methods properties: lastTransitionTime: - description: Last time the condition transitioned from one status - to another. + description: lastTransitionTime is the last time the condition + transitioned from one status to another. This should be when + the underlying condition changed. If that is not known, then + using the time when the API field changed is acceptable. format: date-time type: string message: - description: A human readable message indicating details about - the transition. + description: message is a human readable message indicating + details about the transition. This may be an empty string. + maxLength: 32768 type: string + observedGeneration: + description: observedGeneration represents the .metadata.generation + that the condition was set based upon. For instance, if .metadata.generation + is currently 12, but the .status.conditions[x].observedGeneration + is 9, the condition is out of date with respect to the current + state of the instance. + format: int64 + minimum: 0 + type: integer reason: - description: The reason for the condition's last transition. + description: reason contains a programmatic identifier indicating + the reason for the condition's last transition. Producers + of specific condition types may define expected values and + meanings for this field, and whether the values are considered + a guaranteed API. The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ type: string status: - description: Status of the condition, one of True, False, Unknown. + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown type: string type: - description: Type of PolicyEndpoint condition. + description: type of condition in CamelCase or in foo.example.com/CamelCase. + --- Many .condition.type values are consistent across resources + like Available, but because arbitrary conditions can be useful + (see .node.status.conditions), the ability to deconflict is + important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string + required: + - lastTransitionTime + - message + - reason + - status + - type type: object type: array type: object diff --git a/go.mod b/go.mod index 92e6059..402973d 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( ) require ( + github.com/awslabs/operatorpkg v0.0.0-20231211224023-fce5f0fa8592 github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index c2db611..059c2e4 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/awslabs/operatorpkg v0.0.0-20231211224023-fce5f0fa8592 h1:LSaLHzJ4IMZZLgVIx/2YIcvUCIAaE5OqLhjWzdwF060= +github.com/awslabs/operatorpkg v0.0.0-20231211224023-fce5f0fa8592/go.mod h1:kqgbtyanB/ObfvsSUdGZOk1f3K807kvoibKoKX0wMK4= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= diff --git a/pkg/policyendpoints/manager.go b/pkg/policyendpoints/manager.go index eee5b65..fc316fb 100644 --- a/pkg/policyendpoints/manager.go +++ b/pkg/policyendpoints/manager.go @@ -12,7 +12,6 @@ import ( "github.com/go-logr/logr" "github.com/pkg/errors" "github.com/samber/lo" - corev1 "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,9 +32,10 @@ type PolicyEndpointsManager interface { } // NewPolicyEndpointsManager constructs a new policyEndpointsManager -func NewPolicyEndpointsManager(k8sClient client.Client, endpointChunkSize int, logger logr.Logger) *policyEndpointsManager { +func NewPolicyEndpointsManager(ctx context.Context, k8sClient client.Client, endpointChunkSize int, logger logr.Logger) *policyEndpointsManager { endpointsResolver := resolvers.NewEndpointsResolver(k8sClient, logger.WithName("endpoints-resolver")) return &policyEndpointsManager{ + ctx: ctx, k8sClient: k8sClient, endpointsResolver: endpointsResolver, endpointChunkSize: endpointChunkSize, @@ -59,6 +59,7 @@ const ( var _ PolicyEndpointsManager = (*policyEndpointsManager)(nil) type policyEndpointsManager struct { + ctx context.Context k8sClient client.Client endpointsResolver resolvers.EndpointsResolver endpointChunkSize int @@ -88,6 +89,12 @@ func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networki if err := m.k8sClient.Create(ctx, &policyEndpoint); err != nil { return err } + // initialize the PE's conditions + conditions.CreatePEInitCondition(m.ctx, + m.k8sClient, + types.NamespacedName{Name: policyEndpoint.Name, Namespace: policyEndpoint.Namespace}, + m.logger, + ) m.logger.Info("Created policy endpoint", "id", k8s.NamespacedName(&policyEndpoint)) } @@ -107,11 +114,13 @@ func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networki peId, m.logger, policyinfo.Updated, - corev1.ConditionFalse, + metav1.ConditionFalse, reasonPatching, fmt.Sprintf("patching policy endpoint failed: %s", err.Error()), + // keep condition history for error states + true, ); cErr != nil { - m.logger.Error(cErr, "Adding PE patch failure condition updates to PE failed", "PENamespacedName", peId) + m.logger.Error(cErr, "Adding PE patch failure condition updates to PE failed", "PENamespacedName", peId, "RV", policyEndpoint.ResourceVersion) } return err } @@ -122,9 +131,11 @@ func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networki peId, m.logger, policyinfo.Packed, - corev1.ConditionTrue, + metav1.ConditionTrue, reasonBinPacking, - fmt.Sprintf("binpacked network policy endpoint slices on Ingress - %t, Egress - %t, PodSelector - %t", packed&ingBit>>ingressShift == 1, packed&egBit>>egressShift == 1, packed&psBit>>psShift == 1), + fmt.Sprintf("binpacked network policy endpoint slices on Ingress - %t, Egress - %t, PodSelector - %t with RV %s", packed&ingBit>>ingressShift == 1, packed&egBit>>egressShift == 1, packed&psBit>>psShift == 1, policyEndpoint.ResourceVersion), + // don't keep packing states history. if required, this can be changed to true later. + false, ); err != nil { m.logger.Error(err, "Adding bingpacking condition updates to PE failed", "PENamespacedName", peId) } diff --git a/pkg/utils/conditions/conditions.go b/pkg/utils/conditions/conditions.go index f71399f..7add47b 100644 --- a/pkg/utils/conditions/conditions.go +++ b/pkg/utils/conditions/conditions.go @@ -2,34 +2,89 @@ package conditions import ( "context" + "time" policyinfo "github.com/aws/amazon-network-policy-controller-k8s/api/v1alpha1" + "github.com/awslabs/operatorpkg/status" "github.com/go-logr/logr" - corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" ) +const ( + jitterWaitTime = time.Millisecond * 100 +) + +func CreatePEInitCondition(ctx context.Context, k8sClient client.Client, key types.NamespacedName, log logr.Logger) { + // using a goroutine to add the condition with jitter wait. + go func() { + // since adding an init condition immediate after the PE is created + // waiting for a small time before calling + time.Sleep(wait.Jitter(jitterWaitTime, 0.25)) + err := retry.OnError( + wait.Backoff{ + Duration: time.Millisecond * 100, + Factor: 3.0, + Jitter: 0.1, + Steps: 5, + Cap: time.Second * 10, + }, + func(err error) bool { return errors.IsNotFound(err) }, + func() error { + pe := &policyinfo.PolicyEndpoint{} + var err error + if err = k8sClient.Get(ctx, key, pe); err != nil { + log.Error(err, "getting PE for conditions update failed", "PEName", pe.Name, "PENamespace", pe.Namespace) + } else { + copy := pe.DeepCopy() + copy.StatusConditions() + if err = k8sClient.Status().Patch(ctx, copy, client.MergeFrom(pe)); err != nil { + log.Error(err, "creating PE init status failed", "PEName", pe.Name, "PENamespace", pe.Namespace) + } + } + return err + }, + ) + if err != nil { + log.Error(err, "adding PE init condition failed after retries", "PENamespacedName", key) + } else { + log.Info("added PE init condition", "PENamespacedName", key) + } + }() +} + func UpdatePEConditions(ctx context.Context, k8sClient client.Client, key types.NamespacedName, log logr.Logger, cType policyinfo.PolicyEndpointConditionType, - cStatus corev1.ConditionStatus, + cStatus metav1.ConditionStatus, cReason string, - cMsg string) error { + cMsg string, + keepConditions bool) error { pe := &policyinfo.PolicyEndpoint{} var err error if err = k8sClient.Get(ctx, key, pe); err != nil { log.Error(err, "getting PE for conditions update failed", "PEName", pe.Name, "PENamespace", pe.Namespace) } else { copy := pe.DeepCopy() - cond := policyinfo.PolicyEndpointCondition{ - Type: cType, + cond := status.Condition{ + Type: string(cType), Status: cStatus, LastTransitionTime: metav1.Now(), Reason: cReason, Message: cMsg, } - copy.Status.Conditions = append(copy.Status.Conditions, cond) + if keepConditions { + // not overwrite old conditions that have the same type + conds := copy.GetConditions() + conds = append(conds, cond) + copy.SetConditions(conds) + } else { + // overwrite old conditions that have the same type + copy.StatusConditions().Set(cond) + } log.Info("the controller added condition to PE", "PEName", copy.Name, "PENamespace", copy.Namespace, "Conditions", copy.Status.Conditions) if err = k8sClient.Status().Patch(ctx, copy, client.MergeFrom(pe)); err != nil { log.Error(err, "updating PE status failed", "PEName", pe.Name, "PENamespace", pe.Namespace)