Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(backend): Synced ScheduledWorkflow CRs on apiserver startup #11469

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
19 changes: 16 additions & 3 deletions backend/src/apiserver/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"math"
"reflect"
"strings"

Expand Down Expand Up @@ -97,6 +98,13 @@ type Options struct {
*token
}

func EmptyOptions() *Options {
return &Options{
math.MaxInt32,
&token{},
}
}

// Matches returns trues if the sorting and filtering criteria in o matches that
// of the one supplied in opts.
func (o *Options) Matches(opts *Options) bool {
Expand Down Expand Up @@ -213,9 +221,14 @@ func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuild
if o.IsDesc {
order = "DESC"
}
sqlBuilder = sqlBuilder.
OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order)).
OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order))

if o.SortByFieldName != "" {
sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order))
}

if o.KeyFieldName != "" {
sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order))
}

return sqlBuilder
}
Expand Down
7 changes: 7 additions & 0 deletions backend/src/apiserver/list/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package list

import (
"fmt"
"math"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -645,6 +647,11 @@ func TestAddPaginationAndFilterToSelect(t *testing.T) {
wantSQL: "SELECT * FROM MyTable ORDER BY SortField DESC, KeyField DESC LIMIT 124",
wantArgs: nil,
},
{
in: EmptyOptions(),
wantSQL: fmt.Sprintf("SELECT * FROM MyTable LIMIT %d", math.MaxInt32+1),
wantArgs: nil,
},
{
in: &Options{
PageSize: 123,
Expand Down
18 changes: 17 additions & 1 deletion backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/fsnotify/fsnotify"
Expand Down Expand Up @@ -106,10 +107,25 @@ func main() {
}
log.SetLevel(level)

backgroundCtx, backgroundCancel := context.WithCancel(context.Background())
defer backgroundCancel()
wg := sync.WaitGroup{}
wg.Add(1)
go reconcileSwfCrs(resourceManager, backgroundCtx, &wg)
go startRpcServer(resourceManager)
// This is blocking
startHttpProxy(resourceManager)

backgroundCancel()
clientManager.Close()
wg.Wait()
}

func reconcileSwfCrs(resourceManager *resource.ResourceManager, ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
err := resourceManager.ReconcileSwfCrs(ctx)
if err != nil {
log.Errorf("Could not reconcile the ScheduledWorkflow Kubernetes resources: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your approach of not exiting with an error code in this case, but I'm curious what others think. My concern is that it'd be hard to know that this failed as a KFP admin but it also doesn't seem warranted to keep the API server from running if this can't succeed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I was implementing this from scratch, I'd definitely exit with an error. But since the original behavior (the one that this PR aims to fix) is to run the API server with outdated swfs, existing deployments may start to fail after an upgrade even when they would work with outdated swfs.
At the same time, silently ignoring it and the outdated swfs "working" silently may be even more dangerous than just exiting with an error. (Now while I'm writing this, I'm tilting more to exit with an error)

Yeah, let's see what others think.

}
}

// A custom http request header matcher to pass on the user identity
Expand Down
102 changes: 86 additions & 16 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"context"
"encoding/json"
"fmt"
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
"io"
"net"
"reflect"
"strconv"

"github.com/cenkalti/backoff"
Expand Down Expand Up @@ -567,6 +569,71 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model
return newRun, nil
}

// ReconcileSwfCrs reconciles the ScheduledWorkflow CRs based on existing jobs.
func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error {
filterContext := &model.FilterContext{
ReferenceKey: &model.ReferenceKey{Type: model.NamespaceResourceType, ID: common.GetPodNamespace()},
}

opts := list.EmptyOptions()

jobs, _, _, err := r.jobStore.ListJobs(filterContext, opts)

if err != nil {
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
}

nextJob:
for i := range jobs {
retryJob:
for {
tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec)
if err != nil {
return failedToReconcileSwfCrsError(err)
}

newScheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i])
if err != nil {
return failedToReconcileSwfCrsError(err)
}

currentScheduledWorkflow, err := r.getScheduledWorkflowClient(jobs[i].Namespace).Get(ctx, jobs[i].K8SName, v1.GetOptions{})
if err != nil {
return failedToReconcileSwfCrsError(err)
}

if !reflect.DeepEqual(currentScheduledWorkflow.Spec, newScheduledWorkflow.Spec) {
newScheduledWorkflow.Name = currentScheduledWorkflow.Name
newScheduledWorkflow.ResourceVersion = currentScheduledWorkflow.ResourceVersion
err = r.updateSwfCrSpec(ctx, jobs[i].Namespace, newScheduledWorkflow)
if err != nil {
if apierrors.IsConflict(errors.Unwrap(err)) {
continue retryJob
} else if util.IsNotFound(errors.Cause(err)) {
continue nextJob
}
return failedToReconcileSwfCrsError(err)
}
}
continue nextJob
}
}

return nil
}

