diff --git a/backend/src/apiserver/client/scheduled_workflow_fake.go b/backend/src/apiserver/client/scheduled_workflow_fake.go index 5b81722ee35..970fd26e658 100644 --- a/backend/src/apiserver/client/scheduled_workflow_fake.go +++ b/backend/src/apiserver/client/scheduled_workflow_fake.go @@ -66,9 +66,9 @@ func (c *FakeScheduledWorkflowClient) Get(ctx context.Context, name string, opti return nil, k8errors.NewNotFound(k8schema.ParseGroupResource("scheduledworkflows.kubeflow.org"), name) } -func (c *FakeScheduledWorkflowClient) Update(context.Context, *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) { - glog.Error("This fake method is not yet implemented.") - return nil, nil +func (c *FakeScheduledWorkflowClient) Update(_ context.Context, scheduledWorkflow *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) { + c.scheduledWorkflows[scheduledWorkflow.Name] = scheduledWorkflow + return scheduledWorkflow, nil } func (c *FakeScheduledWorkflowClient) DeleteCollection(ctx context.Context, options *v1.DeleteOptions, listOptions v1.ListOptions) error { diff --git a/backend/src/apiserver/list/list.go b/backend/src/apiserver/list/list.go index e38be8f7339..174eff961d8 100644 --- a/backend/src/apiserver/list/list.go +++ b/backend/src/apiserver/list/list.go @@ -22,6 +22,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "math" "reflect" "strings" @@ -97,6 +98,13 @@ type Options struct { *token } +func EmptyOptions() *Options { + return &Options{ + math.MaxInt32, + &token{}, + } +} + // Matches returns trues if the sorting and filtering criteria in o matches that // of the one supplied in opts. func (o *Options) Matches(opts *Options) bool { @@ -213,9 +221,14 @@ func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuild if o.IsDesc { order = "DESC" } - sqlBuilder = sqlBuilder. - OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order)). - OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order)) + + if o.SortByFieldName != "" { + sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order)) + } + + if o.KeyFieldName != "" { + sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order)) + } return sqlBuilder } diff --git a/backend/src/apiserver/list/list_test.go b/backend/src/apiserver/list/list_test.go index 1806e158eec..e207cd900ab 100644 --- a/backend/src/apiserver/list/list_test.go +++ b/backend/src/apiserver/list/list_test.go @@ -15,6 +15,8 @@ package list import ( + "fmt" + "math" "reflect" "strings" "testing" @@ -645,6 +647,11 @@ func TestAddPaginationAndFilterToSelect(t *testing.T) { wantSQL: "SELECT * FROM MyTable ORDER BY SortField DESC, KeyField DESC LIMIT 124", wantArgs: nil, }, + { + in: EmptyOptions(), + wantSQL: fmt.Sprintf("SELECT * FROM MyTable LIMIT %d", math.MaxInt32+1), + wantArgs: nil, + }, { in: &Options{ PageSize: 123, diff --git a/backend/src/apiserver/main.go b/backend/src/apiserver/main.go index 5430674897b..9841503a32f 100644 --- a/backend/src/apiserver/main.go +++ b/backend/src/apiserver/main.go @@ -28,6 +28,7 @@ import ( "os" "strconv" "strings" + "sync" "time" "github.com/fsnotify/fsnotify" @@ -106,10 +107,25 @@ func main() { } log.SetLevel(level) + backgroundCtx, backgroundCancel := context.WithCancel(context.Background()) + defer backgroundCancel() + wg := sync.WaitGroup{} + wg.Add(1) + go reconcileSwfCrs(resourceManager, backgroundCtx, &wg) go startRpcServer(resourceManager) + // This is blocking startHttpProxy(resourceManager) - + backgroundCancel() clientManager.Close() + wg.Wait() +} + +func reconcileSwfCrs(resourceManager *resource.ResourceManager, ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + err := resourceManager.ReconcileSwfCrs(ctx) + if err != nil { + log.Errorf("Could not reconcile the ScheduledWorkflow Kubernetes resources: %v", err) + } } // A custom http request header matcher to pass on the user identity diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 82c13aa369f..94fdc578c62 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -18,8 +18,10 @@ import ( "context" "encoding/json" "fmt" + scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1" "io" "net" + "reflect" "strconv" "github.com/cenkalti/backoff" @@ -567,6 +569,77 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model return newRun, nil } +// ReconcileSwfCrs reconciles the ScheduledWorkflow CRs based on existing jobs. +func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error { + filterContext := &model.FilterContext{ + ReferenceKey: &model.ReferenceKey{Type: model.NamespaceResourceType, ID: common.GetPodNamespace()}, + } + + opts := list.EmptyOptions() + + jobs, _, _, err := r.jobStore.ListJobs(filterContext, opts) + + if err != nil { + return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources") + } + + for i := range jobs { + select { + case <-ctx.Done(): + return nil + default: + } + + tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec) + if err != nil { + return failedToReconcileSwfCrsError(err) + } + + newScheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i]) + if err != nil { + return failedToReconcileSwfCrsError(err) + } + + for { + currentScheduledWorkflow, err := r.getScheduledWorkflowClient(jobs[i].Namespace).Get(ctx, jobs[i].K8SName, v1.GetOptions{}) + if err != nil { + if util.IsNotFound(err) { + break + } + return failedToReconcileSwfCrsError(err) + } + + if !reflect.DeepEqual(currentScheduledWorkflow.Spec, newScheduledWorkflow.Spec) { + currentScheduledWorkflow.Spec = newScheduledWorkflow.Spec + err = r.updateSwfCrSpec(ctx, jobs[i].Namespace, currentScheduledWorkflow) + if err != nil { + if apierrors.IsConflict(errors.Unwrap(err)) { + continue + } else if util.IsNotFound(errors.Cause(err)) { + break + } + return failedToReconcileSwfCrsError(err) + } + } + break + } + } + + return nil +} + +func failedToReconcileSwfCrsError(err error) error { + return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources") +} + +func (r *ResourceManager) updateSwfCrSpec(ctx context.Context, k8sNamespace string, scheduledWorkflow *scheduledworkflow.ScheduledWorkflow) error { + _, err := r.getScheduledWorkflowClient(k8sNamespace).Update(ctx, scheduledWorkflow) + if err != nil { + return util.Wrap(err, "Failed to update ScheduledWorkflow") + } + return nil +} + // Fetches a run with a given id. func (r *ResourceManager) GetRun(runId string) (*model.Run, error) { run, err := r.runStore.GetRun(runId) @@ -975,12 +1048,6 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model return nil, util.NewInternalServerError(err, "Failed to create a recurring run with an invalid pipeline spec manifest") } - // TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB). - // Convert modelJob into scheduledWorkflow. - scheduledWorkflow, err := tmpl.ScheduledWorkflow(job) - if err != nil { - return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation") - } // Create a new ScheduledWorkflow at the ScheduledWorkflow client. k8sNamespace := job.Namespace if k8sNamespace == "" { @@ -989,6 +1056,15 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model if k8sNamespace == "" { return nil, util.NewInternalServerError(util.NewInvalidInputError("Namespace cannot be empty when creating an Argo scheduled workflow. Check if you have specified POD_NAMESPACE or try adding the parent namespace to the request"), "Failed to create a recurring run due to empty namespace") } + + job.Namespace = k8sNamespace + + // TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB). + // Convert modelJob into scheduledWorkflow. + scheduledWorkflow, err := tmpl.ScheduledWorkflow(job) + if err != nil { + return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation") + } newScheduledWorkflow, err := r.getScheduledWorkflowClient(k8sNamespace).Create(ctx, scheduledWorkflow) if err != nil { if err, ok := err.(net.Error); ok && err.Timeout() { @@ -1000,23 +1076,23 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model swf := util.NewScheduledWorkflow(newScheduledWorkflow) job.UUID = string(swf.UID) job.K8SName = swf.Name - job.Namespace = swf.Namespace job.Conditions = model.StatusState(swf.ConditionSummary()).ToString() for _, modelRef := range job.ResourceReferences { modelRef.ResourceUUID = string(swf.UID) } - // Get the service account - serviceAccount := "" - if swf.Spec.Workflow != nil { - execSpec, err := util.ScheduleSpecToExecutionSpec(util.ArgoWorkflow, swf.Spec.Workflow) - if err == nil { - serviceAccount = execSpec.ServiceAccount() - } - } - job.ServiceAccount = serviceAccount if tmpl.GetTemplateType() == template.V1 { + // Get the service account + serviceAccount := "" + if swf.Spec.Workflow != nil { + execSpec, err := util.ScheduleSpecToExecutionSpec(util.ArgoWorkflow, swf.Spec.Workflow) + if err == nil { + serviceAccount = execSpec.ServiceAccount() + } + } + job.ServiceAccount = serviceAccount job.PipelineSpec.WorkflowSpecManifest = manifest } else { + job.ServiceAccount = newScheduledWorkflow.Spec.ServiceAccount job.PipelineSpec.PipelineSpecManifest = manifest } return r.jobStore.CreateJob(job) diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index f02792a3a08..3a29893fec5 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -3146,6 +3146,35 @@ func TestReportScheduledWorkflowResource_Success_withRuntimeParamsV2(t *testing. assert.Equal(t, expectedJob.ToV1(), actualJob.ToV1()) } +func TestReconcileSwfCrs(t *testing.T) { + store, manager, job := initWithJobV2(t) + defer store.Close() + + fetchedJob, err := manager.GetJob(job.UUID) + require.Nil(t, err) + require.NotNil(t, fetchedJob) + + swfClient := store.SwfClient().ScheduledWorkflow("ns1") + + options := v1.GetOptions{} + ctx := context.Background() + + swf, err := swfClient.Get(ctx, "job-", options) + require.Nil(t, err) + + // emulates an invalid/outdated spec + swf.Spec.Workflow.Spec = nil + swf, err = swfClient.Update(ctx, swf) + require.Nil(t, swf.Spec.Workflow.Spec) + + err = manager.ReconcileSwfCrs(ctx) + require.Nil(t, err) + + swf, err = swfClient.Get(ctx, "job-", options) + require.Nil(t, err) + require.NotNil(t, swf.Spec.Workflow.Spec) +} + func TestReportScheduledWorkflowResource_Error(t *testing.T) { store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) defer store.Close() diff --git a/backend/src/apiserver/template/template_test.go b/backend/src/apiserver/template/template_test.go index 98e7482bf46..27c9817cb60 100644 --- a/backend/src/apiserver/template/template_test.go +++ b/backend/src/apiserver/template/template_test.go @@ -217,9 +217,10 @@ func TestScheduledWorkflow(t *testing.T) { Parameters: []scheduledworkflow.Parameter{{Name: "y", Value: "\"world\""}}, Spec: "", }, - PipelineId: "1", - PipelineName: "pipeline name", - NoCatchup: util.BoolPointer(true), + PipelineId: "1", + PipelineName: "pipeline name", + NoCatchup: util.BoolPointer(true), + ServiceAccount: "pipeline-runner", }, } diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index d14ddffdaeb..1055bcdf8a9 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -97,7 +97,9 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche if modelJob.Namespace != "" { executionSpec.SetExecutionNamespace(modelJob.Namespace) } - setDefaultServiceAccount(executionSpec, modelJob.ServiceAccount) + if executionSpec.ServiceAccount() == "" { + setDefaultServiceAccount(executionSpec, modelJob.ServiceAccount) + } // Disable istio sidecar injection if not specified executionSpec.SetAnnotationsToAllTemplatesIfKeyNotExist(util.AnnotationKeyIstioSidecarInject, util.AnnotationValueIstioSidecarInjectDisabled) swfGeneratedName, err := toSWFCRDResourceGeneratedName(modelJob.K8SName) @@ -132,7 +134,7 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche PipelineId: modelJob.PipelineId, PipelineName: modelJob.PipelineName, PipelineVersionId: modelJob.PipelineVersionId, - ServiceAccount: modelJob.ServiceAccount, + ServiceAccount: executionSpec.ServiceAccount(), }, } return scheduledWorkflow, nil