From aa322c7b557f742743038a81de01c4629a7ebe9b Mon Sep 17 00:00:00 2001 From: Jack Date: Tue, 26 Mar 2019 23:22:46 +0100 Subject: [PATCH] add ActiveDeadlineSeconds and BackoffLimit features (#963) * add ActiveDeadlineSeconds and BackoffLimit features * fix goimport and unassign variable in test * fix test and delete the package added by dep ensure * fix goimports * fix ActiveDeadlineSeconds unit test * add logger for test * add logger for test * add logger for test * add BackoffForOnFailure test * fix test * fix test * fix unit test --- pkg/apis/tensorflow/v1beta2/types.go | 11 + .../v1beta2/zz_generated.deepcopy.go | 10 + pkg/common/util/v1beta2/testutil/pod.go | 7 +- pkg/common/util/v1beta2/testutil/tfjob.go | 32 ++ .../tensorflow/controller.go | 129 +++++++- .../tensorflow/controller_test.go | 4 +- pkg/controller.v1beta2/tensorflow/job.go | 45 +++ pkg/controller.v1beta2/tensorflow/job_test.go | 276 +++++++++++++++++- pkg/controller.v1beta2/tensorflow/status.go | 22 +- pkg/util/k8sutil/k8sutil.go | 32 ++ 10 files changed, 545 insertions(+), 23 deletions(-) diff --git a/pkg/apis/tensorflow/v1beta2/types.go b/pkg/apis/tensorflow/v1beta2/types.go index 60bda2c930..be0d2dc214 100644 --- a/pkg/apis/tensorflow/v1beta2/types.go +++ b/pkg/apis/tensorflow/v1beta2/types.go @@ -42,6 +42,17 @@ type TFJob struct { // TFJobSpec is a desired state description of the TFJob. type TFJobSpec struct { + // Specifies the duration in seconds relative to the startTime that the job may be active + // before the system tries to terminate it; value must be positive integer. + // This method applies only to pods with restartPolicy == OnFailure or Always. + // +optional + ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` + + // Optional number of retries before marking this job failed. + // Defaults to 6 + // +optional + BackoffLimit *int32 `json:"backoffLimit,omitempty"` + // CleanPodPolicy defines the policy to kill pods after TFJob is // succeeded. // Default to Running. diff --git a/pkg/apis/tensorflow/v1beta2/zz_generated.deepcopy.go b/pkg/apis/tensorflow/v1beta2/zz_generated.deepcopy.go index d8b4a81e4d..05c8c3c840 100644 --- a/pkg/apis/tensorflow/v1beta2/zz_generated.deepcopy.go +++ b/pkg/apis/tensorflow/v1beta2/zz_generated.deepcopy.go @@ -87,6 +87,16 @@ func (in *TFJobList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TFJobSpec) DeepCopyInto(out *TFJobSpec) { *out = *in + if in.ActiveDeadlineSeconds != nil { + in, out := &in.ActiveDeadlineSeconds, &out.ActiveDeadlineSeconds + *out = new(int64) + **out = **in + } + if in.BackoffLimit != nil { + in, out := &in.BackoffLimit, &out.BackoffLimit + *out = new(int32) + **out = **in + } if in.CleanPodPolicy != nil { in, out := &in.CleanPodPolicy, &out.CleanPodPolicy *out = new(commonv1beta2.CleanPodPolicy) diff --git a/pkg/common/util/v1beta2/testutil/pod.go b/pkg/common/util/v1beta2/testutil/pod.go index e2f58f9a6c..233b186567 100644 --- a/pkg/common/util/v1beta2/testutil/pod.go +++ b/pkg/common/util/v1beta2/testutil/pod.go @@ -64,7 +64,7 @@ func NewPodList(count int32, status v1.PodPhase, tfJob *tfv1beta2.TFJob, typ str return pods } -func SetPodsStatuses(podIndexer cache.Indexer, tfJob *tfv1beta2.TFJob, typ string, pendingPods, activePods, succeededPods, failedPods int32, t *testing.T) { +func SetPodsStatuses(podIndexer cache.Indexer, tfJob *tfv1beta2.TFJob, typ string, pendingPods, activePods, succeededPods, failedPods int32, restartCounts []int32, t *testing.T) { var index int32 for _, pod := range NewPodList(pendingPods, v1.PodPending, tfJob, typ, index, t) { if err := podIndexer.Add(pod); err != nil { @@ -72,7 +72,10 @@ func SetPodsStatuses(podIndexer cache.Indexer, tfJob *tfv1beta2.TFJob, typ strin } } index += pendingPods - for _, pod := range NewPodList(activePods, v1.PodRunning, tfJob, typ, index, t) { + for i, pod := range NewPodList(activePods, v1.PodRunning, tfJob, typ, index, t) { + if restartCounts != nil { + pod.Status.ContainerStatuses = []v1.ContainerStatus{{RestartCount: restartCounts[i]}} + } if err := podIndexer.Add(pod); err != nil { t.Errorf("%s: unexpected error when adding pod %v", tfJob.Name, err) } diff --git a/pkg/common/util/v1beta2/testutil/tfjob.go b/pkg/common/util/v1beta2/testutil/tfjob.go index 2659e7cbe3..83f3419622 100644 --- a/pkg/common/util/v1beta2/testutil/tfjob.go +++ b/pkg/common/util/v1beta2/testutil/tfjob.go @@ -50,6 +50,38 @@ func NewTFJobWithCleanupJobDelay(chief, worker, ps int, ttl *int32) *tfv1beta2.T return tfJob } +func NewTFJobWithActiveDeadlineSeconds(chief, worker, ps int, ads *int64) *tfv1beta2.TFJob { + if chief == 1 { + tfJob := NewTFJobWithChief(worker, ps) + tfJob.Spec.ActiveDeadlineSeconds = ads + policy := common.CleanPodPolicyAll + tfJob.Spec.CleanPodPolicy = &policy + return tfJob + } + tfJob := NewTFJob(worker, ps) + tfJob.Spec.ActiveDeadlineSeconds = ads + policy := common.CleanPodPolicyAll + tfJob.Spec.CleanPodPolicy = &policy + return tfJob +} + +func NewTFJobWithBackoffLimit(chief, worker, ps int, backoffLimit *int32) *tfv1beta2.TFJob { + if chief == 1 { + tfJob := NewTFJobWithChief(worker, ps) + tfJob.Spec.BackoffLimit = backoffLimit + tfJob.Spec.TFReplicaSpecs["Worker"].RestartPolicy = "OnFailure" + policy := common.CleanPodPolicyAll + tfJob.Spec.CleanPodPolicy = &policy + return tfJob + } + tfJob := NewTFJob(worker, ps) + tfJob.Spec.BackoffLimit = backoffLimit + tfJob.Spec.TFReplicaSpecs["Worker"].RestartPolicy = "OnFailure" + policy := common.CleanPodPolicyAll + tfJob.Spec.CleanPodPolicy = &policy + return tfJob +} + func NewTFJobWithChief(worker, ps int) *tfv1beta2.TFJob { tfJob := NewTFJob(worker, ps) tfJob.Spec.TFReplicaSpecs[tfv1beta2.TFReplicaTypeChief] = &common.ReplicaSpec{ diff --git a/pkg/controller.v1beta2/tensorflow/controller.go b/pkg/controller.v1beta2/tensorflow/controller.go index 715c006e34..02d2cc86a4 100644 --- a/pkg/controller.v1beta2/tensorflow/controller.go +++ b/pkg/controller.v1beta2/tensorflow/controller.go @@ -17,6 +17,7 @@ package tensorflow import ( "fmt" + "strings" "time" kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned" @@ -31,6 +32,7 @@ import ( "k8s.io/client-go/tools/cache" "github.com/kubeflow/tf-operator/cmd/tf-operator.v1beta2/app/options" + common "github.com/kubeflow/tf-operator/pkg/apis/common/v1beta2" tfv1beta2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1beta2" tfjobclientset "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned" tfjobscheme "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned/scheme" @@ -39,6 +41,7 @@ import ( tfjoblisters "github.com/kubeflow/tf-operator/pkg/client/listers/tensorflow/v1beta2" "github.com/kubeflow/tf-operator/pkg/common/jobcontroller" tflogger "github.com/kubeflow/tf-operator/pkg/logger" + "github.com/kubeflow/tf-operator/pkg/util/k8sutil" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -325,17 +328,14 @@ func (tc *TFController) syncTFJob(key string) (bool, error) { return true, err } -func getTotalReplicas(tfjob *tfv1beta2.TFJob) int32 { - tfjobReplicas := int32(0) - for _, r := range tfjob.Spec.TFReplicaSpecs { - tfjobReplicas += *r.Replicas - } - return tfjobReplicas -} - // reconcileTFJobs checks and updates replicas for each given TFReplicaSpec. // It will requeue the tfjob in case of an error while creating/deleting pods/services. func (tc *TFController) reconcileTFJobs(tfjob *tfv1beta2.TFJob) error { + tfjobKey, err := KeyFunc(tfjob) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for tfjob object %#v: %v", tfjob, err)) + return err + } logger := tflogger.LoggerForJob(tfjob) logger.Infof("Reconcile TFJobs %s", tfjob.Name) @@ -353,8 +353,46 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1beta2.TFJob) error { return err } + // retrieve the previous number of retry + previousRetry := tc.WorkQueue.NumRequeues(tfjobKey) + + activePods := k8sutil.FilterActivePods(pods) + active := int32(len(activePods)) + _, failed := getSucceededAndFailedCount(pods) + totalReplicas := getTotalReplicas(tfjob) + prevReplicasFailedNum := getTotalFailedReplicas(tfjob) + + tfJobExceedsLimit := false + var failureMessage string + var exceedsBackoffLimit bool = false + var pastBackoffLimit bool = false + + if tfjob.Spec.BackoffLimit != nil { + jobHasNewFailure := failed > prevReplicasFailedNum + // new failures happen when status does not reflect the failures and active + // is different than parallelism, otherwise the previous controller loop + // failed updating status so even if we pick up failure it is not a new one + exceedsBackoffLimit = jobHasNewFailure && (active != totalReplicas) && + (int32(previousRetry)+1 > *tfjob.Spec.BackoffLimit) + + pastBackoffLimit, err = tc.pastBackoffLimit(tfjob, pods) + if err != nil { + return err + } + } + + if exceedsBackoffLimit || pastBackoffLimit { + // check if the number of pod restart exceeds backoff (for restart OnFailure only) + // OR if the number of failed jobs increased since the last syncJob + tfJobExceedsLimit = true + failureMessage = fmt.Sprintf("TFJob %s has failed because it has reached the specified backoff limit", tfjob.Name) + } else if tc.pastActiveDeadline(tfjob) { + failureMessage = fmt.Sprintf("TFJob %s has failed because it was active longer than specified deadline", tfjob.Name) + tfJobExceedsLimit = true + } + // If the TFJob is terminated, delete all pods and services. - if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) { + if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) || tfJobExceedsLimit { if err := tc.deletePodsAndServices(tfjob, pods); err != nil { return err } @@ -374,6 +412,19 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1beta2.TFJob) error { } } + if tfJobExceedsLimit { + tc.Recorder.Event(tfjob, v1.EventTypeNormal, tfJobFailedReason, failureMessage) + if tfjob.Status.CompletionTime == nil { + now := metav1.Now() + tfjob.Status.CompletionTime = &now + } + err := updateTFJobConditions(tfjob, common.JobFailed, tfJobFailedReason, failureMessage) + if err != nil { + tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err) + return err + } + } + // At this point the pods may have been deleted, so if the job succeeded, we need to manually set the replica status. // If any replicas are still Active, set their status to succeeded. if isSucceeded(tfjob.Status) { @@ -432,6 +483,66 @@ func (tc *TFController) satisfiedExpectations(tfjob *tfv1beta2.TFJob) bool { return satisfied } +// pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit +// this method applies only to pods with restartPolicy == OnFailure or Always +func (tc *TFController) pastBackoffLimit(tfjob *tfv1beta2.TFJob, pods []*v1.Pod) (bool, error) { + if tfjob.Spec.BackoffLimit == nil { + return false, nil + } + logger := tflogger.LoggerForJob(tfjob) + result := int32(0) + for rtype, spec := range tfjob.Spec.TFReplicaSpecs { + if spec.RestartPolicy != common.RestartPolicyOnFailure && spec.RestartPolicy != common.RestartPolicyAlways { + logger.Warnf("The restart policy of replica %v of the job %v is not OnFailure or Always. Not counted in backoff limit.", rtype, tfjob.Name) + continue + } + // Convert TFReplicaType to lower string. + rt := strings.ToLower(string(rtype)) + pods, err := tc.FilterPodsForReplicaType(pods, rt) + if err != nil { + return false, err + } + for i := range pods { + po := pods[i] + if po.Status.Phase != v1.PodRunning { + continue + } + for j := range po.Status.InitContainerStatuses { + stat := po.Status.InitContainerStatuses[j] + result += stat.RestartCount + } + for j := range po.Status.ContainerStatuses { + stat := po.Status.ContainerStatuses[j] + result += stat.RestartCount + } + } + } + + if *tfjob.Spec.BackoffLimit == 0 { + return result > 0, nil + } + return result >= *tfjob.Spec.BackoffLimit, nil +} + +// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded. +func (tc *TFController) pastActiveDeadline(tfjob *tfv1beta2.TFJob) bool { + if tfjob.Spec.ActiveDeadlineSeconds == nil || tfjob.Status.StartTime == nil { + return false + } + now := metav1.Now() + start := tfjob.Status.StartTime.Time + duration := now.Time.Sub(start) + allowedDuration := time.Duration(*tfjob.Spec.ActiveDeadlineSeconds) * time.Second + return duration >= allowedDuration +} + +// getSucceededAndFailedCount returns no of succeeded and failed pods running a job +func getSucceededAndFailedCount(pods []*v1.Pod) (succeeded, failed int32) { + succeeded = int32(k8sutil.FilterPods(pods, v1.PodSucceeded)) + failed = int32(k8sutil.FilterPods(pods, v1.PodFailed)) + return +} + func (tc *TFController) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) { return tc.getTFJobFromName(namespace, name) } diff --git a/pkg/controller.v1beta2/tensorflow/controller_test.go b/pkg/controller.v1beta2/tensorflow/controller_test.go index d59344ee5d..07ba3ae5dc 100644 --- a/pkg/controller.v1beta2/tensorflow/controller_test.go +++ b/pkg/controller.v1beta2/tensorflow/controller_test.go @@ -259,8 +259,8 @@ func TestNormalPath(t *testing.T) { } podIndexer := kubeInformerFactory.Core().V1().Pods().Informer().GetIndexer() - testutil.SetPodsStatuses(podIndexer, tfJob, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, t) - testutil.SetPodsStatuses(podIndexer, tfJob, testutil.LabelPS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, t) + testutil.SetPodsStatuses(podIndexer, tfJob, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, nil, t) + testutil.SetPodsStatuses(podIndexer, tfJob, testutil.LabelPS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, nil, t) serviceIndexer := kubeInformerFactory.Core().V1().Services().Informer().GetIndexer() testutil.SetServices(serviceIndexer, tfJob, testutil.LabelWorker, tc.activeWorkerServices, t) diff --git a/pkg/controller.v1beta2/tensorflow/job.go b/pkg/controller.v1beta2/tensorflow/job.go index 68200748f0..9b7a0082c4 100644 --- a/pkg/controller.v1beta2/tensorflow/job.go +++ b/pkg/controller.v1beta2/tensorflow/job.go @@ -106,8 +106,37 @@ func (tc *TFController) updateTFJob(old, cur interface{}) { if err != nil { return } + curTFJob, err := tfJobFromUnstructured(cur) + if err != nil { + return + } + + // never return error + key, err := KeyFunc(curTFJob) + if err != nil { + return + } + log.Infof("Updating tfjob: %s", oldTFJob.Name) tc.enqueueTFJob(cur) + + // check if need to add a new rsync for ActiveDeadlineSeconds + if curTFJob.Status.StartTime != nil { + curTFJobADS := curTFJob.Spec.ActiveDeadlineSeconds + if curTFJobADS == nil { + return + } + oldTFJobADS := oldTFJob.Spec.ActiveDeadlineSeconds + if oldTFJobADS == nil || *oldTFJobADS != *curTFJobADS { + now := metav1.Now() + start := curTFJob.Status.StartTime.Time + passed := now.Time.Sub(start) + total := time.Duration(*curTFJobADS) * time.Second + // AddAfter will handle total < passed + tc.WorkQueue.AddAfter(key, total-passed) + log.Infof("job ActiveDeadlineSeconds updated, will rsync after %d seconds", total-passed) + } + } } func (tc *TFController) deletePodsAndServices(tfJob *tfv1beta2.TFJob, pods []*v1.Pod) error { @@ -164,3 +193,19 @@ func (tc *TFController) cleanupTFJob(tfJob *tfv1beta2.TFJob) error { func (tc *TFController) deleteTFJob(tfJob *tfv1beta2.TFJob) error { return tc.tfJobClientSet.KubeflowV1beta2().TFJobs(tfJob.Namespace).Delete(tfJob.Name, &metav1.DeleteOptions{}) } + +func getTotalReplicas(tfjob *tfv1beta2.TFJob) int32 { + tfjobReplicas := int32(0) + for _, r := range tfjob.Spec.TFReplicaSpecs { + tfjobReplicas += *r.Replicas + } + return tfjobReplicas +} + +func getTotalFailedReplicas(tfjob *tfv1beta2.TFJob) int32 { + totalFailedReplicas := int32(0) + for rtype := range tfjob.Status.ReplicaStatuses { + totalFailedReplicas += tfjob.Status.ReplicaStatuses[rtype].Failed + } + return totalFailedReplicas +} diff --git a/pkg/controller.v1beta2/tensorflow/job_test.go b/pkg/controller.v1beta2/tensorflow/job_test.go index 90493d0e61..06e8f3e8d2 100644 --- a/pkg/controller.v1beta2/tensorflow/job_test.go +++ b/pkg/controller.v1beta2/tensorflow/job_test.go @@ -19,7 +19,8 @@ import ( "time" kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubeclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" @@ -347,8 +348,8 @@ func TestDeletePodsAndServices(t *testing.T) { } podIndexer := kubeInformerFactory.Core().V1().Pods().Informer().GetIndexer() - testutil.SetPodsStatuses(podIndexer, tc.tfJob, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, t) - testutil.SetPodsStatuses(podIndexer, tc.tfJob, testutil.LabelPS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, t) + testutil.SetPodsStatuses(podIndexer, tc.tfJob, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, nil, t) + testutil.SetPodsStatuses(podIndexer, tc.tfJob, testutil.LabelPS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, nil, t) serviceIndexer := kubeInformerFactory.Core().V1().Services().Informer().GetIndexer() testutil.SetServices(serviceIndexer, tc.tfJob, testutil.LabelWorker, tc.activeWorkerServices, t) @@ -518,8 +519,8 @@ func TestCleanupTFJob(t *testing.T) { } podIndexer := kubeInformerFactory.Core().V1().Pods().Informer().GetIndexer() - testutil.SetPodsStatuses(podIndexer, tc.tfJob, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, t) - testutil.SetPodsStatuses(podIndexer, tc.tfJob, testutil.LabelPS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, t) + testutil.SetPodsStatuses(podIndexer, tc.tfJob, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, nil, t) + testutil.SetPodsStatuses(podIndexer, tc.tfJob, testutil.LabelPS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, nil, t) serviceIndexer := kubeInformerFactory.Core().V1().Services().Informer().GetIndexer() testutil.SetServices(serviceIndexer, tc.tfJob, testutil.LabelWorker, tc.activeWorkerServices, t) @@ -544,3 +545,268 @@ func TestCleanupTFJob(t *testing.T) { } } } + +func TestActiveDeadlineSeconds(t *testing.T) { + type testCase struct { + description string + tfJob *tfv1beta2.TFJob + + pendingWorkerPods int32 + activeWorkerPods int32 + succeededWorkerPods int32 + failedWorkerPods int32 + + pendingPSPods int32 + activePSPods int32 + succeededPSPods int32 + failedPSPods int32 + + activeWorkerServices int32 + activePSServices int32 + + expectedPodDeletions int + } + + ads2 := int64(2) + adsTest2 := &ads2 + testCases := []testCase{ + testCase{ + description: "4 workers and 2 ps is running, ActiveDeadlineSeconds unset", + tfJob: testutil.NewTFJobWithActiveDeadlineSeconds(0, 4, 2, nil), + + pendingWorkerPods: 0, + activeWorkerPods: 4, + succeededWorkerPods: 0, + failedWorkerPods: 0, + + pendingPSPods: 0, + activePSPods: 2, + succeededPSPods: 0, + failedPSPods: 0, + + activeWorkerServices: 4, + activePSServices: 2, + + expectedPodDeletions: 0, + }, + testCase{ + description: "4 workers and 2 ps is running, ActiveDeadlineSeconds is 2", + tfJob: testutil.NewTFJobWithActiveDeadlineSeconds(0, 4, 2, adsTest2), + + pendingWorkerPods: 0, + activeWorkerPods: 4, + succeededWorkerPods: 0, + failedWorkerPods: 0, + + pendingPSPods: 0, + activePSPods: 2, + succeededPSPods: 0, + failedPSPods: 0, + + activeWorkerServices: 4, + activePSServices: 2, + + expectedPodDeletions: 6, + }, + } + for _, tc := range testCases { + // Prepare the clientset and controller for the test. + kubeClientSet := kubeclientset.NewForConfigOrDie(&rest.Config{ + Host: "", + ContentConfig: rest.ContentConfig{ + GroupVersion: &v1.SchemeGroupVersion, + }, + }, + ) + + // Prepare the kube-batch clientset and controller for the test. + kubeBatchClientSet := kubebatchclient.NewForConfigOrDie(&rest.Config{ + Host: "", + ContentConfig: rest.ContentConfig{ + GroupVersion: &v1.SchemeGroupVersion, + }, + }, + ) + + config := &rest.Config{ + Host: "", + ContentConfig: rest.ContentConfig{ + GroupVersion: &tfv1beta2.SchemeGroupVersion, + }, + } + tfJobClientSet := tfjobclientset.NewForConfigOrDie(config) + ctr, kubeInformerFactory, _ := newTFController(config, kubeClientSet, kubeBatchClientSet, tfJobClientSet, controller.NoResyncPeriodFunc, options.ServerOption{}) + fakePodControl := &controller.FakePodControl{} + ctr.PodControl = fakePodControl + fakeServiceControl := &control.FakeServiceControl{} + ctr.ServiceControl = fakeServiceControl + ctr.Recorder = &record.FakeRecorder{} + ctr.tfJobInformerSynced = testutil.AlwaysReady + ctr.PodInformerSynced = testutil.AlwaysReady + ctr.ServiceInformerSynced = testutil.AlwaysReady + tfJobIndexer := ctr.tfJobInformer.GetIndexer() + ctr.updateStatusHandler = func(tfJob *tfv1beta2.TFJob) error { + return nil + } + + unstructured, err := testutil.ConvertTFJobToUnstructured(tc.tfJob) + if err != nil { + t.Errorf("Failed to convert the TFJob to Unstructured: %v", err) + } + + if err := tfJobIndexer.Add(unstructured); err != nil { + t.Errorf("Failed to add tfjob to tfJobIndexer: %v", err) + } + + podIndexer := kubeInformerFactory.Core().V1().Pods().Informer().GetIndexer() + testutil.SetPodsStatuses(podIndexer, tc.tfJob, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, nil, t) + testutil.SetPodsStatuses(podIndexer, tc.tfJob, testutil.LabelPS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, nil, t) + + serviceIndexer := kubeInformerFactory.Core().V1().Services().Informer().GetIndexer() + testutil.SetServices(serviceIndexer, tc.tfJob, testutil.LabelWorker, tc.activeWorkerServices, t) + testutil.SetServices(serviceIndexer, tc.tfJob, testutil.LabelPS, tc.activePSServices, t) + + foo, _ := ctr.getTFJobFromName("default", "test-tfjob") + now := metav1.Now() + foo.Status.StartTime = &now + + ads := tc.tfJob.Spec.ActiveDeadlineSeconds + if ads != nil { + dur := time.Second * time.Duration(*ads) + time.Sleep(dur) + } + + err = ctr.reconcileTFJobs(foo) + if err != nil { + t.Errorf("%s: unexpected error when syncing jobs %v", tc.description, err) + } + + if len(fakePodControl.DeletePodName) != tc.expectedPodDeletions { + t.Errorf("%s: unexpected number of pod deletes. Expected %d, saw %d\n", tc.description, tc.expectedPodDeletions, len(fakePodControl.DeletePodName)) + } + if len(fakeServiceControl.DeleteServiceName) != tc.expectedPodDeletions { + t.Errorf("%s: unexpected number of service deletes. Expected %d, saw %d\n", tc.description, tc.expectedPodDeletions, len(fakeServiceControl.DeleteServiceName)) + } + } +} + +func TestBackoffForOnFailure(t *testing.T) { + type testCase struct { + description string + tfJob *tfv1beta2.TFJob + + pendingWorkerPods int32 + activeWorkerPods int32 + succeededWorkerPods int32 + failedWorkerPods int32 + + restartCounts []int32 + + pendingPSPods int32 + activePSPods int32 + succeededPSPods int32 + failedPSPods int32 + + activeWorkerServices int32 + activePSServices int32 + + expectedPodDeletions int + } + + backoffLimit4 := int32(4) + backoffLimitTest4 := &backoffLimit4 + testCases := []testCase{ + testCase{ + description: "4 workers each having 1 restartCount and 2 ps is running, backoffLimit 4 ", + tfJob: testutil.NewTFJobWithBackoffLimit(0, 4, 2, backoffLimitTest4), + + pendingWorkerPods: 0, + activeWorkerPods: 4, + succeededWorkerPods: 0, + failedWorkerPods: 0, + + restartCounts: []int32{1, 1, 1, 1}, + + pendingPSPods: 0, + activePSPods: 2, + succeededPSPods: 0, + failedPSPods: 0, + + activeWorkerServices: 4, + activePSServices: 2, + + expectedPodDeletions: 6, + }, + } + for _, tc := range testCases { + // Prepare the clientset and controller for the test. + kubeClientSet := kubeclientset.NewForConfigOrDie(&rest.Config{ + Host: "", + ContentConfig: rest.ContentConfig{ + GroupVersion: &v1.SchemeGroupVersion, + }, + }, + ) + + // Prepare the kube-batch clientset and controller for the test. + kubeBatchClientSet := kubebatchclient.NewForConfigOrDie(&rest.Config{ + Host: "", + ContentConfig: rest.ContentConfig{ + GroupVersion: &v1.SchemeGroupVersion, + }, + }, + ) + + config := &rest.Config{ + Host: "", + ContentConfig: rest.ContentConfig{ + GroupVersion: &tfv1beta2.SchemeGroupVersion, + }, + } + tfJobClientSet := tfjobclientset.NewForConfigOrDie(config) + ctr, kubeInformerFactory, _ := newTFController(config, kubeClientSet, kubeBatchClientSet, tfJobClientSet, controller.NoResyncPeriodFunc, options.ServerOption{}) + fakePodControl := &controller.FakePodControl{} + ctr.PodControl = fakePodControl + fakeServiceControl := &control.FakeServiceControl{} + ctr.ServiceControl = fakeServiceControl + ctr.Recorder = &record.FakeRecorder{} + ctr.tfJobInformerSynced = testutil.AlwaysReady + ctr.PodInformerSynced = testutil.AlwaysReady + ctr.ServiceInformerSynced = testutil.AlwaysReady + tfJobIndexer := ctr.tfJobInformer.GetIndexer() + ctr.updateStatusHandler = func(tfJob *tfv1beta2.TFJob) error { + return nil + } + + unstructured, err := testutil.ConvertTFJobToUnstructured(tc.tfJob) + if err != nil { + t.Errorf("Failed to convert the TFJob to Unstructured: %v", err) + } + + if err := tfJobIndexer.Add(unstructured); err != nil { + t.Errorf("Failed to add tfjob to tfJobIndexer: %v", err) + } + + podIndexer := kubeInformerFactory.Core().V1().Pods().Informer().GetIndexer() + testutil.SetPodsStatuses(podIndexer, tc.tfJob, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, tc.restartCounts, t) + testutil.SetPodsStatuses(podIndexer, tc.tfJob, testutil.LabelPS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, tc.restartCounts, t) + + serviceIndexer := kubeInformerFactory.Core().V1().Services().Informer().GetIndexer() + testutil.SetServices(serviceIndexer, tc.tfJob, testutil.LabelWorker, tc.activeWorkerServices, t) + testutil.SetServices(serviceIndexer, tc.tfJob, testutil.LabelPS, tc.activePSServices, t) + + forget, err := ctr.syncTFJob(testutil.GetKey(tc.tfJob, t)) + if err != nil { + t.Errorf("%s: unexpected error when syncing jobs %v", tc.description, err) + } + if !forget { + t.Errorf("%s: unexpected forget value. Expected true, saw %v\n", tc.description, forget) + } + if len(fakePodControl.DeletePodName) != tc.expectedPodDeletions { + t.Errorf("%s: unexpected number of pod deletes. Expected %d, saw %d\n", tc.description, tc.expectedPodDeletions, len(fakePodControl.DeletePodName)) + } + if len(fakeServiceControl.DeleteServiceName) != tc.expectedPodDeletions { + t.Errorf("%s: unexpected number of service deletes. Expected %d, saw %d\n", tc.description, tc.expectedPodDeletions, len(fakeServiceControl.DeleteServiceName)) + } + } +} diff --git a/pkg/controller.v1beta2/tensorflow/status.go b/pkg/controller.v1beta2/tensorflow/status.go index 0e5328fa04..9a2ae59987 100644 --- a/pkg/controller.v1beta2/tensorflow/status.go +++ b/pkg/controller.v1beta2/tensorflow/status.go @@ -17,13 +17,14 @@ package tensorflow import ( "fmt" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "time" common "github.com/kubeflow/tf-operator/pkg/apis/common/v1beta2" tfv1beta2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1beta2" tflogger "github.com/kubeflow/tf-operator/pkg/logger" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" ) const ( @@ -31,9 +32,9 @@ const ( tfJobCreatedReason = "TFJobCreated" // tfJobSucceededReason is added in a tfjob when it is succeeded. tfJobSucceededReason = "TFJobSucceeded" - // tfJobSucceededReason is added in a tfjob when it is running. + // tfJobRunningReason is added in a tfjob when it is running. tfJobRunningReason = "TFJobRunning" - // tfJobSucceededReason is added in a tfjob when it is failed. + // tfJobFailedReason is added in a tfjob when it is failed. tfJobFailedReason = "TFJobFailed" // tfJobRestarting is added in a tfjob when it is restarting. tfJobRestartingReason = "TFJobRestarting" @@ -41,6 +42,12 @@ const ( // updateStatus updates the status of the tfjob. func (tc *TFController) updateStatusSingle(tfjob *tfv1beta2.TFJob, rtype tfv1beta2.TFReplicaType, replicas int, restart, worker0Completed bool) error { + tfjobKey, err := KeyFunc(tfjob) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for tfjob object %#v: %v", tfjob, err)) + return err + } + commonType := common.ReplicaType(rtype) // Expect to have `replicas - succeeded` pods alive. expected := replicas - int(tfjob.Status.ReplicaStatuses[commonType].Succeeded) @@ -53,6 +60,11 @@ func (tc *TFController) updateStatusSingle(tfjob *tfv1beta2.TFJob, rtype tfv1bet if running == replicas && tfjob.Status.StartTime == nil { now := metav1.Now() tfjob.Status.StartTime = &now + // enqueue a sync to check if job past ActiveDeadlineSeconds + if tfjob.Spec.ActiveDeadlineSeconds != nil { + tflogger.LoggerForJob(tfjob).Infof("Job with ActiveDeadlineSeconds will sync after %d seconds", *tfjob.Spec.ActiveDeadlineSeconds) + tc.WorkQueue.AddAfter(tfjobKey, time.Duration(*tfjob.Spec.ActiveDeadlineSeconds)*time.Second) + } } // If the TFJob contains Chief or Master spec, then we will update the status diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index c44681c26d..d44a3fdfab 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -19,6 +19,7 @@ import ( "os" log "github.com/sirupsen/logrus" + "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -89,3 +90,34 @@ func CascadeDeleteOptions(gracePeriodSeconds int64) *metav1.DeleteOptions { }(), } } + +// FilterActivePods returns pods that have not terminated. +func FilterActivePods(pods []*v1.Pod) []*v1.Pod { + var result []*v1.Pod + for _, p := range pods { + if IsPodActive(p) { + result = append(result, p) + } else { + log.Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", + p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp) + } + } + return result +} + +func IsPodActive(p *v1.Pod) bool { + return v1.PodSucceeded != p.Status.Phase && + v1.PodFailed != p.Status.Phase && + p.DeletionTimestamp == nil +} + +// filterPods returns pods based on their phase. +func FilterPods(pods []*v1.Pod, phase v1.PodPhase) int { + result := 0 + for i := range pods { + if phase == pods[i].Status.Phase { + result++ + } + } + return result +}