diff --git a/api/leaderworkerset/v1/leaderworkerset_types.go b/api/leaderworkerset/v1/leaderworkerset_types.go index cfbae2c3..180b5c48 100644 --- a/api/leaderworkerset/v1/leaderworkerset_types.go +++ b/api/leaderworkerset/v1/leaderworkerset_types.go @@ -58,10 +58,8 @@ const ( // Worker pods will have an annotation that is the leader pod's name. LeaderPodNameAnnotationKey string = "leaderworkerset.sigs.k8s.io/leader-name" - // SHAed leaderWorkerTemplate value for version tracking. - // This will be applied to all API objects including: - // leaderStatefulset, leaderPods, workerStatefulsets, workerPods. - TemplateRevisionHashKey string = "leaderworkerset.sigs.k8s.io/template-revision-hash" + // Hash to track the controller revision that matches an LWS object + RevisionKey string = "leaderworkerset.sigs.k8s.io/template-revision-hash" // Environment variable added to all containers in the LeaderWorkerSet to // address the leader via the headless service. diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 03b44b66..5167252d 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -66,6 +66,7 @@ rules: - apiGroups: - apps resources: + - controllerrevisions - statefulsets verbs: - create @@ -78,12 +79,14 @@ rules: - apiGroups: - apps resources: + - controllerrevisions/finalizers - statefulsets/finalizers verbs: - update - apiGroups: - apps resources: + - controllerrevisions/status - statefulsets/status verbs: - get diff --git a/pkg/controllers/leaderworkerset_controller.go b/pkg/controllers/leaderworkerset_controller.go index ab7320e2..5b8d410e 100644 --- a/pkg/controllers/leaderworkerset_controller.go +++ b/pkg/controllers/leaderworkerset_controller.go @@ -44,6 +44,7 @@ import ( "sigs.k8s.io/lws/pkg/utils" controllerutils "sigs.k8s.io/lws/pkg/utils/controller" podutils "sigs.k8s.io/lws/pkg/utils/pod" + revisionutils "sigs.k8s.io/lws/pkg/utils/revision" statefulsetutils "sigs.k8s.io/lws/pkg/utils/statefulset" ) @@ -86,6 +87,9 @@ func NewLeaderWorkerSetReconciler(client client.Client, scheme *runtime.Scheme, //+kubebuilder:rbac:groups=apps,resources=statefulsets/finalizers,verbs=update //+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;patch +// +kubebuilder:rbac:groups=apps,resources=controllerrevisions,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=apps,resources=controllerrevisions/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=apps,resources=controllerrevisions/finalizers,verbs=update func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { // Get leaderworkerset object @@ -96,13 +100,42 @@ func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Requ log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws)) ctx = ctrl.LoggerInto(ctx, log) - partition, replicas, err := r.rollingUpdateParameters(ctx, lws) + leaderSts, err := r.getLeaderStatefulSet(ctx, lws) + if err != nil { + log.Error(err, "Fetching leader statefulset") + return ctrl.Result{}, err + } + + // Handles two cases: + // Case 1: Upgrading the LWS controller from a version that doesn't support controller revision + // Case 2: Creating the controller revision for a newly created LWS object + revision, err := r.getOrCreateRevisionIfNonExist(ctx, leaderSts, lws) + if err != nil { + log.Error(err, "Creating controller revision") + return ctrl.Result{}, err + } + + updatedRevision, err := r.getUpdatedRevision(ctx, leaderSts, lws, revision) + if err != nil { + log.Error(err, "Validating if LWS has been updated") + return ctrl.Result{}, err + } + lwsUpdated := updatedRevision != nil + if lwsUpdated { + revision, err = revisionutils.CreateRevision(ctx, r.Client, updatedRevision, lws) + if err != nil { + log.Error(err, "Creating revision for updated LWS") + return ctrl.Result{}, err + } + } + + partition, replicas, err := r.rollingUpdateParameters(ctx, lws, leaderSts, revisionutils.GetRevisionKey(revision), lwsUpdated) if err != nil { log.Error(err, "Rolling partition error") return ctrl.Result{}, err } - if err := r.SSAWithStatefulset(ctx, lws, partition, replicas); err != nil { + if err := r.SSAWithStatefulset(ctx, lws, partition, replicas, revisionutils.GetRevisionKey(revision)); err != nil { return ctrl.Result{}, err } @@ -114,11 +147,16 @@ func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, err } - err = r.updateStatus(ctx, lws) + updateDone, err := r.updateStatus(ctx, lws, revisionutils.GetRevisionKey(revision)) if err != nil { return ctrl.Result{}, err } + if updateDone { + if err := revisionutils.TruncateRevisions(ctx, r.Client, lws, revisionutils.GetRevisionKey(revision)); err != nil { + return ctrl.Result{}, err + } + } log.V(2).Info("Leader Reconcile completed.") return ctrl.Result{}, nil } @@ -190,19 +228,16 @@ func SetupIndexes(indexer client.FieldIndexer) error { // - Otherwise, Replicas is equal to spec.Replicas // - One exception here is when unready replicas of leaderWorkerSet is equal to MaxSurge, // we should reclaim the extra replicas gradually to accommodate for the new replicas. -func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) (int32, int32, error) { +func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, sts *appsv1.StatefulSet, revisionKey string, leaderWorkerSetUpdated bool) (int32, int32, error) { + log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws)) + ctx = ctrl.LoggerInto(ctx, log) lwsReplicas := *lws.Spec.Replicas - sts := &appsv1.StatefulSet{} - err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts) - if err != nil { - // Case 1: - // If sts not created yet, all partitions should be updated, - // replicas should not change. - if apierrors.IsNotFound(err) { - return 0, lwsReplicas, nil - } - return 0, 0, err + // Case 1: + // If sts not created yet, all partitions should be updated, + // replicas should not change. + if sts == nil { + return 0, lwsReplicas, nil } stsReplicas := *sts.Spec.Replicas @@ -229,7 +264,7 @@ func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, // Case 2: // Indicates a new rolling update here. - if templateUpdated(sts, lws) { + if leaderWorkerSetUpdated { // Processing scaling up/down first prior to rolling update. return min(lwsReplicas, stsReplicas), wantReplicas(lwsReplicas), nil } @@ -242,7 +277,7 @@ func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, return 0, lwsReplicas, nil } - continuousReadyReplicas, lwsUnreadyReplicas, err := r.iterateReplicas(ctx, lws, stsReplicas) + continuousReadyReplicas, lwsUnreadyReplicas, err := r.iterateReplicas(ctx, lws, stsReplicas, revisionKey) if err != nil { return 0, 0, err } @@ -274,11 +309,11 @@ func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, return min(partition, utils.NonZeroValue(stsReplicas-int32(rollingStep)-continuousReadyReplicas)), wantReplicas(lwsUnreadyReplicas), nil } -func (r *LeaderWorkerSetReconciler) SSAWithStatefulset(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32) error { +func (r *LeaderWorkerSetReconciler) SSAWithStatefulset(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, revisionKey string) error { log := ctrl.LoggerFrom(ctx) // construct the statefulset apply configuration - leaderStatefulSetApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(lws, partition, replicas) + leaderStatefulSetApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(lws, partition, replicas, revisionKey) if err != nil { log.Error(err, "Constructing StatefulSet apply configuration.") return err @@ -312,7 +347,7 @@ func (r *LeaderWorkerSetReconciler) SSAWithStatefulset(ctx context.Context, lws } // updates the condition of the leaderworkerset to either Progressing or Available. -func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) (bool, error) { +func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (bool, bool, error) { log := ctrl.LoggerFrom(ctx) podSelector := client.MatchingLabels(map[string]string{ leaderworkerset.SetNameLabelKey: lws.Name, @@ -321,19 +356,18 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l leaderPodList := &corev1.PodList{} if err := r.List(ctx, leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil { log.Error(err, "Fetching leaderPods") - return false, err + return false, false, err } updateStatus := false readyCount, updatedCount, updatedNonBurstWorkerCount, currentNonBurstWorkerCount, updatedAndReadyCount := 0, 0, 0, 0, 0 - templateHash := utils.LeaderWorkerTemplateHash(lws) noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1 // Iterate through all leaderPods. for _, pod := range leaderPodList.Items { index, err := strconv.Atoi(pod.Labels[leaderworkerset.GroupIndexLabelKey]) if err != nil { - return false, err + return false, false, err } if index < int(*lws.Spec.Replicas) { currentNonBurstWorkerCount++ @@ -343,7 +377,7 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l if !noWorkerSts { if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: pod.Name}, &sts); err != nil { log.Error(err, "Fetching worker statefulSet") - return false, err + return false, false, err } } @@ -352,7 +386,7 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l ready = true readyCount++ } - if (noWorkerSts || sts.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash) && pod.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash { + if (noWorkerSts || revisionutils.GetRevisionKey(&sts) == revisionKey) && revisionutils.GetRevisionKey(&pod) == revisionKey { updated = true updatedCount++ if index < int(*lws.Spec.Replicas) { @@ -380,6 +414,7 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l } var conditions []metav1.Condition + updateDone := false if updatedNonBurstWorkerCount < currentNonBurstWorkerCount { // upgradeInProgress is true when the upgrade replicas is smaller than the expected // number of total replicas not including the burst replicas @@ -387,6 +422,7 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetUpgradeInProgress)) } else if updatedAndReadyCount == int(*lws.Spec.Replicas) { conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetAvailable)) + updateDone = true } else { conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing)) } @@ -396,11 +432,11 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l if updateCondition { r.Record.Eventf(lws, corev1.EventTypeNormal, conditions[0].Reason, conditions[0].Message+fmt.Sprintf(", with %d groups ready of total %d groups", readyCount, int(*lws.Spec.Replicas))) } - return updateStatus || updateCondition, nil + return updateStatus || updateCondition, updateDone, nil } // Updates status and condition of LeaderWorkerSet and returns whether or not an update actually occurred. -func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) error { +func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (bool, error) { updateStatus := false log := ctrl.LoggerFrom(ctx) @@ -408,7 +444,7 @@ func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leade sts := &appsv1.StatefulSet{} if err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts); err != nil { log.Error(err, "Error retrieving leader StatefulSet") - return err + return false, err } // retrieve the current number of replicas -- the number of leaders @@ -428,7 +464,7 @@ func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leade selector, err := metav1.LabelSelectorAsSelector(labelSelector) if err != nil { log.Error(err, "Converting label selector to selector") - return err + return false, err } lws.Status.HPAPodSelector = selector.String() @@ -436,17 +472,17 @@ func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leade } // check if an update is needed - updateConditions, err := r.updateConditions(ctx, lws) + updateConditions, updateDone, err := r.updateConditions(ctx, lws, revisionKey) if err != nil { - return err + return false, err } if updateStatus || updateConditions { if err := r.Status().Update(ctx, lws); err != nil { log.Error(err, "Updating LeaderWorkerSet status and/or condition.") - return err + return false, err } } - return nil + return updateDone, nil } // iterateReplicas will iterate the leader pods together with corresponding worker statefulsets @@ -454,7 +490,7 @@ func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leade // - The first value represents the number of continuous ready replicas ranging from the last index to 0, // to help us judge whether we can update the Partition or not. // - The second value represents the unready replicas whose index is smaller than leaderWorkerSet Replicas. -func (r *LeaderWorkerSetReconciler) iterateReplicas(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, stsReplicas int32) (int32, int32, error) { +func (r *LeaderWorkerSetReconciler) iterateReplicas(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, stsReplicas int32, revisionKey string) (int32, int32, error) { podSelector := client.MatchingLabels(map[string]string{ leaderworkerset.SetNameLabelKey: lws.Name, leaderworkerset.WorkerIndexLabelKey: "0", @@ -481,7 +517,6 @@ func (r *LeaderWorkerSetReconciler) iterateReplicas(ctx context.Context, lws *le return strconv.Atoi(sts.Labels[leaderworkerset.GroupIndexLabelKey]) }, stsList.Items, int(stsReplicas)) - templateHash := utils.LeaderWorkerTemplateHash(lws) // Once size==1, no worker statefulSets will be created. noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1 processReplica := func(index int32) (ready bool) { @@ -492,8 +527,8 @@ func (r *LeaderWorkerSetReconciler) iterateReplicas(ctx context.Context, lws *le return false } - podTemplateHash := sortedPods[index].Labels[leaderworkerset.TemplateRevisionHashKey] - if !(podTemplateHash == templateHash && podutils.PodRunningAndReady(sortedPods[index])) { + podTemplateHash := revisionutils.GetRevisionKey(&sortedPods[index]) + if !(podTemplateHash == revisionKey && podutils.PodRunningAndReady(sortedPods[index])) { return false } @@ -501,8 +536,8 @@ func (r *LeaderWorkerSetReconciler) iterateReplicas(ctx context.Context, lws *le return true } - stsTemplateHash := sortedSts[index].Labels[leaderworkerset.TemplateRevisionHashKey] - return stsTemplateHash == templateHash && statefulsetutils.StatefulsetReady(sortedSts[index]) + stsTemplateHash := revisionutils.GetRevisionKey(&sortedSts[index]) + return stsTemplateHash == revisionKey && statefulsetutils.StatefulsetReady(sortedSts[index]) } var skip bool @@ -523,8 +558,54 @@ func (r *LeaderWorkerSetReconciler) iterateReplicas(ctx context.Context, lws *le return continuousReadyReplicas, lwsUnreadyReplicas, nil } +func (r *LeaderWorkerSetReconciler) getLeaderStatefulSet(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) (*appsv1.StatefulSet, error) { + sts := &appsv1.StatefulSet{} + err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } + return nil, err + } + return sts, nil +} + +func (r *LeaderWorkerSetReconciler) getOrCreateRevisionIfNonExist(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet) (*appsv1.ControllerRevision, error) { + revisionKey := "" + if sts != nil { + // Uses the hash in the leader sts to avoid detecting update in the case where LWS controller is upgraded from a version where + // the revisionKey was used to detect update instead of controller revision. + revisionKey = revisionutils.GetRevisionKey(sts) + } + if stsRevision, err := revisionutils.GetRevision(ctx, r.Client, lws, revisionKey); sts != nil || err != nil { + return stsRevision, err + } + revision, err := revisionutils.NewRevision(ctx, r.Client, lws, revisionKey) + if err != nil { + return nil, err + } + return revisionutils.CreateRevision(ctx, r.Client, revision, lws) +} + +func (r *LeaderWorkerSetReconciler) getUpdatedRevision(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet, revision *appsv1.ControllerRevision) (*appsv1.ControllerRevision, error) { + if sts == nil { + return nil, nil + } + + currentRevision, err := revisionutils.NewRevision(ctx, r.Client, lws, "") + if err != nil { + return nil, err + } + + if !revisionutils.EqualRevision(currentRevision, revision) { + return currentRevision, nil + } + + return nil, nil +} + // constructLeaderStatefulSetApplyConfiguration constructs the applied configuration for the leader StatefulSet -func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32) (*appsapplyv1.StatefulSetApplyConfiguration, error) { +func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, revisionKey string) (*appsapplyv1.StatefulSetApplyConfiguration, error) { var podTemplateSpec corev1.PodTemplateSpec if lws.Spec.LeaderWorkerTemplate.LeaderTemplate != nil { podTemplateSpec = *lws.Spec.LeaderWorkerTemplate.LeaderTemplate.DeepCopy() @@ -542,11 +623,10 @@ func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWor return nil, err } - templateHash := utils.LeaderWorkerTemplateHash(lws) podTemplateApplyConfiguration.WithLabels(map[string]string{ - leaderworkerset.WorkerIndexLabelKey: "0", - leaderworkerset.SetNameLabelKey: lws.Name, - leaderworkerset.TemplateRevisionHashKey: templateHash, + leaderworkerset.WorkerIndexLabelKey: "0", + leaderworkerset.SetNameLabelKey: lws.Name, + leaderworkerset.RevisionKey: revisionKey, }) podAnnotations := make(map[string]string) podAnnotations[leaderworkerset.SizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.Size)) @@ -581,8 +661,8 @@ func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWor leaderworkerset.WorkerIndexLabelKey: "0", }))). WithLabels(map[string]string{ - leaderworkerset.SetNameLabelKey: lws.Name, - leaderworkerset.TemplateRevisionHashKey: templateHash, + leaderworkerset.SetNameLabelKey: lws.Name, + leaderworkerset.RevisionKey: revisionKey, }). WithAnnotations(map[string]string{ leaderworkerset.ReplicasAnnotationKey: strconv.Itoa(int(*lws.Spec.Replicas)), @@ -674,7 +754,3 @@ func exclusiveConditionTypes(condition1 metav1.Condition, condition2 metav1.Cond return false } - -func templateUpdated(sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet) bool { - return sts.Labels[leaderworkerset.TemplateRevisionHashKey] != utils.LeaderWorkerTemplateHash(lws) -} diff --git a/pkg/controllers/leaderworkerset_controller_test.go b/pkg/controllers/leaderworkerset_controller_test.go index 84082843..47ce07d9 100644 --- a/pkg/controllers/leaderworkerset_controller_test.go +++ b/pkg/controllers/leaderworkerset_controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package controllers import ( + "context" "testing" "github.com/google/go-cmp/cmp" @@ -30,24 +31,40 @@ import ( "k8s.io/utils/ptr" leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1" - "sigs.k8s.io/lws/pkg/utils" + + "sigs.k8s.io/controller-runtime/pkg/client/fake" + revisionutils "sigs.k8s.io/lws/pkg/utils/revision" testutils "sigs.k8s.io/lws/test/testutils" ) func TestLeaderStatefulSetApplyConfig(t *testing.T) { - hash1 := utils.LeaderWorkerTemplateHash(testutils.BuildBasicLeaderWorkerSet("test-sample", "default"). + client := fake.NewClientBuilder().Build() + lws1 := testutils.BuildBasicLeaderWorkerSet("test-sample", "default"). LeaderTemplateSpec(testutils.MakeLeaderPodSpec()). - WorkerTemplateSpec(testutils.MakeWorkerPodSpec()).Obj()) - hash2 := utils.LeaderWorkerTemplateHash(testutils.BuildBasicLeaderWorkerSet("test-sample", "default"). - WorkerTemplateSpec(testutils.MakeWorkerPodSpec()).Obj()) + WorkerTemplateSpec(testutils.MakeWorkerPodSpec()).Obj() + cr1, err := revisionutils.NewRevision(context.TODO(), client, lws1, "") + if err != nil { + t.Fatal(err) + } + revisionKey1 := revisionutils.GetRevisionKey(cr1) + + lws2 := testutils.BuildBasicLeaderWorkerSet("test-sample", "default"). + WorkerTemplateSpec(testutils.MakeWorkerPodSpec()).Obj() + cr2, err := revisionutils.NewRevision(context.TODO(), client, lws2, "") + if err != nil { + t.Fatal(err) + } + revisionKey2 := revisionutils.GetRevisionKey(cr2) tests := []struct { name string + revisionKey string lws *leaderworkerset.LeaderWorkerSet wantApplyConfig *appsapplyv1.StatefulSetApplyConfiguration }{ { - name: "1 replica, size 1, with empty leader template, exclusive placement disabled", + name: "1 replica, size 1, with empty leader template, exclusive placement disabled", + revisionKey: revisionKey2, lws: testutils.BuildBasicLeaderWorkerSet("test-sample", "default"). Replica(1). RolloutStrategy(leaderworkerset.RolloutStrategy{ @@ -69,7 +86,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) { Namespace: ptr.To[string]("default"), Labels: map[string]string{ "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/template-revision-hash": hash2, + "leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2, }, Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/replicas": "1"}, }, @@ -86,7 +103,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) { Labels: map[string]string{ "leaderworkerset.sigs.k8s.io/name": "test-sample", "leaderworkerset.sigs.k8s.io/worker-index": "0", - "leaderworkerset.sigs.k8s.io/template-revision-hash": hash2, + "leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2, }, Annotations: map[string]string{ "leaderworkerset.sigs.k8s.io/size": "1", @@ -112,7 +129,8 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) { }, }, { - name: "1 replica, size 2 , with empty leader template, exclusive placement enabled", + name: "1 replica, size 2 , with empty leader template, exclusive placement enabled", + revisionKey: revisionKey2, lws: testutils.BuildBasicLeaderWorkerSet("test-sample", "default"). Annotation(map[string]string{ "leaderworkerset.sigs.k8s.io/exclusive-topology": "topologyKey", @@ -136,7 +154,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) { Namespace: ptr.To[string]("default"), Labels: map[string]string{ "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/template-revision-hash": hash2, + "leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2, }, Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/replicas": "1"}, }, @@ -153,7 +171,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) { Labels: map[string]string{ "leaderworkerset.sigs.k8s.io/name": "test-sample", "leaderworkerset.sigs.k8s.io/worker-index": "0", - "leaderworkerset.sigs.k8s.io/template-revision-hash": hash2, + "leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2, }, Annotations: map[string]string{ "leaderworkerset.sigs.k8s.io/size": "2", @@ -180,7 +198,8 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) { }, }, { - name: "2 replica, size 2, with leader template, exclusive placement enabled", + name: "2 replica, size 2, with leader template, exclusive placement enabled", + revisionKey: revisionKey1, lws: testutils.BuildBasicLeaderWorkerSet("test-sample", "default").Annotation(map[string]string{ "leaderworkerset.sigs.k8s.io/exclusive-topology": "topologyKey", }).Replica(2). @@ -204,7 +223,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) { Namespace: ptr.To[string]("default"), Labels: map[string]string{ "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/template-revision-hash": hash1, + "leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey1, }, Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/replicas": "2"}, }, @@ -221,7 +240,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) { Labels: map[string]string{ "leaderworkerset.sigs.k8s.io/name": "test-sample", "leaderworkerset.sigs.k8s.io/worker-index": "0", - "leaderworkerset.sigs.k8s.io/template-revision-hash": hash1, + "leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey1, }, Annotations: map[string]string{ "leaderworkerset.sigs.k8s.io/size": "2", @@ -247,7 +266,8 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) { }, }, { - name: "2 maxUnavailable, 1 maxSurge, with empty leader template, exclusive placement disabled", + name: "2 maxUnavailable, 1 maxSurge, with empty leader template, exclusive placement disabled", + revisionKey: revisionKey2, lws: testutils.BuildBasicLeaderWorkerSet("test-sample", "default"). Replica(1). RolloutStrategy(leaderworkerset.RolloutStrategy{ @@ -270,7 +290,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) { Namespace: ptr.To[string]("default"), Labels: map[string]string{ "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/template-revision-hash": hash2, + "leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2, }, Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/replicas": "1"}, }, @@ -287,7 +307,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) { Labels: map[string]string{ "leaderworkerset.sigs.k8s.io/name": "test-sample", "leaderworkerset.sigs.k8s.io/worker-index": "0", - "leaderworkerset.sigs.k8s.io/template-revision-hash": hash2, + "leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2, }, Annotations: map[string]string{ "leaderworkerset.sigs.k8s.io/size": "1", @@ -313,7 +333,8 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) { }, }, { - name: "1 replica, size 2, with leader template, exclusive placement enabled, subgroupsize enabled", + name: "1 replica, size 2, with leader template, exclusive placement enabled, subgroupsize enabled", + revisionKey: revisionKey1, lws: testutils.BuildBasicLeaderWorkerSet("test-sample", "default").Annotation(map[string]string{ leaderworkerset.SubGroupExclusiveKeyAnnotationKey: "topologyKey", }).SubGroupSize(2).Replica(1). @@ -337,7 +358,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) { Namespace: ptr.To[string]("default"), Labels: map[string]string{ "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/template-revision-hash": hash1, + "leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey1, }, Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/replicas": "1"}, }, @@ -354,7 +375,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) { Labels: map[string]string{ "leaderworkerset.sigs.k8s.io/name": "test-sample", "leaderworkerset.sigs.k8s.io/worker-index": "0", - "leaderworkerset.sigs.k8s.io/template-revision-hash": hash1, + "leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey1, }, Annotations: map[string]string{ "leaderworkerset.sigs.k8s.io/size": "2", @@ -383,7 +404,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) { } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - stsApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(tc.lws, 0, *tc.lws.Spec.Replicas) + stsApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(tc.lws, 0, *tc.lws.Spec.Replicas, tc.revisionKey) if err != nil { t.Errorf("failed with error: %s", err.Error()) } diff --git a/pkg/controllers/pod_controller.go b/pkg/controllers/pod_controller.go index 1ec64ede..4dac3714 100644 --- a/pkg/controllers/pod_controller.go +++ b/pkg/controllers/pod_controller.go @@ -41,6 +41,7 @@ import ( acceleratorutils "sigs.k8s.io/lws/pkg/utils/accelerators" controllerutils "sigs.k8s.io/lws/pkg/utils/controller" podutils "sigs.k8s.io/lws/pkg/utils/pod" + revisionutils "sigs.k8s.io/lws/pkg/utils/revision" statefulsetutils "sigs.k8s.io/lws/pkg/utils/statefulset" ) @@ -118,8 +119,12 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R log.V(2).Info("defer the creation of the worker statefulset because leader pod is not ready.") return ctrl.Result{}, nil } - - statefulSet, err := constructWorkerStatefulSetApplyConfiguration(pod, leaderWorkerSet) + revision, err := revisionutils.GetRevision(ctx, r.Client, &leaderWorkerSet, revisionutils.GetRevisionKey(&pod)) + if err != nil { + log.Error(err, "Getting lws revisions") + return ctrl.Result{}, err + } + statefulSet, err := constructWorkerStatefulSetApplyConfiguration(pod, leaderWorkerSet, revision) if err != nil { return ctrl.Result{}, err } @@ -180,6 +185,10 @@ func (r *PodReconciler) handleRestartPolicy(ctx context.Context, pod corev1.Pod, if err := r.Get(ctx, types.NamespacedName{Name: leaderPodName, Namespace: pod.Namespace}, &leader); err != nil { return false, err } + // Different revision key means that this pod will be deleted soon and alternative will be created with the matching key + if revisionutils.GetRevisionKey(&leader) != revisionutils.GetRevisionKey(&pod) { + return false, nil + } } else { leader = pod } @@ -259,8 +268,12 @@ func setControllerReferenceWithStatefulSet(owner metav1.Object, sts *appsapplyv1 } // constructWorkerStatefulSetApplyConfiguration constructs the applied configuration for the leader StatefulSet -func constructWorkerStatefulSetApplyConfiguration(leaderPod corev1.Pod, lws leaderworkerset.LeaderWorkerSet) (*appsapplyv1.StatefulSetApplyConfiguration, error) { - podTemplateSpec := *lws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy() +func constructWorkerStatefulSetApplyConfiguration(leaderPod corev1.Pod, lws leaderworkerset.LeaderWorkerSet, currentRevision *appsv1.ControllerRevision) (*appsapplyv1.StatefulSetApplyConfiguration, error) { + currentLws, err := revisionutils.ApplyRevision(&lws, currentRevision) + if err != nil { + return nil, err + } + podTemplateSpec := *currentLws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy() // construct pod template spec configuration obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&podTemplateSpec) if err != nil { @@ -280,7 +293,7 @@ func constructWorkerStatefulSetApplyConfiguration(leaderPod corev1.Pod, lws lead leaderworkerset.GroupIndexLabelKey: leaderPod.Labels[leaderworkerset.GroupIndexLabelKey], leaderworkerset.SetNameLabelKey: lws.Name, leaderworkerset.GroupUniqueHashLabelKey: leaderPod.Labels[leaderworkerset.GroupUniqueHashLabelKey], - leaderworkerset.TemplateRevisionHashKey: leaderPod.Labels[leaderworkerset.TemplateRevisionHashKey], + leaderworkerset.RevisionKey: revisionutils.GetRevisionKey(&leaderPod), } podTemplateApplyConfiguration.WithLabels(labelMap) diff --git a/pkg/controllers/pod_controller_test.go b/pkg/controllers/pod_controller_test.go index 3fa5c1f5..74238d71 100644 --- a/pkg/controllers/pod_controller_test.go +++ b/pkg/controllers/pod_controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package controllers import ( + "context" "testing" "github.com/google/go-cmp/cmp" @@ -27,28 +28,42 @@ import ( coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1" metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client/fake" leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1" + revisionutils "sigs.k8s.io/lws/pkg/utils/revision" testutils "sigs.k8s.io/lws/test/testutils" ) func TestConstructWorkerStatefulSetApplyConfiguration(t *testing.T) { + client := fake.NewClientBuilder().Build() + + lws := testutils.BuildBasicLeaderWorkerSet("test-sample", "default").Replica(1).WorkerTemplateSpec(testutils.MakeWorkerPodSpec()).Size(1).Obj() + updateRevision, err := revisionutils.NewRevision(context.TODO(), client, lws, "") + if err != nil { + t.Fatal(err) + } + updateRevisionKey := revisionutils.GetRevisionKey(updateRevision) + tests := []struct { name string pod *corev1.Pod lws *leaderworkerset.LeaderWorkerSet wantStatefulSetConfig *appsapplyv1.StatefulSetApplyConfiguration + revision *appsv1.ControllerRevision }{ { - name: "1 replica, size 1, exclusive placement disabled", + name: "1 replica, size 1, exclusive placement disabled", + revision: updateRevision, pod: &corev1.Pod{ ObjectMeta: v1.ObjectMeta{ Name: "test-sample", Namespace: "default", Labels: map[string]string{ - "leaderworkerset.sigs.k8s.io/worker-index": "0", - "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/group-index": "1", - "leaderworkerset.sigs.k8s.io/group-key": "test-key", + leaderworkerset.WorkerIndexLabelKey: "0", + leaderworkerset.SetNameLabelKey: "test-sample", + leaderworkerset.GroupIndexLabelKey: "1", + leaderworkerset.GroupUniqueHashLabelKey: "test-key", + leaderworkerset.RevisionKey: updateRevisionKey, }, }, }, @@ -65,28 +80,28 @@ func TestConstructWorkerStatefulSetApplyConfiguration(t *testing.T) { Name: ptr.To[string]("test-sample"), Namespace: ptr.To[string]("default"), Labels: map[string]string{ - "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/group-index": "1", - "leaderworkerset.sigs.k8s.io/group-key": "test-key", - "leaderworkerset.sigs.k8s.io/template-revision-hash": "", + leaderworkerset.SetNameLabelKey: "test-sample", + leaderworkerset.GroupIndexLabelKey: "1", + leaderworkerset.GroupUniqueHashLabelKey: "test-key", + leaderworkerset.RevisionKey: updateRevisionKey, }, }, Spec: &appsapplyv1.StatefulSetSpecApplyConfiguration{ Replicas: ptr.To[int32](0), Selector: &metaapplyv1.LabelSelectorApplyConfiguration{ MatchLabels: map[string]string{ - "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/group-index": "1", - "leaderworkerset.sigs.k8s.io/group-key": "test-key", + leaderworkerset.SetNameLabelKey: "test-sample", + leaderworkerset.GroupIndexLabelKey: "1", + leaderworkerset.GroupUniqueHashLabelKey: "test-key", }, }, Template: &coreapplyv1.PodTemplateSpecApplyConfiguration{ ObjectMetaApplyConfiguration: &metaapplyv1.ObjectMetaApplyConfiguration{ Labels: map[string]string{ - "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/group-index": "1", - "leaderworkerset.sigs.k8s.io/group-key": "test-key", - "leaderworkerset.sigs.k8s.io/template-revision-hash": "", + leaderworkerset.SetNameLabelKey: "test-sample", + leaderworkerset.GroupIndexLabelKey: "1", + leaderworkerset.GroupUniqueHashLabelKey: "test-key", + leaderworkerset.RevisionKey: updateRevisionKey, }, Annotations: map[string]string{ "leaderworkerset.sigs.k8s.io/size": "1", @@ -111,17 +126,18 @@ func TestConstructWorkerStatefulSetApplyConfiguration(t *testing.T) { }, }, { - name: "1 replica, size 2, exclusive placement enabled", + name: "1 replica, size 2, exclusive placement enabled", + revision: updateRevision, pod: &corev1.Pod{ ObjectMeta: v1.ObjectMeta{ Name: "test-sample", Namespace: "default", Labels: map[string]string{ - "leaderworkerset.sigs.k8s.io/worker-index": "0", - "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/group-index": "1", - "leaderworkerset.sigs.k8s.io/group-key": "test-key", - "leaderworkerset.sigs.k8s.io/template-revision-hash": "", + leaderworkerset.WorkerIndexLabelKey: "0", + leaderworkerset.SetNameLabelKey: "test-sample", + leaderworkerset.GroupIndexLabelKey: "1", + leaderworkerset.GroupUniqueHashLabelKey: "test-key", + leaderworkerset.RevisionKey: updateRevisionKey, }, }, }, @@ -140,28 +156,28 @@ func TestConstructWorkerStatefulSetApplyConfiguration(t *testing.T) { Name: ptr.To[string]("test-sample"), Namespace: ptr.To[string]("default"), Labels: map[string]string{ - "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/group-index": "1", - "leaderworkerset.sigs.k8s.io/group-key": "test-key", - "leaderworkerset.sigs.k8s.io/template-revision-hash": "", + leaderworkerset.SetNameLabelKey: "test-sample", + leaderworkerset.GroupIndexLabelKey: "1", + leaderworkerset.GroupUniqueHashLabelKey: "test-key", + leaderworkerset.RevisionKey: updateRevisionKey, }, }, Spec: &appsapplyv1.StatefulSetSpecApplyConfiguration{ Replicas: ptr.To[int32](1), Selector: &metaapplyv1.LabelSelectorApplyConfiguration{ MatchLabels: map[string]string{ - "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/group-index": "1", - "leaderworkerset.sigs.k8s.io/group-key": "test-key", + leaderworkerset.SetNameLabelKey: "test-sample", + leaderworkerset.GroupIndexLabelKey: "1", + leaderworkerset.GroupUniqueHashLabelKey: "test-key", }, }, Template: &coreapplyv1.PodTemplateSpecApplyConfiguration{ ObjectMetaApplyConfiguration: &metaapplyv1.ObjectMetaApplyConfiguration{ Labels: map[string]string{ - "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/group-index": "1", - "leaderworkerset.sigs.k8s.io/group-key": "test-key", - "leaderworkerset.sigs.k8s.io/template-revision-hash": "", + leaderworkerset.SetNameLabelKey: "test-sample", + leaderworkerset.GroupIndexLabelKey: "1", + leaderworkerset.GroupUniqueHashLabelKey: "test-key", + leaderworkerset.RevisionKey: updateRevisionKey, }, Annotations: map[string]string{ "leaderworkerset.sigs.k8s.io/size": "2", @@ -187,17 +203,18 @@ func TestConstructWorkerStatefulSetApplyConfiguration(t *testing.T) { }, }, { - name: "1 replica, size 2, subgroupsize 2, exclusive placement enabled", + name: "1 replica, size 2, subgroupsize 2, exclusive placement enabled", + revision: updateRevision, pod: &corev1.Pod{ ObjectMeta: v1.ObjectMeta{ Name: "test-sample", Namespace: "default", Labels: map[string]string{ - "leaderworkerset.sigs.k8s.io/worker-index": "0", - "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/group-index": "1", - "leaderworkerset.sigs.k8s.io/group-key": "test-key", - "leaderworkerset.sigs.k8s.io/template-revision-hash": "", + leaderworkerset.WorkerIndexLabelKey: "0", + leaderworkerset.SetNameLabelKey: "test-sample", + leaderworkerset.GroupIndexLabelKey: "1", + leaderworkerset.GroupUniqueHashLabelKey: "test-key", + leaderworkerset.RevisionKey: updateRevisionKey, }, }, }, @@ -216,28 +233,28 @@ func TestConstructWorkerStatefulSetApplyConfiguration(t *testing.T) { Name: ptr.To[string]("test-sample"), Namespace: ptr.To[string]("default"), Labels: map[string]string{ - "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/group-index": "1", - "leaderworkerset.sigs.k8s.io/template-revision-hash": "", - "leaderworkerset.sigs.k8s.io/group-key": "test-key", + leaderworkerset.SetNameLabelKey: "test-sample", + leaderworkerset.GroupIndexLabelKey: "1", + leaderworkerset.RevisionKey: updateRevisionKey, + leaderworkerset.GroupUniqueHashLabelKey: "test-key", }, }, Spec: &appsapplyv1.StatefulSetSpecApplyConfiguration{ Replicas: ptr.To[int32](1), Selector: &metaapplyv1.LabelSelectorApplyConfiguration{ MatchLabels: map[string]string{ - "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/group-index": "1", - "leaderworkerset.sigs.k8s.io/group-key": "test-key", + leaderworkerset.SetNameLabelKey: "test-sample", + leaderworkerset.GroupIndexLabelKey: "1", + leaderworkerset.GroupUniqueHashLabelKey: "test-key", }, }, Template: &coreapplyv1.PodTemplateSpecApplyConfiguration{ ObjectMetaApplyConfiguration: &metaapplyv1.ObjectMetaApplyConfiguration{ Labels: map[string]string{ - "leaderworkerset.sigs.k8s.io/name": "test-sample", - "leaderworkerset.sigs.k8s.io/group-index": "1", - "leaderworkerset.sigs.k8s.io/template-revision-hash": "", - "leaderworkerset.sigs.k8s.io/group-key": "test-key", + leaderworkerset.SetNameLabelKey: "test-sample", + leaderworkerset.GroupIndexLabelKey: "1", + leaderworkerset.RevisionKey: updateRevisionKey, + leaderworkerset.GroupUniqueHashLabelKey: "test-key", }, Annotations: map[string]string{ "leaderworkerset.sigs.k8s.io/size": "2", @@ -267,7 +284,7 @@ func TestConstructWorkerStatefulSetApplyConfiguration(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - statefulSetConfig, err := constructWorkerStatefulSetApplyConfiguration(*tc.pod, *tc.lws) + statefulSetConfig, err := constructWorkerStatefulSetApplyConfiguration(*tc.pod, *tc.lws, tc.revision) if err != nil { t.Errorf("failed with error %s", err.Error()) } diff --git a/pkg/utils/revision/revision_utils.go b/pkg/utils/revision/revision_utils.go new file mode 100644 index 00000000..b9ed627f --- /dev/null +++ b/pkg/utils/revision/revision_utils.go @@ -0,0 +1,299 @@ +package revision + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "hash" + "hash/fnv" + + "github.com/davecgh/go-spew/spew" + appsv1 "k8s.io/api/apps/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1" +) + +// Functions in this package are adapted from https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/statefulset/ and +// https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/history/controller_history.go + +// NewRevision instantiates a new ControllerRevision containing a patch that reapplies the target state of LeaderWorkerSet. +// The Revision of the returned ControllerRevision is set to revision. If the returned error is nil, the returned +// ControllerRevision is valid. LeaderWorkerSet revisions are stored as patches that re-apply the current state of set +// to a new LeaderWorkerSet using a strategic merge patch to replace the saved state of the new LeaderWorkerSet. +func NewRevision(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (*appsv1.ControllerRevision, error) { + var controllerKind = leaderworkerset.GroupVersion.WithKind("LeaderWorkerSet") + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{ + leaderworkerset.SetNameLabelKey: lws.Name, + }}) + if err != nil { + return nil, err + } + revisions, err := ListRevisions(ctx, k8sClient, lws, selector) + highestRevision := getHighestRevision(revisions) + revision := int64(1) + if highestRevision != nil { + revision = highestRevision.Revision + 1 + } + if err != nil { + return nil, err + } + patch, err := getPatch(lws) + if err != nil { + return nil, err + } + + cr := &appsv1.ControllerRevision{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + leaderworkerset.SetNameLabelKey: lws.Name, + }, + OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(lws, controllerKind)}, + Namespace: lws.Namespace, + }, + Data: runtime.RawExtension{Raw: patch}, + Revision: revision, + } + + hash := hashRevision(cr) + if revisionKey == "" { + revisionKey = hash + } + cr.Name = revisionName(lws.Name, hash, revision) + cr.Labels[leaderworkerset.RevisionKey] = revisionKey + return cr, nil +} + +func CreateRevision(ctx context.Context, k8sClient client.Client, revision *appsv1.ControllerRevision, lws *leaderworkerset.LeaderWorkerSet) (*appsv1.ControllerRevision, error) { + if err := k8sClient.Create(ctx, revision); err != nil { + return nil, err + } + return revision, nil +} + +func GetRevisionKey(obj metav1.Object) string { + if obj.GetLabels() != nil { + return obj.GetLabels()[leaderworkerset.RevisionKey] + } + return "" +} + +// GetRevision returns the controllerRevision that matches the revisionKey that is passed. A nil controllerRevision will be returned if the passed revisionKey is nil, +// or if no controllerRevisions match the revisionKey passed. If more than one controllerRevision matches, the latest revision will be passed. +func GetRevision(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (*appsv1.ControllerRevision, error) { + log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws)) + ctx = ctrl.LoggerInto(ctx, log) + if revisionKey == "" { + return nil, nil + } + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{ + leaderworkerset.RevisionKey: revisionKey, + }}) + if err != nil { + return nil, err + } + revisions, err := ListRevisions(ctx, k8sClient, lws, selector) + if err != nil { + log.Error(err, "Listing all controller revisions") + return nil, err + } + + if len(revisions) == 0 { + return nil, nil + } + + if len(revisions) > 1 { + // Since we only create a controllerRevision when the template hash changes, only one should match + log.Error(err, "More than one revision exists for the given template hash; returning the latest revision") + return getHighestRevision(revisions), nil + } + + return revisions[0], nil +} + +// ListRevisions lists all ControllerRevisions matching selector and owned by parent or no other +// controller. If the returned error is nil the returned slice of ControllerRevisions is valid. If the +// returned error is not nil, the returned slice is not valid. +func ListRevisions(ctx context.Context, k8sClient client.Client, parent metav1.Object, selector labels.Selector) ([]*appsv1.ControllerRevision, error) { + // List all revisions in the namespace that match the selector + revisionList := new(appsv1.ControllerRevisionList) + err := k8sClient.List(ctx, revisionList, client.InNamespace(parent.GetNamespace()), client.MatchingLabelsSelector{Selector: selector}) + if err != nil { + return nil, err + } + history := revisionList.Items + var owned []*appsv1.ControllerRevision + for i := range history { + ref := metav1.GetControllerOfNoCopy(&history[i]) + if ref == nil || ref.UID == parent.GetUID() { + owned = append(owned, &history[i]) + } + + } + return owned, err +} + +// ApplyRevision returns a new LeaderWorkerSet constructed by restoring the state in revision to set. If the returned error +// is nil, the returned LeaderWorkerSet is valid. +func ApplyRevision(lws *leaderworkerset.LeaderWorkerSet, revision *appsv1.ControllerRevision) (*leaderworkerset.LeaderWorkerSet, error) { + // clone := lws.DeepCopy() + str := &bytes.Buffer{} + err := unstructured.UnstructuredJSONScheme.Encode(lws, str) + if err != nil { + return nil, err + } + patched, err := strategicpatch.StrategicMergePatch(str.Bytes(), revision.Data.Raw, lws) + if err != nil { + return nil, err + } + restoredLws := &leaderworkerset.LeaderWorkerSet{} + if err = json.Unmarshal(patched, restoredLws); err != nil { + return nil, err + } + return restoredLws, nil +} + +// EqualRevision returns true if lhs and rhs are either both nil, if the revisionKey is the same, +// or if they are semantically equivalent. +func EqualRevision(lhs *appsv1.ControllerRevision, rhs *appsv1.ControllerRevision) bool { + if lhs == nil || rhs == nil { + return lhs == rhs + } + + if GetRevisionKey(lhs) == GetRevisionKey(rhs) { + return true + } + + return bytes.Equal(lhs.Data.Raw, rhs.Data.Raw) && apiequality.Semantic.DeepEqual(lhs.Data.Object, rhs.Data.Object) +} + +// TruncateRevisions cleans up all other controller revisions except the currentRevision. +// currentRevision is the one that matches the revisionKey that is passed +func TruncateRevisions(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) error { + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{ + leaderworkerset.SetNameLabelKey: lws.Name, + }}) + if err != nil { + return err + } + revisions, err := ListRevisions(ctx, k8sClient, lws, selector) + if err != nil { + return err + } + + for i, revision := range revisions { + if GetRevisionKey(revision) != revisionKey { + if err := k8sClient.Delete(ctx, revisions[i]); err != nil { + return err + } + } + } + return nil +} + +// getPatch returns a strategic merge patch that can be applied to restore a LeaderWorkerSet to a +// previous version. If the returned error is nil the patch is valid. The current state that we save is the +// leaderWorkerTemplate and NetworkConfig. We can modify this later to encompass more state (or less) and +// remain compatible with previously recorded patches. +func getPatch(lws *leaderworkerset.LeaderWorkerSet) ([]byte, error) { + str := &bytes.Buffer{} + clone := lws.DeepCopy() + // When upgrading from an LWS version that doesn't contain NetworkConfig, NetworkConfig will be nil + // until another field in the LWS object is changed triggering the LWS webhook. This allows the revision + // to be the same before and after the LWS webhook actually defaults the value. + if clone.Spec.NetworkConfig == nil { + clone.Spec.NetworkConfig = &leaderworkerset.NetworkConfig{} + subdomainPolicy := leaderworkerset.SubdomainShared + clone.Spec.NetworkConfig = &leaderworkerset.NetworkConfig{ + SubdomainPolicy: &subdomainPolicy, + } + } + + if err := unstructured.UnstructuredJSONScheme.Encode(clone, str); err != nil { + return nil, err + } + var raw map[string]interface{} + if err := json.Unmarshal(str.Bytes(), &raw); err != nil { + return nil, err + } + objCopy := make(map[string]interface{}) + specCopy := make(map[string]interface{}) + spec := raw["spec"].(map[string]interface{}) + networkConfig := spec["networkConfig"].(map[string]interface{}) + specCopy["networkConfig"] = networkConfig + template := spec["leaderWorkerTemplate"].(map[string]interface{}) + specCopy["leaderWorkerTemplate"] = template + networkConfig["$patch"] = "replace" + template["$patch"] = "replace" + objCopy["spec"] = specCopy + return json.Marshal(objCopy) +} + +// getHighestRevision finds the next valid revision number based on revisions. If the length of revisions +// is 0 this is 1. Otherwise, it is 1 greater than the largest revision's Revision. It also returns the revision +// with the highest Revision value. +func getHighestRevision(revisions []*appsv1.ControllerRevision) *appsv1.ControllerRevision { + count := len(revisions) + if count <= 0 { + return nil + } + + max := int64(0) + var maxRevision *appsv1.ControllerRevision + for _, revision := range revisions { + if max <= revision.Revision { + max = revision.Revision + maxRevision = revision + } + } + return maxRevision +} + +// revisionName returns the Name for a ControllerRevision in the form prefix-hash-revisionnumber. If the length +// of prefix is greater than 220 bytes, it is truncated to allow for a name that is no larger than 253 bytes. +// revision-number allows us to avoid collisions if the created prefix-hash already exists in the history, since revision +// will be unique. +func revisionName(prefix string, hash string, revisionNumber int64) string { + if len(prefix) > 220 { + prefix = prefix[:220] + } + + return fmt.Sprintf("%s-%s-%v", prefix, hash, revisionNumber) +} + +// hashRevision hashes the contents of revision's Data using FNV hashing. +// The returned hash will be a safe encoded string to avoid bad words. +func hashRevision(revision *appsv1.ControllerRevision) string { + hf := fnv.New32() + if len(revision.Data.Raw) > 0 { + hf.Write(revision.Data.Raw) + } + if revision.Data.Object != nil { + deepHashObject(hf, revision.Data.Object) + } + return rand.SafeEncodeString(fmt.Sprint(hf.Sum32())) +} + +func deepHashObject(hasher hash.Hash, objectToWrite interface{}) { + hasher.Reset() + printer := spew.ConfigState{ + Indent: " ", + SortKeys: true, + DisableMethods: true, + SpewKeys: true, + } + _, err := printer.Fprintf(hasher, "%#v", objectToWrite) + if err != nil { + return + } +} diff --git a/pkg/utils/revision/revision_utils_test.go b/pkg/utils/revision/revision_utils_test.go new file mode 100644 index 00000000..922f1bb8 --- /dev/null +++ b/pkg/utils/revision/revision_utils_test.go @@ -0,0 +1,134 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package revision + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1" +) + +func TestApplyRevision(t *testing.T) { + client := fake.NewClientBuilder().Build() + + lws := BuildLeaderWorkerSet("default") + revision, err := NewRevision(context.TODO(), client, lws, "") + if err != nil { + t.Fatal(err) + } + currentLws := lws.DeepCopy() + + lws.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Containers[0].Name = "update-name" + subdomainPolicy := leaderworkerset.SubdomainUniquePerReplica + lws.Spec.NetworkConfig = &leaderworkerset.NetworkConfig{ + SubdomainPolicy: &subdomainPolicy, + } + lws.Spec.RolloutStrategy = leaderworkerset.RolloutStrategy{ + Type: leaderworkerset.RollingUpdateStrategyType, + RollingUpdateConfiguration: &leaderworkerset.RollingUpdateConfiguration{ + MaxUnavailable: intstr.FromInt32(2), + MaxSurge: intstr.FromInt(1), + }, + } + restoredLws, err := ApplyRevision(lws, revision) + if err != nil { + t.Fatal(err) + } + + restoredRevision, err := NewRevision(context.TODO(), client, restoredLws, "") + if err != nil { + t.Fatal(err) + } + + if !EqualRevision(revision, restoredRevision) { + t.Errorf("expected value %v, got %v", revision, restoredRevision) + } + + if diff := cmp.Diff(currentLws.Spec.LeaderWorkerTemplate, restoredLws.Spec.LeaderWorkerTemplate); diff != "" { + t.Errorf("unexpected restored LeaderWorkerTemplate: %s", diff) + } + + if diff := cmp.Diff(currentLws.Spec.NetworkConfig, restoredLws.Spec.NetworkConfig); diff != "" { + t.Errorf("NetworkConfig should be restored %s", diff) + } + + if diff := cmp.Diff(lws.Spec.RolloutStrategy, restoredLws.Spec.RolloutStrategy); diff != "" { + t.Errorf("It should not restore/clear non NetworkConfig Spec fields %s,", diff) + } +} + +func BuildLeaderWorkerSet(nsName string) *leaderworkerset.LeaderWorkerSet { + lws := leaderworkerset.LeaderWorkerSet{} + lws.Name = "test-sample" + lws.Namespace = nsName + lws.Spec = leaderworkerset.LeaderWorkerSetSpec{} + lws.Spec.Replicas = ptr.To[int32](2) + lws.Spec.LeaderWorkerTemplate = leaderworkerset.LeaderWorkerTemplate{RestartPolicy: leaderworkerset.RecreateGroupOnPodRestart} + lws.Spec.LeaderWorkerTemplate.Size = ptr.To[int32](2) + lws.Spec.LeaderWorkerTemplate.LeaderTemplate = &corev1.PodTemplateSpec{} + lws.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec = MakeLeaderPodSpec() + lws.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec = MakeWorkerPodSpec() + // Manually set this for we didn't enable webhook in controller tests. + lws.Spec.RolloutStrategy = leaderworkerset.RolloutStrategy{ + Type: leaderworkerset.RollingUpdateStrategyType, + RollingUpdateConfiguration: &leaderworkerset.RollingUpdateConfiguration{ + MaxUnavailable: intstr.FromInt32(1), + MaxSurge: intstr.FromInt(0), + }, + } + lws.Spec.StartupPolicy = leaderworkerset.LeaderCreatedStartupPolicy + subdomainPolicy := leaderworkerset.SubdomainShared + lws.Spec.NetworkConfig = &leaderworkerset.NetworkConfig{ + SubdomainPolicy: &subdomainPolicy, + } + + return &lws +} + +func MakeLeaderPodSpec() corev1.PodSpec { + return corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "worker", + Image: "nginx:1.14.2", + }, + }, + } +} + +func MakeWorkerPodSpec() corev1.PodSpec { + return corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "leader", + Image: "nginx:1.14.2", + Ports: []corev1.ContainerPort{ + { + ContainerPort: 8080, + Protocol: "TCP", + }, + }, + }, + }, + } +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 94abe9af..faedd0c2 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -24,8 +24,6 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - - leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1" ) const ( @@ -46,16 +44,6 @@ func NonZeroValue(value int32) int32 { return value } -func LeaderWorkerTemplateHash(lws *leaderworkerset.LeaderWorkerSet) string { - if lws.Spec.NetworkConfig == nil || string(*lws.Spec.NetworkConfig.SubdomainPolicy) == string(leaderworkerset.SubdomainShared) { - return Sha1Hash(lws.Spec.LeaderWorkerTemplate.LeaderTemplate.String() + - lws.Spec.LeaderWorkerTemplate.WorkerTemplate.String()) - } - - return Sha1Hash(lws.Spec.LeaderWorkerTemplate.LeaderTemplate.String() + - lws.Spec.LeaderWorkerTemplate.WorkerTemplate.String() + string(*lws.Spec.NetworkConfig.SubdomainPolicy)) -} - // SortByIndex returns an ascending list, the length of the list is always specified by the parameter. func SortByIndex[T appsv1.StatefulSet | corev1.Pod | int](indexFunc func(T) (int, error), items []T, length int) []T { result := make([]T, length) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index ea7b1170..81416c9a 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -71,7 +71,7 @@ var _ = ginkgo.Describe("leaderWorkerSet e2e tests", func() { gomega.Expect(*lws.Spec.LeaderWorkerTemplate.Size).To(gomega.Equal(int32(5))) gomega.Expect(lws.Spec.LeaderWorkerTemplate.RestartPolicy).To(gomega.Equal(v1.NoneRestartPolicy)) - expectedLabels := []string{v1.SetNameLabelKey, v1.GroupIndexLabelKey, v1.WorkerIndexLabelKey, v1.TemplateRevisionHashKey} + expectedLabels := []string{v1.SetNameLabelKey, v1.GroupIndexLabelKey, v1.WorkerIndexLabelKey, v1.RevisionKey} expectedAnnotations := []string{v1.LeaderPodNameAnnotationKey, v1.SizeAnnotationKey} for _, pod := range pods.Items { @@ -152,7 +152,7 @@ var _ = ginkgo.Describe("leaderWorkerSet e2e tests", func() { lwsPods := &corev1.PodList{} testing.ExpectValidPods(ctx, k8sClient, lws, lwsPods) - expectedLabels := []string{v1.SetNameLabelKey, v1.GroupIndexLabelKey, v1.WorkerIndexLabelKey, v1.TemplateRevisionHashKey, v1.SubGroupIndexLabelKey} + expectedLabels := []string{v1.SetNameLabelKey, v1.GroupIndexLabelKey, v1.WorkerIndexLabelKey, v1.RevisionKey, v1.SubGroupIndexLabelKey} expectedAnnotations := []string{v1.LeaderPodNameAnnotationKey, v1.SizeAnnotationKey, v1.SubGroupSizeAnnotationKey} for _, pod := range lwsPods.Items { diff --git a/test/testutils/util.go b/test/testutils/util.go index c2ec3b41..8c2c0060 100644 --- a/test/testutils/util.go +++ b/test/testutils/util.go @@ -32,8 +32,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1" - "sigs.k8s.io/lws/pkg/utils" acceleratorutils "sigs.k8s.io/lws/pkg/utils/accelerators" + revisionutils "sigs.k8s.io/lws/pkg/utils/revision" ) func MustCreateLws(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet) { @@ -57,6 +57,7 @@ func CreateWorkerPodsForLeaderPod(ctx context.Context, leaderPod corev1.Pod, k8s leaderworkerset.SetNameLabelKey: lws.Name, "worker.pod": "workers", leaderworkerset.WorkerIndexLabelKey: strconv.Itoa(i), + leaderworkerset.RevisionKey: revisionutils.GetRevisionKey(&leaderPod), }, Annotations: map[string]string{ leaderworkerset.SizeAnnotationKey: strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.Size)), @@ -123,6 +124,10 @@ func CreateLeaderPods(ctx context.Context, leaderSts appsv1.StatefulSet, k8sClie } else { podTemplateSpec = *lws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy() } + cr, err := revisionutils.NewRevision(ctx, k8sClient, lws, "") + if err != nil { + return err + } for i := start; i < end; i++ { pod := corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -133,7 +138,7 @@ func CreateLeaderPods(ctx context.Context, leaderSts appsv1.StatefulSet, k8sClie leaderworkerset.WorkerIndexLabelKey: strconv.Itoa(0), leaderworkerset.GroupIndexLabelKey: strconv.Itoa(i), leaderworkerset.GroupUniqueHashLabelKey: "randomValue", - leaderworkerset.TemplateRevisionHashKey: utils.LeaderWorkerTemplateHash(lws), + leaderworkerset.RevisionKey: revisionutils.GetRevisionKey(cr), }, Annotations: map[string]string{ leaderworkerset.SizeAnnotationKey: strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.Size)), @@ -161,11 +166,13 @@ func ExpectValidPods(ctx context.Context, k8sClient client.Client, lws *leaderwo if err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, lws); err != nil { return err } - - hash := utils.LeaderWorkerTemplateHash(lws) + cr, err := revisionutils.NewRevision(ctx, k8sClient, lws, "") + if err != nil { + return err + } labelSelector := client.MatchingLabels(map[string]string{ - leaderworkerset.SetNameLabelKey: lws.Name, - leaderworkerset.TemplateRevisionHashKey: hash, + leaderworkerset.SetNameLabelKey: lws.Name, + leaderworkerset.RevisionKey: revisionutils.GetRevisionKey(cr), }) if err := k8sClient.List(ctx, podList, labelSelector, client.InNamespace(lws.Namespace)); err != nil { @@ -173,7 +180,7 @@ func ExpectValidPods(ctx context.Context, k8sClient client.Client, lws *leaderwo } if len(podList.Items) != int((*lws.Spec.Replicas)*(*lws.Spec.LeaderWorkerTemplate.Size)) { - return errors.New("pod number not right") + return fmt.Errorf("expected %d pods, got %d", (int((*lws.Spec.Replicas) * (*lws.Spec.LeaderWorkerTemplate.Size))), len(podList.Items)) } var leaderTemplateSpec corev1.PodTemplateSpec @@ -194,6 +201,7 @@ func ExpectValidPods(ctx context.Context, k8sClient client.Client, lws *leaderwo return errors.New("container name not right") } } + return nil }, Timeout, Interval).Should(gomega.Succeed()) } @@ -249,9 +257,12 @@ func SetLeaderPodToReady(ctx context.Context, k8sClient client.Client, podName s if err := k8sClient.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: lws.Name}, lws); err != nil { return err } - hash := utils.LeaderWorkerTemplateHash(lws) + cr, err := revisionutils.NewRevision(ctx, k8sClient, lws, "") + if err != nil { + return err + } - leaderPod.Labels[leaderworkerset.TemplateRevisionHashKey] = hash + leaderPod.Labels[leaderworkerset.RevisionKey] = revisionutils.GetRevisionKey(cr) return k8sClient.Update(ctx, &leaderPod) }, Timeout, Interval).Should(gomega.Succeed()) diff --git a/test/testutils/validators.go b/test/testutils/validators.go index a72fa94b..e05c0b21 100644 --- a/test/testutils/validators.go +++ b/test/testutils/validators.go @@ -33,7 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1" - "sigs.k8s.io/lws/pkg/utils" + revisionutils "sigs.k8s.io/lws/pkg/utils/revision" statefulsetutils "sigs.k8s.io/lws/pkg/utils/statefulset" ) @@ -150,9 +150,13 @@ func ExpectValidLeaderStatefulSet(ctx context.Context, k8sClient client.Client, if sts.Spec.Template.Labels[leaderworkerset.SetNameLabelKey] == "" { return fmt.Errorf("leader statefulset pod template misses leaderworkerset label") } - hash := utils.LeaderWorkerTemplateHash(&lws) - if sts.Labels[leaderworkerset.TemplateRevisionHashKey] != hash { - return fmt.Errorf("mismatch template revision hash for leader statefulset, got: %s, want: %s", sts.Spec.Template.Labels[leaderworkerset.TemplateRevisionHashKey], hash) + cr, err := revisionutils.NewRevision(ctx, k8sClient, &lws, "") + if err != nil { + return err + } + hash := revisionutils.GetRevisionKey(cr) + if revisionutils.GetRevisionKey(&sts) != hash { + return fmt.Errorf("mismatch template revision hash for leader statefulset, got: %s, want: %s", revisionutils.GetRevisionKey(&sts), hash) } if sts.Spec.ServiceName != lws.Name { return errors.New("leader StatefulSet service name should match leaderWorkerSet name") @@ -180,9 +184,9 @@ func ExpectValidLeaderStatefulSet(ctx context.Context, k8sClient client.Client, } // check pod template has correct label if diff := cmp.Diff(sts.Spec.Template.Labels, map[string]string{ - leaderworkerset.SetNameLabelKey: lws.Name, - leaderworkerset.WorkerIndexLabelKey: "0", - leaderworkerset.TemplateRevisionHashKey: utils.LeaderWorkerTemplateHash(&lws), + leaderworkerset.SetNameLabelKey: lws.Name, + leaderworkerset.WorkerIndexLabelKey: "0", + leaderworkerset.RevisionKey: hash, }); diff != "" { return errors.New("leader StatefulSet pod template doesn't have the correct labels: " + diff) } @@ -271,9 +275,13 @@ func ExpectValidWorkerStatefulSets(ctx context.Context, leaderWorkerSet *leaderw if lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey] != sts.Spec.Template.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey] { return fmt.Errorf("mismatch exclusive placement annotation between worker statefulset and leaderworkerset") } - hash := utils.LeaderWorkerTemplateHash(&lws) - if sts.Labels[leaderworkerset.TemplateRevisionHashKey] != hash { - return fmt.Errorf("mismatch template revision hash for worker statefulset, got: %s, want: %s", sts.Labels[leaderworkerset.TemplateRevisionHashKey], hash) + cr, err := revisionutils.NewRevision(ctx, k8sClient, &lws, "") + if err != nil { + return err + } + hash := revisionutils.GetRevisionKey(cr) + if revisionutils.GetRevisionKey(&sts) != hash { + return fmt.Errorf("mismatch template revision hash for worker statefulset, got: %s, want: %s", revisionutils.GetRevisionKey(&sts), hash) } if *sts.Spec.Replicas != *lws.Spec.LeaderWorkerTemplate.Size-1 { return errors.New("worker StatefulSet replicas should match leaderWorkerSet replicas") diff --git a/test/testutils/wrappers.go b/test/testutils/wrappers.go index c23e5c95..f10852d7 100644 --- a/test/testutils/wrappers.go +++ b/test/testutils/wrappers.go @@ -154,6 +154,7 @@ func BuildLeaderWorkerSet(nsName string) *LeaderWorkerSetWrapper { lws.Spec.NetworkConfig = &leaderworkerset.NetworkConfig{ SubdomainPolicy: &subdomainPolicy, } + return &LeaderWorkerSetWrapper{ lws, }