Skip to content

Commit

Permalink
feat: support vllm in controller
Browse files Browse the repository at this point in the history
- set vllm as the default runtime

Signed-off-by: jerryzhuang <[email protected]>
  • Loading branch information
zhuangqh committed Oct 28, 2024
1 parent 1d09da0 commit 1fcd0b1
Show file tree
Hide file tree
Showing 30 changed files with 713 additions and 274 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,11 @@ azure-karpenter-helm: ## Update Azure client env vars and settings in helm valu
##@ Build
.PHONY: build
build: manifests generate fmt vet ## Build manager binary.
go build -o bin/manager cmd/*.go
go build -o bin/manager cmd/workspace/*.go

.PHONY: run
run: manifests generate fmt vet ## Run a controller from your host.
go run ./cmd/main.go
go run ./cmd/workspace/main.go

##@ Build Dependencies
## Location to install dependencies to
Expand Down
23 changes: 23 additions & 0 deletions api/v1alpha1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package v1alpha1

import "github.com/azure/kaito/pkg/model"

const (

// Non-prefixed labels/annotations are reserved for end-use.
Expand All @@ -27,4 +29,25 @@ const (

// WorkspaceRevisionAnnotation is the Annotations for revision number
WorkspaceRevisionAnnotation = "workspace.kaito.io/revision"

// AnnotationWorkspaceBackend is the annotation for backend selection.
AnnotationWorkspaceBackend = KAITOPrefix + "backend"
)

// GetWorkspaceBackendName returns the runtime name of the workspace.
func GetWorkspaceBackendName(ws *Workspace) model.BackendName {
if ws == nil {
panic("workspace is nil")
}
runtime := model.BackendNameVLLM

name := ws.Annotations[AnnotationWorkspaceBackend]
switch name {
case string(model.BackendNameHuggingfaceTransformers):
runtime = model.BackendNameHuggingfaceTransformers
case string(model.BackendNameVLLM):
runtime = model.BackendNameVLLM
}

return runtime
}
4 changes: 2 additions & 2 deletions api/v1alpha1/workspace_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (r *TuningSpec) validateCreate(ctx context.Context, workspaceNamespace stri
// Currently require a preset to specified, in future we can consider defining a template
if r.Preset == nil {
errs = errs.Also(apis.ErrMissingField("Preset"))
} else if presetName := string(r.Preset.Name); !utils.IsValidPreset(presetName) {
} else if presetName := string(r.Preset.Name); !plugin.IsValidPreset(presetName) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("Unsupported tuning preset name %s", presetName), "presetName"))
}
return errs
Expand Down Expand Up @@ -407,7 +407,7 @@ func (i *InferenceSpec) validateCreate() (errs *apis.FieldError) {
if i.Preset != nil {
presetName := string(i.Preset.Name)
// Validate preset name
if !utils.IsValidPreset(presetName) {
if !plugin.IsValidPreset(presetName) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("Unsupported inference preset name %s", presetName), "presetName"))
}
// Validate private preset has private image specified
Expand Down
58 changes: 33 additions & 25 deletions docker/presets/models/tfs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,39 @@ ARG MODEL_TYPE
ARG VERSION

# Set the working directory
WORKDIR /workspace/tfs
WORKDIR /workspace

# Write the version to a file
RUN echo $VERSION > /workspace/tfs/version.txt

# First, copy just the preset files and install dependencies
# This is done before copying the code to utilize Docker's layer caching and
# avoid reinstalling dependencies unless the requirements file changes.
# Inference
COPY kaito/presets/inference/${MODEL_TYPE}/requirements.txt /workspace/tfs/inference-requirements.txt
RUN pip install --no-cache-dir -r inference-requirements.txt

COPY kaito/presets/inference/${MODEL_TYPE}/inference_api.py /workspace/tfs/inference_api.py

# Fine Tuning
COPY kaito/presets/tuning/${MODEL_TYPE}/requirements.txt /workspace/tfs/tuning-requirements.txt
RUN pip install --no-cache-dir -r tuning-requirements.txt

COPY kaito/presets/tuning/${MODEL_TYPE}/cli.py /workspace/tfs/cli.py
COPY kaito/presets/tuning/${MODEL_TYPE}/fine_tuning.py /workspace/tfs/fine_tuning.py
COPY kaito/presets/tuning/${MODEL_TYPE}/parser.py /workspace/tfs/parser.py
COPY kaito/presets/tuning/${MODEL_TYPE}/dataset.py /workspace/tfs/dataset.py

# Copy the metrics server
COPY kaito/presets/tuning/${MODEL_TYPE}/metrics/metrics_server.py /workspace/tfs/metrics_server.py

# Copy the entire model weights to the weights directory
COPY ${WEIGHTS_PATH} /workspace/tfs/weights
COPY kaito/presets/inference/vllm/requirements.txt /workspace/vllm/inference-requirements.txt

RUN pip install --no-cache-dir -r /workspace/tfs/inference-requirements.txt && \
pip install --no-cache-dir -r /workspace/tfs/tuning-requirements.txt && \
pip install --no-cache-dir -r /workspace/vllm/inference-requirements.txt

# 1. Huggingface transformers
COPY kaito/presets/inference/${MODEL_TYPE}/inference_api.py \
kaito/presets/tuning/${MODEL_TYPE}/cli.py \
kaito/presets/tuning/${MODEL_TYPE}/fine_tuning.py \
kaito/presets/tuning/${MODEL_TYPE}/parser.py \
kaito/presets/tuning/${MODEL_TYPE}/dataset.py \
kaito/presets/tuning/${MODEL_TYPE}/metrics/metrics_server.py \
/workspace/tfs/

# 2. vLLM
COPY kaito/presets/inference/vllm/inference_api.py /workspace/vllm/inference_api.py

# Chat template
RUN apt update && apt install -y git && \
rm /var/lib/apt/lists/* -r
RUN git clone https://github.com/chujiezheng/chat_templates /tmp/chat_templates && \
cd /tmp/chat_templates && \
git reset --hard 670a2eb && \
cp -r ./chat_templates/ /workspace/ && \
rm -rf /tmp/chat_templates

# Model weights
COPY ${WEIGHTS_PATH} /workspace/weights
RUN echo $VERSION > /workspace/version.txt && \
ln -s /workspace/weights /workspace/tfs/weights && \
ln -s /workspace/weights /workspace/vllm/weights
4 changes: 2 additions & 2 deletions hack/run-pytest-in-venv.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env bash

set -ex
set -e

if [ "$#" -ne 2 ]; then
echo "Usage: $0 <test_dir> <requirements.txt>"
Expand All @@ -20,7 +20,7 @@ trap cleanup EXIT

cd $VENV_DIR
printf "Creating virtual environment in %s\n" "$VENV_DIR"
python3 -m virtualenv venv
python3 -m virtualenv --system-site-packages venv
source "$VENV_DIR/venv/bin/activate"
if [ "$?" -ne 0 ]; then
printf "Failed to activate virtual environment\n"
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/workspace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ func (c *WorkspaceReconciler) applyInference(ctx context.Context, wObj *kaitov1a
} else if apierrors.IsNotFound(err) {
var workloadObj client.Object
// Need to create a new workload
workloadObj, err = inference.CreatePresetInference(ctx, wObj, revisionStr, inferenceParam, model.SupportDistributedInference(), c.Client)
workloadObj, err = inference.CreatePresetInference(ctx, wObj, revisionStr, model, c.Client)
if err != nil {
return
}
Expand Down
87 changes: 42 additions & 45 deletions pkg/inference/preset-inferences.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/azure/kaito/pkg/utils"
"github.com/azure/kaito/pkg/utils/consts"

"github.com/azure/kaito/api/v1alpha1"
kaitov1alpha1 "github.com/azure/kaito/api/v1alpha1"
"github.com/azure/kaito/pkg/model"
"github.com/azure/kaito/pkg/resources"
Expand All @@ -22,9 +23,8 @@ import (
)

const (
ProbePath = "/healthz"
Port5000 = int32(5000)
InferenceFile = "inference_api.py"
ProbePath = "/health"
Port5000 = int32(5000)
)

var (
Expand Down Expand Up @@ -70,26 +70,31 @@ var (
}
)

func updateTorchParamsForDistributedInference(ctx context.Context, kubeClient client.Client, wObj *kaitov1alpha1.Workspace, inferenceObj *model.PresetParam) error {
func updateTorchParamsForDistributedInference(ctx context.Context, kubeClient client.Client, wObj *kaitov1alpha1.Workspace, inferenceParam *model.PresetParam) error {
backendName := v1alpha1.GetWorkspaceBackendName(wObj)
if backendName != model.BackendNameHuggingfaceTransformers {
return fmt.Errorf("distributed inference is not supported for backend %s", backendName)
}

existingService := &corev1.Service{}
err := resources.GetResource(ctx, wObj.Name, wObj.Namespace, kubeClient, existingService)
if err != nil {
return err
}

nodes := *wObj.Resource.Count
inferenceObj.TorchRunParams["nnodes"] = strconv.Itoa(nodes)
inferenceObj.TorchRunParams["nproc_per_node"] = strconv.Itoa(inferenceObj.WorldSize / nodes)
inferenceParam.Transformers.TorchRunParams["nnodes"] = strconv.Itoa(nodes)
inferenceParam.Transformers.TorchRunParams["nproc_per_node"] = strconv.Itoa(inferenceParam.WorldSize / nodes)
if nodes > 1 {
inferenceObj.TorchRunParams["node_rank"] = "$(echo $HOSTNAME | grep -o '[^-]*$')"
inferenceObj.TorchRunParams["master_addr"] = existingService.Spec.ClusterIP
inferenceObj.TorchRunParams["master_port"] = "29500"
}
if inferenceObj.TorchRunRdzvParams != nil {
inferenceObj.TorchRunRdzvParams["max_restarts"] = "3"
inferenceObj.TorchRunRdzvParams["rdzv_id"] = "job"
inferenceObj.TorchRunRdzvParams["rdzv_backend"] = "c10d"
inferenceObj.TorchRunRdzvParams["rdzv_endpoint"] =
inferenceParam.Transformers.TorchRunParams["node_rank"] = "$(echo $HOSTNAME | grep -o '[^-]*$')"
inferenceParam.Transformers.TorchRunParams["master_addr"] = existingService.Spec.ClusterIP
inferenceParam.Transformers.TorchRunParams["master_port"] = "29500"
}
if inferenceParam.Transformers.TorchRunRdzvParams != nil {
inferenceParam.Transformers.TorchRunRdzvParams["max_restarts"] = "3"
inferenceParam.Transformers.TorchRunRdzvParams["rdzv_id"] = "job"
inferenceParam.Transformers.TorchRunRdzvParams["rdzv_backend"] = "c10d"
inferenceParam.Transformers.TorchRunRdzvParams["rdzv_endpoint"] =
fmt.Sprintf("%s-0.%s-headless.%s.svc.cluster.local:29500", wObj.Name, wObj.Name, wObj.Namespace)
}
return nil
Expand All @@ -114,14 +119,17 @@ func GetInferenceImageInfo(ctx context.Context, workspaceObj *kaitov1alpha1.Work
}

func CreatePresetInference(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, revisionNum string,
inferenceObj *model.PresetParam, supportDistributedInference bool, kubeClient client.Client) (client.Object, error) {
if inferenceObj.TorchRunParams != nil && supportDistributedInference {
if err := updateTorchParamsForDistributedInference(ctx, kubeClient, workspaceObj, inferenceObj); err != nil {
model model.Model, kubeClient client.Client) (client.Object, error) {
inferenceParam := model.GetInferenceParameters().DeepCopy()

if model.SupportDistributedInference() {
if err := updateTorchParamsForDistributedInference(ctx, kubeClient, workspaceObj, inferenceParam); err != nil { //
klog.ErrorS(err, "failed to update torch params", "workspace", workspaceObj)
return nil, err
}
}

// additional volume
var volumes []corev1.Volume
var volumeMounts []corev1.VolumeMount
shmVolume, shmVolumeMount := utils.ConfigSHMVolume(*workspaceObj.Resource.Count)
Expand All @@ -131,24 +139,35 @@ func CreatePresetInference(ctx context.Context, workspaceObj *kaitov1alpha1.Work
if shmVolumeMount.Name != "" {
volumeMounts = append(volumeMounts, shmVolumeMount)
}

if len(workspaceObj.Inference.Adapters) > 0 {
adapterVolume, adapterVolumeMount := utils.ConfigAdapterVolume()
volumes = append(volumes, adapterVolume)
volumeMounts = append(volumeMounts, adapterVolumeMount)
}

// resource requirements
skuNumGPUs, err := utils.GetSKUNumGPUs(ctx, kubeClient, workspaceObj.Status.WorkerNodes,
workspaceObj.Resource.InstanceType, inferenceObj.GPUCountRequirement)
workspaceObj.Resource.InstanceType, inferenceParam.GPUCountRequirement)
if err != nil {
return nil, fmt.Errorf("failed to get SKU num GPUs: %v", err)
}
resourceReq := corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(skuNumGPUs),
},
Limits: corev1.ResourceList{
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(skuNumGPUs),
},
}

commands, resourceReq := prepareInferenceParameters(ctx, inferenceObj, skuNumGPUs)
image, imagePullSecrets := GetInferenceImageInfo(ctx, workspaceObj, inferenceObj)
// inference command
backendName := v1alpha1.GetWorkspaceBackendName(workspaceObj)
commands := inferenceParam.GetInferenceCommand(backendName)

image, imagePullSecrets := GetInferenceImageInfo(ctx, workspaceObj, inferenceParam)

var depObj client.Object
if supportDistributedInference {
if model.SupportDistributedInference() {
depObj = resources.GenerateStatefulSetManifest(ctx, workspaceObj, image, imagePullSecrets, *workspaceObj.Resource.Count, commands,
containerPorts, livenessProbe, readinessProbe, resourceReq, tolerations, volumes, volumeMounts)
} else {
Expand All @@ -161,25 +180,3 @@ func CreatePresetInference(ctx context.Context, workspaceObj *kaitov1alpha1.Work
}
return depObj, nil
}

// prepareInferenceParameters builds a PyTorch command:
// torchrun <TORCH_PARAMS> <OPTIONAL_RDZV_PARAMS> baseCommand <MODEL_PARAMS>
// and sets the GPU resources required for inference.
// Returns the command and resource configuration.
func prepareInferenceParameters(ctx context.Context, inferenceObj *model.PresetParam, skuNumGPUs string) ([]string, corev1.ResourceRequirements) {
torchCommand := utils.BuildCmdStr(inferenceObj.BaseCommand, inferenceObj.TorchRunParams)
torchCommand = utils.BuildCmdStr(torchCommand, inferenceObj.TorchRunRdzvParams)
modelCommand := utils.BuildCmdStr(InferenceFile, inferenceObj.ModelRunParams)
commands := utils.ShellCmd(torchCommand + " " + modelCommand)

resourceRequirements := corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(skuNumGPUs),
},
Limits: corev1.ResourceList{
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(skuNumGPUs),
},
}

return commands, resourceRequirements
}
Loading

0 comments on commit 1fcd0b1

Please sign in to comment.