Skip to content

Commit

Permalink
Merge pull request #577 from graphistry/dev/tolerate-cudf-fail
Browse files Browse the repository at this point in the history
refactor(lazy import): centralize, optimize, CPU fallback when broken…
  • Loading branch information
lmeyerov authored Jul 22, 2024
2 parents 1c98df9 + ac11e56 commit c8eb9c2
Show file tree
Hide file tree
Showing 19 changed files with 301 additions and 252 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Development]

### Fixed

* Graceful CPU fallbacks: When lazy GPU dependency imports throw `ImportError`, commonly seen due to broken CUDA environments or having CUDA libraries but no GPU, warn and fall back to CPU.

* Ring layouts now support filtered inputs, giving expected positions

* `encode_axis()` updates are now functional, not inplace

### Changed

* Centralize lazy imports into `graphistry.utils.lazy_import`
* Lazy imports distinguish `ModuleNotFound` (=> `False`) from `ImportError` (warn + `False`)

## [0.34.0 - 2024-07-17]

### Infra
Expand Down
29 changes: 10 additions & 19 deletions graphistry/Engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from inspect import getmodule
import pandas as pd
from typing import Any, Optional, Union
from enum import Enum
from graphistry.utils.lazy_import import lazy_cudf_import


class Engine(Enum):
Expand All @@ -21,18 +23,6 @@ class EngineAbstract(Enum):
DataframeLocalLike = Any # pdf, cudf
GraphistryLke = Any

#TODO use new importer when it lands (this is copied from umap_utils)
def lazy_cudf_import_has_dependancy():
try:
import warnings

warnings.filterwarnings("ignore")
import cudf # type: ignore

return True, "ok", cudf
except ModuleNotFoundError as e:
return False, e, None

