Skip to content

Commit

Permalink
pipeline: Add steps to deploy the model
Browse files Browse the repository at this point in the history
Add steps to:

* Process the trained model.
* Create a Triton model repository.
* Serve the model using KServe.

Signed-off-by: Dimitris Poulopoulos <[email protected]>
  • Loading branch information
dpoulopoulos committed Jul 24, 2024
1 parent fb2be35 commit 28f413c
Showing 1 changed file with 190 additions and 3 deletions.
193 changes: 190 additions & 3 deletions 02.pipeline.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,21 @@
"source": [
"# Pipeline\n",
"\n",
"In this Jupyter Notebook you will create a Kubeflow Pipeline that automates the workflow for the [ISIC 2024 - Skin Cancer Detection with 3D-TBP Kaggle](https://www.kaggle.com/competitions/isic-2024-challenge) competition. The Pipeline consists of two primary steps:\n",
"In this Jupyter Notebook you will create a Kubeflow Pipeline that automates the process of training and deploying a model for the [ISIC 2024 - Skin Cancer Detection with 3D-TBP Kaggle](https://www.kaggle.com/competitions/isic-2024-challenge) competition.\n",
"\n",
"The Pipeline consists of four primary steps:\n",
"\n",
"1. **📥 Download the Competition Dataset**: In this initial step, the Pipeline downloads the specified Kaggle competition dataset inside a PVC. Utilizing the Kaggle CLI and the previously\n",
" created Kubernetes secret, the dataset is retrieved and prepared for subsequent use in the training process.\n",
"\n",
"1. **🚀 Launch a Distributed Training Job (PyTorchJob)**: The second step involves launching a distributed training job using the Kubeflow Training operator, specifically, a PyTorchJob.\n",
" This step orchestrates the training of a machine learning model in a distributed manner, leveraging the capabilities of PyTorch Distributed for efficient and scalable training.\n",
"\n",
"1. **⚒️ Build the model repository**: The third step of the pipeline builds the model repository, a specific directory structure that includes the trained model in ONNX format and a configuration file.\n",
" This is needed by the Triton Inference Server.\n",
"\n",
"1. **🎉 Deploy the model**: Use KServe and the Triton backend to deploy the trained model as a scalable API.\n",
"\n",
"By integrating these steps into a Kubeflow Pipeline, this Notebook facilitates a streamlined, reproducible, and automated approach to training a model for a Kaggle competition. The Pipeline ensures that the dataset is readily available and that the training job is efficiently executed within the Kubeflow environment, providing a robust framework for developing and deploying machine learning models."
]
},
Expand Down Expand Up @@ -246,6 +253,156 @@
" training_client.create_job(pytorch_job, namespace=namespace)"
]
},
{
"cell_type": "markdown",
"id": "c44d1b8b-ce62-4131-a93c-315896f35c1b",
"metadata": {},
"source": [
"Next, you need to wait for the training process to complete. Thus, you can create a monitor training step that just streams the logs from the master node of the distributed training job."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2b5cfe5e-be1e-4c08-9a9d-c6c6dfd161ba",
"metadata": {},
"outputs": [],
"source": [
"@dsl.component(packages_to_install=[\"kubeflow-training==1.8.0\"])\n",
"def monitor_training(run_name: str) -> None:\n",
" from kubeflow.training import TrainingClient, constants\n",
" \n",
" training_client = TrainingClient(job_kind=constants.PYTORCHJOB_KIND)\n",
" training_client.get_job_logs(\n",
" name=run_name,\n",
" follow=True)"
]
},
{
"cell_type": "markdown",
"id": "6da54eae-e303-4bcc-a389-64bab2f06040",
"metadata": {},
"source": [
"The training is done. The next step builds the model repository, a directory with a specific structure that the Triton Inference Server expects. In this step, you are performing two distinct tasks:\n",
"\n",
"* Convert the model to the ONNX format.\n",
"* Write the configuration file that provides the Triton Inference Server with instructions on how to serve the model."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "92d34c67-e7cc-4f2e-9267-37b9f69d8604",
"metadata": {},
"outputs": [],
"source": [
"@dsl.component(packages_to_install=[\"onnx==1.16.1\", \"timm==1.0.7\"])\n",
"def build_model_repo() -> None:\n",
" import os\n",
" import timm\n",
" import torch\n",
" import torch.nn as nn\n",
" \n",
" os.makedirs(\"/models/model-repository/skin_cancer_detection/1\", exist_ok=True)\n",
" \n",
" model = timm.create_model('efficientnet_b2', pretrained=False)\n",
" # grab the number of input features to the classifier\n",
" num_features = model.classifier.in_features\n",
" \n",
" # add a new binary classfier\n",
" model.classifier = nn.Sequential(\n",
" nn.Dropout(0.2),\n",
" nn.Linear(num_features, 1),\n",
" nn.Sigmoid())\n",
" \n",
" checkpoint = torch.load(\n",
" \"/logs/checkpoints/model_checkpoint.ckpt\", map_location=torch.device('cpu'))\n",
" \n",
" model_weights = {k.replace(\"model.\", \"\"): v for k, v in checkpoint[\"state_dict\"].items() if k.startswith(\"model.\")}\n",
" model.load_state_dict(model_weights)\n",
" model.eval()\n",
" \n",
" tensor_x = torch.rand((1, 3, 224, 224), dtype=torch.float32)\n",
" onnx_program = torch.onnx.export(model, tensor_x, \"/models/model-repository/skin_cancer_detection/1/model.onnx\")\n",
" \n",
" content = \"\"\"\n",
" name: \"skin_cancer_detection\"\n",
" backend: \"onnxruntime\"\n",
" max_batch_size : 0\n",
" input [\n",
" {\n",
" name: \"input.1\"\n",
" data_type: TYPE_FP32\n",
" dims: [ 1, 3, 224, 224 ]\n",
" }\n",
" ]\n",
" output [\n",
" {\n",
" name: \"919\"\n",
" data_type: TYPE_FP32\n",
" dims: [ 1, 1 ]\n",
" }\n",
" ]\n",
" \"\"\"\n",
" \n",
" with open('/models/model-repository/skin_cancer_detection/config.pbtxt', 'w') as file:\n",
" file.write(content)"
]
},
{
"cell_type": "markdown",
"id": "19c527e3-e1ce-4e53-b42b-654e79085ad7",
"metadata": {},
"source": [
"Finally, you are ready to deploy the model. For this, you will create a KServe Inference Service, leveraging the Triton backend."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ce8fb42e-28ae-4420-80e0-45dfaba494fe",
"metadata": {},
"outputs": [],
"source": [
"@dsl.component(packages_to_install=[\"kserve==0.13.0\"])\n",
"def deploy_model(isvc_name: str, namespace: str) -> None:\n",
" from kserve import KServeClient\n",
" from kserve.models import (V1beta1InferenceService,\n",
" V1beta1InferenceServiceSpec,\n",
" V1beta1PredictorSpec,\n",
" V1beta1ModelSpec,\n",
" V1beta1ModelFormat)\n",
" from kubernetes.client import (V1ObjectMeta,\n",
" V1ResourceRequirements)\n",
" \n",
" kserve_client = KServeClient()\n",
" \n",
" metadata = V1ObjectMeta(name=isvc_name)\n",
" \n",
" model_format = V1beta1ModelFormat(name=\"triton\")\n",
" \n",
" model_spec = V1beta1ModelSpec(\n",
" name=\"\",\n",
" model_format=model_format,\n",
" protocol_version=\"v2\",\n",
" runtime_version=\"24.06-py3\",\n",
" storage_uri=\"pvc://model-repo/model-repository\",\n",
" )\n",
" \n",
" predictor_spec = V1beta1PredictorSpec(model=model_spec)\n",
" \n",
" isvc_spec = V1beta1InferenceServiceSpec(predictor=predictor_spec)\n",
" \n",
" isvc = V1beta1InferenceService(\n",
" api_version=\"serving.kserve.io/v1beta1\",\n",
" kind=\"InferenceService\",\n",
" metadata=metadata,\n",
" spec=isvc_spec\n",
" )\n",
" \n",
" kserve_client.create(isvc, namespace)"
]
},
{
"cell_type": "markdown",
"id": "9ff6bfa6-5b9d-4f45-a19e-f3b0e1e7d87c",
Expand All @@ -268,6 +425,7 @@
" namespace: str,\n",
" competition_name: str,\n",
" dist_run_name: str,\n",
" isvc_name: str,\n",
" data_vol: str,\n",
" logs_vol: str,\n",
" dist_run_image: str,\n",
Expand Down Expand Up @@ -310,6 +468,14 @@
" size='2.0Gi',\n",
" storage_class_name='longhorn'\n",
" )\n",
" \n",
" # create a PVC to store the trained models\n",
" model_repo_pvc = kubernetes.CreatePVC(\n",
" pvc_name='model-repo',\n",
" access_modes=['ReadWriteMany'],\n",
" size='4.0Gi',\n",
" storage_class_name='longhorn'\n",
" )\n",
"\n",
" download_data_step = download_data(\n",
" competition=competition_name,\n",
Expand All @@ -326,12 +492,32 @@
" image_args=dist_image_args,\n",
" data_mount_path=data_mount_path,\n",
" logs_mount_path=logs_mount_path).after(download_data_step)\n",
" launch_training_step.set_caching_options(enable_caching=False)\n",
" launch_training_step.set_caching_options(enable_caching=True)\n",
" \n",
" monitor_training_step = monitor_training(\n",
" run_name=dist_run_name).after(launch_training_step)\n",
" monitor_training_step.set_caching_options(enable_caching=True)\n",
" \n",
" build_model_repo_step = build_model_repo().after(monitor_training_step)\n",
" build_model_repo_step.set_caching_options(enable_caching=True)\n",
" \n",
" deploy_model_step = deploy_model(\n",
" isvc_name=isvc_name,\n",
" namespace=namespace).after(build_model_repo_step)\n",
" deploy_model_step.set_caching_options(enable_caching=False)\n",
"\n",
" kubernetes.mount_pvc(\n",
" download_data_step,\n",
" pvc_name=isic_data_pvc.outputs['name'],\n",
" mount_path='/data')\n",
" kubernetes.mount_pvc(\n",
" build_model_repo_step,\n",
" pvc_name=isic_logs_pvc.outputs['name'],\n",
" mount_path='/logs')\n",
" kubernetes.mount_pvc(\n",
" build_model_repo_step,\n",
" pvc_name=model_repo_pvc.outputs['name'],\n",
" mount_path='/models')\n",
" kubernetes.use_secret_as_env(\n",
" download_data_step,\n",
" secret_name=kaggle_secret,\n",
Expand Down Expand Up @@ -389,9 +575,10 @@
" \"namespace\": ns,\n",
" \"competition_name\": \"isic-2024-challenge\",\n",
" \"dist_run_name\": \"pytorch-dist-isic-efficientnet\",\n",
" \"isvc_name\": \"skin-cancer-detection\",\n",
" \"data_vol\": \"isic-data\",\n",
" \"logs_vol\": \"isic-logs\",\n",
" \"dist_run_image\": \"dpoulopoulos/pytorch-dist-isic:61a89cd\",\n",
" \"dist_run_image\": \"dpoulopoulos/pytorch-dist-isic:fb2be35\",\n",
" },\n",
") "
]
Expand Down

0 comments on commit 28f413c

Please sign in to comment.