Skip to content
This repository has been archived by the owner on Feb 22, 2023. It is now read-only.

Commit

Permalink
Merge pull request #143 from xhochy/__or__
Browse files Browse the repository at this point in the history
Implement __or__ on FletcherBaseArray
  • Loading branch information
xhochy authored Jun 23, 2020
2 parents 2a93fca + bafa443 commit 9ea2c35
Show file tree
Hide file tree
Showing 12 changed files with 886 additions and 425 deletions.
372 changes: 54 additions & 318 deletions fletcher/_algorithms.py

Large diffs are not rendered by default.

415 changes: 415 additions & 0 deletions fletcher/algorithms/bool.py

Large diffs are not rendered by default.

45 changes: 42 additions & 3 deletions fletcher/algorithms/string.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
import pyarrow as pa
from numba import njit

from fletcher._algorithms import (
_buffer_to_view,
from fletcher._algorithms import _buffer_to_view, _merge_valid_bitmaps
from fletcher.algorithms.utils.chunking import (
_calculate_chunk_offsets,
_combined_in_chunk_offsets,
_merge_valid_bitmaps,
apply_per_chunk,
)

Expand Down Expand Up @@ -266,3 +265,43 @@ def _text_contains_case_sensitive(data: pa.Array, pat: str) -> pa.Array:
return pa.Array.from_buffers(
pa.bool_(), len(data), [valid_buffer, pa.py_buffer(output)], data.null_count
)


@njit
def _startswith(sa, needle, na, offset, out):
for i in range(sa.size):
if sa.isnull(i):
out[offset + i] = na
continue

if sa.byte_length(i) < needle.length:
out[offset + i] = 0
continue

for j in range(needle.length):
if sa.get_byte(i, j) != needle.get_byte(j):
out[offset + i] = 0
break

else:
out[offset + i] = 1


@njit
def _endswith(sa, needle, na, offset, out):
for i in range(sa.size):
if sa.isnull(i):
out[offset + i] = na
continue

string_length = sa.byte_length(i)
needle_length = needle.length
if string_length < needle.length:
out[offset + i] = 0
continue

out[offset + i] = 1
for j in range(needle_length):
if sa.get_byte(i, string_length - needle_length + j) != needle.get_byte(j):
out[offset + i] = 0
break
Empty file.
183 changes: 183 additions & 0 deletions fletcher/algorithms/utils/chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
"""Utility functions to deal with chunked arrays."""

from functools import singledispatch, wraps
from typing import Any, Callable, Dict, List, Tuple, Union

import numpy as np
import pyarrow as pa


def _calculate_chunk_offsets(chunked_array: pa.ChunkedArray) -> np.ndarray:
"""Return an array holding the indices pointing to the first element of each chunk."""
offset = 0
offsets = []
for chunk in chunked_array.iterchunks():
offsets.append(offset)
offset += len(chunk)
return np.array(offsets)


def _in_chunk_offsets(
arr: pa.ChunkedArray, offsets: List[int]
) -> List[Tuple[int, int, int]]:
"""Calculate the access ranges for a given list of offsets.
All chunk start indices must be included as offsets and the offsets must be
unique.
Returns a list of tuples that contain:
* The index of the given chunk
* The position inside the chunk
* The length of the current range
"""
new_offsets = []
pos = 0
chunk = 0
chunk_pos = 0
for offset, offset_next in zip(offsets, offsets[1:] + [len(arr)]):
diff = offset - pos
chunk_remains = len(arr.chunk(chunk)) - chunk_pos
step = offset_next - offset
if diff == 0: # The first offset
new_offsets.append((chunk, chunk_pos, step))
elif diff == chunk_remains:
chunk += 1
chunk_pos = 0
pos += chunk_remains
new_offsets.append((chunk, chunk_pos, step))
else: # diff < chunk_remains
chunk_pos += diff
pos += diff
new_offsets.append((chunk, chunk_pos, step))
return new_offsets


def _combined_in_chunk_offsets(
a: pa.ChunkedArray, b: pa.ChunkedArray
) -> Tuple[List[Tuple[int, int, int]], List[Tuple[int, int, int]]]:
offsets_a = _calculate_chunk_offsets(a)
offsets_b = _calculate_chunk_offsets(b)
offsets = sorted(set(list(offsets_a) + list(offsets_b)))
in_a_offsets = _in_chunk_offsets(a, offsets)
in_b_offsets = _in_chunk_offsets(b, offsets)
return in_a_offsets, in_b_offsets


def apply_per_chunk(func):
"""Apply a function to each chunk if the input is chunked."""

@wraps(func)
def wrapper(arr: Union[pa.Array, pa.ChunkedArray], *args, **kwargs):
if isinstance(arr, pa.ChunkedArray):
return pa.chunked_array(
[func(chunk, *args, **kwargs) for chunk in arr.chunks]
)
else:
return func(arr, *args, **kwargs)

return wrapper


def _not_implemented_path(*args, **kwargs):
raise NotImplementedError("Dispatching path not implemented")


@singledispatch
def dispatch_chunked_binary_map(a: Any, b: Any, ops: Dict[str, Callable]):
"""
Apply a map-like binary function where at least one of the arguments is an Arrow structure.
This will yield a pyarrow.Arrow or pyarrow.ChunkedArray as an output.
Parameters
----------
a: scalar or np.ndarray or pa.Array or pa.ChunkedArray
b: scalar or np.ndarray or pa.Array or pa.ChunkedArray
op: dict
Dictionary with the keys ('array_array', 'array_nparray', 'nparray_array',
'array_scalar', 'scalar_array')
"""
# a is neither a pa.Array nor a pa.ChunkedArray, we expect only numpy.ndarray or scalars.
if isinstance(b, pa.ChunkedArray):
if np.isscalar(a):
new_chunks = []
for chunk in b.iterchunks():
new_chunks.append(dispatch_chunked_binary_map(a, chunk, ops))
return pa.chunked_array(new_chunks)
else:
if len(a) != len(b):
raise ValueError("Inputs don't have the same length.")
new_chunks = []
offsets = _calculate_chunk_offsets(b)
for chunk, offset in zip(b.iterchunks(), offsets):
new_chunks.append(
dispatch_chunked_binary_map(
a[offset : offset + len(chunk)], chunk, ops
)
)
return pa.chunked_array(new_chunks)
elif isinstance(b, pa.Array):
if np.isscalar(a):
return ops.get("scalar_array", _not_implemented_path)(a, b)
else:
return ops.get("nparray_array", _not_implemented_path)(a, b)
else:
# Should never be reached, add a safe-guard
raise NotImplementedError(f"Cannot apply ufunc on {type(a)} and {type(b)}")


@dispatch_chunked_binary_map.register(pa.ChunkedArray)
def _1(a: pa.ChunkedArray, b: Any, ops: Dict[str, Callable]):
"""Apply a NumPy ufunc where at least one of the arguments is an Arrow structure."""
if isinstance(b, pa.ChunkedArray):
if len(a) != len(b):
raise ValueError("Inputs don't have the same length.")
in_a_offsets, in_b_offsets = _combined_in_chunk_offsets(a, b)

new_chunks: List[pa.Array] = []
for a_offset, b_offset in zip(in_a_offsets, in_b_offsets):
a_slice = a.chunk(a_offset[0])[a_offset[1] : a_offset[1] + a_offset[2]]
b_slice = b.chunk(b_offset[0])[b_offset[1] : b_offset[1] + b_offset[2]]
new_chunks.append(dispatch_chunked_binary_map(a_slice, b_slice, ops))
return pa.chunked_array(new_chunks)
elif np.isscalar(b):
new_chunks = []
for chunk in a.iterchunks():
new_chunks.append(dispatch_chunked_binary_map(chunk, b, ops))
return pa.chunked_array(new_chunks)
else:
if len(a) != len(b):
raise ValueError("Inputs don't have the same length.")
new_chunks = []
offsets = _calculate_chunk_offsets(a)
for chunk, offset in zip(a.iterchunks(), offsets):
new_chunks.append(
dispatch_chunked_binary_map(chunk, b[offset : offset + len(chunk)], ops)
)
return pa.chunked_array(new_chunks)


@dispatch_chunked_binary_map.register(pa.Array)
def _2(a: pa.Array, b: Any, ops: Dict[str, Callable]):
"""Apply a NumPy ufunc where at least one of the arguments is an Arrow structure."""
if isinstance(b, pa.ChunkedArray):
if len(a) != len(b):
raise ValueError("Inputs don't have the same length.")
new_chunks = []
offsets = _calculate_chunk_offsets(b)
for chunk, offset in zip(b.iterchunks(), offsets):
new_chunks.append(
dispatch_chunked_binary_map(a[offset : offset + len(chunk)], chunk, ops)
)
return pa.chunked_array(new_chunks)
elif isinstance(b, pa.Array):
if len(a) != len(b):
raise ValueError("Inputs don't have the same length.")
return ops.get("array_array", _not_implemented_path)(a, b)
else:
if np.isscalar(b):
return ops.get("array_scalar", _not_implemented_path)(a, b)
else:
if len(a) != len(b):
raise ValueError("Inputs don't have the same length.")
return ops.get("array_nparray", _not_implemented_path)(a, b)
26 changes: 22 additions & 4 deletions fletcher/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
from pandas.core.arrays import ExtensionArray
from pandas.core.dtypes.dtypes import ExtensionDtype, register_extension_dtype

from ._algorithms import (
_calculate_chunk_offsets,
all_op,
any_op,
from fletcher._algorithms import (
extract_isnull_bytemap,
kurt_op,
max_op,
Expand All @@ -35,6 +32,8 @@
sum_op,
var_op,
)
from fletcher.algorithms.bool import all_op, all_true, any_op, or_na, or_vectorised
from fletcher.algorithms.utils.chunking import _calculate_chunk_offsets

PANDAS_GE_0_26_0 = LooseVersion(pd.__version__) >= "0.26.0"
if PANDAS_GE_0_26_0:
Expand Down Expand Up @@ -514,6 +513,25 @@ def _np_compare_op(self, op: Callable, np_op: Callable, other):
__mod__ = partialmethod(_np_ufunc_op, np.ndarray.__mod__)
__rmod__ = partialmethod(_np_ufunc_op, np.ndarray.__rmod__)

def __or__(self, other):
"""Compute vectorised or."""
if not pa.types.is_boolean(self.dtype.arrow_dtype):
raise NotImplementedError("__or__ is only supported for boolean arrays yet")

if other is pd.NA or (pd.api.types.is_scalar(other) and pd.isna(other)):
# All fields that are True stay True, all others get set to NA
return type(self)(or_na(self.data))
elif isinstance(other, bool):
if other:
# or with True yields all-True
return type(self)(all_true(self.data))
else:
return self
else:
if isinstance(other, FletcherBaseArray):
other = other.data
return type(self)(or_vectorised(self.data, other))

def __divmod__(self, other):
"""Compute divmod via floordiv and mod."""
return (self.__floordiv__(other), self.__mod__(other))
Expand Down
14 changes: 10 additions & 4 deletions fletcher/string_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@
import pandas as pd
import pyarrow as pa

from ._algorithms import _endswith, _startswith, all_true_like
from ._numba_compat import NumbaString, NumbaStringArray
from .algorithms.string import (
from fletcher._numba_compat import NumbaString, NumbaStringArray
from fletcher.algorithms.bool import all_true_like
from fletcher.algorithms.string import (
_endswith,
_startswith,
_text_cat,
_text_cat_chunked,
_text_cat_chunked_mixed,
_text_contains_case_sensitive,
)
from .base import FletcherBaseArray, FletcherChunkedArray, FletcherContinuousArray
from fletcher.base import (
FletcherBaseArray,
FletcherChunkedArray,
FletcherContinuousArray,
)


@pd.api.extensions.register_series_accessor("fr_text")
Expand Down
23 changes: 23 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

import pytest

from fletcher import (
FletcherChunkedArray,
FletcherChunkedDtype,
FletcherContinuousArray,
FletcherContinuousDtype,
)

# More information about the pandas extension interface tests can be found here
# https://github.com/pandas-dev/pandas/blob/master/pandas/tests/extension/base/__init__.py

Expand Down Expand Up @@ -130,6 +137,22 @@ def fletcher_variant(request):
return request.param


@pytest.fixture
def fletcher_dtype(fletcher_variant):
if fletcher_variant == "chunked":
return FletcherChunkedDtype
else:
return FletcherContinuousDtype


@pytest.fixture
def fletcher_array(fletcher_variant):
if fletcher_variant == "chunked":
return FletcherChunkedArray
else:
return FletcherContinuousArray


@pytest.fixture(params=["chunked", "continuous"], scope="session")
def fletcher_variant_2(request):
"""Whether to test the chunked or continuous implementation.
Expand Down
Loading

0 comments on commit 9ea2c35

Please sign in to comment.