Skip to content

Commit

Permalink
Merge branch 'master' into feat/gpu-featurization
Browse files Browse the repository at this point in the history
  • Loading branch information
dcolinmorgan committed Jul 10, 2024
2 parents a86be5c + 2b10395 commit 4bd056c
Show file tree
Hide file tree
Showing 24 changed files with 1,375 additions and 58 deletions.
57 changes: 57 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,63 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Development]

## [0.33.9 - 2024-07-04]

### Added

* Added `personalized_pagerank` to the list of supported `compute_igraph` algorithms.

## [0.33.8 - 2024-04-30]

### Added

* Fix from_json when json object contains predicates.

## [0.33.7 - 2024-04-06]

* Fix refresh() for SSO

## [0.33.6 - 2024-04-05]

### Added

* `featurize()`, on error, coerces `object` dtype cols to `.astype(str)` and retries

## [0.33.5 - 2024-03-11]

### Fixed

* Fix upload-time validation rejecting graphs without a nodes table

## [0.33.4 - 2024-02-29]

### Added

* Fix validations import.

## [0.33.3 - 2024-02-28]

### Added

* Validations for dataset encodings.

## [0.33.2 - 2024-02-24]

### Added

* GFQL: Export shorter alias `e` for `e_undirected`
* Featurize: More auto-dropping of non-numerics when no `dirty_cat`

### Fixed

* GFQL: `hop()` defaults to `debugging_hop=False`
* GFQL: Edge cases around shortest-path multi-hop queries failing to enrich against target nodes during backwards pass

### Infra

* Pin test env to work around test fails: `'test': ['flake8>=5.0', 'mock', 'mypy', 'pytest'] + stubs + test_workarounds,` + `test_workarounds = ['scikit-learn<=1.3.2']`
* Skip dbscan tests that require umap when it is not available

## [0.33.0 - 2023-12-26]

### Added
Expand Down
4 changes: 2 additions & 2 deletions DEVELOP.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ GitHub Actions: See `.github/workflows`

## Publish: Merge, Tag, & Upload

1. Merge the desired PR to master and switch to master head (`git checkout master && git pull`)

1. Manually update CHANGELOG.md

