Skip to content

Commit

Permalink
completed Ray tests
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Sep 23, 2024
1 parent 26978f1 commit 12e5ef0
Show file tree
Hide file tree
Showing 8 changed files with 393 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import pyarrow as pa
from data_processing.test_support.transform import NOOPTransform
from data_processing.test_support.transform import (
AbstractTableTransformTest,
import os
from data_processing.runtime.pure_python import PythonTransformLauncher
from data_processing.test_support.launch.transform_test import (
AbstractTransformLauncherTest,
)
from data_processing.test_support.transform.noop_transform import NOOPPythonTransformConfiguration, sleep_cli_param


table = pa.Table.from_pydict({"name": pa.array(["Tom", "Dick", "Harry"]), "age": pa.array([0, 1, 2])})
expected_table = table # We're a noop after all.
expected_metadata_list = [{"nfiles": 1, "nrows": 3}, {}] # transform() result # flush() result


class TestNOOPTransform(AbstractTableTransformTest):
class TestPythonNOOPTransform(AbstractTransformLauncherTest):
"""
Extends the super-class to define the test data for the tests defined there.
The name of this class MUST begin with the word Test so that pytest recognizes it as a test class.
"""

def get_test_transform_fixtures(self) -> list[tuple]:
fixtures = [
(NOOPTransform({"sleep": 0}), [table], [expected_table], expected_metadata_list),
(NOOPTransform({"sleep": 0}), [table], [expected_table], expected_metadata_list),
]
src_file_dir = os.path.abspath(os.path.dirname(__file__))
fixtures = []
basedir = os.path.abspath(os.path.join(os.path.dirname(__file__),
"../../../../../transforms/universal/noop/python/test-data"))
launcher = PythonTransformLauncher(NOOPPythonTransformConfiguration())
transform_config = {sleep_cli_param: 0}
fixtures.append(
(
launcher,
transform_config,
basedir + "/input",
basedir + "/expected",
[], # optional list of column names to ignore in comparing test-generated with expected.
)
)

return fixtures
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class TestPythonResizeTransform(AbstractTransformLauncherTest):
def get_test_transform_fixtures(self) -> list[tuple]:
# The following based on 3 identical input files of about 39kbytes, and 200 rows
fixtures = []
basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../../transforms/universal/resize/python/test-data"))
basedir = os.path.abspath(os.path.join(os.path.dirname(__file__),
"../../../../../transforms/universal/resize/python/test-data"))
launcher = PythonTransformLauncher(ResizePythonTransformConfiguration())

# Split into 4 or so files
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from data_processing_ray.test_support.transform.noop_transform import NOOPRayTransformConfiguration
from data_processing_ray.test_support.transform.resize_transform import ResizeRayTransformConfiguration
from data_processing_ray.test_support.transform.pipeline_transform import ResizeNOOPRayTransformConfiguration
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from data_processing_ray.runtime.ray import RayTransformLauncher, RayTransformRuntimeConfiguration
from data_processing_ray.transform.ray import RayPipelineTransform
from data_processing.transform import PipelineTransformConfiguration
from data_processing.utils import get_logger
from data_processing_ray.test_support.transform import NOOPRayTransformConfiguration, ResizeRayTransformConfiguration

logger = get_logger(__name__)


class ResizeNOOPRayTransformConfiguration(RayTransformRuntimeConfiguration):
"""
Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher.
NOOP does not use a RayRuntime class so the superclass only needs the base
python-only configuration.
"""

def __init__(self):
"""
Initialization
"""
super().__init__(transform_config=PipelineTransformConfiguration(
config={"transforms": [ResizeRayTransformConfiguration(),
NOOPRayTransformConfiguration()]},
transform_class=RayPipelineTransform))


if __name__ == "__main__":
# launcher = NOOPRayLauncher()
launcher = RayTransformLauncher(ResizeNOOPRayTransformConfiguration())
logger.info("Launching resize/noop transform")
launcher.launch()
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from argparse import ArgumentParser, Namespace
from typing import Any

import pyarrow as pa
from data_processing.transform import AbstractTableTransform, TransformConfiguration
from data_processing.utils import (
LOCAL_TO_DISK,
MB,
CLIArgumentProvider,
UnrecoverableException,
get_logger,
)
from data_processing_ray.runtime.ray import RayTransformLauncher, RayTransformRuntimeConfiguration


logger = get_logger(__name__)

max_rows_per_table_key = "max_rows_per_table"
max_mbytes_per_table_key = "max_mbytes_per_table"
size_type_key = "size_type"
shortname = "resize"
cli_prefix = f"{shortname}_"
max_rows_per_table_cli_param = f"{cli_prefix}{max_rows_per_table_key}"
max_mbytes_per_table_cli_param = f"{cli_prefix}{max_mbytes_per_table_key}"
size_type_cli_param = f"{cli_prefix}{size_type_key}"
size_type_disk = "disk"
size_type_memory = "memory"
size_type_default = size_type_disk


class ResizeTransform(AbstractTableTransform):
"""
Implements splitting large files into smaller ones.
Two flavours of splitting are supported - based on the amount of documents and based on the size
"""

def __init__(self, config: dict[str, Any]):
"""
Initialize based on the dictionary of configuration information.
"""
super().__init__(config)
self.max_rows_per_table = config.get(max_rows_per_table_key, 0)
self.max_bytes_per_table = MB * config.get(max_mbytes_per_table_key, 0)
disk_memory = config.get(size_type_key, size_type_default)
if size_type_default in disk_memory:
self.max_bytes_per_table *= LOCAL_TO_DISK

self.logger.debug(f"max bytes = {self.max_bytes_per_table}")
self.logger.debug(f"max rows = {self.max_rows_per_table}")
self.buffer = None
if self.max_rows_per_table <= 0 and self.max_bytes_per_table <= 0:
raise ValueError("Neither max rows per table nor max table size are defined")
if self.max_rows_per_table > 0 and self.max_bytes_per_table > 0:
raise ValueError("Both max rows per table and max table size are defined. Only one should be present")

def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]:
"""
split larger files into the smaller ones
:param table: table
:param file_name: name of the file
:return: resulting set of tables
"""
self.logger.debug(f"got new table with {table.num_rows} rows")
if self.buffer is not None:
try:
self.logger.debug(
f"concatenating buffer with {self.buffer.num_rows} rows to table with {table.num_rows} rows"
)
# table = pa.concat_tables([self.buffer, table], unicode_promote_options="permissive")
table = pa.concat_tables([self.buffer, table])
self.buffer = None
self.logger.debug(f"concatenated table has {table.num_rows} rows")
except Exception as _: # Can happen if schemas are different
# Raise unrecoverable error to stop the execution
self.logger.warning(f"table in {file_name} can't be merged with the buffer")
self.logger.warning(f"incoming table columns {table.schema.names} ")
self.logger.warning(f"buffer columns {self.buffer.schema.names}")
raise UnrecoverableException()

