Skip to content

Commit

Permalink
feat(backend): Add Semaphore and Mutex fields to Workflow CR
Browse files Browse the repository at this point in the history
- Added `Semaphore` and `Mutex` fields to the Workflow Spec to support concurrency control mechanisms directly within workflows.
- Introduced a new environment variable, `SEMAPHORE_CONFIGMAP_NAME`, to the API Server deployment for managing semaphore configurations.
- Added an empty ConfigMap manifest for semaphores to facilitate initial setup and testing.

Signed-off-by: ddalvi <[email protected]>
  • Loading branch information
DharmitD committed Dec 13, 2024
1 parent 60a8865 commit b060e03
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 26 deletions.
51 changes: 35 additions & 16 deletions backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,34 @@ var (
Launcher = ""
)

func getKubernetesSpec(platformSpec map[string]*pipelinespec.SinglePlatformSpec) *pipelinespec.SinglePlatformSpec {
var kubernetesSpec *pipelinespec.SinglePlatformSpec

// Check for "kubernetes" key in the platformSpec map
if platformSpec != nil {
if platform, ok := platformSpec["kubernetes"]; ok && platform != nil {
kubernetesSpec = platform
}
}
return kubernetesSpec
}

func getPipelineOptions(platform *pipelinespec.SinglePlatformSpec) *argocompiler.Options {
var pipelineOptions *argocompiler.Options

if platform != nil && platform.PipelineConfig != nil {

Check failure on line 63 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 63 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
pipelineOptions = &argocompiler.Options{}
if platform.PipelineConfig.SemaphoreKey != "" {

Check failure on line 65 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 65 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
pipelineOptions.SemaphoreKey = platform.PipelineConfig.SemaphoreKey

Check failure on line 66 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 66 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
}
if platform.PipelineConfig.MutexName != "" {

Check failure on line 68 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 68 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
pipelineOptions.MutexName = platform.PipelineConfig.MutexName

Check failure on line 69 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 69 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
}
}
return pipelineOptions
}


// Converts modelJob to ScheduledWorkflow.
func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) {
job := &pipelinespec.PipelineJob{}
Expand All @@ -69,17 +97,12 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
return nil, util.Wrap(err, "invalid pipeline job inputs")
}

// Pick out Kubernetes platform configs
var kubernetesSpec *pipelinespec.SinglePlatformSpec
if t.platformSpec != nil {
if _, ok := t.platformSpec.Platforms["kubernetes"]; ok {
kubernetesSpec = t.platformSpec.Platforms["kubernetes"]
}
}
kubernetesSpec := getKubernetesSpec(t.platformSpec.Platforms)
pipelineOptions := getPipelineOptions(kubernetesSpec)

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, pipelineOptions)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, &tektoncompiler.Options{LauncherImage: Launcher})
}
Expand Down Expand Up @@ -292,17 +315,13 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u
if err = t.validatePipelineJobInputs(job); err != nil {
return nil, util.Wrap(err, "invalid pipeline job inputs")
}
// Pick out Kubernetes platform configs
var kubernetesSpec *pipelinespec.SinglePlatformSpec
if t.platformSpec != nil {
if _, ok := t.platformSpec.Platforms["kubernetes"]; ok {
kubernetesSpec = t.platformSpec.Platforms["kubernetes"]
}
}

kubernetesSpec := getKubernetesSpec(t.platformSpec.Platforms)
pipelineOptions := getPipelineOptions(kubernetesSpec)

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, pipelineOptions)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, nil)
}
Expand Down
51 changes: 42 additions & 9 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"strings"
"os"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
Expand All @@ -28,6 +29,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
k8score "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
k8sres "k8s.io/apimachinery/pkg/api/resource"
k8smeta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand All @@ -40,6 +42,16 @@ type Options struct {
// optional
PipelineRoot string
// TODO(Bobgy): add an option -- dev mode, ImagePullPolicy should only be Always in dev mode.
SemaphoreKey string
MutexName string
}

func getSemaphoreConfigMapName() string {
const defaultConfigMapName = "semaphore-config"
if name := os.Getenv("SEMAPHORE_CONFIGMAP_NAME"); name != "" {
return name
}
return defaultConfigMapName
}

func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.SinglePlatformSpec, opts *Options) (*wfapi.Workflow, error) {
Expand Down Expand Up @@ -76,6 +88,14 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
}
}

var semaphoreKey, mutexName string
if opts != nil && opts.SemaphoreKey != "" {
semaphoreKey = opts.SemaphoreKey
}
if opts != nil && opts.MutexName != "" {
mutexName = opts.MutexName
}

var kubernetesSpec *pipelinespec.SinglePlatformSpec
if kubernetesSpecArg != nil {
// clone kubernetesSpecArg, because we don't want to change it
Expand All @@ -86,22 +106,13 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
}
}

// initialization
wf := &wfapi.Workflow{
TypeMeta: k8smeta.TypeMeta{
APIVersion: "argoproj.io/v1alpha1",
Kind: "Workflow",
},
ObjectMeta: k8smeta.ObjectMeta{
GenerateName: retrieveLastValidString(spec.GetPipelineInfo().GetName()) + "-",
// Note, uncomment the following during development to view argo inputs/outputs in KFP UI.
// TODO(Bobgy): figure out what annotations we should use for v2 engine.
// For now, comment this annotation, so that in KFP UI, it shows argo input/output params/artifacts
// suitable for debugging.
//
// Annotations: map[string]string{
// "pipelines.kubeflow.org/v2_pipeline": "true",
// },
},
Spec: wfapi.WorkflowSpec{
PodMetadata: &wfapi.Metadata{
Expand All @@ -119,6 +130,28 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
Entrypoint: tmplEntrypoint,
},
}

if semaphoreKey != "" {
wf.Spec.Synchronization = &wfapi.Synchronization{
Semaphore: &wfapi.SemaphoreRef{
ConfigMapKeyRef: &v1.ConfigMapKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: getSemaphoreConfigMapName(),
},
Key: semaphoreKey,
},
},
}
}

if mutexName != "" {
wf.Spec.Synchronization = &wfapi.Synchronization{
Mutex: &wfapi.Mutex{
Name: mutexName,
},
}
}

c := &workflowCompiler{
wf: wf,
templates: make(map[string]*wfapi.Template),
Expand Down
2 changes: 1 addition & 1 deletion backend/src/v2/compiler/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (state *pipelineDFS) dfs(name string, component *pipelinespec.ComponentSpec
}

// Add kubernetes spec to annotation
if state.kubernetesSpec != nil {
if state.kubernetesSpec != nil && state.kubernetesSpec.DeploymentSpec != nil {
kubernetesExecSpec, ok := state.kubernetesSpec.DeploymentSpec.Executors[executorLabel]
if ok {
state.visitor.AddKubernetesSpec(name, kubernetesExecSpec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ spec:
secretKeyRef:
name: mlpipeline-minio-artifact
key: secretkey
- name: SEMAPHORE_CONFIGMAP_NAME
value: "semaphore-config"
image: gcr.io/ml-pipeline/api-server:dummy
imagePullPolicy: IfNotPresent
name: ml-pipeline-api-server
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kind: ConfigMap
apiVersion: v1
metadata:
name: semaphore-config
data: {}

0 comments on commit b060e03

Please sign in to comment.