Skip to content

Commit

Permalink
feat(backend)Add Semaphore and Mutex fields to Workflow Spec
Browse files Browse the repository at this point in the history
  • Loading branch information
DharmitD committed Nov 12, 2024
1 parent 60a8865 commit 5c420db
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 9 deletions.
26 changes: 24 additions & 2 deletions backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,20 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
}
}

var pipeline_options argocompiler.Options
for _, platform := range t.platformSpec.Platforms {
if platform.PipelineConfig.SemaphoreKey != "" || platform.PipelineConfig.MutexName != "" {

Check failure on line 82 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 82 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
pipeline_options = argocompiler.Options{
SemaphoreKey: platform.PipelineConfig.SemaphoreKey,

Check failure on line 84 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 84 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
MutexName: platform.PipelineConfig.MutexName,

Check failure on line 85 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 85 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
}
break
}
}

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, &pipeline_options)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, &tektoncompiler.Options{LauncherImage: Launcher})
}
Expand Down Expand Up @@ -300,9 +311,20 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u
}
}

var pipeline_options *argocompiler.Options
for _, platform := range t.platformSpec.Platforms {
if platform.PipelineConfig.SemaphoreKey != "" || platform.PipelineConfig.MutexName != "" {

Check failure on line 316 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 316 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
pipeline_options = &argocompiler.Options{
SemaphoreKey: platform.PipelineConfig.SemaphoreKey,

Check failure on line 318 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 318 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
MutexName: platform.PipelineConfig.MutexName,

Check failure on line 319 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 319 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
}
break
}
}

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, pipeline_options)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, nil)
}
Expand Down
33 changes: 27 additions & 6 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
k8score "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
k8sres "k8s.io/apimachinery/pkg/api/resource"
k8smeta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand All @@ -40,6 +41,8 @@ type Options struct {
// optional
PipelineRoot string
// TODO(Bobgy): add an option -- dev mode, ImagePullPolicy should only be Always in dev mode.
SemaphoreKey string
MutexName string
}

func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.SinglePlatformSpec, opts *Options) (*wfapi.Workflow, error) {
Expand Down Expand Up @@ -76,6 +79,14 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
}
}

var semaphore_key, mutex_name string
if opts != nil && opts.SemaphoreKey != "" {
semaphore_key = opts.SemaphoreKey
}
if opts != nil && opts.MutexName != "" {
mutex_name = opts.MutexName
}

var kubernetesSpec *pipelinespec.SinglePlatformSpec
if kubernetesSpecArg != nil {
// clone kubernetesSpecArg, because we don't want to change it
Expand All @@ -94,13 +105,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
},
ObjectMeta: k8smeta.ObjectMeta{
GenerateName: retrieveLastValidString(spec.GetPipelineInfo().GetName()) + "-",
// Note, uncomment the following during development to view argo inputs/outputs in KFP UI.
// TODO(Bobgy): figure out what annotations we should use for v2 engine.
// For now, comment this annotation, so that in KFP UI, it shows argo input/output params/artifacts
// suitable for debugging.
//
// Uncomment during development for better debugging in KFP UI
// Annotations: map[string]string{
// "pipelines.kubeflow.org/v2_pipeline": "true",
// "pipelines.kubeflow.org/v2_pipeline": "true",
// },
},
Spec: wfapi.WorkflowSpec{
Expand All @@ -117,8 +124,22 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
},
ServiceAccountName: "pipeline-runner",
Entrypoint: tmplEntrypoint,
Synchronization: &wfapi.Synchronization{
Semaphore: &wfapi.SemaphoreRef{
ConfigMapKeyRef: &v1.ConfigMapKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: "semaphore-config",
},
Key: semaphore_key,
},
},
Mutex: &wfapi.Mutex{
Name: mutex_name,
},
},
},
}

c := &workflowCompiler{
wf: wf,
templates: make(map[string]*wfapi.Template),
Expand Down
2 changes: 1 addition & 1 deletion backend/src/v2/compiler/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (state *pipelineDFS) dfs(name string, component *pipelinespec.ComponentSpec
}

// Add kubernetes spec to annotation
if state.kubernetesSpec != nil {
if state.kubernetesSpec != nil && state.kubernetesSpec.DeploymentSpec != nil {
kubernetesExecSpec, ok := state.kubernetesSpec.DeploymentSpec.Executors[executorLabel]
if ok {
state.visitor.AddKubernetesSpec(name, kubernetesExecSpec)
Expand Down

0 comments on commit 5c420db

Please sign in to comment.