result = []
start_row = 0
if self.max_rows_per_table > 0:
# split file with max documents
n_rows = table.num_rows
rows_left = n_rows
while start_row < n_rows and rows_left >= self.max_rows_per_table:
length = n_rows - start_row
if length > self.max_rows_per_table:
length = self.max_rows_per_table
a_slice = table.slice(offset=start_row, length=length)
self.logger.debug(f"created table slice with {a_slice.num_rows} rows, starting with row {start_row}")
result.append(a_slice)
start_row = start_row + self.max_rows_per_table
rows_left = rows_left - self.max_rows_per_table
else:
# split based on size
current_size = 0.0
if table.nbytes >= self.max_bytes_per_table:
for n in range(table.num_rows):
current_size += table.slice(offset=n, length=1).nbytes
if current_size >= self.max_bytes_per_table:
self.logger.debug(f"capturing slice, current_size={current_size}")
# Reached the size
a_slice = table.slice(offset=start_row, length=(n - start_row))
result.append(a_slice)
start_row = n
current_size = 0.0
if start_row < table.num_rows:
# buffer remaining chunk for next call
self.logger.debug(f"Buffering table starting at row {start_row}")
self.buffer = table.slice(offset=start_row, length=(table.num_rows - start_row))
self.logger.debug(f"buffered table has {self.buffer.num_rows} rows")
self.logger.debug(f"returning {len(result)} tables")
return result, {}

