Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Added a Pandas based Transformation and BaseTransformation #141

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions pyproject.toml
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Newly added is ml - I sorted the features afterwards

Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,17 @@ async_http = [
"nest-asyncio>=1.6.0",
]
box = ["boxsdk[jwt]==3.8.1"]
delta = ["delta-spark>=2.2"]
excel = ["openpyxl>=3.0.0"]
ml = ["numpy>=1.21.5", "scikit-learn>=1.1.1", "scipy>=1.9.1", "pandas"]
pandas = ["pandas>=1.3", "setuptools", "numpy<2.0.0", "pandas-stubs"]
pyspark = ["pyspark>=3.2.0", "pyarrow>13"]
pyspark_connect = ["pyspark[connect]>=3.5"]
se = ["spark-expectations>=2.2.1,<2.3.0"]
sftp = ["paramiko>=2.6.0"]
delta = ["delta-spark>=2.2"]
excel = ["openpyxl>=3.0.0"]
tableau = ["tableauhyperapi>=0.0.19484", "tableauserverclient>=0.25"]
snowflake = ["snowflake-connector-python>=3.12.0"]
tableau = ["tableauhyperapi>=0.0.19484", "tableauserverclient>=0.25"]
# development dependencies
dev = ["ruff", "mypy", "pylint", "colorama", "types-PyYAML", "types-requests"]
test = [
"chispa",
Expand Down Expand Up @@ -100,7 +103,6 @@ docs = [
"pymdown-extensions>=10.7.0",
"black",
]
se = ["spark-expectations>=2.2.1,<2.3.0"]

[tool.hatch.metadata]
allow-direct-references = true
Expand Down Expand Up @@ -176,16 +178,17 @@ installer = "uv"
features = [
"async",
"async_http",
"pandas",
"pyspark",
"sftp",
"box",
"delta",
"dev",
"excel",
"ml",
"pandas",
"pyspark",
"se",
"box",
"sftp",
"snowflake",
"tableau",
"dev",
]


Expand Down Expand Up @@ -240,6 +243,7 @@ features = [
"delta",
"dev",
"excel",
"ml",
"pandas",
"pyspark",
"se",
Expand Down Expand Up @@ -415,13 +419,14 @@ features = [
"async",
"async_http",
"box",
"delta",
"excel",
"ml",
"pandas",
"pyspark",
"se",
"sftp",
"snowflake",
"delta",
"excel",
"tableau",
"dev",
"test",
Expand Down
2 changes: 1 addition & 1 deletion src/koheesio/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

LICENSE_INFO = "Licensed as Apache 2.0"
SOURCE = "https://github.com/Nike-Inc/koheesio"
__version__ = "0.9.0"
__version__ = "0.10.0rc0"
__logo__ = (
75,
(
Expand Down
10 changes: 7 additions & 3 deletions src/koheesio/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from __future__ import annotations

from typing import Annotated, Any, Dict, List, Optional, Union
from typing import Annotated, Any, Dict, List, Optional, TypeVar, Union
from abc import ABC
from functools import cached_property
from pathlib import Path
Expand Down Expand Up @@ -407,7 +407,9 @@ def __add__(self, other: Union[Dict, BaseModel]) -> BaseModel:
```python
step_output_1 = StepOutput(foo="bar")
step_output_2 = StepOutput(lorem="ipsum")
(step_output_1 + step_output_2) # step_output_1 will now contain {'foo': 'bar', 'lorem': 'ipsum'}
(
step_output_1 + step_output_2
) # step_output_1 will now contain {'foo': 'bar', 'lorem': 'ipsum'}
```

Parameters
Expand Down Expand Up @@ -531,7 +533,9 @@ def merge(self, other: Union[Dict, BaseModel]) -> BaseModel:
--------
```python
step_output = StepOutput(foo="bar")
step_output.merge({"lorem": "ipsum"}) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'}
step_output.merge(
{"lorem": "ipsum"}
) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'}
```

Parameters
Expand Down
166 changes: 166 additions & 0 deletions src/koheesio/models/dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""
Module for the BaseReader and BaseTransformation classes
"""

from typing import Optional, TypeVar
from abc import ABC, abstractmethod

from koheesio import Step
from koheesio.models import Field

DataFrameType = TypeVar("DataFrameType")
"""Defines a type variable that can be any type of DataFrame"""


class BaseReader(Step, ABC):
"""Base class for all Readers

Concept
-------
A Reader is a Step that reads data from a source based on the input parameters
and stores the result in self.output.df (DataFrame).

When implementing a Reader, the execute() method should be implemented.
The execute() method should read from the source and store the result in self.output.df.

The Reader class implements a standard read() method that calls the execute() method and returns the result. This
method can be used to read data from a Reader without having to call the execute() method directly. Read method
does not need to be implemented in the child class.

The Reader class also implements a shorthand for accessing the output Dataframe through the df-property. If the
output.df is None, .execute() will be run first.
"""

@property
def to_df(self) -> Optional[DataFrameType]:
"""Shorthand for accessing self.output.df
If the output.df is None, .execute() will be run first

aliases:
- toDF, mimics the Delta API
- df
"""
if not self.output.df:
self.execute()
return self.output.df

toDF = to_df
df = to_df

@abstractmethod
def execute(self) -> Step.Output:
"""Execute on a Reader should handle self.output.df (output) as a minimum
Read from whichever source -> store result in self.output.df
"""
pass

def read(self) -> DataFrameType:
"""Read from a Reader without having to call the execute() method directly"""
self.execute()
return self.output.df


class BaseTransformation(Step, ABC):
"""Base class for all Transformations

Concept
-------
A Transformation is a Step that takes a DataFrame as input and returns a DataFrame as output. The DataFrame is
transformed based on the logic implemented in the `execute` method. Any additional parameters that are needed for
the transformation can be passed to the constructor.

When implementing a Transformation, the `execute` method should be implemented. The `execute` method should take the
input DataFrame, transform it, and store the result in `self.output.df`.

The Transformation class implements a standard `transform` method that calls the `execute` method and returns the
result. This method can be used to transform a DataFrame without having to call the `execute` method directly. The
`transform` method does not need to be implemented in the child class.

The Transformation class also implements a shorthand for accessing the output DataFrame through the
`to-df`-property (alias: `toDF`). If the `output.df` is `None`, `.execute()` will be run first.
"""

df: Optional[DataFrameType] = Field(default=None, description="The input DataFrame")

@abstractmethod
def execute(self) -> Step.Output:
"""Execute on a Transformation should handle self.df (input) and set self.output.df (output)

This method should be implemented in the child class. The input DataFrame is available as `self.df` and the
output DataFrame should be stored in `self.output.df`.

For example:
```python
# pyspark example
def execute(self):
self.output.df = self.df.withColumn(
"new_column", f.col("old_column") + 1
)
```

The transform method will call this method and return the output DataFrame.
"""
# self.df # input DataFrame
# self.output.df # output DataFrame
self.output.df = ... # implement the transformation logic
raise NotImplementedError

@property
def to_df(self) -> Optional[DataFrameType]:
"""Shorthand for accessing self.output.df
If the output.df is None, .execute() will be run first
"""
if not self.output.df:
self.execute()
return self.output.df

toDF = to_df
"""Alias for the to_df property - mimics the Delta API"""

def transform(self, df: Optional[DataFrameType] = None) -> DataFrameType:
"""Execute the transformation and return the output DataFrame

Note: when creating a child from this, don't implement this transform method. Instead, implement `execute`.

See Also
--------
`Transformation.execute`

Parameters
----------
df: Optional[DataFrameType]
The DataFrame to apply the transformation to. If not provided, the DataFrame passed to the constructor
will be used.

Returns
-------
DataFrameType
The transformed DataFrame
"""
if df is not None:
self.df = df
if self.df is None:
raise RuntimeError("No valid Dataframe was passed")
self.execute()
return self.output.df

def __call__(self, *args, **kwargs):
"""Allow the class to be called as a function.
This is especially useful when using a Pyspark DataFrame's transform method.

Example
-------
Using pyspark's DataFrame transform method:
```python
input_df = spark.range(3)

output_df = input_df.transform(AddOne(target_column="foo")).transform(
AddOne(target_column="bar")
)
```

In the above example, the `AddOne` transformation is applied to the `input_df` DataFrame using the `transform`
method. The `output_df` will now contain the original DataFrame with an additional columns called `foo` and
`bar', each with the values of `id` + 1.
"""
return self.transform(*args, **kwargs)
49 changes: 4 additions & 45 deletions src/koheesio/models/reader.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,9 @@
"""
Module for the BaseReader class
"""

from typing import Optional, TypeVar
from abc import ABC, abstractmethod

from koheesio import Step

# Define a type variable that can be any type of DataFrame
DataFrameType = TypeVar("DataFrameType")


class BaseReader(Step, ABC):
"""Base class for all Readers

A Reader is a Step that reads data from a source based on the input parameters
and stores the result in self.output.df (DataFrame).

When implementing a Reader, the execute() method should be implemented.
The execute() method should read from the source and store the result in self.output.df.

The Reader class implements a standard read() method that calls the execute() method and returns the result. This
method can be used to read data from a Reader without having to call the execute() method directly. Read method
does not need to be implemented in the child class.

The Reader class also implements a shorthand for accessing the output Dataframe through the df-property. If the
output.df is None, .execute() will be run first.
"""

@property
def df(self) -> Optional[DataFrameType]:
"""Shorthand for accessing self.output.df
If the output.df is None, .execute() will be run first
"""
if not self.output.df:
self.execute()
return self.output.df
This acts as a pass-through for the BaseReader class in the models.dataframe module.
"""

@abstractmethod
def execute(self) -> Step.Output:
"""Execute on a Reader should handle self.output.df (output) as a minimum
Read from whichever source -> store result in self.output.df
"""
pass
from koheesio.models.dataframe import BaseReader

def read(self) -> DataFrameType:
"""Read from a Reader without having to call the execute() method directly"""
self.execute()
return self.output.df
__all__ = ["BaseReader"]
9 changes: 9 additions & 0 deletions src/koheesio/models/transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""
Module for the BaseTransformation class

This acts as a pass-through for the BaseTransformation class in the models.dataframe module.
"""

from koheesio.models.dataframe import BaseTransformation

__all__ = ["BaseTransformation"]
8 changes: 7 additions & 1 deletion src/koheesio/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,22 @@
from koheesio.spark.utils import import_pandas_based_on_pyspark_version

pandas: ModuleType = import_pandas_based_on_pyspark_version()
"""pandas module"""
pd = pandas


class PandasStep(Step, ABC):
"""Base class for a Pandas step

Extends the Step class with Pandas DataFrame support. The following:
- Pandas steps are expected to return a Pandas DataFrame as output.
- Pandas steps have a pd attribute that is set to the Pandas module. This is to ensure that the applicable pandas
module is used when running the step.
"""

pd: ModuleType = Field(pandas, description="Pandas module", alias="pandas")

class Output(StepOutput):
"""Output class for PandasStep"""

df: Optional[pandas.DataFrame] = Field(default=None, description="The Pandas DataFrame") # type: ignore
df: Optional[pd.DataFrame] = Field(default=None, description="The Pandas DataFrame") # type: ignore
Loading