diff --git a/.gitignore b/.gitignore index dcb73da03..cd4020676 100644 --- a/.gitignore +++ b/.gitignore @@ -3,7 +3,6 @@ __pycache__/ .idea .jpg .ipynb_checkpoints/ -*.csv *.dat .DS_Store gen_modules diff --git a/docs/integrations/index.md b/docs/integrations/index.md index d462d64a9..7d48d1531 100644 --- a/docs/integrations/index.md +++ b/docs/integrations/index.md @@ -171,9 +171,9 @@ The {ref}`community ` would love to help you build new SDKs. Currentl :header-rows: 0 :widths: 20 30 -* - [flytekit](https://flytekit.readthedocs.io) +* - [flytekit](https://github.com/flyteorg/flytekit) - The Python SDK for Flyte. -* - [flytekit-java](https://github.com/spotify/flytekit-java) +* - [flytekit-java](https://github.com/flyteorg/flytekit-java) - The Java/Scala SDK for Flyte. ``` @@ -259,8 +259,8 @@ Hive :hidden: :caption: SDKs for writing tasks and workflows -flytekit -flytekit-java +flytekit +flytekit-java ``` diff --git a/examples/advanced_composition/advanced_composition/conditional.py b/examples/advanced_composition/advanced_composition/conditional.py index 7c314162b..f29c0770a 100644 --- a/examples/advanced_composition/advanced_composition/conditional.py +++ b/examples/advanced_composition/advanced_composition/conditional.py @@ -79,7 +79,10 @@ def shape_properties_accept_conditional_output(radius: float) -> float: if __name__ == "__main__": - print(f"Circumference of circle x Area of circle (radius={radius_small}): {shape_properties(radius=5.0)}") + radius_small = 0.5 + print( + f"Circumference of circle (radius={radius_small}) x Area of circle (radius={calculate_circle_circumference(radius=radius_small)}): {shape_properties_accept_conditional_output(radius=radius_small)}" + ) # Using the output of a previous task in a conditional @@ -213,8 +216,10 @@ def noop_in_conditional(radius: float, seed: int = 5) -> float: if __name__ == "__main__": default_seed_output = consume_task_output(radius=0.4) print( - f"Executing consume_task_output(0.4) with default seed=5. Expected output: calculate_circle_circumference => {default_seed_output}" + f"Executing consume_task_output(0.4) with default seed=5. Expected output: calculate_circle_area => {default_seed_output}" ) custom_seed_output = consume_task_output(radius=0.4, seed=7) - print(f"Executing consume_task_output(0.4, seed=7). Expected output: calculate_circle_area => {custom_seed_output}") + print( + f"Executing consume_task_output(0.4, seed=7). Expected output: calculate_circle_circumference => {custom_seed_output}" + ) diff --git a/examples/customizing_dependencies/customizing_dependencies/raw_container.py b/examples/customizing_dependencies/customizing_dependencies/raw_container.py index e8dc9a5ed..9d986da56 100644 --- a/examples/customizing_dependencies/customizing_dependencies/raw_container.py +++ b/examples/customizing_dependencies/customizing_dependencies/raw_container.py @@ -1,6 +1,7 @@ import logging from flytekit import ContainerTask, kwtypes, task, workflow +from flytekit.core.base_task import TaskMetadata logger = logging.getLogger(__file__) @@ -25,6 +26,7 @@ "{{.inputs.b}}", "/var/outputs", ], + metadata=TaskMetadata(cache=True, cache_version="1.0"), ) calculate_ellipse_area_python = ContainerTask( @@ -41,6 +43,7 @@ "{{.inputs.b}}", "/var/outputs", ], + metadata=TaskMetadata(cache=True, cache_version="1.0"), ) calculate_ellipse_area_r = ContainerTask( @@ -58,6 +61,7 @@ "{{.inputs.b}}", "/var/outputs", ], + metadata=TaskMetadata(cache=True, cache_version="1.0"), ) calculate_ellipse_area_haskell = ContainerTask( @@ -73,6 +77,7 @@ "{{.inputs.b}}", "/var/outputs", ], + metadata=TaskMetadata(cache=True, cache_version="1.0"), ) calculate_ellipse_area_julia = ContainerTask( @@ -89,6 +94,7 @@ "{{.inputs.b}}", "/var/outputs", ], + metadata=TaskMetadata(cache=True, cache_version="1.0"), ) diff --git a/examples/data_types_and_io/data_types_and_io/file_streaming.py b/examples/data_types_and_io/data_types_and_io/file_streaming.py new file mode 100644 index 000000000..73e1add2a --- /dev/null +++ b/examples/data_types_and_io/data_types_and_io/file_streaming.py @@ -0,0 +1,45 @@ +import os + +import pandas as pd +from flytekit import task, workflow +from flytekit.types.directory import FlyteDirectory +from flytekit.types.file import FlyteFile + + +@task() +def remove_some_rows(ff: FlyteFile) -> FlyteFile: + """ + Remove the rows that the value of city is 'Seattle'. + This is an example with streaming support. + """ + new_file = FlyteFile.new_remote_file("data_without_seattle.csv") + with ff.open("r") as r: + with new_file.open("w") as w: + df = pd.read_csv(r) + df = df[df["City"] != "Seattle"] + df.to_csv(w, index=False) + return new_file + + +@task +def process_folder(fd: FlyteDirectory) -> FlyteDirectory: + out_fd = FlyteDirectory.new_remote("folder-copy") + for base, x in fd.crawl(): + src = str(os.path.join(base, x)) + out_file = out_fd.new_file(x) + with FlyteFile(src).open("rb") as f: + with out_file.open("wb") as o: + o.write(f.read()) + # The output path will be s3://my-s3-bucket/data/77/--0/folder-copy + return out_fd + + +@workflow() +def wf(): + remove_some_rows(ff=FlyteFile("s3://custom-bucket/data.csv")) + process_folder(fd=FlyteDirectory("s3://my-s3-bucket/folder")) + return + + +if __name__ == "__main__": + print(f"Running wf() {wf()}") diff --git a/examples/data_types_and_io/data_types_and_io/tensorflow_type.py b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py index 349f34b67..3ec8aea71 100644 --- a/examples/data_types_and_io/data_types_and_io/tensorflow_type.py +++ b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py @@ -1,6 +1,6 @@ # Import necessary libraries and modules -from flytekit import task, workflow +from flytekit import ImageSpec, task, workflow from flytekit.types.directory import TFRecordsDirectory from flytekit.types.file import TFRecordFile @@ -9,48 +9,54 @@ registry="ghcr.io/flyteorg", ) -if custom_image.is_container(): - import tensorflow as tf - - # TensorFlow Model - @task - def train_model() -> tf.keras.Model: - model = tf.keras.Sequential( - [tf.keras.layers.Dense(128, activation="relu"), tf.keras.layers.Dense(10, activation="softmax")] - ) - model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) - return model - - @task - def evaluate_model(model: tf.keras.Model, x: tf.Tensor, y: tf.Tensor) -> float: - loss, accuracy = model.evaluate(x, y) - return accuracy - - @workflow - def training_workflow(x: tf.Tensor, y: tf.Tensor) -> float: - model = train_model() - return evaluate_model(model=model, x=x, y=y) - - # TFRecord Files - @task - def process_tfrecord(file: TFRecordFile) -> int: - count = 0 - for record in tf.data.TFRecordDataset(file): - count += 1 - return count - - @workflow - def tfrecord_workflow(file: TFRecordFile) -> int: - return process_tfrecord(file=file) - - # TFRecord Directories - @task - def process_tfrecords_dir(dir: TFRecordsDirectory) -> int: - count = 0 - for record in tf.data.TFRecordDataset(dir.path): - count += 1 - return count - - @workflow - def tfrecords_dir_workflow(dir: TFRecordsDirectory) -> int: - return process_tfrecords_dir(dir=dir) +import tensorflow as tf + + +# TensorFlow Model +@task +def train_model() -> tf.keras.Model: + model = tf.keras.Sequential( + [tf.keras.layers.Dense(128, activation="relu"), tf.keras.layers.Dense(10, activation="softmax")] + ) + model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) + return model + + +@task +def evaluate_model(model: tf.keras.Model, x: tf.Tensor, y: tf.Tensor) -> float: + loss, accuracy = model.evaluate(x, y) + return accuracy + + +@workflow +def training_workflow(x: tf.Tensor, y: tf.Tensor) -> float: + model = train_model() + return evaluate_model(model=model, x=x, y=y) + + +# TFRecord Files +@task +def process_tfrecord(file: TFRecordFile) -> int: + count = 0 + for record in tf.data.TFRecordDataset(file): + count += 1 + return count + + +@workflow +def tfrecord_workflow(file: TFRecordFile) -> int: + return process_tfrecord(file=file) + + +# TFRecord Directories +@task +def process_tfrecords_dir(dir: TFRecordsDirectory) -> int: + count = 0 + for record in tf.data.TFRecordDataset(dir.path): + count += 1 + return count + + +@workflow +def tfrecords_dir_workflow(dir: TFRecordsDirectory) -> int: + return process_tfrecords_dir(dir=dir) diff --git a/examples/data_types_and_io/requirements.in b/examples/data_types_and_io/requirements.in index 79bd303e5..2bcce8b12 100644 --- a/examples/data_types_and_io/requirements.in +++ b/examples/data_types_and_io/requirements.in @@ -1,4 +1,5 @@ pandas torch tabulate +tensorflow pyarrow diff --git a/examples/kfmpi_plugin/README.md b/examples/kfmpi_plugin/README.md index ad23c4729..1eeeac68b 100644 --- a/examples/kfmpi_plugin/README.md +++ b/examples/kfmpi_plugin/README.md @@ -55,3 +55,36 @@ pyflyte run --remote \ ```{auto-examples-toc} mpi_mnist ``` + +## MPI Plugin Troubleshooting Guide + +This section covers common issues encountered during the setup of the MPI operator for distributed training jobs on Flyte. + +**Worker Pods Failing to Start (Insufficient Resources)** + +MPI worker pods may fail to start or exhibit scheduling issues, leading to job timeouts or failures. This often occurs due to resource constraints (CPU, memory, or GPU) in the cluster. + +1. Adjust Resource Requests: +Ensure that each worker pod has sufficient resources. You can adjust the resource requests in your task definition: + +``` + requests=Resources(cpu="", mem="") +``` + +Modify the CPU and memory values according to your cluster's available resources. This helps prevent pod scheduling failures caused by resource constraints. + +2. Check Pod Logs for Errors: +If the worker pods still fail to start, check the logs for any related errors: + +``` + kubectl logs -n +``` + +Look for resource allocation or worker communication errors. + +**Workflow Registration Method Errors (Timeouts or Deadlocks)** + +If your MPI workflow hangs or times out, it may be caused by an incorrect workflow registration method. + +1. Verify Registration Method: + When using a custom image, refer to the Flyte documentation on [Registering workflows](https://docs.flyte.org/en/latest/user_guide/flyte_fundamentals/registering_workflows.html#registration-patterns) to ensure you're following the correct registration method.