From b57ae1ba37b48c7abe2a44a5caacaef4e9d0c8c0 Mon Sep 17 00:00:00 2001 From: ghanse <163584195+ghanse@users.noreply.github.com> Date: Thu, 19 Dec 2024 17:53:44 -0500 Subject: [PATCH] Feature serverless support v2 (#308) * wip * wip * Bumping version, updating changelog --------- Co-authored-by: ronanstokes-db Co-authored-by: Ronan Stokes <42389040+ronanstokes-db@users.noreply.github.com> --- CHANGELOG.md | 13 +++++++ README.md | 2 +- dbldatagen/_version.py | 2 +- dbldatagen/data_generator.py | 15 ++++++-- docs/source/conf.py | 2 +- python/.bumpversion.cfg | 2 +- setup.py | 2 +- tests/test_serverless.py | 75 ++++++++++++++++++++++++++++++++++++ 8 files changed, 105 insertions(+), 8 deletions(-) create mode 100644 tests/test_serverless.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 29675839..f3334e3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,19 @@ ## Change History All notable changes to the Databricks Labs Data Generator will be documented in this file. +### Version 0.4.0 Hotfix 2 + +#### Fixed +* Added basic stock ticker and multi-table sales order standard datasets +* Added min and max latitude and longitude options for the basic geometries dataset provider +* Added default max values for numeric data types + +### Version 0.4.0 Hotfix 1 + +#### Fixed +* Fixed issue with running on serverless environment + + ### Version 0.4.0 #### Changed diff --git a/README.md b/README.md index 4a30ad69..1a6b0738 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,7 @@ details of use and many examples. Release notes and details of the latest changes for this specific release can be found in the GitHub repository -[here](https://github.com/databrickslabs/dbldatagen/blob/release/v0.4.0/CHANGELOG.md) +[here](https://github.com/databrickslabs/dbldatagen/blob/release/v0.4.0post2/CHANGELOG.md) # Installation diff --git a/dbldatagen/_version.py b/dbldatagen/_version.py index 5c1c4c93..da00ddf1 100644 --- a/dbldatagen/_version.py +++ b/dbldatagen/_version.py @@ -34,7 +34,7 @@ def get_version(version): return version_info -__version__ = "0.4.0" # DO NOT EDIT THIS DIRECTLY! It is managed by bumpversion +__version__ = "0.4.0post2" # DO NOT EDIT THIS DIRECTLY! It is managed by bumpversion __version_info__ = get_version(__version__) diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index dcfb3e0e..12015438 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -226,12 +226,21 @@ def _setupPandas(self, pandasBatchSize): self.logger.info("Spark version: %s", self.sparkSession.version) if str(self.sparkSession.version).startswith("3"): self.logger.info("Using spark 3.x") - self.sparkSession.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") + try: + self.sparkSession.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") + except Exception: # pylint: disable=broad-exception-caught + pass else: - self.sparkSession.conf.set("spark.sql.execution.arrow.enabled", "true") + try: + self.sparkSession.conf.set("spark.sql.execution.arrow.enabled", "true") + except Exception: # pylint: disable=broad-exception-caught + pass if self._batchSize is not None: - self.sparkSession.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", self._batchSize) + try: + self.sparkSession.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", self._batchSize) + except Exception: # pylint: disable=broad-exception-caught + pass def _setupLogger(self): """Set up logging diff --git a/docs/source/conf.py b/docs/source/conf.py index f7a9dc08..a85569e8 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -32,7 +32,7 @@ author = 'Databricks Inc' # The full version, including alpha/beta/rc tags -release = "0.4.0" # DO NOT EDIT THIS DIRECTLY! It is managed by bumpversion +release = "0.4.0post2" # DO NOT EDIT THIS DIRECTLY! It is managed by bumpversion # -- General configuration --------------------------------------------------- diff --git a/python/.bumpversion.cfg b/python/.bumpversion.cfg index c6efa034..0ace3463 100644 --- a/python/.bumpversion.cfg +++ b/python/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.4.0 +current_version = 0.4.0post2 commit = False tag = False parse = (?P\d+)\.(?P\d+)\.(?P\d+){0,1}(?P\D*)(?P\d*) diff --git a/setup.py b/setup.py index bd316c3d..8fb35dac 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ setuptools.setup( name="dbldatagen", - version="0.4.0", + version="0.4.0post2", author="Ronan Stokes, Databricks", description="Databricks Labs - PySpark Synthetic Data Generator", long_description=long_description, diff --git a/tests/test_serverless.py b/tests/test_serverless.py new file mode 100644 index 00000000..e4ed2d35 --- /dev/null +++ b/tests/test_serverless.py @@ -0,0 +1,75 @@ +import pytest + +import dbldatagen as dg + + +class TestSimulatedServerless: + """Serverless operation and other forms of shared spark cloud operation often have restrictions on what + features may be used. + + In this set of tests, we'll simulate some of the common restrictions found in Databricks serverless and shared + environments to ensure that common operations still work. + + Serverless operations have some of the following restrictions: + + - Spark config settings cannot be written + + """ + + @pytest.fixture(scope="class") + def serverlessSpark(self): + from unittest.mock import MagicMock + + sparkSession = dg.SparkSingleton.getLocalInstance("unit tests") + + oldSetMethod = sparkSession.conf.set + oldGetMethod = sparkSession.conf.get + sparkSession.conf.set = MagicMock( + side_effect=ValueError("Setting value prohibited in simulated serverless env.")) + sparkSession.conf.get = MagicMock( + side_effect=ValueError("Getting value prohibited in simulated serverless env.")) + + yield sparkSession + + sparkSession.conf.set = oldSetMethod + sparkSession.conf.get = oldGetMethod + + def test_basic_data(self, serverlessSpark): + from pyspark.sql.types import FloatType, IntegerType, StringType + + row_count = 1000 * 100 + column_count = 10 + testDataSpec = ( + dg.DataGenerator(serverlessSpark, name="test_data_set1", rows=row_count, partitions=4) + .withIdOutput() + .withColumn( + "r", + FloatType(), + expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=column_count, + ) + .withColumn("code1", IntegerType(), minValue=100, maxValue=200) + .withColumn("code2", "integer", minValue=0, maxValue=10, random=True) + .withColumn("code3", StringType(), values=["online", "offline", "unknown"]) + .withColumn( + "code4", StringType(), values=["a", "b", "c"], random=True, percentNulls=0.05 + ) + .withColumn( + "code5", "string", values=["a", "b", "c"], random=True, weights=[9, 1, 1] + ) + ) + + dfTestData = testDataSpec.build() + + @pytest.mark.parametrize("providerName, providerOptions", [ + ("basic/user", {"rows": 50, "partitions": 4, "random": False, "dummyValues": 0}), + ("basic/user", {"rows": 100, "partitions": -1, "random": True, "dummyValues": 0}) + ]) + def test_basic_user_table_retrieval(self, providerName, providerOptions, serverlessSpark): + ds = dg.Datasets(serverlessSpark, providerName).get(**providerOptions) + assert ds is not None, f"""expected to get dataset specification for provider `{providerName}` + with options: {providerOptions} + """ + df = ds.build() + + assert df.count() >= 0 \ No newline at end of file