diff --git a/pyproject.toml b/pyproject.toml index 98114841..5c2ee7fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,14 +58,17 @@ async_http = [ "nest-asyncio>=1.6.0", ] box = ["boxsdk[jwt]==3.8.1"] +delta = ["delta-spark>=2.2"] +excel = ["openpyxl>=3.0.0"] +ml = ["numpy>=1.21.5", "scikit-learn>=1.1.1", "scipy>=1.9.1", "pandas"] pandas = ["pandas>=1.3", "setuptools", "numpy<2.0.0", "pandas-stubs"] pyspark = ["pyspark>=3.2.0", "pyarrow>13"] pyspark_connect = ["pyspark[connect]>=3.5"] +se = ["spark-expectations>=2.2.1,<2.3.0"] sftp = ["paramiko>=2.6.0"] -delta = ["delta-spark>=2.2"] -excel = ["openpyxl>=3.0.0"] -tableau = ["tableauhyperapi>=0.0.19484", "tableauserverclient>=0.25"] snowflake = ["snowflake-connector-python>=3.12.0"] +tableau = ["tableauhyperapi>=0.0.19484", "tableauserverclient>=0.25"] +# development dependencies dev = ["ruff", "mypy", "pylint", "colorama", "types-PyYAML", "types-requests"] test = [ "chispa", @@ -100,7 +103,6 @@ docs = [ "pymdown-extensions>=10.7.0", "black", ] -se = ["spark-expectations>=2.2.1,<2.3.0"] [tool.hatch.metadata] allow-direct-references = true @@ -176,16 +178,17 @@ installer = "uv" features = [ "async", "async_http", - "pandas", - "pyspark", - "sftp", + "box", "delta", + "dev", "excel", + "ml", + "pandas", + "pyspark", "se", - "box", + "sftp", "snowflake", "tableau", - "dev", ] @@ -240,6 +243,7 @@ features = [ "delta", "dev", "excel", + "ml", "pandas", "pyspark", "se", @@ -415,13 +419,14 @@ features = [ "async", "async_http", "box", + "delta", + "excel", + "ml", "pandas", "pyspark", "se", "sftp", "snowflake", - "delta", - "excel", "tableau", "dev", "test", diff --git a/src/koheesio/__about__.py b/src/koheesio/__about__.py index b11f3d74..ecce567e 100644 --- a/src/koheesio/__about__.py +++ b/src/koheesio/__about__.py @@ -12,7 +12,7 @@ LICENSE_INFO = "Licensed as Apache 2.0" SOURCE = "https://github.com/Nike-Inc/koheesio" -__version__ = "0.9.1" +__version__ = "0.10.0rc0" __logo__ = ( 75, ( diff --git a/src/koheesio/models/__init__.py b/src/koheesio/models/__init__.py index 535582e5..c84364ba 100644 --- a/src/koheesio/models/__init__.py +++ b/src/koheesio/models/__init__.py @@ -11,7 +11,7 @@ from __future__ import annotations -from typing import Annotated, Any, Dict, List, Optional, Union +from typing import Annotated, Any, Dict, List, Optional, TypeVar, Union from abc import ABC from functools import cached_property from pathlib import Path diff --git a/src/koheesio/models/dataframe.py b/src/koheesio/models/dataframe.py new file mode 100644 index 00000000..01341aa6 --- /dev/null +++ b/src/koheesio/models/dataframe.py @@ -0,0 +1,166 @@ +""" +Module for the BaseReader and BaseTransformation classes +""" + +from typing import Optional, TypeVar +from abc import ABC, abstractmethod + +from koheesio import Step +from koheesio.models import Field + +DataFrameType = TypeVar("DataFrameType") +"""Defines a type variable that can be any type of DataFrame""" + + +class BaseReader(Step, ABC): + """Base class for all Readers + + Concept + ------- + A Reader is a Step that reads data from a source based on the input parameters + and stores the result in self.output.df (DataFrame). + + When implementing a Reader, the execute() method should be implemented. + The execute() method should read from the source and store the result in self.output.df. + + The Reader class implements a standard read() method that calls the execute() method and returns the result. This + method can be used to read data from a Reader without having to call the execute() method directly. Read method + does not need to be implemented in the child class. + + The Reader class also implements a shorthand for accessing the output Dataframe through the df-property. If the + output.df is None, .execute() will be run first. + """ + + @property + def to_df(self) -> Optional[DataFrameType]: + """Shorthand for accessing self.output.df + If the output.df is None, .execute() will be run first + + aliases: + - toDF, mimics the Delta API + - df + """ + if not self.output.df: + self.execute() + return self.output.df + + toDF = to_df + df = to_df + + @abstractmethod + def execute(self) -> Step.Output: + """Execute on a Reader should handle self.output.df (output) as a minimum + Read from whichever source -> store result in self.output.df + """ + pass + + def read(self) -> DataFrameType: + """Read from a Reader without having to call the execute() method directly""" + self.execute() + return self.output.df + + +class BaseTransformation(Step, ABC): + """Base class for all Transformations + + Concept + ------- + A Transformation is a Step that takes a DataFrame as input and returns a DataFrame as output. The DataFrame is + transformed based on the logic implemented in the `execute` method. Any additional parameters that are needed for + the transformation can be passed to the constructor. + + When implementing a Transformation, the `execute` method should be implemented. The `execute` method should take the + input DataFrame, transform it, and store the result in `self.output.df`. + + The Transformation class implements a standard `transform` method that calls the `execute` method and returns the + result. This method can be used to transform a DataFrame without having to call the `execute` method directly. The + `transform` method does not need to be implemented in the child class. + + The Transformation class also implements a shorthand for accessing the output DataFrame through the + `to-df`-property (alias: `toDF`). If the `output.df` is `None`, `.execute()` will be run first. + """ + + df: Optional[DataFrameType] = Field(default=None, description="The input DataFrame") + + @abstractmethod + def execute(self) -> Step.Output: + """Execute on a Transformation should handle self.df (input) and set self.output.df (output) + + This method should be implemented in the child class. The input DataFrame is available as `self.df` and the + output DataFrame should be stored in `self.output.df`. + + For example: + ```python + # pyspark example + def execute(self): + self.output.df = self.df.withColumn( + "new_column", f.col("old_column") + 1 + ) + ``` + + The transform method will call this method and return the output DataFrame. + """ + # self.df # input DataFrame + # self.output.df # output DataFrame + self.output.df = ... # implement the transformation logic + raise NotImplementedError + + @property + def to_df(self) -> Optional[DataFrameType]: + """Shorthand for accessing self.output.df + If the output.df is None, .execute() will be run first + """ + if not self.output.df: + self.execute() + return self.output.df + + toDF = to_df + """Alias for the to_df property - mimics the Delta API""" + + def transform(self, df: Optional[DataFrameType] = None) -> DataFrameType: + """Execute the transformation and return the output DataFrame + + Note: when creating a child from this, don't implement this transform method. Instead, implement `execute`. + + See Also + -------- + `Transformation.execute` + + Parameters + ---------- + df: Optional[DataFrameType] + The DataFrame to apply the transformation to. If not provided, the DataFrame passed to the constructor + will be used. + + Returns + ------- + DataFrameType + The transformed DataFrame + """ + if df is not None: + self.df = df + if self.df is None: + raise RuntimeError("No valid Dataframe was passed") + self.execute() + return self.output.df + + def __call__(self, *args, **kwargs): + """Allow the class to be called as a function. + This is especially useful when using a Pyspark DataFrame's transform method. + + Example + ------- + Using pyspark's DataFrame transform method: + ```python + input_df = spark.range(3) + + output_df = input_df.transform(AddOne(target_column="foo")).transform( + AddOne(target_column="bar") + ) + ``` + + In the above example, the `AddOne` transformation is applied to the `input_df` DataFrame using the `transform` + method. The `output_df` will now contain the original DataFrame with an additional columns called `foo` and + `bar', each with the values of `id` + 1. + """ + return self.transform(*args, **kwargs) diff --git a/src/koheesio/models/reader.py b/src/koheesio/models/reader.py index c1227940..283658de 100644 --- a/src/koheesio/models/reader.py +++ b/src/koheesio/models/reader.py @@ -1,50 +1,9 @@ """ Module for the BaseReader class -""" - -from typing import Optional, TypeVar -from abc import ABC, abstractmethod - -from koheesio import Step - -# Define a type variable that can be any type of DataFrame -DataFrameType = TypeVar("DataFrameType") - - -class BaseReader(Step, ABC): - """Base class for all Readers - A Reader is a Step that reads data from a source based on the input parameters - and stores the result in self.output.df (DataFrame). - - When implementing a Reader, the execute() method should be implemented. - The execute() method should read from the source and store the result in self.output.df. - - The Reader class implements a standard read() method that calls the execute() method and returns the result. This - method can be used to read data from a Reader without having to call the execute() method directly. Read method - does not need to be implemented in the child class. - - The Reader class also implements a shorthand for accessing the output Dataframe through the df-property. If the - output.df is None, .execute() will be run first. - """ - - @property - def df(self) -> Optional[DataFrameType]: - """Shorthand for accessing self.output.df - If the output.df is None, .execute() will be run first - """ - if not self.output.df: - self.execute() - return self.output.df +This acts as a pass-through for the BaseReader class in the models.dataframe module. +""" - @abstractmethod - def execute(self) -> Step.Output: - """Execute on a Reader should handle self.output.df (output) as a minimum - Read from whichever source -> store result in self.output.df - """ - pass +from koheesio.models.dataframe import BaseReader - def read(self) -> DataFrameType: - """Read from a Reader without having to call the execute() method directly""" - self.execute() - return self.output.df +__all__ = ["BaseReader"] diff --git a/src/koheesio/models/transformation.py b/src/koheesio/models/transformation.py new file mode 100644 index 00000000..80f29b2e --- /dev/null +++ b/src/koheesio/models/transformation.py @@ -0,0 +1,9 @@ +""" +Module for the BaseTransformation class + +This acts as a pass-through for the BaseTransformation class in the models.dataframe module. +""" + +from koheesio.models.dataframe import BaseTransformation + +__all__ = ["BaseTransformation"] diff --git a/src/koheesio/pandas/__init__.py b/src/koheesio/pandas/__init__.py index c753a8d4..d89cb6b1 100644 --- a/src/koheesio/pandas/__init__.py +++ b/src/koheesio/pandas/__init__.py @@ -13,6 +13,8 @@ from koheesio.spark.utils import import_pandas_based_on_pyspark_version pandas: ModuleType = import_pandas_based_on_pyspark_version() +"""pandas module""" +pd = pandas class PandasStep(Step, ABC): @@ -20,9 +22,13 @@ class PandasStep(Step, ABC): Extends the Step class with Pandas DataFrame support. The following: - Pandas steps are expected to return a Pandas DataFrame as output. + - Pandas steps have a pd attribute that is set to the Pandas module. This is to ensure that the applicable pandas + module is used when running the step. """ + pd: ModuleType = Field(pandas, description="Pandas module", alias="pandas") + class Output(StepOutput): """Output class for PandasStep""" - df: Optional[pandas.DataFrame] = Field(default=None, description="The Pandas DataFrame") # type: ignore + df: Optional[pd.DataFrame] = Field(default=None, description="The Pandas DataFrame") # type: ignore diff --git a/src/koheesio/pandas/transformations/__init__.py b/src/koheesio/pandas/transformations/__init__.py new file mode 100644 index 00000000..c289fc71 --- /dev/null +++ b/src/koheesio/pandas/transformations/__init__.py @@ -0,0 +1,116 @@ +from typing import Optional +from abc import ABC, abstractmethod + +from koheesio.models import Field +from koheesio.models.transformation import BaseTransformation +from koheesio.pandas import PandasStep +from koheesio.pandas import pandas as pd + + +class Transformation(BaseTransformation, PandasStep, ABC): + """Base class for all transformations + + Concept + ------- + A Transformation is a Step that takes a DataFrame as input and returns a DataFrame as output. The DataFrame is + transformed based on the logic implemented in the `execute` method. Any additional parameters that are needed for + the transformation can be passed to the constructor. + + Parameters + ---------- + df : Optional[pandas.DataFrame] + The DataFrame to apply the transformation to. If not provided, the DataFrame has to be passed to the + transform-method. + + Example + ------- + ### Implementing a transformation using the Transformation class: + ```python + from koheesio.pandas.transformations import Transformation + from koheesio.pandas import pandas as pd + + + class AddOne(Transformation): + target_column: str = "new_column" + + def execute(self): + self.output.df = self.df.copy() + self.output.df[self.target_column] = self.df["old_column"] + 1 + ``` + + In the example above, the `execute` method is implemented to add 1 to the values of the `old_column` and store the + result in a new column called `new_column`. + + ### Using the transformation: + In order to use this transformation, we can call the `transform` method: + + ```python + from koheesio.pandas import pandas as pd + + # create a DataFrame with 3 rows + df = pd.DataFrame({"old_column": [0, 1, 2]}) + + output_df = AddOne().transform(df) + ``` + + The `output_df` will now contain the original DataFrame with an additional column called `new_column` with the + values of `old_column` + 1. + + __output_df:__ + + | old_column | new_column | + |------------|------------| + | 0 | 1 | + | 1 | 2 | + | 2 | 3 | + ... + + ### Alternative ways to use the transformation: + Alternatively, we can pass the DataFrame to the constructor and call the `execute` or `transform` method without + any arguments: + + ```python + output_df = AddOne(df).transform() + # or + output_df = AddOne(df).execute().output.df + ``` + + > Note: that the transform method was not implemented explicitly in the AddOne class. This is because the `transform` + method is already implemented in the `Transformation` class. This means that all classes that inherit from the + Transformation class will have the `transform` method available. Only the execute method needs to be implemented. + + ### Using the transformation as a function: + The transformation can also be used as a function as part of a DataFrame's `pipe` method: + + ```python + input_df = pd.DataFrame({"old_column": [0, 1, 2]}) + add_baz = AddOne(target_column="baz") + + output_df = ( + input_df + # using the transform method + .pipe(AddOne(target_column="foo").transform) + # using the transformation as a function + .pipe(AddOne(target_column="bar")) + # or, using a variable that holds the transformation + .pipe(add_baz) + ) + ``` + + In the above example, the `AddOne` transformation is applied to the `input_df` DataFrame using the `transform` + method. The `output_df` will now contain the original DataFrame with an additional columns called `foo` and + `bar', each with the values of `id` + 1. + + Note: Because `Transformation` classes are callable, they can be used as functions in the `pipe` method of a DataFrame. + """ + + df: Optional[pd.DataFrame] = Field(default=None, description="The Spark DataFrame") + + @abstractmethod + def execute(self) -> PandasStep.Output: + # self.df # input DataFrame + # self.output.df # output DataFrame + self.output.df = ... # implement the transformation logic + raise NotImplementedError + + execute.__doc__ = BaseTransformation.execute.__doc__ diff --git a/src/koheesio/spark/transformations/__init__.py b/src/koheesio/spark/transformations/__init__.py index ca9866b6..ba6d3b9d 100644 --- a/src/koheesio/spark/transformations/__init__.py +++ b/src/koheesio/spark/transformations/__init__.py @@ -28,11 +28,18 @@ from pyspark.sql.types import DataType from koheesio.models import Field, ListOfColumns, field_validator +from koheesio.models.transformation import BaseTransformation from koheesio.spark import Column, DataFrame, SparkStep from koheesio.spark.utils import SparkDatatype +__all__ = [ + "Transformation", + "ColumnsTransformation", + "ColumnsTransformationWithTarget", +] -class Transformation(SparkStep, ABC): + +class Transformation(BaseTransformation, SparkStep, ABC): """Base class for all transformations Concept @@ -51,7 +58,7 @@ class Transformation(SparkStep, ABC): ------- ### Implementing a transformation using the Transformation class: ```python - from koheesio.steps.transformations import Transformation + from koheesio.spark.transformations import Transformation from pyspark.sql import functions as f @@ -125,71 +132,12 @@ def execute(self): @abstractmethod def execute(self) -> SparkStep.Output: - """Execute on a Transformation should handle self.df (input) and set self.output.df (output) - - This method should be implemented in the child class. The input DataFrame is available as `self.df` and the - output DataFrame should be stored in `self.output.df`. - - For example: - ```python - def execute(self): - self.output.df = self.df.withColumn( - "new_column", f.col("old_column") + 1 - ) - ``` - - The transform method will call this method and return the output DataFrame. - """ - # self.df # input dataframe - # self.output.df # output dataframe + # self.df # input DataFrame + # self.output.df # output DataFrame self.output.df = ... # implement the transformation logic raise NotImplementedError - def transform(self, df: Optional[DataFrame] = None) -> DataFrame: - """Execute the transformation and return the output DataFrame - - Note: when creating a child from this, don't implement this transform method. Instead, implement execute! - - See Also - -------- - `Transformation.execute` - - Parameters - ---------- - df: Optional[DataFrame] - The DataFrame to apply the transformation to. If not provided, the DataFrame passed to the constructor - will be used. - - Returns - ------- - DataFrame - The transformed DataFrame - """ - self.df = df or self.df - if not self.df: - raise RuntimeError("No valid Dataframe was passed") - self.execute() - return self.output.df - - def __call__(self, *args, **kwargs): - """Allow the class to be called as a function. - This is especially useful when using a DataFrame's transform method. - - Example - ------- - ```python - input_df = spark.range(3) - - output_df = input_df.transform(AddOne(target_column="foo")).transform( - AddOne(target_column="bar") - ) - ``` - - In the above example, the `AddOne` transformation is applied to the `input_df` DataFrame using the `transform` - method. The `output_df` will now contain the original DataFrame with an additional columns called `foo` and - `bar', each with the values of `id` + 1. - """ - return self.transform(*args, **kwargs) + execute.__doc__ = BaseTransformation.execute.__doc__ class ColumnsTransformation(Transformation, ABC): diff --git a/tests/pandas/transformations/test_pandas_transformation.py b/tests/pandas/transformations/test_pandas_transformation.py new file mode 100644 index 00000000..6cba8899 --- /dev/null +++ b/tests/pandas/transformations/test_pandas_transformation.py @@ -0,0 +1,63 @@ +import pandas.testing as pdt + +from koheesio.pandas import pandas as pd +from koheesio.pandas.transformations import Transformation + +# create a DataFrame with 3 rows +input_df = pd.DataFrame({"old_column": [0, 1, 2]}) + + +class AddOne(Transformation): + """A transformation that adds 1 to a column and stores the result in a given target_column.""" + + target_column: str = "new_column" + + def execute(self): + self.output.df = self.df.copy() + self.output.df[self.target_column] = self.df["old_column"] + 1 + + +class TestTransformation: + """Test that the AddOne transformation adds 1 to the old_column and stores the result in the new_column. + + AddOne is used simply as an example of a transformation for testing purposes. + This same example class is used in the docstring of the AddOne transformation. + """ + + def test_add_one_execute(self): + """When using execute, the input df should be explicitly set in the constructor.""" + # Arrange + add_one = AddOne(df=input_df) + expected_df = pd.DataFrame({"old_column": [0, 1, 2], "new_column": [1, 2, 3]}) + # Act + add_one.execute() + # Assert + pdt.assert_frame_equal(add_one.output.df, expected_df) + + def test_add_one_transform(self): + # Arrange + add_one = AddOne(target_column="foo") + expected_df = pd.DataFrame({"old_column": [0, 1, 2], "foo": [1, 2, 3]}) + # Act + output_df = add_one.transform(input_df) + # Assert + pdt.assert_frame_equal(output_df, expected_df) + + def test_add_one_pipe(self): + """Test that the AddOne transformation can be used as a function in a DataFrame's pipe method.""" + # Arrange + add_one = AddOne(target_column="foo") + add_baz = AddOne(target_column="baz") + expected_df = pd.DataFrame({"old_column": [0, 1, 2], "foo": [1, 2, 3], "bar": [1, 2, 3], "baz": [1, 2, 3]}) + # Act + output_df = ( + input_df + # using the transform method + .pipe(add_one.transform) + # using the transformation as a function + .pipe(AddOne(target_column="bar")) + # or, using a variable that holds the transformation + .pipe(add_baz) + ) + # Assert + pdt.assert_frame_equal(output_df, expected_df)