def resolve_engine(
engine: Union[EngineAbstract, str],
g_or_df: Optional[Any] = None,
Expand All @@ -58,14 +48,15 @@ def resolve_engine(
if isinstance(g_or_df, pd.DataFrame):
return Engine.PANDAS

has_cudf_dependancy_, _, _ = lazy_cudf_import_has_dependancy()
if has_cudf_dependancy_:
import cudf
if isinstance(g_or_df, cudf.DataFrame):
return Engine.CUDF
raise ValueError(f'Expected cudf dataframe, got: {type(g_or_df)}')
if 'cudf.core.dataframe' in str(getmodule(g_or_df)):
has_cudf_dependancy_, _, _ = lazy_cudf_import()
if has_cudf_dependancy_:
import cudf
if isinstance(g_or_df, cudf.DataFrame):
return Engine.CUDF
raise ValueError(f'Expected cudf dataframe, got: {type(g_or_df)}')

has_cudf_dependancy_, _, _ = lazy_cudf_import_has_dependancy()
has_cudf_dependancy_, _, _ = lazy_cudf_import()
if has_cudf_dependancy_:
return Engine.CUDF
return Engine.PANDAS
Expand Down
13 changes: 5 additions & 8 deletions graphistry/PlotterBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,14 +373,11 @@ def encode_axis(self, rows: List[Dict] = []) -> Plottable:
"""

complex_encodings = self._complex_encodings or {}
if 'node_encodings' not in complex_encodings:
complex_encodings['node_encodings'] = {}
node_encodings = complex_encodings['node_encodings']
if 'current' not in node_encodings:
node_encodings['current'] = {}
if 'default' not in node_encodings:
node_encodings['default'] = {}
complex_encodings = {**self._complex_encodings} if self._complex_encodings else {}
node_encodings = {**complex_encodings['node_encodings']} if 'node_encodings' not in complex_encodings else {}
complex_encodings['node_encodings'] = node_encodings
node_encodings['current'] = {**node_encodings['current']} if 'current' in node_encodings else {}
node_encodings['default'] = {**node_encodings['default']} if 'default' in node_encodings else {}
node_encodings['default']["pointAxisEncoding"] = {
"graphType": "point",
"encodingType": "axis",
Expand Down
38 changes: 4 additions & 34 deletions graphistry/compute/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from graphistry.constants import CUML, UMAP_LEARN, DBSCAN # noqa type: ignore
from graphistry.features import ModelDict
from graphistry.feature_utils import get_matrix_by_column_parts
from graphistry.utils.lazy_import import lazy_cudf_import, lazy_dbscan_import

logger = logging.getLogger("compute.cluster")

Expand All @@ -22,37 +23,6 @@
DBSCANEngine = Literal[DBSCANEngineConcrete, "auto"]


def lazy_dbscan_import_has_dependency():
has_min_dependency = True
DBSCAN = None
try:
from sklearn.cluster import DBSCAN
except ImportError:
has_min_dependency = False
logger.info("Please install sklearn for CPU DBSCAN")

has_cuml_dependency = True
cuDBSCAN = None
try:
from cuml import DBSCAN as cuDBSCAN
except ImportError:
has_cuml_dependency = False
logger.info("Please install cuml for GPU DBSCAN")

return has_min_dependency, DBSCAN, has_cuml_dependency, cuDBSCAN

def lazy_cudf_import_has_dependancy():
try:
import warnings

warnings.filterwarnings("ignore")
import cudf # type: ignore

return True, "ok", cudf
except ModuleNotFoundError as e:
return False, e, None


def resolve_cpu_gpu_engine(
engine: DBSCANEngine,
) -> DBSCANEngineConcrete: # noqa
Expand All @@ -64,7 +34,7 @@ def resolve_cpu_gpu_engine(
_,
has_cuml_dependency,
_,
) = lazy_dbscan_import_has_dependency()
) = lazy_dbscan_import()
if has_cuml_dependency:
return "cuml"
if has_min_dependency:
Expand All @@ -90,7 +60,7 @@ def safe_cudf(X, y):
new_kwargs[key] = value
return new_kwargs['X'], new_kwargs['y']

has_cudf_dependancy_, _, cudf = lazy_cudf_import_has_dependancy()
has_cudf_dependancy_, _, cudf = lazy_cudf_import()
if has_cudf_dependancy_:
# print('DBSCAN CUML Matrices')
return safe_cudf(X, y)
Expand Down Expand Up @@ -209,7 +179,7 @@ def _cluster_dbscan(
):
"""DBSCAN clustering on cpu or gpu infered by .engine flag
"""
_, DBSCAN, _, cuDBSCAN = lazy_dbscan_import_has_dependency()
_, DBSCAN, _, cuDBSCAN = lazy_dbscan_import()

if engine_dbscan in [CUML]:
print('`g.transform_dbscan(..)` not supported for engine=cuml, will return `g.transform_umap(..)` instead')
Expand Down
28 changes: 6 additions & 22 deletions graphistry/dgl_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import numpy as np
import pandas as pd

from graphistry.utils.lazy_import import (
lazy_dgl_import,
lazy_torch_import_has_dependency
)
from . import constants as config
from .feature_utils import (
FeatureEngine,
Expand Down Expand Up @@ -34,26 +38,6 @@
MIXIN_BASE = object


def lazy_dgl_import_has_dependency():
try:
import warnings
warnings.filterwarnings('ignore')
import dgl # noqa: F811
return True, 'ok', dgl
except ModuleNotFoundError as e:
return False, e, None


def lazy_torch_import_has_dependency():
try:
import warnings
warnings.filterwarnings('ignore')
import torch # noqa: F811
return True, 'ok', torch
except ModuleNotFoundError as e:
return False, e, None


logger = setup_logger(name=__name__)


Expand Down Expand Up @@ -181,7 +165,7 @@ def pandas_to_dgl_graph(
sp_mat: sparse scipy matrix
ordered_nodes_dict: dict ordered from most common src and dst nodes
"""
_, _, dgl = lazy_dgl_import_has_dependency() # noqa: F811
_, _, dgl = lazy_dgl_import() # noqa: F811
sp_mat, ordered_nodes_dict = pandas_to_sparse_adjacency(df, src, dst, weight_col)
g = dgl.from_scipy(sp_mat, device=device) # there are other ways too
logger.info(f"Graph Type: {type(g)}")
Expand Down Expand Up @@ -225,7 +209,7 @@ def dgl_lazy_init(self, train_split: float = 0.8, device: str = "cpu"):
"""

if not self.dgl_initialized:
lazy_dgl_import_has_dependency()
lazy_dgl_import()
lazy_torch_import_has_dependency()
self.train_split = train_split
self.device = device
Expand Down
31 changes: 9 additions & 22 deletions graphistry/embed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,11 @@
import pandas as pd
from typing import Optional, Union, Callable, List, TYPE_CHECKING, Any, Tuple

from graphistry.utils.lazy_import import lazy_embed_import
from .PlotterBase import Plottable
from .compute.ComputeMixin import ComputeMixin


def lazy_embed_import_dep():
try:
import torch
import torch.nn as nn
import dgl
from dgl.dataloading import GraphDataLoader
import torch.nn.functional as F
from .networks import HeteroEmbed
from tqdm import trange
return True, torch, nn, dgl, GraphDataLoader, HeteroEmbed, F, trange

except:
return False, None, None, None, None, None, None, None

def check_cudf():
try:
import cudf
Expand All @@ -30,7 +17,7 @@ def check_cudf():


if TYPE_CHECKING:
_, torch, _, _, _, _, _, _ = lazy_embed_import_dep()
_, torch, _, _, _, _, _, _ = lazy_embed_import()
TT = torch.Tensor
MIXIN_BASE = ComputeMixin
else:
Expand Down Expand Up @@ -147,7 +134,7 @@ def _preprocess_embedding_data(self, res, train_split:Union[float, int] = 0.8) -
return res

def _build_graph(self, res) -> Plottable:
_, _, _, dgl, _, _, _, _ = lazy_embed_import_dep()
_, _, _, dgl, _, _, _, _ = lazy_embed_import()
s, r, t = res._triplets.T

if res._train_idx is not None:
Expand All @@ -169,7 +156,7 @@ def _build_graph(self, res) -> Plottable:


def _init_model(self, res, batch_size:int, sample_size:int, num_steps:int, device):
_, _, _, _, GraphDataLoader, HeteroEmbed, _, _ = lazy_embed_import_dep()
_, _, _, _, GraphDataLoader, HeteroEmbed, _, _ = lazy_embed_import()
g_iter = SubgraphIterator(res._kg_dgl, sample_size, num_steps)
g_dataloader = GraphDataLoader(
g_iter, batch_size=batch_size, collate_fn=lambda x: x[0]
Expand All @@ -188,7 +175,7 @@ def _init_model(self, res, batch_size:int, sample_size:int, num_steps:int, devic
return model, g_dataloader

def _train_embedding(self, res, epochs:int, batch_size:int, lr:float, sample_size:int, num_steps:int, device) -> Plottable:
_, torch, nn, _, _, _, _, trange = lazy_embed_import_dep()
_, torch, nn, _, _, _, _, trange = lazy_embed_import()
log('Training embedding')
model, g_dataloader = res._init_model(res, batch_size, sample_size, num_steps, device)
if hasattr(res, "_embed_model") and not res._build_new_embedding_model:
Expand Down Expand Up @@ -232,7 +219,7 @@ def _train_embedding(self, res, epochs:int, batch_size:int, lr:float, sample_siz

@property
def _gcn_node_embeddings(self):
_, torch, _, _, _, _, _, _ = lazy_embed_import_dep()
_, torch, _, _, _, _, _, _ = lazy_embed_import()
g_dgl = self._kg_dgl.to(self._device)
em = self._embed_model(g_dgl).detach()
torch.cuda.empty_cache()
Expand Down Expand Up @@ -540,7 +527,7 @@ def fetch_triplets_for_inference(x_r):


def _score(self, triplets: Union[np.ndarray, TT]) -> TT: # type: ignore
_, torch, _, _, _, _, _, _ = lazy_embed_import_dep()
_, torch, _, _, _, _, _, _ = lazy_embed_import()
emb = self._kg_embeddings.clone().detach()
if not isinstance(triplets, torch.Tensor):
triplets = torch.tensor(triplets)
Expand Down Expand Up @@ -571,7 +558,7 @@ def __len__(self) -> int:
return self.num_steps

def __getitem__(self, i:int):
_, torch, nn, dgl, GraphDataLoader, _, F, _ = lazy_embed_import_dep()
_, torch, nn, dgl, GraphDataLoader, _, F, _ = lazy_embed_import()
eids = torch.from_numpy(np.random.choice(self.eids, self.sample_size))

src, dst = self.g.find_edges(eids)
Expand All @@ -593,7 +580,7 @@ def __getitem__(self, i:int):

@staticmethod
def _sample_neg(triplets:np.ndarray, num_nodes:int) -> Tuple[TT, TT]: # type: ignore
_, torch, _, _, _, _, _, _ = lazy_embed_import_dep()
_, torch, _, _, _, _, _, _ = lazy_embed_import()
triplets = torch.tensor(triplets)
h, r, t = triplets.T
h_o_t = torch.randint(high=2, size=h.size())
Expand Down
Loading

0 comments on commit c8eb9c2

Please sign in to comment.