1. Tag the repository with a new version number. We use semantic version numbers of the form *X.Y.Z*.
Expand All @@ -120,3 +118,5 @@ GitHub Actions: See `.github/workflows`
1. Confirm the [publish](https://github.com/graphistry/pygraphistry/actions?query=workflow%3A%22Publish+Python+%F0%9F%90%8D+distributions+%F0%9F%93%A6+to+PyPI+and+TestPyPI%22) Github Action published to [pypi](https://pypi.org/project/graphistry/), or manually run it for the master branch

1. Toggle version as active at [ReadTheDocs](https://readthedocs.org/projects/pygraphistry/versions/)

1. Merge the desired PR to master and switch to master head (`git checkout master && git pull`)
7 changes: 7 additions & 0 deletions docs/source/graphistry.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ Arrow File Uploader Module
:undoc-members:
:show-inheritance:

Validation
==================
.. toctree::
:maxdepth: 3

graphistry.validate

Versioneer
==================

Expand Down
8 changes: 8 additions & 0 deletions docs/source/graphistry.validate.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Module contents
---------------

.. automodule:: graphistry.validate
:members:
:undoc-members:
:show-inheritance:
:noindex:
7 changes: 5 additions & 2 deletions graphistry/PlotterBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -1314,7 +1314,7 @@ def privacy(self, mode: Optional[Mode] = None, notify: Optional[bool] = None, in

def plot(
self, graph=None, nodes=None, name=None, description=None, render=None, skip_upload=False, as_files=False, memoize=True,
extra_html="", override_html_style=None
extra_html="", override_html_style=None, validate: bool = True
): # noqa: C901
"""Upload data to the Graphistry server and show as an iframe of it.
Expand Down Expand Up @@ -1353,6 +1353,9 @@ def plot(
:param override_html_style: Set fully custom style tag.
:type override_html_style: Optional[str]
:param validate: Controls validations, including those for encodings.
:type validate: Optional[bool]
**Example: Simple**
::
Expand Down Expand Up @@ -1405,7 +1408,7 @@ def plot(
if skip_upload:
return dataset
dataset.token = PyGraphistry.api_token()
dataset.post(as_files=as_files, memoize=memoize)
dataset.post(as_files=as_files, memoize=memoize, validate=validate)
dataset.maybe_post_share_link(self)
info = {
'name': dataset.dataset_id,
Expand Down
2 changes: 1 addition & 1 deletion graphistry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
)

from graphistry.compute import (
n, e_forward, e_reverse, e_undirected,
n, e, e_forward, e_reverse, e_undirected,
Chain,

is_in, IsIn,
Expand Down
39 changes: 26 additions & 13 deletions graphistry/arrow_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from graphistry.privacy import Mode, Privacy

from .ArrowFileUploader import ArrowFileUploader

from .exceptions import TokenExpireException
from .validate.validate_encodings import validate_encodings
from .util import setup_logger
logger = setup_logger(__name__)

Expand Down Expand Up @@ -55,19 +58,19 @@ def view_base_path(self, view_base_path: str):
self.__view_base_path = view_base_path

@property
def edges(self) -> pa.Table:
def edges(self) -> Optional[pa.Table]:
return self.__edges

@edges.setter
def edges(self, edges: pa.Table):
def edges(self, edges: Optional[pa.Table]):
self.__edges = edges

@property
def nodes(self) -> pa.Table:
def nodes(self) -> Optional[pa.Table]:
return self.__nodes

@nodes.setter
def nodes(self, nodes: pa.Table):
def nodes(self, nodes: Optional[pa.Table]):
self.__nodes = nodes

@property
Expand Down Expand Up @@ -156,7 +159,7 @@ def __init__(self,
server_base_path='http://nginx', view_base_path='http://localhost',
name = None,
description = None,
edges = None, nodes = None,
edges: Optional[pa.Table] = None, nodes: Optional[pa.Table] = None,
node_encodings = None, edge_encodings = None,
token = None, dataset_id = None,
metadata = None,
Expand Down Expand Up @@ -361,10 +364,14 @@ def refresh(self, token=None):
try:
json_response = out.json()
if not ('token' in json_response):
if (
"non_field_errors" in json_response and "Token has expired." in json_response["non_field_errors"]
):
raise TokenExpireException(out.text)
raise Exception(out.text)
except Exception:
except Exception as e:
logger.error('Error: %s', out, exc_info=True)
raise Exception(out.text)
raise e

self.token = out.json()['token']
return self
Expand All @@ -380,7 +387,13 @@ def verify(self, token=None) -> bool:
json={'token': token})
return out.status_code == requests.codes.ok

def create_dataset(self, json): # noqa: F811
def create_dataset(self, json, validate: bool = True): # noqa: F811
if validate:
validate_encodings(
json.get('node_encodings', {}),
json.get('edge_encodings', {}),
self.nodes.column_names if self.nodes is not None else None,
self.edges.column_names if self.edges is not None else None)
tok = self.token
if self.org_name:
json['org_name'] = self.org_name
Expand Down Expand Up @@ -488,7 +501,7 @@ def g_to_edge_encodings(self, g):
return encodings


def post(self, as_files: bool = True, memoize: bool = True):
def post(self, as_files: bool = True, memoize: bool = True, validate: bool = True):
"""
Note: likely want to pair with self.maybe_post_share_link(g)
"""
Expand All @@ -513,7 +526,7 @@ def post(self, as_files: bool = True, memoize: bool = True):
"description": self.description,
"edge_files": [ e_file_id ],
**({"node_files": [ n_file_id ] if not (self.nodes is None) else []})
})
}, validate)

else:

Expand All @@ -523,7 +536,7 @@ def post(self, as_files: bool = True, memoize: bool = True):
"metadata": self.metadata,
"name": self.name,
"description": self.description
})
}, validate)

self.post_edges_arrow()

Expand Down Expand Up @@ -636,12 +649,12 @@ def post_share_link(
###########################################


def post_edges_arrow(self, arr=None, opts=''):
def post_edges_arrow(self, arr: Optional[pa.Table] = None, opts=''):
if arr is None:
arr = self.edges
return self.post_arrow(arr, 'edges', opts)

def post_nodes_arrow(self, arr=None, opts=''):
def post_nodes_arrow(self, arr: Optional[pa.Table] = None, opts=''):
if arr is None:
arr = self.nodes
return self.post_arrow(arr, 'nodes', opts)
Expand Down
2 changes: 1 addition & 1 deletion graphistry/compute/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .ComputeMixin import ComputeMixin
from .ast import (
n, e_forward, e_reverse, e_undirected
n, e, e_forward, e_reverse, e_undirected
)
from .chain import Chain
from .predicates.is_in import (
Expand Down
4 changes: 3 additions & 1 deletion graphistry/compute/ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from graphistry.util import setup_logger
from graphistry.utils.json import JSONVal, is_json_serializable
from .predicates.ASTPredicate import ASTPredicate
from .predicates.from_json import from_json as predicates_from_json

from .predicates.is_in import (
is_in, IsIn
)
Expand Down Expand Up @@ -100,7 +102,7 @@ def maybe_filter_dict_from_json(d: Dict, key: str) -> Optional[Dict]:
return None
if key in d and isinstance(d[key], dict):
return {
k: ASTPredicate.from_json(v) if isinstance(v, dict) else v
k: predicates_from_json(v) if isinstance(v, dict) else v
for k, v in d[key].items()
}
elif key in d and d[key] is not None:
Expand Down
17 changes: 17 additions & 0 deletions graphistry/compute/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def combine_steps(g: Plottable, kind: str, steps: List[Tuple[ASTObject,Plottable
getattr(g_step, df_fld)[[id]]
for (_, g_step) in steps
]).drop_duplicates(subset=[id])
for (op, g_step) in steps:
logger.debug('adding nodes to concat: %s', g_step._nodes[[g_step._node]])
logger.debug('adding edges to concat: %s', g_step._edges[[g_step._source, g_step._destination]])

# df[[id, op_name1, ...]]
logger.debug('combine_steps ops: %s', [op for (op, _) in steps])
Expand Down Expand Up @@ -293,6 +296,13 @@ def chain(self: Plottable, ops: Union[List[ASTObject], Chain], engine: Union[Eng
)
g_stack.append(g_step)

import logging
if logger.isEnabledFor(logging.DEBUG):
for (i, g_step) in enumerate(g_stack):
logger.debug('~' * 10 + '\nstep %s', i)
logger.debug('nodes: %s', g_step._nodes)
logger.debug('edges: %s', g_step._edges)

logger.debug('======================== BACKWARDS ========================')

# Backwards
Expand Down Expand Up @@ -325,6 +335,13 @@ def chain(self: Plottable, ops: Union[List[ASTObject], Chain], engine: Union[Eng
)
g_stack_reverse.append(g_step_reverse)

import logging
if logger.isEnabledFor(logging.DEBUG):
for (i, g_step) in enumerate(g_stack_reverse):
logger.debug('~' * 10 + '\nstep %s', i)
logger.debug('nodes: %s', g_step._nodes)
logger.debug('edges: %s', g_step._edges)

logger.debug('============ COMBINE NODES ============')
final_nodes_df = combine_steps(g, 'nodes', list(zip(ops, reversed(g_stack_reverse))), engine_concrete)

Expand Down
30 changes: 25 additions & 5 deletions graphistry/compute/hop.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def hop(self: Plottable,
#TODO target_wave_front code also includes nodes for handling intermediate hops
# ... better to make an explicit param of allowed intermediates? (vs recording each intermediate hop)

debugging_hop = True
debugging_hop = False

if debugging_hop and logger.isEnabledFor(logging.DEBUG):
logger.debug('=======================')
Expand Down Expand Up @@ -152,6 +152,7 @@ def hop(self: Plottable,
logger.debug('=====================')

first_iter = True
combined_node_ids = None
while True:

if debugging_hop and logger.isEnabledFor(logging.DEBUG):
Expand Down Expand Up @@ -242,7 +243,8 @@ def hop(self: Plottable,
logger.debug('--- direction in [reverse, undirected] ---')
logger.debug('hop_edges_reverse basic:\n%s', hop_edges_reverse)

if target_wave_front is not None:
#FIXME: What test case does this enable? Disabled to pass shortest path backwards pass steps
if False and target_wave_front is not None:
assert nodes is not None, "target_wave_front indicates nodes"
if hops_remaining:
intermediate_target_wave_front = concat([
Expand Down Expand Up @@ -298,7 +300,10 @@ def hop(self: Plottable,
if debugging_hop and logger.isEnabledFor(logging.DEBUG):
logger.debug('~~~~~~~~~~ LOOP STEP MERGES 1 ~~~~~~~~~~~')
logger.debug('matches_edges:\n%s', matches_edges)
logger.debug('matches_nodes:\n%s', matches_nodes)
logger.debug('new_node_ids:\n%s', new_node_ids)
logger.debug('hop_edges_forward:\n%s', hop_edges_forward)
logger.debug('hop_edges_reverse:\n%s', hop_edges_reverse)

# Finally include all initial root nodes matched against, now that edge triples satisfy all source/dest/edge predicates
# Only run first iteration b/c root nodes already accounted for in subsequent
Expand Down Expand Up @@ -337,6 +342,15 @@ def hop(self: Plottable,
logger.debug('wave_front:\n%s', wave_front)
logger.debug('matches_nodes:\n%s', matches_nodes)

if debugging_hop and logger.isEnabledFor(logging.DEBUG):
logger.debug('~~~~~~~~~~ LOOP END POST ~~~~~~~~~~~')
logger.debug('matches_nodes:\n%s', matches_nodes)
logger.debug('matches_edges:\n%s', matches_edges)
logger.debug('combined_node_ids:\n%s', combined_node_ids)
logger.debug('nodes (self):\n%s', self._nodes)
logger.debug('nodes (init):\n%s', nodes)
logger.debug('target_wave_front:\n%s', target_wave_front)

#hydrate edges
final_edges = edges_indexed.merge(matches_edges, on=EDGE_ID, how='inner')
if EDGE_ID not in self._edges:
Expand All @@ -345,10 +359,16 @@ def hop(self: Plottable,

#hydrate nodes
if self._nodes is not None:
logger.debug('~~~~~~~~~~ NODES HYDRATION ~~~~~~~~~~~')
#FIXME what was this for? Removed for shortest-path reverse pass fixes
#if target_wave_front is not None:
# rich_nodes = target_wave_front
#else:
# rich_nodes = self._nodes
rich_nodes = self._nodes
if target_wave_front is not None:
rich_nodes = target_wave_front
else:
rich_nodes = self._nodes
rich_nodes = concat([rich_nodes, target_wave_front], ignore_index=True, sort=False).drop_duplicates(subset=[g2._node])
logger.debug('rich_nodes available for inner merge:\n%s', rich_nodes[[self._node]])
final_nodes = rich_nodes.merge(
matches_nodes if matches_nodes is not None else wave_front[:0],
on=self._node,
Expand Down
Loading

0 comments on commit 4bd056c

Please sign in to comment.