From 28f413ce2c3713baf82feabea0707032102d6569 Mon Sep 17 00:00:00 2001 From: Dimitris Poulopoulos Date: Tue, 23 Jul 2024 17:57:52 +0300 Subject: [PATCH] pipeline: Add steps to deploy the model Add steps to: * Process the trained model. * Create a Triton model repository. * Serve the model using KServe. Signed-off-by: Dimitris Poulopoulos --- 02.pipeline.ipynb | 193 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 190 insertions(+), 3 deletions(-) diff --git a/02.pipeline.ipynb b/02.pipeline.ipynb index d652f07..1fd485b 100644 --- a/02.pipeline.ipynb +++ b/02.pipeline.ipynb @@ -7,7 +7,9 @@ "source": [ "# Pipeline\n", "\n", - "In this Jupyter Notebook you will create a Kubeflow Pipeline that automates the workflow for the [ISIC 2024 - Skin Cancer Detection with 3D-TBP Kaggle](https://www.kaggle.com/competitions/isic-2024-challenge) competition. The Pipeline consists of two primary steps:\n", + "In this Jupyter Notebook you will create a Kubeflow Pipeline that automates the process of training and deploying a model for the [ISIC 2024 - Skin Cancer Detection with 3D-TBP Kaggle](https://www.kaggle.com/competitions/isic-2024-challenge) competition.\n", + "\n", + "The Pipeline consists of four primary steps:\n", "\n", "1. **📥 Download the Competition Dataset**: In this initial step, the Pipeline downloads the specified Kaggle competition dataset inside a PVC. Utilizing the Kaggle CLI and the previously\n", " created Kubernetes secret, the dataset is retrieved and prepared for subsequent use in the training process.\n", @@ -15,6 +17,11 @@ "1. **🚀 Launch a Distributed Training Job (PyTorchJob)**: The second step involves launching a distributed training job using the Kubeflow Training operator, specifically, a PyTorchJob.\n", " This step orchestrates the training of a machine learning model in a distributed manner, leveraging the capabilities of PyTorch Distributed for efficient and scalable training.\n", "\n", + "1. **⚒️ Build the model repository**: The third step of the pipeline builds the model repository, a specific directory structure that includes the trained model in ONNX format and a configuration file.\n", + " This is needed by the Triton Inference Server.\n", + "\n", + "1. **🎉 Deploy the model**: Use KServe and the Triton backend to deploy the trained model as a scalable API.\n", + "\n", "By integrating these steps into a Kubeflow Pipeline, this Notebook facilitates a streamlined, reproducible, and automated approach to training a model for a Kaggle competition. The Pipeline ensures that the dataset is readily available and that the training job is efficiently executed within the Kubeflow environment, providing a robust framework for developing and deploying machine learning models." ] }, @@ -246,6 +253,156 @@ " training_client.create_job(pytorch_job, namespace=namespace)" ] }, + { + "cell_type": "markdown", + "id": "c44d1b8b-ce62-4131-a93c-315896f35c1b", + "metadata": {}, + "source": [ + "Next, you need to wait for the training process to complete. Thus, you can create a monitor training step that just streams the logs from the master node of the distributed training job." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2b5cfe5e-be1e-4c08-9a9d-c6c6dfd161ba", + "metadata": {}, + "outputs": [], + "source": [ + "@dsl.component(packages_to_install=[\"kubeflow-training==1.8.0\"])\n", + "def monitor_training(run_name: str) -> None:\n", + " from kubeflow.training import TrainingClient, constants\n", + " \n", + " training_client = TrainingClient(job_kind=constants.PYTORCHJOB_KIND)\n", + " training_client.get_job_logs(\n", + " name=run_name,\n", + " follow=True)" + ] + }, + { + "cell_type": "markdown", + "id": "6da54eae-e303-4bcc-a389-64bab2f06040", + "metadata": {}, + "source": [ + "The training is done. The next step builds the model repository, a directory with a specific structure that the Triton Inference Server expects. In this step, you are performing two distinct tasks:\n", + "\n", + "* Convert the model to the ONNX format.\n", + "* Write the configuration file that provides the Triton Inference Server with instructions on how to serve the model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "92d34c67-e7cc-4f2e-9267-37b9f69d8604", + "metadata": {}, + "outputs": [], + "source": [ + "@dsl.component(packages_to_install=[\"onnx==1.16.1\", \"timm==1.0.7\"])\n", + "def build_model_repo() -> None:\n", + " import os\n", + " import timm\n", + " import torch\n", + " import torch.nn as nn\n", + " \n", + " os.makedirs(\"/models/model-repository/skin_cancer_detection/1\", exist_ok=True)\n", + " \n", + " model = timm.create_model('efficientnet_b2', pretrained=False)\n", + " # grab the number of input features to the classifier\n", + " num_features = model.classifier.in_features\n", + " \n", + " # add a new binary classfier\n", + " model.classifier = nn.Sequential(\n", + " nn.Dropout(0.2),\n", + " nn.Linear(num_features, 1),\n", + " nn.Sigmoid())\n", + " \n", + " checkpoint = torch.load(\n", + " \"/logs/checkpoints/model_checkpoint.ckpt\", map_location=torch.device('cpu'))\n", + " \n", + " model_weights = {k.replace(\"model.\", \"\"): v for k, v in checkpoint[\"state_dict\"].items() if k.startswith(\"model.\")}\n", + " model.load_state_dict(model_weights)\n", + " model.eval()\n", + " \n", + " tensor_x = torch.rand((1, 3, 224, 224), dtype=torch.float32)\n", + " onnx_program = torch.onnx.export(model, tensor_x, \"/models/model-repository/skin_cancer_detection/1/model.onnx\")\n", + " \n", + " content = \"\"\"\n", + " name: \"skin_cancer_detection\"\n", + " backend: \"onnxruntime\"\n", + " max_batch_size : 0\n", + " input [\n", + " {\n", + " name: \"input.1\"\n", + " data_type: TYPE_FP32\n", + " dims: [ 1, 3, 224, 224 ]\n", + " }\n", + " ]\n", + " output [\n", + " {\n", + " name: \"919\"\n", + " data_type: TYPE_FP32\n", + " dims: [ 1, 1 ]\n", + " }\n", + " ]\n", + " \"\"\"\n", + " \n", + " with open('/models/model-repository/skin_cancer_detection/config.pbtxt', 'w') as file:\n", + " file.write(content)" + ] + }, + { + "cell_type": "markdown", + "id": "19c527e3-e1ce-4e53-b42b-654e79085ad7", + "metadata": {}, + "source": [ + "Finally, you are ready to deploy the model. For this, you will create a KServe Inference Service, leveraging the Triton backend." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ce8fb42e-28ae-4420-80e0-45dfaba494fe", + "metadata": {}, + "outputs": [], + "source": [ + "@dsl.component(packages_to_install=[\"kserve==0.13.0\"])\n", + "def deploy_model(isvc_name: str, namespace: str) -> None:\n", + " from kserve import KServeClient\n", + " from kserve.models import (V1beta1InferenceService,\n", + " V1beta1InferenceServiceSpec,\n", + " V1beta1PredictorSpec,\n", + " V1beta1ModelSpec,\n", + " V1beta1ModelFormat)\n", + " from kubernetes.client import (V1ObjectMeta,\n", + " V1ResourceRequirements)\n", + " \n", + " kserve_client = KServeClient()\n", + " \n", + " metadata = V1ObjectMeta(name=isvc_name)\n", + " \n", + " model_format = V1beta1ModelFormat(name=\"triton\")\n", + " \n", + " model_spec = V1beta1ModelSpec(\n", + " name=\"\",\n", + " model_format=model_format,\n", + " protocol_version=\"v2\",\n", + " runtime_version=\"24.06-py3\",\n", + " storage_uri=\"pvc://model-repo/model-repository\",\n", + " )\n", + " \n", + " predictor_spec = V1beta1PredictorSpec(model=model_spec)\n", + " \n", + " isvc_spec = V1beta1InferenceServiceSpec(predictor=predictor_spec)\n", + " \n", + " isvc = V1beta1InferenceService(\n", + " api_version=\"serving.kserve.io/v1beta1\",\n", + " kind=\"InferenceService\",\n", + " metadata=metadata,\n", + " spec=isvc_spec\n", + " )\n", + " \n", + " kserve_client.create(isvc, namespace)" + ] + }, { "cell_type": "markdown", "id": "9ff6bfa6-5b9d-4f45-a19e-f3b0e1e7d87c", @@ -268,6 +425,7 @@ " namespace: str,\n", " competition_name: str,\n", " dist_run_name: str,\n", + " isvc_name: str,\n", " data_vol: str,\n", " logs_vol: str,\n", " dist_run_image: str,\n", @@ -310,6 +468,14 @@ " size='2.0Gi',\n", " storage_class_name='longhorn'\n", " )\n", + " \n", + " # create a PVC to store the trained models\n", + " model_repo_pvc = kubernetes.CreatePVC(\n", + " pvc_name='model-repo',\n", + " access_modes=['ReadWriteMany'],\n", + " size='4.0Gi',\n", + " storage_class_name='longhorn'\n", + " )\n", "\n", " download_data_step = download_data(\n", " competition=competition_name,\n", @@ -326,12 +492,32 @@ " image_args=dist_image_args,\n", " data_mount_path=data_mount_path,\n", " logs_mount_path=logs_mount_path).after(download_data_step)\n", - " launch_training_step.set_caching_options(enable_caching=False)\n", + " launch_training_step.set_caching_options(enable_caching=True)\n", + " \n", + " monitor_training_step = monitor_training(\n", + " run_name=dist_run_name).after(launch_training_step)\n", + " monitor_training_step.set_caching_options(enable_caching=True)\n", + " \n", + " build_model_repo_step = build_model_repo().after(monitor_training_step)\n", + " build_model_repo_step.set_caching_options(enable_caching=True)\n", + " \n", + " deploy_model_step = deploy_model(\n", + " isvc_name=isvc_name,\n", + " namespace=namespace).after(build_model_repo_step)\n", + " deploy_model_step.set_caching_options(enable_caching=False)\n", "\n", " kubernetes.mount_pvc(\n", " download_data_step,\n", " pvc_name=isic_data_pvc.outputs['name'],\n", " mount_path='/data')\n", + " kubernetes.mount_pvc(\n", + " build_model_repo_step,\n", + " pvc_name=isic_logs_pvc.outputs['name'],\n", + " mount_path='/logs')\n", + " kubernetes.mount_pvc(\n", + " build_model_repo_step,\n", + " pvc_name=model_repo_pvc.outputs['name'],\n", + " mount_path='/models')\n", " kubernetes.use_secret_as_env(\n", " download_data_step,\n", " secret_name=kaggle_secret,\n", @@ -389,9 +575,10 @@ " \"namespace\": ns,\n", " \"competition_name\": \"isic-2024-challenge\",\n", " \"dist_run_name\": \"pytorch-dist-isic-efficientnet\",\n", + " \"isvc_name\": \"skin-cancer-detection\",\n", " \"data_vol\": \"isic-data\",\n", " \"logs_vol\": \"isic-logs\",\n", - " \"dist_run_image\": \"dpoulopoulos/pytorch-dist-isic:61a89cd\",\n", + " \"dist_run_image\": \"dpoulopoulos/pytorch-dist-isic:fb2be35\",\n", " },\n", ") " ]