def flush(self) -> tuple[list[pa.Table], dict[str, Any]]:
result = []
if self.buffer is not None:
self.logger.debug(f"flushing buffered table with {self.buffer.num_rows} rows of size {self.buffer.nbytes}")
result.append(self.buffer)
self.buffer = None
else:
self.logger.debug(f"Empty buffer. nothing to flush.")
return result, {}


class ResizeTransformConfiguration(TransformConfiguration):

"""
Provides support for configuring and using the associated Transform class include
configuration with CLI args and combining of metadata.
"""

def __init__(self):
super().__init__(name=shortname, transform_class=ResizeTransform)

def add_input_params(self, parser: ArgumentParser) -> None:
"""
Add Transform-specific arguments to the given parser.
This will be included in a dictionary used to initialize the resizeTransform.
By convention a common prefix should be used for all transform-specific CLI args
(e.g, noop_, pii_, etc.)
"""
parser.add_argument(
f"--{max_rows_per_table_cli_param}",
type=int,
default=-1,
help="Max number of rows per table",
)
parser.add_argument(
f"--{max_mbytes_per_table_cli_param}",
type=float,
default=-1,
help=f"Max table size (MB). Size is measured according to the --{size_type_cli_param} parameter",
)
parser.add_argument(
f"--{size_type_cli_param}",
type=str,
required=False,
default=size_type_default,
choices=[size_type_disk, size_type_memory],
help=f"Determines how memory is measured when using the --{max_mbytes_per_table_cli_param} option."
"\n'memory' measures the in-process memory footprint and \n'disk' makes an estimate of the resulting parquet file size.",
)

def apply_input_params(self, args: Namespace) -> bool:
"""
Validate and apply the arguments that have been parsed
:param args: user defined arguments.
:return: True, if validate pass or False otherwise
"""
# Capture the args that are specific to this transform
captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False)
self.params = self.params | captured
# dargs = vars(args)
if self.params.get(max_rows_per_table_key) <= 0 and self.params.get(max_mbytes_per_table_key) <= 0:
logger.info("Neither max documents per table nor max table size are defined")
return False
if self.params.get(max_rows_per_table_key) > 0 and self.params.get(max_mbytes_per_table_key) > 0:
logger.info("Both max documents per table and max table size are defined. Only one should be present")
return False
logger.info(f"Split file parameters are : {self.params}")
return True


class ResizeRayTransformConfiguration(RayTransformRuntimeConfiguration):
"""
Implements the RayTransformConfiguration for resize as required by the RayTransformLauncher.
"""

def __init__(self):
"""
Initialization
"""
super().__init__(transform_config=ResizeTransformConfiguration())


if __name__ == "__main__":
# launcher = NOOPRayLauncher()
launcher = RayTransformLauncher(ResizeRayTransformConfiguration())
logger.info("Launching noop transform")
launcher.launch()
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os

from data_processing_ray.test_support.transform import ResizeRayTransformConfiguration
from data_processing_ray.runtime.ray import RayTransformLauncher
from data_processing.test_support.launch.transform_test import (
AbstractTransformLauncherTest,
)


class TestRayResizeTransform(AbstractTransformLauncherTest):
"""
Extends the super-class to define the test data for the tests defined there.
The name of this class MUST begin with the word Test so that pytest recognizes it as a test class.
"""

def get_test_transform_fixtures(self) -> list[tuple]:
# The following based on 3 identical input files of about 39kbytes, and 200 rows
fixtures = []
common_config = {"runtime_num_workers": 1, "run_locally": True}
basedir = os.path.abspath(os.path.join(os.path.dirname(__file__),
"../../../../../../transforms/universal/resize/ray/test-data"))
launcher = RayTransformLauncher(ResizeRayTransformConfiguration())

# Split into 4 or so files
config = {"resize_max_rows_per_table": 125} | common_config
fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-125"))

# Merge into 2 or so files
config = {"resize_max_rows_per_table": 300} | common_config
fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-300"))

# # Merge all into a single table
config = {"resize_max_mbytes_per_table": 1} | common_config
fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-1"))

# # Merge the 1st 2 and some of the 2nd with the 3rd
config = {"resize_max_mbytes_per_table": 0.05} | common_config
fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.05"))

# Split into 4 or so files
config = {"resize_max_mbytes_per_table": 0.02} | common_config
fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.02"))

return fixtures
Loading

0 comments on commit 12e5ef0

Please sign in to comment.