func failedToReconcileSwfCrsError(err error) error {
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
}

func (r *ResourceManager) updateSwfCrSpec(ctx context.Context, k8sNamespace string, scheduledWorkflow *scheduledworkflow.ScheduledWorkflow) error {
_, err := r.getScheduledWorkflowClient(k8sNamespace).Update(ctx, scheduledWorkflow)
if err != nil {
return util.Wrap(err, "Failed to update ScheduledWorkflow")
}
return nil
}

// Fetches a run with a given id.
func (r *ResourceManager) GetRun(runId string) (*model.Run, error) {
run, err := r.runStore.GetRun(runId)
Expand Down Expand Up @@ -975,12 +1042,6 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model
return nil, util.NewInternalServerError(err, "Failed to create a recurring run with an invalid pipeline spec manifest")
}

// TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB).
// Convert modelJob into scheduledWorkflow.
scheduledWorkflow, err := tmpl.ScheduledWorkflow(job)
if err != nil {
return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation")
}
// Create a new ScheduledWorkflow at the ScheduledWorkflow client.
k8sNamespace := job.Namespace
if k8sNamespace == "" {
Expand All @@ -989,6 +1050,15 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model
if k8sNamespace == "" {
return nil, util.NewInternalServerError(util.NewInvalidInputError("Namespace cannot be empty when creating an Argo scheduled workflow. Check if you have specified POD_NAMESPACE or try adding the parent namespace to the request"), "Failed to create a recurring run due to empty namespace")
}

job.Namespace = k8sNamespace

// TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB).
// Convert modelJob into scheduledWorkflow.
scheduledWorkflow, err := tmpl.ScheduledWorkflow(job)
if err != nil {
return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation")
}
newScheduledWorkflow, err := r.getScheduledWorkflowClient(k8sNamespace).Create(ctx, scheduledWorkflow)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
Expand All @@ -1000,23 +1070,23 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model
swf := util.NewScheduledWorkflow(newScheduledWorkflow)
job.UUID = string(swf.UID)
job.K8SName = swf.Name
job.Namespace = swf.Namespace
job.Conditions = model.StatusState(swf.ConditionSummary()).ToString()
for _, modelRef := range job.ResourceReferences {
modelRef.ResourceUUID = string(swf.UID)
}
// Get the service account
serviceAccount := ""
if swf.Spec.Workflow != nil {
execSpec, err := util.ScheduleSpecToExecutionSpec(util.ArgoWorkflow, swf.Spec.Workflow)
if err == nil {
serviceAccount = execSpec.ServiceAccount()
}
}
job.ServiceAccount = serviceAccount
if tmpl.GetTemplateType() == template.V1 {
// Get the service account
serviceAccount := ""
if swf.Spec.Workflow != nil {
execSpec, err := util.ScheduleSpecToExecutionSpec(util.ArgoWorkflow, swf.Spec.Workflow)
if err == nil {
serviceAccount = execSpec.ServiceAccount()
}
}
job.ServiceAccount = serviceAccount
job.PipelineSpec.WorkflowSpecManifest = manifest
} else {
job.ServiceAccount = newScheduledWorkflow.Spec.ServiceAccount
job.PipelineSpec.PipelineSpecManifest = manifest
}
return r.jobStore.CreateJob(job)
Expand Down
7 changes: 4 additions & 3 deletions backend/src/apiserver/template/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,10 @@ func TestScheduledWorkflow(t *testing.T) {
Parameters: []scheduledworkflow.Parameter{{Name: "y", Value: "\"world\""}},
Spec: "",
},
PipelineId: "1",
PipelineName: "pipeline name",
NoCatchup: util.BoolPointer(true),
PipelineId: "1",
PipelineName: "pipeline name",
NoCatchup: util.BoolPointer(true),
ServiceAccount: "pipeline-runner",
},
}

Expand Down
6 changes: 4 additions & 2 deletions backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
if modelJob.Namespace != "" {
executionSpec.SetExecutionNamespace(modelJob.Namespace)
}
setDefaultServiceAccount(executionSpec, modelJob.ServiceAccount)
if executionSpec.ServiceAccount() == "" {
setDefaultServiceAccount(executionSpec, modelJob.ServiceAccount)
}
// Disable istio sidecar injection if not specified
executionSpec.SetAnnotationsToAllTemplatesIfKeyNotExist(util.AnnotationKeyIstioSidecarInject, util.AnnotationValueIstioSidecarInjectDisabled)
swfGeneratedName, err := toSWFCRDResourceGeneratedName(modelJob.K8SName)
Expand Down Expand Up @@ -132,7 +134,7 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
PipelineId: modelJob.PipelineId,
PipelineName: modelJob.PipelineName,
PipelineVersionId: modelJob.PipelineVersionId,
ServiceAccount: modelJob.ServiceAccount,
ServiceAccount: executionSpec.ServiceAccount(),
},
}
return scheduledWorkflow, nil
Expand Down
Loading