From 40599d25c79b85d0c73ea9e4a50ae353bb0480a4 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 27 Nov 2024 22:55:36 +0000 Subject: [PATCH 01/10] refactor: Define RowJoinNode and defer rewrite --- bigframes/core/blocks.py | 8 +- bigframes/core/compile/compiler.py | 4 +- bigframes/core/compile/polars/compiler.py | 4 +- bigframes/core/nodes.py | 110 ++++++++++++++++++++++ bigframes/core/rewrite/__init__.py | 12 ++- bigframes/core/rewrite/implicit_align.py | 20 +++- bigframes/core/rewrite/slices.py | 5 +- bigframes/core/tree_properties.py | 3 +- bigframes/session/executor.py | 32 +++---- 9 files changed, 166 insertions(+), 32 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 574bed00eb..55aee3b45f 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2341,8 +2341,8 @@ def join( # Handle null index, which only supports row join # This is the canonical way of aligning on null index, so always allow (ignore block_identity_join) if self.index.nlevels == other.index.nlevels == 0: - result = try_legacy_row_join(self, other, how=how) or try_new_row_join( - self, other + result = try_new_row_join(self, other) or try_legacy_row_join( + self, other, how=how ) if result is not None: return result @@ -2356,8 +2356,8 @@ def join( and (self.index.nlevels == other.index.nlevels) and (self.index.dtypes == other.index.dtypes) ): - result = try_legacy_row_join(self, other, how=how) or try_new_row_join( - self, other + result = try_new_row_join(self, other) or try_legacy_row_join( + self, other, how=how ) if result is not None: return result diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 66fde9b874..b6cccda9e8 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -83,7 +83,9 @@ def _preprocess(self, node: nodes.BigFrameNode): if self.enable_pruning: used_fields = frozenset(field.id for field in node.fields) node = node.prune(used_fields) - node = functools.cache(rewrites.replace_slice_ops)(node) + node = bigframes.core.nodes.bottom_up( + node, rewrites.replace_slice_op, memoize=True + ) if self.enable_densify_ids: original_names = [id.name for id in node.ids] node, _ = rewrites.remap_variables( diff --git a/bigframes/core/compile/polars/compiler.py b/bigframes/core/compile/polars/compiler.py index d1ae063b59..75e1099647 100644 --- a/bigframes/core/compile/polars/compiler.py +++ b/bigframes/core/compile/polars/compiler.py @@ -173,7 +173,9 @@ def compile(self, array_value: bigframes.core.ArrayValue) -> pl.LazyFrame: # TODO: Create standard way to configure BFET -> BFET rewrites # Polars has incomplete slice support in lazy mode - node = bigframes.core.rewrite.replace_slice_ops(array_value.node) + node = bigframes.core.nodes.bottom_up( + array_value.node, bigframes.core.rewrite.replace_slice_op, memoize=True + ) return self.compile_node(node) @functools.singledispatchmethod diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 420348cca9..336af614af 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -481,6 +481,77 @@ def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): return dataclasses.replace(self, conditions=new_conds) # type: ignore +@dataclasses.dataclass(frozen=True, eq=False) +class RowJoinNode(BigFrameNode): + left_child: BigFrameNode + right_child: BigFrameNode + + def _validate(self): + assert not ( + set(self.left_child.ids) & set(self.right_child.ids) + ), "Join ids collide" + + @property + def row_preserving(self) -> bool: + return True + + @property + def child_nodes(self) -> typing.Sequence[BigFrameNode]: + return (self.left_child, self.right_child) + + @property + def order_ambiguous(self) -> bool: + return False + + @property + def explicitly_ordered(self) -> bool: + # Do not consider user pre-join ordering intent - they need to re-order post-join in unordered mode. + return True + + @property + def fields(self) -> Iterable[Field]: + return itertools.chain(self.left_child.fields, self.right_child.fields) + + @functools.cached_property + def variables_introduced(self) -> int: + return 0 + + @property + def row_count(self) -> Optional[int]: + return self.left_child.row_count + + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return () + + @property + def projection_base(self) -> BigFrameNode: + assert self.left_child.projection_base == self.right_child.projection_base + return self.left_child.projection_base + + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + transformed = dataclasses.replace( + self, left_child=t(self.left_child), right_child=t(self.right_child) + ) + if self == transformed: + # reusing existing object speeds up eq, and saves a small amount of memory + return self + return transformed + + def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: + return self.transform_children(lambda x: x.prune(used_cols)) + + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + return self + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + return self + + @dataclasses.dataclass(frozen=True, eq=False) class ConcatNode(BigFrameNode): # TODO: Explcitly map column ids from each child @@ -1506,3 +1577,42 @@ def remap_refs( ) -> BigFrameNode: new_ids = tuple(id.remap_column_refs(mappings) for id in self.column_ids) return dataclasses.replace(self, column_ids=new_ids) # type: ignore + + +def top_down( + root: BigFrameNode, + transform: Callable[[BigFrameNode], BigFrameNode], + *, + memoize=False, + validate=False, +): + def top_down_internal(root: BigFrameNode) -> BigFrameNode: + return transform(root).transform_children(transform) + + if memoize: + # MUST reassign to the same name or caching won't work recursively + top_down_internal = functools.cache(top_down_internal) + result = top_down_internal(root) + if validate: + result.validate_tree() + return result + + +def bottom_up( + root: BigFrameNode, + transform: Callable[[BigFrameNode], BigFrameNode], + *, + memoize=False, + validate=False, +): + def bottom_up_internal(root: BigFrameNode) -> BigFrameNode: + return transform(root.transform_children(bottom_up_internal)) + + if memoize: + # MUST reassign to the same name or caching won't work recursively + bottom_up_internal = functools.cache(bottom_up_internal) + + result = bottom_up_internal(root) + if validate: + result.validate_tree() + return result diff --git a/bigframes/core/rewrite/__init__.py b/bigframes/core/rewrite/__init__.py index f7ee3c87c2..e62b676335 100644 --- a/bigframes/core/rewrite/__init__.py +++ b/bigframes/core/rewrite/__init__.py @@ -13,14 +13,20 @@ # limitations under the License. from bigframes.core.rewrite.identifiers import remap_variables -from bigframes.core.rewrite.implicit_align import try_join_as_projection +from bigframes.core.rewrite.implicit_align import ( + combine_nodes, + rewrite_row_join, + try_join_as_projection, +) from bigframes.core.rewrite.legacy_align import legacy_join_as_projection -from bigframes.core.rewrite.slices import pullup_limit_from_slice, replace_slice_ops +from bigframes.core.rewrite.slices import pullup_limit_from_slice, replace_slice_op __all__ = [ "legacy_join_as_projection", "try_join_as_projection", - "replace_slice_ops", + "replace_slice_op", "pullup_limit_from_slice", "remap_variables", + "combine_nodes", + "rewrite_row_join", ] diff --git a/bigframes/core/rewrite/implicit_align.py b/bigframes/core/rewrite/implicit_align.py index 1d7fed09d2..5d89e8325a 100644 --- a/bigframes/core/rewrite/implicit_align.py +++ b/bigframes/core/rewrite/implicit_align.py @@ -51,7 +51,14 @@ def get_expression_spec( ) curr_node = node while True: - if isinstance(curr_node, bigframes.core.nodes.SelectionNode): + if isinstance(curr_node, bigframes.core.nodes.RowJoinNode): + if id in curr_node.left_child.ids: + curr_node = curr_node.left_child + continue + else: + curr_node = curr_node.right_child + continue + elif isinstance(curr_node, bigframes.core.nodes.SelectionNode): select_mappings = { col_id: ref for ref, col_id in curr_node.input_output_pairs } @@ -63,6 +70,7 @@ def get_expression_spec( expression = expression.bind_refs( proj_mappings, allow_partial_bindings=True ) + elif isinstance( curr_node, ( @@ -92,6 +100,14 @@ def _linearize_trees( return append_tree.replace_child(_linearize_trees(base_tree, append_tree.child)) +def rewrite_row_join( + node: bigframes.core.nodes.BigFrameNode, +) -> bigframes.core.nodes.BigFrameNode: + if isinstance(node, bigframes.core.nodes.RowJoinNode): + return combine_nodes(node.left_child, node.right_child) + return node + + def combine_nodes( l_node: bigframes.core.nodes.BigFrameNode, r_node: bigframes.core.nodes.BigFrameNode, @@ -124,7 +140,7 @@ def try_join_as_projection( r_node, right_id ): return None - return combine_nodes(l_node, r_node) + return bigframes.core.nodes.RowJoinNode(l_node, r_node) def pull_up_selection( diff --git a/bigframes/core/rewrite/slices.py b/bigframes/core/rewrite/slices.py index 906d635e93..28f8d24bdd 100644 --- a/bigframes/core/rewrite/slices.py +++ b/bigframes/core/rewrite/slices.py @@ -55,13 +55,12 @@ def pullup_limit_from_slice( return root, None -def replace_slice_ops(root: nodes.BigFrameNode) -> nodes.BigFrameNode: +def replace_slice_op(root: nodes.BigFrameNode) -> nodes.BigFrameNode: # TODO: we want to pull up some slices into limit op if near root. if isinstance(root, nodes.SliceNode): - root = root.transform_children(replace_slice_ops) return rewrite_slice(cast(nodes.SliceNode, root)) else: - return root.transform_children(replace_slice_ops) + return root def rewrite_slice(node: nodes.SliceNode): diff --git a/bigframes/core/tree_properties.py b/bigframes/core/tree_properties.py index 0a4339ee06..6b04f933a5 100644 --- a/bigframes/core/tree_properties.py +++ b/bigframes/core/tree_properties.py @@ -168,11 +168,10 @@ def replace_nodes( root: nodes.BigFrameNode, replacements: dict[nodes.BigFrameNode, nodes.BigFrameNode], ): - @functools.cache def apply_substition(node: nodes.BigFrameNode) -> nodes.BigFrameNode: if node in replacements.keys(): return replacements[node] else: return node.transform_children(apply_substition) - return apply_substition(root) + return nodes.top_down(root, apply_substition, memoize=True) diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index d19ec23501..c2abc7aac2 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -45,6 +45,7 @@ import bigframes.core.identifiers import bigframes.core.nodes as nodes import bigframes.core.ordering as order +import bigframes.core.rewrite import bigframes.core.schema import bigframes.core.tree_properties as tree_properties import bigframes.features @@ -228,9 +229,7 @@ def to_sql( col_id_overrides = dict(col_id_overrides) col_id_overrides[internal_offset_col] = offset_column node = ( - self.replace_cached_subtrees(array_value.node) - if enable_cache - else array_value.node + self.preprocess_tree(array_value.node) if enable_cache else array_value.node ) if ordered: return self.compiler.compile_ordered( @@ -371,7 +370,7 @@ def peek( """ A 'peek' efficiently accesses a small number of rows in the dataframe. """ - plan = self.replace_cached_subtrees(array_value.node) + plan = self.preprocess_tree(array_value.node) if not tree_properties.can_fast_peek(plan): warnings.warn("Peeking this value cannot be done efficiently.") @@ -401,7 +400,7 @@ def head( # No user-provided ordering, so just get any N rows, its faster! return self.peek(array_value, n_rows) - plan = self.replace_cached_subtrees(array_value.node) + plan = self.preprocess_tree(array_value.node) if not tree_properties.can_fast_head(plan): # If can't get head fast, we are going to need to execute the whole query # Will want to do this in a way such that the result is reusable, but the first @@ -409,7 +408,7 @@ def head( # This currently requires clustering on offsets. self._cache_with_offsets(array_value) # Get a new optimized plan after caching - plan = self.replace_cached_subtrees(array_value.node) + plan = self.preprocess_tree(array_value.node) assert tree_properties.can_fast_head(plan) head_plan = generate_head_plan(plan, n_rows) @@ -433,7 +432,7 @@ def get_row_count(self, array_value: bigframes.core.ArrayValue) -> int: if count is not None: return count else: - row_count_plan = self.replace_cached_subtrees( + row_count_plan = self.preprocess_tree( generate_row_count_plan(array_value.node) ) sql = self.compiler.compile_unordered(row_count_plan) @@ -462,7 +461,7 @@ def _local_get_row_count( ) -> Optional[int]: # optimized plan has cache materializations which will have row count metadata # that is more likely to be usable than original leaf nodes. - plan = self.replace_cached_subtrees(array_value.node) + plan = self.preprocess_tree(array_value.node) return tree_properties.row_count(plan) # Helpers @@ -528,7 +527,10 @@ def _wait_on_job( self.metrics.count_job_stats(query_job) return results_iterator - def replace_cached_subtrees(self, node: nodes.BigFrameNode) -> nodes.BigFrameNode: + def preprocess_tree(self, node: nodes.BigFrameNode) -> nodes.BigFrameNode: + node = bigframes.core.nodes.bottom_up( + node, bigframes.core.rewrite.rewrite_row_join, memoize=True + ) return tree_properties.replace_nodes(node, (dict(self._cached_executions))) def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): @@ -539,7 +541,7 @@ def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): # Once rewriting is available, will want to rewrite before # evaluating execution cost. return tree_properties.is_trivially_executable( - self.replace_cached_subtrees(array_value.node) + self.preprocess_tree(array_value.node) ) def _cache_with_cluster_cols( @@ -548,7 +550,7 @@ def _cache_with_cluster_cols( """Executes the query and uses the resulting table to rewrite future executions.""" sql, schema, ordering_info = self.compiler.compile_raw( - self.replace_cached_subtrees(array_value.node) + self.preprocess_tree(array_value.node) ) tmp_table = self._sql_as_cached_temp_table( sql, @@ -565,9 +567,7 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): """Executes the query and uses the resulting table to rewrite future executions.""" offset_column = bigframes.core.guid.generate_guid("bigframes_offsets") w_offsets, offset_column = array_value.promote_offsets() - sql = self.compiler.compile_unordered( - self.replace_cached_subtrees(w_offsets.node) - ) + sql = self.compiler.compile_unordered(self.preprocess_tree(w_offsets.node)) tmp_table = self._sql_as_cached_temp_table( sql, @@ -603,7 +603,7 @@ def _simplify_with_caching(self, array_value: bigframes.core.ArrayValue): """Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces.""" # Apply existing caching first for _ in range(MAX_SUBTREE_FACTORINGS): - node_with_cache = self.replace_cached_subtrees(array_value.node) + node_with_cache = self.preprocess_tree(array_value.node) if node_with_cache.planning_complexity < QUERY_COMPLEXITY_LIMIT: return @@ -660,7 +660,7 @@ def _validate_result_schema( ): actual_schema = tuple(bq_schema) ibis_schema = bigframes.core.compile.test_only_ibis_inferred_schema( - self.replace_cached_subtrees(array_value.node) + self.preprocess_tree(array_value.node) ) internal_schema = array_value.schema if not bigframes.features.PANDAS_VERSIONS.is_arrow_list_dtype_usable: From 3d96d1057545b03d2e93342b46241a1aea83a6f1 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 27 Nov 2024 23:02:54 +0000 Subject: [PATCH 02/10] implement op in local engine --- bigframes/core/compile/polars/compiler.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/bigframes/core/compile/polars/compiler.py b/bigframes/core/compile/polars/compiler.py index 75e1099647..364e038804 100644 --- a/bigframes/core/compile/polars/compiler.py +++ b/bigframes/core/compile/polars/compiler.py @@ -276,6 +276,12 @@ def compile_join(self, node: nodes.JoinNode): def compile_concat(self, node: nodes.ConcatNode): return pl.concat(self.compile_node(child) for child in node.child_nodes) + @compile_node.register + def compile_row_join(self, node: nodes.RowJoinNode): + return pl.concat( + (self.compile_node(child) for child in node.child_nodes), how="horizontal" + ) + @compile_node.register def compile_agg(self, node: nodes.AggregateNode): df = self.compile_node(node.child) From 151e4787a62126755b0bdb54c831aaaafc116be8 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 16 Dec 2024 22:52:06 +0000 Subject: [PATCH 03/10] fix preprocess helpers --- bigframes/core/__init__.py | 4 +- bigframes/core/nodes.py | 11 ++- bigframes/core/rewrite/__init__.py | 3 +- bigframes/core/rewrite/implicit_align.py | 98 +++---------------- bigframes/core/rewrite/legacy_align.py | 20 ++++ bigframes/core/row_join.py | 115 +++++++++++++++++++++++ bigframes/session/executor.py | 5 + tests/system/small/test_series.py | 19 ++++ 8 files changed, 185 insertions(+), 90 deletions(-) create mode 100644 bigframes/core/row_join.py diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 5e3f6df355..6ad3983fcc 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -444,9 +444,9 @@ def try_row_join( lcol.name: lcol.name for lcol in self.node.ids } other_node, r_mapping = self.prepare_join_names(other) - import bigframes.core.rewrite + import bigframes.core.row_join - result_node = bigframes.core.rewrite.try_row_join( + result_node = bigframes.core.row_join.try_row_join( self.node, other_node, conditions ) if result_node is None: diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 183a742f71..a41d32601a 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -520,6 +520,10 @@ def row_count(self) -> Optional[int]: def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: return () + @property + def added_fields(self) -> Tuple[Field, ...]: + return tuple(self.right_child.fields) + def transform_children( self, t: Callable[[BigFrameNode], BigFrameNode] ) -> BigFrameNode: @@ -1585,8 +1589,9 @@ def bottom_up( root: BigFrameNode, transform: Callable[[BigFrameNode], BigFrameNode], *, - memoize=False, - validate=False, + stop: Optional[Callable[[BigFrameNode], bool]] = None, + memoize: bool = False, + validate: bool = False, ) -> BigFrameNode: """ Perform a bottom-up transformation of the BigFrameNode tree. @@ -1595,6 +1600,8 @@ def bottom_up( """ def bottom_up_internal(root: BigFrameNode) -> BigFrameNode: + if (stop is not None) and (stop(root)): + return root return transform(root.transform_children(bottom_up_internal)) if memoize: diff --git a/bigframes/core/rewrite/__init__.py b/bigframes/core/rewrite/__init__.py index cb62ce0859..c8e91bb1c7 100644 --- a/bigframes/core/rewrite/__init__.py +++ b/bigframes/core/rewrite/__init__.py @@ -13,13 +13,12 @@ # limitations under the License. from bigframes.core.rewrite.identifiers import remap_variables -from bigframes.core.rewrite.implicit_align import try_row_join +from bigframes.core.rewrite.implicit_align import rewrite_row_join from bigframes.core.rewrite.legacy_align import legacy_join_as_projection from bigframes.core.rewrite.slices import pullup_limit_from_slice, rewrite_slice __all__ = [ "legacy_join_as_projection", - "try_row_join", "rewrite_slice", "pullup_limit_from_slice", "remap_variables", diff --git a/bigframes/core/rewrite/implicit_align.py b/bigframes/core/rewrite/implicit_align.py index 3a034592c8..d4002bab0b 100644 --- a/bigframes/core/rewrite/implicit_align.py +++ b/bigframes/core/rewrite/implicit_align.py @@ -13,7 +13,6 @@ # limitations under the License. from __future__ import annotations -import dataclasses from typing import Iterable, Optional, Tuple import bigframes.core.expression @@ -37,87 +36,18 @@ ) -@dataclasses.dataclass(frozen=True) -class ExpressionSpec: - expression: bigframes.core.expression.Expression - node: bigframes.core.nodes.BigFrameNode - - -def get_expression_spec( - node: bigframes.core.nodes.BigFrameNode, id: bigframes.core.identifiers.ColumnId -) -> ExpressionSpec: - """Normalizes column value by chaining expressions across multiple selection and projection nodes if possible. - This normalization helps identify whether columns are equivalent. - """ - # TODO: While we chain expression fragments from different nodes - # we could further normalize with constant folding and other scalar expression rewrites - expression: bigframes.core.expression.Expression = ( - bigframes.core.expression.DerefOp(id) - ) - curr_node = node - while True: - if isinstance(curr_node, bigframes.core.nodes.RowJoinNode): - if id in curr_node.left_child.ids: - curr_node = curr_node.left_child - continue - else: - curr_node = curr_node.right_child - continue - elif isinstance(curr_node, bigframes.core.nodes.SelectionNode): - select_mappings = { - col_id: ref for ref, col_id in curr_node.input_output_pairs - } - expression = expression.bind_refs( - select_mappings, allow_partial_bindings=True - ) - elif isinstance(curr_node, bigframes.core.nodes.ProjectionNode): - proj_mappings = {col_id: expr for expr, col_id in curr_node.assignments} - expression = expression.bind_refs( - proj_mappings, allow_partial_bindings=True - ) - - elif isinstance( - curr_node, - ( - bigframes.core.nodes.WindowOpNode, - bigframes.core.nodes.PromoteOffsetsNode, - ), - ): - if set(expression.column_references).isdisjoint( - field.id for field in curr_node.added_fields - ): - # we don't yet have a way of normalizing window ops into a ExpressionSpec, which only - # handles normalizing scalar expressions at the moment. - pass - else: - return ExpressionSpec(expression, curr_node) - else: - return ExpressionSpec(expression, curr_node) - curr_node = curr_node.child - +def rewrite_row_join( + node: bigframes.core.nodes.BigFrameNode, +): + if not isinstance(node, bigframes.core.nodes.RowJoinNode): + return node -def try_row_join( - l_node: bigframes.core.nodes.BigFrameNode, - r_node: bigframes.core.nodes.BigFrameNode, - join_keys: Tuple[Tuple[str, str], ...], -) -> Optional[bigframes.core.nodes.BigFrameNode]: - """Joins the two nodes""" + l_node = node.left_child + r_node = node.right_child divergent_node = first_shared_descendent( l_node, r_node, descendable_types=ALIGNABLE_NODES ) - if divergent_node is None: - return None - # check join keys are equivalent by normalizing the expressions as much as posisble - # instead of just comparing ids - for l_key, r_key in join_keys: - # Caller is block, so they still work with raw strings rather than ids - left_id = bigframes.core.identifiers.ColumnId(l_key) - right_id = bigframes.core.identifiers.ColumnId(r_key) - if get_expression_spec(l_node, left_id) != get_expression_spec( - r_node, right_id - ): - return None - + assert divergent_node is not None l_node, l_selection = pull_up_selection(l_node, stop=divergent_node) r_node, r_selection = pull_up_selection( r_node, stop=divergent_node, rename_vars=True @@ -211,10 +141,10 @@ def pull_up_selection( def first_shared_descendent( left: bigframes.core.nodes.BigFrameNode, right: bigframes.core.nodes.BigFrameNode, - descendable_types: Tuple[type[bigframes.core.nodes.UnaryNode], ...], + descendable_types: Tuple[type[bigframes.core.nodes.BigFrameNode], ...], ) -> Optional[bigframes.core.nodes.BigFrameNode]: - l_path = tuple(descend(left, descendable_types)) - r_path = tuple(descend(right, descendable_types)) + l_path = tuple(descend_left(left, descendable_types)) + r_path = tuple(descend_left(right, descendable_types)) if l_path[-1] != r_path[-1]: return None @@ -225,10 +155,10 @@ def first_shared_descendent( raise ValueError() -def descend( +def descend_left( root: bigframes.core.nodes.BigFrameNode, - descendable_types: Tuple[type[bigframes.core.nodes.UnaryNode], ...], + descendable_types: Tuple[type[bigframes.core.nodes.BigFrameNode], ...], ) -> Iterable[bigframes.core.nodes.BigFrameNode]: yield root if isinstance(root, descendable_types): - yield from descend(root.child, descendable_types) + yield from descend_left(root.child_nodes[0], descendable_types) diff --git a/bigframes/core/rewrite/legacy_align.py b/bigframes/core/rewrite/legacy_align.py index a671f34bd4..6063be5dc7 100644 --- a/bigframes/core/rewrite/legacy_align.py +++ b/bigframes/core/rewrite/legacy_align.py @@ -242,6 +242,26 @@ def legacy_join_as_projection( mappings: Tuple[join_defs.JoinColumnMapping, ...], how: join_defs.JoinType, ) -> Optional[nodes.BigFrameNode]: + # New alignment logic creates RowJoinNode, need to remove these to apply legacy join logic + + def stop_condition(node: nodes.BigFrameNode) -> bool: + return not isinstance(node, (*LEGACY_REWRITER_NODES, nodes.RowJoinNode)) + + # Legacy joiner can't handle RowJoinNode, so rewrite them. + # This is temporary measure while both legacy and new alignment logic coexist + l_node = nodes.bottom_up( + l_node, + bigframes.core.rewrite.implicit_align.rewrite_row_join, + stop=stop_condition, + memoize=True, + ) + r_node = nodes.bottom_up( + r_node, + bigframes.core.rewrite.implicit_align.rewrite_row_join, + stop=stop_condition, + memoize=True, + ) + rewrite_common_node = common_selection_root(l_node, r_node) if rewrite_common_node is not None: left_side = SquashedSelect.from_node_span(l_node, rewrite_common_node) diff --git a/bigframes/core/row_join.py b/bigframes/core/row_join.py new file mode 100644 index 0000000000..f156c78d24 --- /dev/null +++ b/bigframes/core/row_join.py @@ -0,0 +1,115 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import dataclasses +from typing import Optional, Tuple + +import bigframes.core.expression +import bigframes.core.guid +import bigframes.core.identifiers +import bigframes.core.join_def +import bigframes.core.nodes +import bigframes.core.window_spec +import bigframes.operations.aggregations + +# Additive nodes leave existing columns completely intact, and only add new columns to the end +ADDITIVE_NODES = ( + bigframes.core.nodes.ProjectionNode, + bigframes.core.nodes.WindowOpNode, + bigframes.core.nodes.PromoteOffsetsNode, +) +# Combination of selects and additive nodes can be merged as an explicit keyless "row join" +ALIGNABLE_NODES = ( + *ADDITIVE_NODES, + bigframes.core.nodes.SelectionNode, +) + + +@dataclasses.dataclass(frozen=True) +class ExpressionSpec: + expression: bigframes.core.expression.Expression + node: bigframes.core.nodes.BigFrameNode + + +def get_expression_spec( + node: bigframes.core.nodes.BigFrameNode, id: bigframes.core.identifiers.ColumnId +) -> ExpressionSpec: + """Normalizes column value by chaining expressions across multiple selection and projection nodes if possible. + This normalization helps identify whether columns are equivalent. + """ + # TODO: While we chain expression fragments from different nodes + # we could further normalize with constant folding and other scalar expression rewrites + expression: bigframes.core.expression.Expression = ( + bigframes.core.expression.DerefOp(id) + ) + curr_node = node + while True: + if isinstance(curr_node, bigframes.core.nodes.RowJoinNode): + if id in curr_node.left_child.ids: + curr_node = curr_node.left_child + continue + else: + curr_node = curr_node.right_child + continue + elif isinstance(curr_node, bigframes.core.nodes.SelectionNode): + select_mappings = { + col_id: ref for ref, col_id in curr_node.input_output_pairs + } + expression = expression.bind_refs( + select_mappings, allow_partial_bindings=True + ) + elif isinstance(curr_node, bigframes.core.nodes.ProjectionNode): + proj_mappings = {col_id: expr for expr, col_id in curr_node.assignments} + expression = expression.bind_refs( + proj_mappings, allow_partial_bindings=True + ) + + elif isinstance( + curr_node, + ( + bigframes.core.nodes.WindowOpNode, + bigframes.core.nodes.PromoteOffsetsNode, + ), + ): + if set(expression.column_references).isdisjoint( + field.id for field in curr_node.added_fields + ): + # we don't yet have a way of normalizing window ops into a ExpressionSpec, which only + # handles normalizing scalar expressions at the moment. + pass + else: + return ExpressionSpec(expression, curr_node) + else: + return ExpressionSpec(expression, curr_node) + curr_node = curr_node.child + + +def try_row_join( + l_node: bigframes.core.nodes.BigFrameNode, + r_node: bigframes.core.nodes.BigFrameNode, + join_keys: Tuple[Tuple[str, str], ...], +) -> Optional[bigframes.core.nodes.BigFrameNode]: + """Joins the two nodes""" + # check join keys are equivalent by normalizing the expressions as much as posisble + # instead of just comparing ids + for l_key, r_key in join_keys: + # Caller is block, so they still work with raw strings rather than ids + left_id = bigframes.core.identifiers.ColumnId(l_key) + right_id = bigframes.core.identifiers.ColumnId(r_key) + if get_expression_spec(l_node, left_id) != get_expression_spec( + r_node, right_id + ): + return None + return bigframes.core.nodes.RowJoinNode(l_node, r_node) diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index 7e986b5441..35205940f2 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -529,6 +529,11 @@ def _wait_on_job( return results_iterator def preprocess_tree(self, node: nodes.BigFrameNode) -> nodes.BigFrameNode: + # Note rewriting row joins invalidates caching here + # TODO: Make row join and caching mutually compatible + node = bigframes.core.nodes.bottom_up( + node, bigframes.core.rewrite.rewrite_row_join, memoize=True + ) return nodes.top_down( node, lambda x: self._cached_executions.get(x, x), memoize=True ) diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 670828f616..6ddd24050a 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -2391,6 +2391,25 @@ def test_nested_analytic_ops_align(scalars_df_index, scalars_pandas_df_index): ) +@skip_legacy_pandas +def test_unfiltered_align_then_filtered_align( + scalars_df_index, scalars_pandas_df_index +): + col_name = "float64_col" + bf_series = scalars_df_index[col_name].fillna(0.0) + pd_series = scalars_pandas_df_index[col_name].fillna(0.0) + + bf_result = (bf_series + bf_series) + bf_series[bf_series > 0] + + # cumsum does not behave well on nullable ints in pandas, produces object type and never ignores NA + pd_result = (pd_series + pd_series) + pd_series[pd_series > 0] + + pd.testing.assert_series_equal( + bf_result.to_pandas(), + pd_result, + ) + + def test_cumsum_int_filtered(scalars_df_index, scalars_pandas_df_index): col_name = "int64_col" From 72222ce67c7d5200e3a0dd3cbb8b62e3fe03da26 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 17 Dec 2024 20:06:15 +0000 Subject: [PATCH 04/10] reconcile RowJoin and compatibel caches --- bigframes/core/rewrite/implicit_align.py | 21 ++++++++++++++++++++- bigframes/core/row_join.py | 19 +++++++++++++------ bigframes/core/tree_properties.py | 2 +- bigframes/session/executor.py | 17 ++++++++++------- 4 files changed, 44 insertions(+), 15 deletions(-) diff --git a/bigframes/core/rewrite/implicit_align.py b/bigframes/core/rewrite/implicit_align.py index d4002bab0b..3419f35d11 100644 --- a/bigframes/core/rewrite/implicit_align.py +++ b/bigframes/core/rewrite/implicit_align.py @@ -33,12 +33,13 @@ ALIGNABLE_NODES = ( *ADDITIVE_NODES, bigframes.core.nodes.SelectionNode, + bigframes.core.nodes.RowJoinNode, ) def rewrite_row_join( node: bigframes.core.nodes.BigFrameNode, -): +) -> bigframes.core.nodes.BigFrameNode: if not isinstance(node, bigframes.core.nodes.RowJoinNode): return node @@ -48,6 +49,24 @@ def rewrite_row_join( l_node, r_node, descendable_types=ALIGNABLE_NODES ) assert divergent_node is not None + # Inner handler can't RowJoinNode, so bottom up apply the algorithm locally + return bigframes.core.nodes.bottom_up( + node, + lambda x: _rewrite_row_join_node(x, divergent_node), + stop=lambda x: x == divergent_node, + memoize=True, + ) + + +def _rewrite_row_join_node( + node: bigframes.core.nodes.BigFrameNode, + divergent_node: bigframes.core.nodes.BigFrameNode, +) -> bigframes.core.nodes.BigFrameNode: + if not isinstance(node, bigframes.core.nodes.RowJoinNode): + return node + + l_node = node.left_child + r_node = node.right_child l_node, l_selection = pull_up_selection(l_node, stop=divergent_node) r_node, r_selection = pull_up_selection( r_node, stop=divergent_node, rename_vars=True diff --git a/bigframes/core/row_join.py b/bigframes/core/row_join.py index f156c78d24..78a317d2b0 100644 --- a/bigframes/core/row_join.py +++ b/bigframes/core/row_join.py @@ -24,16 +24,13 @@ import bigframes.core.window_spec import bigframes.operations.aggregations -# Additive nodes leave existing columns completely intact, and only add new columns to the end -ADDITIVE_NODES = ( +# TODO: Push this into the Node definitions +PRESERVES_ROWS = ( bigframes.core.nodes.ProjectionNode, bigframes.core.nodes.WindowOpNode, bigframes.core.nodes.PromoteOffsetsNode, -) -# Combination of selects and additive nodes can be merged as an explicit keyless "row join" -ALIGNABLE_NODES = ( - *ADDITIVE_NODES, bigframes.core.nodes.SelectionNode, + bigframes.core.nodes.RowJoinNode, ) @@ -96,6 +93,14 @@ def get_expression_spec( curr_node = curr_node.child +def row_identity_source( + node: bigframes.core.nodes.BigFrameNode, +) -> bigframes.core.nodes.BigFrameNode: + if isinstance(node, PRESERVES_ROWS): + return row_identity_source(node.child_nodes[0]) + return node + + def try_row_join( l_node: bigframes.core.nodes.BigFrameNode, r_node: bigframes.core.nodes.BigFrameNode, @@ -104,6 +109,8 @@ def try_row_join( """Joins the two nodes""" # check join keys are equivalent by normalizing the expressions as much as posisble # instead of just comparing ids + if row_identity_source(l_node) != row_identity_source(r_node): + return None for l_key, r_key in join_keys: # Caller is block, so they still work with raw strings rather than ids left_id = bigframes.core.identifiers.ColumnId(l_key) diff --git a/bigframes/core/tree_properties.py b/bigframes/core/tree_properties.py index d893356207..22bd583767 100644 --- a/bigframes/core/tree_properties.py +++ b/bigframes/core/tree_properties.py @@ -45,7 +45,7 @@ def can_fast_head(node: nodes.BigFrameNode) -> bool: # To do fast head operation: # (1) the underlying data must be arranged/indexed according to the logical ordering # (2) transformations must support pushing down LIMIT or a filter on row numbers - return has_fast_offset_address(node) or has_fast_offset_address(node) + return has_fast_offset_address(node) or has_fast_orderby_limit(node) def has_fast_orderby_limit(node: nodes.BigFrameNode) -> bool: diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index 35205940f2..3a9462fb37 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -529,14 +529,17 @@ def _wait_on_job( return results_iterator def preprocess_tree(self, node: nodes.BigFrameNode) -> nodes.BigFrameNode: - # Note rewriting row joins invalidates caching here + # At each node top=down we do the smallest possibel rewrite to apply the row join + # After that we apply caching. Sometimes the RowJoin will have invalidated the caching # TODO: Make row join and caching mutually compatible - node = bigframes.core.nodes.bottom_up( - node, bigframes.core.rewrite.rewrite_row_join, memoize=True - ) - return nodes.top_down( - node, lambda x: self._cached_executions.get(x, x), memoize=True - ) + def preprocess_node(node): + with_row_join_rewritten = bigframes.core.rewrite.rewrite_row_join(node) + with_cache = self._cached_executions.get( + with_row_join_rewritten, with_row_join_rewritten + ) + return with_cache + + return nodes.top_down(node, preprocess_node, memoize=True) def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): """ From 790362060a70cbc4666f9312bc32281ae86c2d35 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 17 Dec 2024 21:48:14 +0000 Subject: [PATCH 05/10] rewrite shared descendent logic --- bigframes/core/nodes.py | 6 ++++ bigframes/core/rewrite/implicit_align.py | 37 ++++++++++++------------ bigframes/core/rewrite/legacy_align.py | 2 +- bigframes/session/executor.py | 17 ++++++----- 4 files changed, 35 insertions(+), 27 deletions(-) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index a41d32601a..e7174ed236 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -209,6 +209,12 @@ def explicitly_ordered(self) -> bool: """ ... + @functools.cached_property + def height(self) -> int: + if len(self.child_nodes) == 0: + return 0 + return max(child.height for child in self.child_nodes) + 1 + @functools.cached_property def total_variables(self) -> int: return self.variables_introduced + sum( diff --git a/bigframes/core/rewrite/implicit_align.py b/bigframes/core/rewrite/implicit_align.py index 3419f35d11..3768d0fd84 100644 --- a/bigframes/core/rewrite/implicit_align.py +++ b/bigframes/core/rewrite/implicit_align.py @@ -13,7 +13,8 @@ # limitations under the License. from __future__ import annotations -from typing import Iterable, Optional, Tuple +import itertools +from typing import Optional, Set, Tuple import bigframes.core.expression import bigframes.core.guid @@ -46,7 +47,7 @@ def rewrite_row_join( l_node = node.left_child r_node = node.right_child divergent_node = first_shared_descendent( - l_node, r_node, descendable_types=ALIGNABLE_NODES + {l_node, r_node}, descendable_types=ALIGNABLE_NODES ) assert divergent_node is not None # Inner handler can't RowJoinNode, so bottom up apply the algorithm locally @@ -158,26 +159,24 @@ def pull_up_selection( ## Traversal helpers def first_shared_descendent( - left: bigframes.core.nodes.BigFrameNode, - right: bigframes.core.nodes.BigFrameNode, + roots: Set[bigframes.core.nodes.BigFrameNode], descendable_types: Tuple[type[bigframes.core.nodes.BigFrameNode], ...], ) -> Optional[bigframes.core.nodes.BigFrameNode]: - l_path = tuple(descend_left(left, descendable_types)) - r_path = tuple(descend_left(right, descendable_types)) - if l_path[-1] != r_path[-1]: + if not roots: return None + if len(roots) == 1: + return next(iter(roots)) - for l_node, r_node in zip(l_path[-len(r_path) :], r_path[-len(l_path) :]): - if l_node == r_node: - return l_node - # should be impossible, as l_path[-1] == r_path[-1] - raise ValueError() + min_height = min(root.height for root in roots) + to_descend = set(root for root in roots if root.height > min_height) + if not to_descend: + to_descend = roots + if any(not isinstance(root, descendable_types) for root in to_descend): + return None + as_is = roots - to_descend + descended = set( + itertools.chain.from_iterable(root.child_nodes for root in to_descend) + ) -def descend_left( - root: bigframes.core.nodes.BigFrameNode, - descendable_types: Tuple[type[bigframes.core.nodes.BigFrameNode], ...], -) -> Iterable[bigframes.core.nodes.BigFrameNode]: - yield root - if isinstance(root, descendable_types): - yield from descend_left(root.child_nodes[0], descendable_types) + return first_shared_descendent(as_is.union(descended), descendable_types) diff --git a/bigframes/core/rewrite/legacy_align.py b/bigframes/core/rewrite/legacy_align.py index 6063be5dc7..d9b4166b42 100644 --- a/bigframes/core/rewrite/legacy_align.py +++ b/bigframes/core/rewrite/legacy_align.py @@ -381,5 +381,5 @@ def common_selection_root( ) -> Optional[nodes.BigFrameNode]: """Find common subtree between join subtrees""" return bigframes.core.rewrite.implicit_align.first_shared_descendent( - l_tree, r_tree, descendable_types=LEGACY_REWRITER_NODES + {l_tree, r_tree}, descendable_types=LEGACY_REWRITER_NODES ) diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index 3a9462fb37..b68c4cc0a6 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -228,9 +228,7 @@ def to_sql( array_value, internal_offset_col = array_value.promote_offsets() col_id_overrides = dict(col_id_overrides) col_id_overrides[internal_offset_col] = offset_column - node = ( - self.preprocess_tree(array_value.node) if enable_cache else array_value.node - ) + node = self.preprocess_tree(array_value.node, use_cache=enable_cache) if ordered: return self.compiler.compile_ordered( node, col_id_overrides=col_id_overrides @@ -528,16 +526,21 @@ def _wait_on_job( self.metrics.count_job_stats(query_job) return results_iterator - def preprocess_tree(self, node: nodes.BigFrameNode) -> nodes.BigFrameNode: + def preprocess_tree( + self, node: nodes.BigFrameNode, *, use_cache: bool = True + ) -> nodes.BigFrameNode: # At each node top=down we do the smallest possibel rewrite to apply the row join # After that we apply caching. Sometimes the RowJoin will have invalidated the caching # TODO: Make row join and caching mutually compatible def preprocess_node(node): with_row_join_rewritten = bigframes.core.rewrite.rewrite_row_join(node) - with_cache = self._cached_executions.get( - with_row_join_rewritten, with_row_join_rewritten + return ( + self._cached_executions.get( + with_row_join_rewritten, with_row_join_rewritten + ) + if use_cache + else with_row_join_rewritten ) - return with_cache return nodes.top_down(node, preprocess_node, memoize=True) From 3d20473484e3fd928444b71dd69cab1bace521ef Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 17 Dec 2024 23:03:11 +0000 Subject: [PATCH 06/10] fold consecutive projections when rewriting --- bigframes/core/rewrite/implicit_align.py | 26 ++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/bigframes/core/rewrite/implicit_align.py b/bigframes/core/rewrite/implicit_align.py index 3768d0fd84..7b7795c65b 100644 --- a/bigframes/core/rewrite/implicit_align.py +++ b/bigframes/core/rewrite/implicit_align.py @@ -89,9 +89,35 @@ def _linearize_trees( ) merged_node = _linearize_trees(l_node, r_node) + # RowJoin rewrites can otherwise create too deep a tree + merged_node = bigframes.core.nodes.bottom_up( + merged_node, + fold_projections, + stop=lambda x: divergent_node in x.child_nodes, + memoize=True, + ) return bigframes.core.nodes.SelectionNode(merged_node, combined_selection) +def fold_projections( + root: bigframes.core.nodes.BigFrameNode, +) -> bigframes.core.nodes.BigFrameNode: + """If root and child are projection nodes, merge them.""" + if not isinstance(root, bigframes.core.nodes.ProjectionNode): + return root + if not isinstance(root.child, bigframes.core.nodes.ProjectionNode): + return root + mapping = {id: expr for expr, id in root.child.assignments} + new_exprs = ( + *root.child.assignments, + *( + (expr.bind_refs(mapping, allow_partial_bindings=True), id) + for expr, id in root.assignments + ), + ) + return bigframes.core.nodes.ProjectionNode(root.child.child, new_exprs) + + def pull_up_selection( node: bigframes.core.nodes.BigFrameNode, stop: bigframes.core.nodes.BigFrameNode, From b864480e7dc9a7b48527c4161a4db7f4a92b30f6 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 18 Dec 2024 01:47:32 +0000 Subject: [PATCH 07/10] fix folding stop condition --- bigframes/core/nodes.py | 9 +++++++++ bigframes/core/rewrite/implicit_align.py | 4 +++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index e7174ed236..974f56dccd 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -160,6 +160,15 @@ def roots(self) -> typing.Set[BigFrameNode]: ) return set(roots) + @property + def all_nodes(self) -> Iterable[BigFrameNode]: + yield self + for child in self.child_nodes: + yield from child.all_nodes + + def contains(self, node: BigFrameNode) -> bool: + return node in set(self.all_nodes) + # TODO: Store some local data lazily for select, aggregate nodes. @property @abc.abstractmethod diff --git a/bigframes/core/rewrite/implicit_align.py b/bigframes/core/rewrite/implicit_align.py index 7b7795c65b..aa248ddbf3 100644 --- a/bigframes/core/rewrite/implicit_align.py +++ b/bigframes/core/rewrite/implicit_align.py @@ -68,6 +68,7 @@ def _rewrite_row_join_node( l_node = node.left_child r_node = node.right_child + l_node, l_selection = pull_up_selection(l_node, stop=divergent_node) r_node, r_selection = pull_up_selection( r_node, stop=divergent_node, rename_vars=True @@ -89,11 +90,12 @@ def _linearize_trees( ) merged_node = _linearize_trees(l_node, r_node) + # RowJoin rewrites can otherwise create too deep a tree merged_node = bigframes.core.nodes.bottom_up( merged_node, fold_projections, - stop=lambda x: divergent_node in x.child_nodes, + stop=lambda x: (x == divergent_node) or (divergent_node in x.child_nodes), memoize=True, ) return bigframes.core.nodes.SelectionNode(merged_node, combined_selection) From b4229748307c38d1fe3bf31e0b92001053232a57 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 18 Dec 2024 01:49:35 +0000 Subject: [PATCH 08/10] rename try_new_row_join to try_row_join --- bigframes/core/blocks.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 53e9bce338..9231164cf9 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2335,7 +2335,7 @@ def join( # Handle null index, which only supports row join # This is the canonical way of aligning on null index, so always allow (ignore block_identity_join) if self.index.nlevels == other.index.nlevels == 0: - result = try_new_row_join(self, other) or try_legacy_row_join( + result = try_row_join(self, other) or try_legacy_row_join( self, other, how=how ) if result is not None: @@ -2350,7 +2350,7 @@ def join( and (self.index.nlevels == other.index.nlevels) and (self.index.dtypes == other.index.dtypes) ): - result = try_new_row_join(self, other) or try_legacy_row_join( + result = try_row_join(self, other) or try_legacy_row_join( self, other, how=how ) if result is not None: @@ -2691,7 +2691,7 @@ def is_uniquely_named(self: BlockIndexProperties): return len(set(self.names)) == len(self.names) -def try_new_row_join( +def try_row_join( left: Block, right: Block ) -> Optional[Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]]: join_keys = tuple( From 96e37f2628d5eade799330a753d3345167299e62 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 18 Dec 2024 01:50:32 +0000 Subject: [PATCH 09/10] rename to_descend to roots_to_descend --- bigframes/core/rewrite/implicit_align.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/bigframes/core/rewrite/implicit_align.py b/bigframes/core/rewrite/implicit_align.py index aa248ddbf3..caa3af0203 100644 --- a/bigframes/core/rewrite/implicit_align.py +++ b/bigframes/core/rewrite/implicit_align.py @@ -196,15 +196,15 @@ def first_shared_descendent( return next(iter(roots)) min_height = min(root.height for root in roots) - to_descend = set(root for root in roots if root.height > min_height) - if not to_descend: - to_descend = roots + roots_to_descend = set(root for root in roots if root.height > min_height) + if not roots_to_descend: + roots_to_descend = roots - if any(not isinstance(root, descendable_types) for root in to_descend): + if any(not isinstance(root, descendable_types) for root in roots_to_descend): return None - as_is = roots - to_descend + as_is = roots - roots_to_descend descended = set( - itertools.chain.from_iterable(root.child_nodes for root in to_descend) + itertools.chain.from_iterable(root.child_nodes for root in roots_to_descend) ) return first_shared_descendent(as_is.union(descended), descendable_types) From 737cc9ecb69dc5ac67eccdd6ddcdfdb41eceb155 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 18 Dec 2024 01:54:44 +0000 Subject: [PATCH 10/10] extract out comprehension to variable --- bigframes/core/rewrite/implicit_align.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/bigframes/core/rewrite/implicit_align.py b/bigframes/core/rewrite/implicit_align.py index caa3af0203..ba0cc80150 100644 --- a/bigframes/core/rewrite/implicit_align.py +++ b/bigframes/core/rewrite/implicit_align.py @@ -110,12 +110,15 @@ def fold_projections( if not isinstance(root.child, bigframes.core.nodes.ProjectionNode): return root mapping = {id: expr for expr, id in root.child.assignments} - new_exprs = ( - *root.child.assignments, - *( - (expr.bind_refs(mapping, allow_partial_bindings=True), id) - for expr, id in root.assignments - ), + root_assignments = ( + (expr.bind_refs(mapping, allow_partial_bindings=True), id) + for expr, id in root.assignments + ) + new_exprs = tuple( + itertools.chain( + root.child.assignments, + root_assignments, + ) ) return bigframes.core.nodes.ProjectionNode(root.child.child, new_exprs)