Skip to content

Commit

Permalink
Airflow 2.x generic pipeline mods. See also #3166.
Browse files Browse the repository at this point in the history
- Changed processor airflow and jinja2 template airflow to comply with Airflow 2.x.
- Added new location of KubernetesPodOperator library in Airflow 2.x to test Pipeline for processor airflow.
- Added cpu and memory limits fields in airflow 2.x fashion as well.
- Added documentation related to Airflow 2 support and Elyra 4 for readthedocs.
- Also mentioning what does not yet work in Elyra 4, e.g. custom components.
- tests_kubernetes.py fixed lint-server E226 missing whitespace around arithmetic operator
- tests_metadata.py fixed lint-server E226 missing whitespace around arithmetic operator

Signed-off-by: Sven Thoms <[email protected]>
  • Loading branch information
shalberd committed Dec 21, 2024
1 parent 1aeb32e commit 9112a32
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 43 deletions.
2 changes: 2 additions & 0 deletions docs/source/getting_started/tutorials.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ Learn how to [run runtime-specific pipelines on Kubeflow Pipelines](https://gith
#### Run generic pipelines on Apache Airflow

Learn how to [run generic pipelines on Apache Airflow](https://github.com/elyra-ai/examples/tree/main/pipelines/run-generic-pipelines-on-apache-airflow). This tutorial requires an Apache Airflow deployment in a local environment or on the cloud.
If you want to run generic pipelines in Airflow 2.x, you have to use Elyra 4. In Elyra 4, generic pipeline support for Airflow 1 is removed.

#### Run runtime-specific pipelines on Apache Airflow

Learn how to [run runtime-specific pipelines on Apache Airflow](https://github.com/elyra-ai/examples/tree/main/pipelines/run-pipelines-on-apache-airflow). This tutorial requires an Apache Airflow deployment in a local environment or on the cloud.
If you want to run generic components (R, Python, ipynb Notebooks) in runtime-specific pipelines in Airflow 2.x, you have to use Elyra 4. In Elyra 4, generic pipeline support for Airflow 1 is removed and custom Airflow components are not yet supported.


#### Examples
Expand Down
6 changes: 5 additions & 1 deletion docs/source/recipes/configure-airflow-as-a-runtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Pipelines in Elyra can be run locally in JupyterLab, or remotely on Kubeflow Pip
**Note: Support for Apache Airflow is experimental.**

This document outlines how to set up a new Elyra-enabled Apache Airflow environment or add Elyra support to an existing deployment.
You can submit pipelines with generic components to Airflow 2.x from Elyra 4 on.
Generic components DAG code generation support for Airflow 1.x is removed in Elyra 4.

This guide assumes a general working knowledge of and administration of a Kubernetes cluster.

Expand All @@ -42,8 +44,10 @@ AND
OR

- An existing Apache Airflow cluster
- Ensure Apache Airflow is at least v1.10.8 and below v2.0.0. Other versions might work but have not been tested.
- Ensure Apache Airflow is at least v1.10.8 and below v2.0.0. This applies to Elyra < 4.
- Ensure Apache Airflow is at least v2.7.0. This applies to Elyra 4.
- Apache Airflow is configured to use the Kubernetes Executor.
- Apache Airflow must be configured to use git-sync, which is configurable both in [Airflow 1](https://airflow.apache.org/docs/apache-airflow/1.10.12/configurations-ref.html?highlight=git%20sync#git-repo) as well as in [Airflow 2](https://airflow.apache.org/docs/helm-chart/stable/parameters-ref.html#airflow)
- Ensure the KubernetesPodOperator is installed and available in the Apache Airflow deployment

## Setting up a DAG repository on Git
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ See the [Kubeflow Pipelines documentation](https://www.kubeflow.org/docs/compone
### Apache Airflow components

#### Requirements
Apache Airflow components are currently only supported for Airflow < 2 and in Elyra < 4.
Elyra 4 starts with generic components support (R, Python, ipync Notebooks), not (yet) custom components, for Airflow 2.x.

##### Configure fully qualified package names for custom operator classes

Expand Down Expand Up @@ -161,4 +163,4 @@ The missing component definition is stored in a [Machine Learning Exchange](http

#### Component catalogs not listed here

Check the [component catalog connector directory](https://github.com/elyra-ai/examples/blob/main/component-catalog-connectors/connector-directory.md) if the referenced catalog type is not listed here.
Check the [component catalog connector directory](https://github.com/elyra-ai/examples/blob/main/component-catalog-connectors/connector-directory.md) if the referenced catalog type is not listed here.
10 changes: 6 additions & 4 deletions docs/source/user_guide/pipeline-components.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ The same pipeline could be implemented using a single component that performs al

#### Generic components

Elyra includes three _generic components_ that allow for the processing of Jupyter notebooks, Python scripts, and R scripts. These components are called generic because they can be included in pipelines for any supported runtime type: local/JupyterLab, Kubeflow Pipelines, and Apache Airflow. Components are exposed in the pipeline editor via the palette.
Elyra includes three _generic components_ that allow for the processing of Jupyter notebooks, Python scripts, and R scripts. These components are called generic because they can be included in pipelines for any supported runtime type: local/JupyterLab, Kubeflow Pipelines, and Apache Airflow 2.x. Components are exposed in the pipeline editor via the palette.

![Generic components in the palette](../images/user_guide/pipeline-components/generic-components-in-palette.png)

Note: Refer to the [_Best practices_ topic in the _User Guide_](best-practices-file-based-nodes.md) to learn more about special considerations for generic components.

#### Custom components

_Custom components_ are commonly only implemented for one runtime type, such as Kubeflow Pipelines or Apache Airflow. (The local runtime type does not support custom components.)
_Custom components_ are commonly only implemented for one runtime type, such as Kubeflow Pipelines or Apache Airflow < 2. (The local runtime type does not support custom components). Custom components, due to their being supported only for Airflow 1.x, are only supported on Elyra < 4.

![Kubeflow components in the palette](../images/user_guide/pipeline-components/custom-kubeflow-components-in-palette.png)

Expand All @@ -63,9 +63,9 @@ Elyra includes connectors for the following component catalog types:

Example: A URL component catalog that is configured using the `http://myserver:myport/mypath/my_component.yaml` URL makes the `my_component.yaml` component file available to Elyra.

- [_Apache Airflow package catalogs_](#apache-airflow-package-catalog) provide access to Apache Airflow operators that are stored in Apache Airflow built distributions.
- [_Apache Airflow package catalogs_](#apache-airflow-package-catalog) provide access to Apache Airflow operators that are stored in Apache Airflow built distributions. This is currently only supported for Airflow < 2.

- [_Apache Airflow provider package catalogs_](#apache-airflow-provider-package-catalog) provide access to Apache Airflow operators that are stored in Apache Airflow provider packages.
- [_Apache Airflow provider package catalogs_](#apache-airflow-provider-package-catalog) provide access to Apache Airflow operators that are stored in Apache Airflow provider packages. This is currently only supported for Airflow < 2.

Refer to section [Built-in catalog connector reference](#built-in-catalog-connector-reference) for details about these connectors.

Expand Down Expand Up @@ -438,6 +438,7 @@ Examples (CLI):

The [Apache Airflow package catalog connector](https://github.com/elyra-ai/elyra/tree/main/elyra/pipeline/airflow/package_catalog_connector) provides access to operators that are stored in Apache Airflow [built distributions](https://packaging.python.org/en/latest/glossary/#term-built-distribution):
- Only the [wheel distribution format](https://packaging.python.org/en/latest/glossary/#term-Wheel) is supported.
- Only Airflow < 2 is supported. Use of that functionality is not working in Elyra >=4, which is no longer supporting Airflow 1.x.
- The specified URL must be retrievable using an HTTP `GET` request. `http`, `https`, and `file` [URI schemes](https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml) are supported.
- In secured environments where SSL server authenticity can only be validated using certificates based on private public key infrastructure (PKI) with root and optionally intermediate certificate authorities (CAs) that are not publicly trusted, you must define environment variable `TRUSTED_CA_BUNDLE_PATH` in the environment where JupyterLab/Elyra is running. The variable value must identify an existing [Privacy-Enhanced Mail (PEM) file](https://en.wikipedia.org/wiki/Privacy-Enhanced_Mail).

Expand All @@ -454,6 +455,7 @@ Examples:
#### Apache Airflow provider package catalog
The [Apache Airflow provider package catalog connector](https://github.com/elyra-ai/elyra/tree/main/elyra/pipeline/airflow/provider_package_catalog_connector) provides access to operators that are stored in [Apache Airflow provider packages](https://airflow.apache.org/docs/apache-airflow-providers/):
- Only the [wheel distribution format](https://packaging.python.org/en/latest/glossary/#term-Wheel) is supported.
- Only Airflow < 2 and operators for Airflow < 2 are supported. Use of that functionality is not working in Elyra >=4, which is no longer supporting Airflow 1.x.
- The specified URL must be retrievable using an HTTP `GET` request. `http`, `https`, and `file` [URI schemes](https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml) are supported.
- In secured environments where SSL server authenticity can only be validated using certificates based on private public key infrastructure (PKI) with root and optionally intermediate certificate authorities (CAs) that are not publicly trusted, you must define environment variable `TRUSTED_CA_BUNDLE_PATH` in the environment where JupyterLab/Elyra is running. The variable value must identify an existing [Privacy-Enhanced Mail (PEM) file](https://en.wikipedia.org/wiki/Privacy-Enhanced_Mail).

Expand Down
25 changes: 20 additions & 5 deletions elyra/pipeline/airflow/processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance
"cpu_limit": operation.cpu_limit,
"memory_limit": operation.memory_limit,
"gpu_limit": operation.gpu,
"gpu_vendor": operation.gpu_vendor,
"operator_source": operation.filename,
}

Expand Down Expand Up @@ -598,13 +599,23 @@ def render_volumes(self, elyra_properties: Dict[str, ElyraProperty]) -> str:
str_to_render = ""
for v in elyra_properties.get(pipeline_constants.MOUNTED_VOLUMES, []):
str_to_render += f"""
Volume(name="{v.pvc_name}", configs={{"persistentVolumeClaim": {{"claimName": "{v.pvc_name}"}}}}),"""
k8s.V1Volume(
name="{v.pvc_name}",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
claim_name="{v.pvc_name}",
),
),"""
# set custom shared memory size
shm = elyra_properties.get(pipeline_constants.KUBERNETES_SHARED_MEM_SIZE)
if shm is not None and shm.size:
config = f"""configs={{"emptyDir": {{"medium": "Memory", "sizeLimit": "{shm.size}{shm.units}"}}}}"""
str_to_render += f"""
Volume(name="shm", {config}),"""
k8s.V1Volume(
name="shm",
empty_dir=k8s.V1EmptyDirVolumeSource(
medium="Memory",
size_limit="{shm.size}{shm.units}",
),
),"""
return dedent(str_to_render)

def render_mounts(self, elyra_properties: Dict[str, ElyraProperty]) -> str:
Expand All @@ -615,8 +626,12 @@ def render_mounts(self, elyra_properties: Dict[str, ElyraProperty]) -> str:
str_to_render = ""
for v in elyra_properties.get(pipeline_constants.MOUNTED_VOLUMES, []):
str_to_render += f"""
VolumeMount(name="{v.pvc_name}", mount_path="{v.path}",
sub_path="{v.sub_path}", read_only={v.read_only}),"""
k8s.V1VolumeMount(
name="{v.pvc_name}",
mount_path="{v.path}",
sub_path="{v.sub_path}",
read_only={v.read_only},
),"""
return dedent(str_to_render)

def render_secrets(self, elyra_properties: Dict[str, ElyraProperty], cos_secret: Optional[str]) -> str:
Expand Down
55 changes: 29 additions & 26 deletions elyra/templates/airflow/airflow_template.jinja2
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from airflow import DAG
from airflow.utils.dates import days_ago
import pendulum

args = {
'project_id' : '{{ pipeline_name }}',
}

dag = DAG(
'{{ pipeline_name }}',
dag_id='{{ pipeline_name }}',
default_args=args,
schedule_interval='@once',
start_date=days_ago(1),
schedule='@once',
start_date=pendulum.today('UTC').add(days=-1),
description="""
{{ pipeline_description|replace("\"\"\"", "\\\"\\\"\\\"") }}
""",
Expand All @@ -22,10 +22,9 @@ dag = DAG(
{{import_statement}}
{% endfor %}
{% else %}
from airflow.kubernetes.secret import Secret
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.secret import Secret
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
{% endif %}

{% if operation.operator_source %}# Operator source: {{ operation.operator_source }}{% endif %}
Expand All @@ -48,23 +47,27 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n
task_id='{{ operation.notebook|regex_replace }}',
env_vars={{ operation.pipeline_envs }},
{% if operation.cpu_request or operation.mem_request or operation.cpu_limit or operation.memory_limit or operation.gpu_limit %}
resources = {
{% if operation.cpu_request %}
'request_cpu': '{{ operation.cpu_request }}',
{% endif %}
{% if operation.mem_request %}
'request_memory': '{{ operation.mem_request }}G',
{% endif %}
{% if operation.cpu_limit %}
'limit_cpu': '{{ operation.cpu_limit }}',
{% endif %}
{% if operation.memory_limit %}
'limit_memory': '{{ operation.memory_limit }}G',
{% endif %}
{% if operation.gpu_limit %}
'limit_gpu': '{{ operation.gpu_limit }}',
{% endif %}
},
container_resources=k8s.V1ResourceRequirements(
requests={
{% if operation.cpu_request %}
'cpu': '{{ operation.cpu_request }}',
{% endif %}
{% if operation.mem_request %}
'memory': '{{ operation.mem_request }}G',
{% endif %}
},
limits={
{% if operation.cpu_limit %}
'cpu': '{{ operation.cpu_limit }}',
{% endif %}
{% if operation.memory_limit %}
'memory': '{{ operation.memory_limit }}G',
{% endif %}
{% if operation.gpu_limit %}
'{{ operation.gpu_vendor }}': '{{ operation.gpu_limit }}',
{% endif %}
}
),
{% endif %}
volumes=[{{ processor.render_volumes(operation.elyra_props) }}],
volume_mounts=[{{ processor.render_mounts(operation.elyra_props) }}],
Expand All @@ -73,7 +76,7 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n
labels={{ processor.render_labels(operation.elyra_props) }},
tolerations=[{{ processor.render_tolerations(operation.elyra_props) }}],
in_cluster={{ in_cluster }},
config_file="{{ kube_config_path }}",
config_file={% if kube_config_path is string %}"{{ kube_config_path }}"{% else %}{{ kube_config_path }}{% endif %},
{% endif %}
dag=dag)
{% if operation.image_pull_policy %}
Expand Down
2 changes: 1 addition & 1 deletion elyra/tests/metadata/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ def test_validation_performance():
print(
f"\nMemory: {diff:,} kb, Start: {memory_start.rss / 1024 / 1024:,.3f} mb, "
f"End: {memory_end.rss / 1024 / 1024:,.3f} mb., "
f"Elapsed time: {t1-t0:.3f}s over {iterations} iterations."
f"Elapsed time: {t1 - t0:.3f}s over {iterations} iterations."
)


Expand Down
2 changes: 1 addition & 1 deletion elyra/tests/pipeline/airflow/test_processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def test_create_file(monkeypatch, processor, parsed_pipeline, parsed_ordered_dic
with open(response) as f:
file_as_lines = f.read().splitlines()

assert "from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator" in file_as_lines
assert "from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator" in file_as_lines

# Check DAG project name
for i in range(len(file_as_lines)):
Expand Down
8 changes: 4 additions & 4 deletions elyra/tests/util/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def test_is_valid_label_key_invalid_input():
assert not is_valid_label_key(key="/n") # prefix too short
assert not is_valid_label_key(key="p/") # name too short
assert not is_valid_label_key(key="a" * 254) # name too long
assert not is_valid_label_key(key=f"d/{'b'*64}") # name too long
assert not is_valid_label_key(key=f"d/{'b' * 64}") # name too long
# test first character violations (not alphanum)
assert not is_valid_label_key(key="-a")
assert not is_valid_label_key(key=".b")
Expand Down Expand Up @@ -116,7 +116,7 @@ def test_is_valid_label_key_valid_input():
assert is_valid_label_key(key="p/n")
assert is_valid_label_key(key="prefix/you.2")
assert is_valid_label_key(key="how.sad/to-see")
assert is_valid_label_key(key=f"{'d'*253}/{'n'*63}")
assert is_valid_label_key(key=f"{'d' * 253}/{'n' * 63}")


def test_is_valid_label_value_invalid_input():
Expand Down Expand Up @@ -175,7 +175,7 @@ def test_is_valid_annotation_key_invalid_input():
assert not is_valid_annotation_key(key="/n") # prefix too short
assert not is_valid_annotation_key(key="p/") # name too short
assert not is_valid_annotation_key(key="a" * 254) # name too long
assert not is_valid_annotation_key(key=f"d/{'b'*64}") # name too long
assert not is_valid_annotation_key(key=f"d/{'b' * 64}") # name too long
# test first character violations (not alphanum)
assert not is_valid_annotation_key(key="-a")
assert not is_valid_annotation_key(key=".b")
Expand Down Expand Up @@ -204,7 +204,7 @@ def test_is_valid_annotation_key_valid_input():
assert is_valid_annotation_key(key="p/n")
assert is_valid_annotation_key(key="prefix/you.2")
assert is_valid_annotation_key(key="how.sad/to-see")
assert is_valid_annotation_key(key=f"{'d'*253}/{'n'*63}")
assert is_valid_annotation_key(key=f"{'d' * 253}/{'n' * 63}")


def test_is_valid_annotation_value_invalid_input():
Expand Down

0 comments on commit 9112a32

Please sign in to comment.