diff --git a/docs/source/getting_started/tutorials.md b/docs/source/getting_started/tutorials.md index 1333a0e2a..a861e6c73 100644 --- a/docs/source/getting_started/tutorials.md +++ b/docs/source/getting_started/tutorials.md @@ -38,10 +38,12 @@ Learn how to [run runtime-specific pipelines on Kubeflow Pipelines](https://gith #### Run generic pipelines on Apache Airflow Learn how to [run generic pipelines on Apache Airflow](https://github.com/elyra-ai/examples/tree/main/pipelines/run-generic-pipelines-on-apache-airflow). This tutorial requires an Apache Airflow deployment in a local environment or on the cloud. +If you want to run generic pipelines in Airflow 2.x, you have to use Elyra 4. In Elyra 4, generic pipeline support for Airflow 1 is removed. #### Run runtime-specific pipelines on Apache Airflow Learn how to [run runtime-specific pipelines on Apache Airflow](https://github.com/elyra-ai/examples/tree/main/pipelines/run-pipelines-on-apache-airflow). This tutorial requires an Apache Airflow deployment in a local environment or on the cloud. +If you want to run generic components (R, Python, ipynb Notebooks) in runtime-specific pipelines in Airflow 2.x, you have to use Elyra 4. In Elyra 4, generic pipeline support for Airflow 1 is removed and custom Airflow components are not yet supported. #### Examples diff --git a/docs/source/recipes/configure-airflow-as-a-runtime.md b/docs/source/recipes/configure-airflow-as-a-runtime.md index 55120dfa5..7bb7c53ed 100644 --- a/docs/source/recipes/configure-airflow-as-a-runtime.md +++ b/docs/source/recipes/configure-airflow-as-a-runtime.md @@ -24,6 +24,8 @@ Pipelines in Elyra can be run locally in JupyterLab, or remotely on Kubeflow Pip **Note: Support for Apache Airflow is experimental.** This document outlines how to set up a new Elyra-enabled Apache Airflow environment or add Elyra support to an existing deployment. +You can submit pipelines with generic components to Airflow 2.x from Elyra 4 on. +Generic components DAG code generation support for Airflow 1.x is removed in Elyra 4. This guide assumes a general working knowledge of and administration of a Kubernetes cluster. @@ -42,8 +44,10 @@ AND OR - An existing Apache Airflow cluster - - Ensure Apache Airflow is at least v1.10.8 and below v2.0.0. Other versions might work but have not been tested. + - Ensure Apache Airflow is at least v1.10.8 and below v2.0.0. This applies to Elyra < 4. + - Ensure Apache Airflow is at least v2.7.0. This applies to Elyra 4. - Apache Airflow is configured to use the Kubernetes Executor. + - Apache Airflow must be configured to use git-sync, which is configurable both in [Airflow 1](https://airflow.apache.org/docs/apache-airflow/1.10.12/configurations-ref.html?highlight=git%20sync#git-repo) as well as in [Airflow 2](https://airflow.apache.org/docs/helm-chart/stable/parameters-ref.html#airflow) - Ensure the KubernetesPodOperator is installed and available in the Apache Airflow deployment ## Setting up a DAG repository on Git diff --git a/docs/source/user_guide/best-practices-custom-pipeline-components.md b/docs/source/user_guide/best-practices-custom-pipeline-components.md index f661b021e..c55b48d12 100644 --- a/docs/source/user_guide/best-practices-custom-pipeline-components.md +++ b/docs/source/user_guide/best-practices-custom-pipeline-components.md @@ -40,6 +40,8 @@ See the [Kubeflow Pipelines documentation](https://www.kubeflow.org/docs/compone ### Apache Airflow components #### Requirements +Apache Airflow components are currently only supported for Airflow < 2 and in Elyra < 4. +Elyra 4 starts with generic components support (R, Python, ipync Notebooks), not (yet) custom components, for Airflow 2.x. ##### Configure fully qualified package names for custom operator classes @@ -161,4 +163,4 @@ The missing component definition is stored in a [Machine Learning Exchange](http #### Component catalogs not listed here -Check the [component catalog connector directory](https://github.com/elyra-ai/examples/blob/main/component-catalog-connectors/connector-directory.md) if the referenced catalog type is not listed here. \ No newline at end of file +Check the [component catalog connector directory](https://github.com/elyra-ai/examples/blob/main/component-catalog-connectors/connector-directory.md) if the referenced catalog type is not listed here. diff --git a/docs/source/user_guide/pipeline-components.md b/docs/source/user_guide/pipeline-components.md index b8cc0ede0..95fbc58b7 100644 --- a/docs/source/user_guide/pipeline-components.md +++ b/docs/source/user_guide/pipeline-components.md @@ -28,7 +28,7 @@ The same pipeline could be implemented using a single component that performs al #### Generic components -Elyra includes three _generic components_ that allow for the processing of Jupyter notebooks, Python scripts, and R scripts. These components are called generic because they can be included in pipelines for any supported runtime type: local/JupyterLab, Kubeflow Pipelines, and Apache Airflow. Components are exposed in the pipeline editor via the palette. +Elyra includes three _generic components_ that allow for the processing of Jupyter notebooks, Python scripts, and R scripts. These components are called generic because they can be included in pipelines for any supported runtime type: local/JupyterLab, Kubeflow Pipelines, and Apache Airflow 2.x. Components are exposed in the pipeline editor via the palette. ![Generic components in the palette](../images/user_guide/pipeline-components/generic-components-in-palette.png) @@ -36,7 +36,7 @@ Note: Refer to the [_Best practices_ topic in the _User Guide_](best-practices-f #### Custom components -_Custom components_ are commonly only implemented for one runtime type, such as Kubeflow Pipelines or Apache Airflow. (The local runtime type does not support custom components.) +_Custom components_ are commonly only implemented for one runtime type, such as Kubeflow Pipelines or Apache Airflow < 2. (The local runtime type does not support custom components). Custom components, due to their being supported only for Airflow 1.x, are only supported on Elyra < 4. ![Kubeflow components in the palette](../images/user_guide/pipeline-components/custom-kubeflow-components-in-palette.png) @@ -63,9 +63,9 @@ Elyra includes connectors for the following component catalog types: Example: A URL component catalog that is configured using the `http://myserver:myport/mypath/my_component.yaml` URL makes the `my_component.yaml` component file available to Elyra. - - [_Apache Airflow package catalogs_](#apache-airflow-package-catalog) provide access to Apache Airflow operators that are stored in Apache Airflow built distributions. + - [_Apache Airflow package catalogs_](#apache-airflow-package-catalog) provide access to Apache Airflow operators that are stored in Apache Airflow built distributions. This is currently only supported for Airflow < 2. - - [_Apache Airflow provider package catalogs_](#apache-airflow-provider-package-catalog) provide access to Apache Airflow operators that are stored in Apache Airflow provider packages. + - [_Apache Airflow provider package catalogs_](#apache-airflow-provider-package-catalog) provide access to Apache Airflow operators that are stored in Apache Airflow provider packages. This is currently only supported for Airflow < 2. Refer to section [Built-in catalog connector reference](#built-in-catalog-connector-reference) for details about these connectors. @@ -438,6 +438,7 @@ Examples (CLI): The [Apache Airflow package catalog connector](https://github.com/elyra-ai/elyra/tree/main/elyra/pipeline/airflow/package_catalog_connector) provides access to operators that are stored in Apache Airflow [built distributions](https://packaging.python.org/en/latest/glossary/#term-built-distribution): - Only the [wheel distribution format](https://packaging.python.org/en/latest/glossary/#term-Wheel) is supported. +- Only Airflow < 2 is supported. Use of that functionality is not working in Elyra >=4, which is no longer supporting Airflow 1.x. - The specified URL must be retrievable using an HTTP `GET` request. `http`, `https`, and `file` [URI schemes](https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml) are supported. - In secured environments where SSL server authenticity can only be validated using certificates based on private public key infrastructure (PKI) with root and optionally intermediate certificate authorities (CAs) that are not publicly trusted, you must define environment variable `TRUSTED_CA_BUNDLE_PATH` in the environment where JupyterLab/Elyra is running. The variable value must identify an existing [Privacy-Enhanced Mail (PEM) file](https://en.wikipedia.org/wiki/Privacy-Enhanced_Mail). @@ -454,6 +455,7 @@ Examples: #### Apache Airflow provider package catalog The [Apache Airflow provider package catalog connector](https://github.com/elyra-ai/elyra/tree/main/elyra/pipeline/airflow/provider_package_catalog_connector) provides access to operators that are stored in [Apache Airflow provider packages](https://airflow.apache.org/docs/apache-airflow-providers/): - Only the [wheel distribution format](https://packaging.python.org/en/latest/glossary/#term-Wheel) is supported. +- Only Airflow < 2 and operators for Airflow < 2 are supported. Use of that functionality is not working in Elyra >=4, which is no longer supporting Airflow 1.x. - The specified URL must be retrievable using an HTTP `GET` request. `http`, `https`, and `file` [URI schemes](https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml) are supported. - In secured environments where SSL server authenticity can only be validated using certificates based on private public key infrastructure (PKI) with root and optionally intermediate certificate authorities (CAs) that are not publicly trusted, you must define environment variable `TRUSTED_CA_BUNDLE_PATH` in the environment where JupyterLab/Elyra is running. The variable value must identify an existing [Privacy-Enhanced Mail (PEM) file](https://en.wikipedia.org/wiki/Privacy-Enhanced_Mail). diff --git a/elyra/pipeline/airflow/processor_airflow.py b/elyra/pipeline/airflow/processor_airflow.py index c7449c6f8..e53629bf4 100644 --- a/elyra/pipeline/airflow/processor_airflow.py +++ b/elyra/pipeline/airflow/processor_airflow.py @@ -345,6 +345,7 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance "cpu_limit": operation.cpu_limit, "memory_limit": operation.memory_limit, "gpu_limit": operation.gpu, + "gpu_vendor": operation.gpu_vendor, "operator_source": operation.filename, } @@ -598,13 +599,23 @@ def render_volumes(self, elyra_properties: Dict[str, ElyraProperty]) -> str: str_to_render = "" for v in elyra_properties.get(pipeline_constants.MOUNTED_VOLUMES, []): str_to_render += f""" - Volume(name="{v.pvc_name}", configs={{"persistentVolumeClaim": {{"claimName": "{v.pvc_name}"}}}}),""" + k8s.V1Volume( + name="{v.pvc_name}", + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource( + claim_name="{v.pvc_name}", + ), + ),""" # set custom shared memory size shm = elyra_properties.get(pipeline_constants.KUBERNETES_SHARED_MEM_SIZE) if shm is not None and shm.size: - config = f"""configs={{"emptyDir": {{"medium": "Memory", "sizeLimit": "{shm.size}{shm.units}"}}}}""" str_to_render += f""" - Volume(name="shm", {config}),""" + k8s.V1Volume( + name="shm", + empty_dir=k8s.V1EmptyDirVolumeSource( + medium="Memory", + size_limit="{shm.size}{shm.units}", + ), + ),""" return dedent(str_to_render) def render_mounts(self, elyra_properties: Dict[str, ElyraProperty]) -> str: @@ -615,8 +626,12 @@ def render_mounts(self, elyra_properties: Dict[str, ElyraProperty]) -> str: str_to_render = "" for v in elyra_properties.get(pipeline_constants.MOUNTED_VOLUMES, []): str_to_render += f""" - VolumeMount(name="{v.pvc_name}", mount_path="{v.path}", - sub_path="{v.sub_path}", read_only={v.read_only}),""" + k8s.V1VolumeMount( + name="{v.pvc_name}", + mount_path="{v.path}", + sub_path="{v.sub_path}", + read_only={v.read_only}, + ),""" return dedent(str_to_render) def render_secrets(self, elyra_properties: Dict[str, ElyraProperty], cos_secret: Optional[str]) -> str: diff --git a/elyra/templates/airflow/airflow_template.jinja2 b/elyra/templates/airflow/airflow_template.jinja2 index fc68a55f6..b9245695f 100644 --- a/elyra/templates/airflow/airflow_template.jinja2 +++ b/elyra/templates/airflow/airflow_template.jinja2 @@ -1,15 +1,15 @@ from airflow import DAG -from airflow.utils.dates import days_ago +import pendulum args = { 'project_id' : '{{ pipeline_name }}', } dag = DAG( - '{{ pipeline_name }}', + dag_id='{{ pipeline_name }}', default_args=args, - schedule_interval='@once', - start_date=days_ago(1), + schedule='@once', + start_date=pendulum.today('UTC').add(days=-1), description=""" {{ pipeline_description|replace("\"\"\"", "\\\"\\\"\\\"") }} """, @@ -22,10 +22,9 @@ dag = DAG( {{import_statement}} {% endfor %} {% else %} -from airflow.kubernetes.secret import Secret -from airflow.contrib.kubernetes.volume import Volume -from airflow.contrib.kubernetes.volume_mount import VolumeMount -from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.secret import Secret +from kubernetes.client import models as k8s +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator {% endif %} {% if operation.operator_source %}# Operator source: {{ operation.operator_source }}{% endif %} @@ -48,23 +47,27 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n task_id='{{ operation.notebook|regex_replace }}', env_vars={{ operation.pipeline_envs }}, {% if operation.cpu_request or operation.mem_request or operation.cpu_limit or operation.memory_limit or operation.gpu_limit %} - resources = { - {% if operation.cpu_request %} - 'request_cpu': '{{ operation.cpu_request }}', - {% endif %} - {% if operation.mem_request %} - 'request_memory': '{{ operation.mem_request }}G', - {% endif %} - {% if operation.cpu_limit %} - 'limit_cpu': '{{ operation.cpu_limit }}', - {% endif %} - {% if operation.memory_limit %} - 'limit_memory': '{{ operation.memory_limit }}G', - {% endif %} - {% if operation.gpu_limit %} - 'limit_gpu': '{{ operation.gpu_limit }}', - {% endif %} - }, + container_resources=k8s.V1ResourceRequirements( + requests={ + {% if operation.cpu_request %} + 'cpu': '{{ operation.cpu_request }}', + {% endif %} + {% if operation.mem_request %} + 'memory': '{{ operation.mem_request }}G', + {% endif %} + }, + limits={ + {% if operation.cpu_limit %} + 'cpu': '{{ operation.cpu_limit }}', + {% endif %} + {% if operation.memory_limit %} + 'memory': '{{ operation.memory_limit }}G', + {% endif %} + {% if operation.gpu_limit %} + '{{ operation.gpu_vendor }}': '{{ operation.gpu_limit }}', + {% endif %} + } + ), {% endif %} volumes=[{{ processor.render_volumes(operation.elyra_props) }}], volume_mounts=[{{ processor.render_mounts(operation.elyra_props) }}], @@ -73,7 +76,7 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n labels={{ processor.render_labels(operation.elyra_props) }}, tolerations=[{{ processor.render_tolerations(operation.elyra_props) }}], in_cluster={{ in_cluster }}, - config_file="{{ kube_config_path }}", + config_file={% if kube_config_path is string %}"{{ kube_config_path }}"{% else %}{{ kube_config_path }}{% endif %}, {% endif %} dag=dag) {% if operation.image_pull_policy %} diff --git a/elyra/tests/metadata/test_metadata.py b/elyra/tests/metadata/test_metadata.py index 71728dd3a..d431d8428 100644 --- a/elyra/tests/metadata/test_metadata.py +++ b/elyra/tests/metadata/test_metadata.py @@ -704,7 +704,7 @@ def test_validation_performance(): print( f"\nMemory: {diff:,} kb, Start: {memory_start.rss / 1024 / 1024:,.3f} mb, " f"End: {memory_end.rss / 1024 / 1024:,.3f} mb., " - f"Elapsed time: {t1-t0:.3f}s over {iterations} iterations." + f"Elapsed time: {t1 - t0:.3f}s over {iterations} iterations." ) diff --git a/elyra/tests/pipeline/airflow/test_processor_airflow.py b/elyra/tests/pipeline/airflow/test_processor_airflow.py index 7217aae7a..30e387f36 100644 --- a/elyra/tests/pipeline/airflow/test_processor_airflow.py +++ b/elyra/tests/pipeline/airflow/test_processor_airflow.py @@ -195,7 +195,7 @@ def test_create_file(monkeypatch, processor, parsed_pipeline, parsed_ordered_dic with open(response) as f: file_as_lines = f.read().splitlines() - assert "from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator" in file_as_lines + assert "from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator" in file_as_lines # Check DAG project name for i in range(len(file_as_lines)): diff --git a/elyra/tests/util/test_kubernetes.py b/elyra/tests/util/test_kubernetes.py index c82effe19..d019b5cde 100644 --- a/elyra/tests/util/test_kubernetes.py +++ b/elyra/tests/util/test_kubernetes.py @@ -87,7 +87,7 @@ def test_is_valid_label_key_invalid_input(): assert not is_valid_label_key(key="/n") # prefix too short assert not is_valid_label_key(key="p/") # name too short assert not is_valid_label_key(key="a" * 254) # name too long - assert not is_valid_label_key(key=f"d/{'b'*64}") # name too long + assert not is_valid_label_key(key=f"d/{'b' * 64}") # name too long # test first character violations (not alphanum) assert not is_valid_label_key(key="-a") assert not is_valid_label_key(key=".b") @@ -116,7 +116,7 @@ def test_is_valid_label_key_valid_input(): assert is_valid_label_key(key="p/n") assert is_valid_label_key(key="prefix/you.2") assert is_valid_label_key(key="how.sad/to-see") - assert is_valid_label_key(key=f"{'d'*253}/{'n'*63}") + assert is_valid_label_key(key=f"{'d' * 253}/{'n' * 63}") def test_is_valid_label_value_invalid_input(): @@ -175,7 +175,7 @@ def test_is_valid_annotation_key_invalid_input(): assert not is_valid_annotation_key(key="/n") # prefix too short assert not is_valid_annotation_key(key="p/") # name too short assert not is_valid_annotation_key(key="a" * 254) # name too long - assert not is_valid_annotation_key(key=f"d/{'b'*64}") # name too long + assert not is_valid_annotation_key(key=f"d/{'b' * 64}") # name too long # test first character violations (not alphanum) assert not is_valid_annotation_key(key="-a") assert not is_valid_annotation_key(key=".b") @@ -204,7 +204,7 @@ def test_is_valid_annotation_key_valid_input(): assert is_valid_annotation_key(key="p/n") assert is_valid_annotation_key(key="prefix/you.2") assert is_valid_annotation_key(key="how.sad/to-see") - assert is_valid_annotation_key(key=f"{'d'*253}/{'n'*63}") + assert is_valid_annotation_key(key=f"{'d' * 253}/{'n' * 63}") def test_is_valid_annotation_value_invalid_input():