From 40125890f35e731f1e78fb6dc979ea31fc4510f9 Mon Sep 17 00:00:00 2001 From: Abhradeep Chakraborty Date: Tue, 3 Dec 2024 13:35:55 +0530 Subject: [PATCH] fix(reconciler): recreate missing resources (#222) * fix(reconciler): recreate missing resources Signed-off-by: Abhradeep Chakraborty * fix Signed-off-by: Abhradeep Chakraborty * trigger controller Signed-off-by: Abhradeep Chakraborty * fix Signed-off-by: Abhradeep Chakraborty * debug Signed-off-by: Abhradeep Chakraborty * fix Signed-off-by: Abhradeep Chakraborty * test Signed-off-by: Abhradeep Chakraborty * remove pvc change Signed-off-by: Abhradeep Chakraborty --------- Signed-off-by: Abhradeep Chakraborty --- e2e/dragonfly_controller_test.go | 30 +++- go.mod | 1 + go.sum | 2 + internal/controller/dragonfly_controller.go | 144 ++++++++++++++------ 4 files changed, 134 insertions(+), 43 deletions(-) diff --git a/e2e/dragonfly_controller_test.go b/e2e/dragonfly_controller_test.go index 239775b..2fc03ed 100644 --- a/e2e/dragonfly_controller_test.go +++ b/e2e/dragonfly_controller_test.go @@ -418,11 +418,13 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() err = k8sClient.Update(ctx, &df) Expect(err).To(BeNil()) + GinkgoLogr.Info("start timestamp", "timestamp", time.Now().UTC()) // Wait until Dragonfly object is marked ready err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute) Expect(err).To(BeNil()) err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute) Expect(err).To(BeNil()) + GinkgoLogr.Info("end timestamp", "timestamp", time.Now().UTC()) // Check for service and statefulset var ss appsv1.StatefulSet @@ -478,10 +480,19 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() // check for affinity Expect(pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution).To(Equal(newAffinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution)) } + // Update df to the latest + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: name, + Namespace: namespace, + }, &df) + Expect(err).To(BeNil()) + GinkgoLogr.Info("df arg propagate phase", "phase", df.Status.Phase, "rolling-update", df.Status.IsRollingUpdate) + }) It("Check for data", func() { stopChan := make(chan struct{}, 1) + defer close(stopChan) rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password, 6395) Expect(err).To(BeNil()) @@ -489,7 +500,6 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() data, err := rc.Get(ctx, "foo").Result() Expect(err).To(BeNil()) Expect(data).To(Equal("bar")) - defer close(stopChan) }) It("Change Service specification to LoadBalancer", func() { @@ -512,11 +522,12 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() Labels: newLabels, } + GinkgoLogr.Info("df phase", "phase", df.Status.Phase, "rolling-update", df.Status.IsRollingUpdate) err = k8sClient.Update(ctx, &df) Expect(err).To(BeNil()) // Wait until Dragonfly object is marked ready - err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute) + err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 1*time.Minute) Expect(err).To(BeNil()) err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute) Expect(err).To(BeNil()) @@ -533,6 +544,19 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() Expect(svc.Labels).To(Equal(newLabels)) }) + It("Should recreate missing statefulset", func() { + var ss appsv1.StatefulSet + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: name, + Namespace: namespace, + }, &ss) + Expect(err).To(BeNil()) + + Expect(k8sClient.Delete(ctx, &ss)).To(BeNil()) + err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute) + Expect(err).To(BeNil()) + }) + It("Cleanup", func() { var df resourcesv1.Dragonfly err := k8sClient.Get(ctx, types.NamespacedName{ @@ -919,6 +943,8 @@ func isDragonflyInphase(ctx context.Context, c client.Client, name, namespace, p return false, nil } + GinkgoLogr.Info("dragonfly phase", "phase", df.Status.Phase, "update", df.Status.IsRollingUpdate) + // Ready means we also want rolling update to be false if phase == controller.PhaseReady { // check for replicas diff --git a/go.mod b/go.mod index 71793d7..e2b04ab 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/onsi/gomega v1.33.1 github.com/pkg/errors v0.9.1 github.com/redis/go-redis/v9 v9.5.3 + github.com/samber/lo v1.47.0 k8s.io/api v0.30.2 k8s.io/apimachinery v0.30.2 k8s.io/client-go v0.30.2 diff --git a/go.sum b/go.sum index 77c6f00..e602756 100644 --- a/go.sum +++ b/go.sum @@ -98,6 +98,8 @@ github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRci github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= +github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/internal/controller/dragonfly_controller.go b/internal/controller/dragonfly_controller.go index 752ea76..9ec7ad4 100644 --- a/internal/controller/dragonfly_controller.go +++ b/internal/controller/dragonfly_controller.go @@ -25,6 +25,7 @@ import ( "github.com/dragonflydb/dragonfly-operator/internal/resources" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" @@ -91,7 +92,49 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Created resources") return ctrl.Result{}, nil - } else if df.Status.IsRollingUpdate { + } + + // Ensure all resources exist before moving forward. + missingResources, err := r.getMissingResources(ctx, &df) + if err != nil { + log.Error(err, "could not get resources") + return ctrl.Result{}, err + } + for _, resource := range missingResources { + // recreate missing resources + if err := r.Create(ctx, resource); err != nil { + log.Error(err, fmt.Sprintf("could not create resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName())) + return ctrl.Result{}, err + } + } + + var statefulSet appsv1.StatefulSet + if err := r.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &statefulSet); err != nil { + log.Error(err, "could not get statefulset") + return ctrl.Result{}, err + } + + // Update all resources even if the df is in rollout state to ensure + // that newer updates don't get blocked by failed update attempts. + log.Info("updating existing resources") + newResources, err := resources.GetDragonflyResources(ctx, &df) + if err != nil { + log.Error(err, "could not get resources") + return ctrl.Result{}, err + } + + // update all resources + for _, resource := range newResources { + if err := r.Update(ctx, resource); err != nil { + log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName())) + return ctrl.Result{}, err + } + } + + log.Info("Updated resources for object") + r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Updated resources") + + if df.Status.IsRollingUpdate { // This is a Rollout log.Info("Rolling out new version") var updatedStatefulset appsv1.StatefulSet @@ -127,6 +170,14 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } } else { log.Info("found pod without label", "pod", pod.Name) + if isFailedToStart(&pod) { + // This is a new pod which is trying to be ready, but couldn't start due to misconfig. + // Delete the pod and create a new one. + if err := r.Delete(ctx, &pod); err != nil { + log.Error(err, "could not delete pod") + return ctrl.Result{RequeueAfter: 5 * time.Second}, err + } + } // retry after they are ready return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } @@ -234,54 +285,65 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } return ctrl.Result{}, nil - } else { + } else if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas { // perform a rollout only if the pod spec has changed - var statefulSet appsv1.StatefulSet - if err := r.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &statefulSet); err != nil { - log.Error(err, "could not get statefulset") - return ctrl.Result{}, err - } - // Check if the pod spec has changed log.Info("Checking if pod spec has changed", "updatedReplicas", statefulSet.Status.UpdatedReplicas, "currentReplicas", statefulSet.Status.Replicas) - if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas { - log.Info("Pod spec has changed, performing a rollout") - r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Starting a rollout") - - // Start rollout and update status - // update status so that we can track progress - df.Status.IsRollingUpdate = true - if err := r.Status().Update(ctx, &df); err != nil { - log.Error(err, "could not update the Dragonfly object") - return ctrl.Result{Requeue: true}, err - } - - r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Performing a rollout") + log.Info("Pod spec has changed, performing a rollout") + r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Starting a rollout") - // requeue so that the rollout is processed - return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + // Start rollout and update status + // update status so that we can track progress + df.Status.IsRollingUpdate = true + if err := r.Status().Update(ctx, &df); err != nil { + log.Error(err, "could not update the Dragonfly object") + return ctrl.Result{Requeue: true}, err } - // Is this a Dragonfly object update? - log.Info("updating existing resources") - newResources, err := resources.GetDragonflyResources(ctx, &df) - if err != nil { - log.Error(err, "could not get resources") - return ctrl.Result{}, err - } + r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Performing a rollout") + } + return ctrl.Result{Requeue: true}, nil +} - // update all resources - for _, resource := range newResources { - if err := r.Update(ctx, resource); err != nil { - log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName())) - return ctrl.Result{}, err - } +func isFailedToStart(pod *corev1.Pod) bool { + for _, containerStatus := range pod.Status.ContainerStatuses { + if (containerStatus.State.Waiting != nil && isFailureReason(containerStatus.State.Waiting.Reason)) || + (containerStatus.State.Terminated != nil && isFailureReason(containerStatus.State.Terminated.Reason)) { + return true } + } + return false +} + +// isFailureReason checks if the given reason indicates a failure. +func isFailureReason(reason string) bool { + return reason == "ErrImagePull" || + reason == "ImagePullBackOff" || + reason == "CrashLoopBackOff" || + reason == "RunContainerError" +} - log.Info("Updated resources for object") - r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Updated resources") - return ctrl.Result{Requeue: true}, nil +func (r *DragonflyReconciler) getMissingResources(ctx context.Context, df *dfv1alpha1.Dragonfly) ([]client.Object, error) { + resources, err := resources.GetDragonflyResources(ctx, df) + if err != nil { + return nil, err + } + missingResources := make([]client.Object, 0) + for _, resource := range resources { + obj := resource.DeepCopyObject().(client.Object) + + err := r.Get(ctx, client.ObjectKey{ + Namespace: df.Namespace, + Name: resource.GetName(), + }, obj) + + if errors.IsNotFound(err) { + missingResources = append(missingResources, resource) + } else if err != nil { + return nil, fmt.Errorf("failed to get resource %s/%s: %w", df.Namespace, resource.GetName(), err) + } } + return missingResources, nil } // SetupWithManager sets up the controller with the Manager. @@ -289,8 +351,8 @@ func (r *DragonflyReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). // Listen only to spec changes For(&dfv1alpha1.Dragonfly{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). - Owns(&appsv1.StatefulSet{}). - Owns(&corev1.Service{}). + Owns(&appsv1.StatefulSet{}, builder.MatchEveryOwner). + Owns(&corev1.Service{}, builder.MatchEveryOwner). Named("Dragonfly"). Complete(r) }