Skip to content

Commit

Permalink
Feature serverless support v2 (#308)
Browse files Browse the repository at this point in the history
* wip

* wip

* Bumping version, updating changelog

---------

Co-authored-by: ronanstokes-db <[email protected]>
Co-authored-by: Ronan Stokes <[email protected]>
  • Loading branch information
3 people authored Dec 19, 2024
1 parent b4370c0 commit b57ae1b
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 8 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@
## Change History
All notable changes to the Databricks Labs Data Generator will be documented in this file.

### Version 0.4.0 Hotfix 2

#### Fixed
* Added basic stock ticker and multi-table sales order standard datasets
* Added min and max latitude and longitude options for the basic geometries dataset provider
* Added default max values for numeric data types

### Version 0.4.0 Hotfix 1

#### Fixed
* Fixed issue with running on serverless environment


### Version 0.4.0

#### Changed
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ details of use and many examples.

Release notes and details of the latest changes for this specific release
can be found in the GitHub repository
[here](https://github.com/databrickslabs/dbldatagen/blob/release/v0.4.0/CHANGELOG.md)
[here](https://github.com/databrickslabs/dbldatagen/blob/release/v0.4.0post2/CHANGELOG.md)

# Installation

Expand Down
2 changes: 1 addition & 1 deletion dbldatagen/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def get_version(version):
return version_info


__version__ = "0.4.0" # DO NOT EDIT THIS DIRECTLY! It is managed by bumpversion
__version__ = "0.4.0post2" # DO NOT EDIT THIS DIRECTLY! It is managed by bumpversion
__version_info__ = get_version(__version__)


Expand Down
15 changes: 12 additions & 3 deletions dbldatagen/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,21 @@ def _setupPandas(self, pandasBatchSize):
self.logger.info("Spark version: %s", self.sparkSession.version)
if str(self.sparkSession.version).startswith("3"):
self.logger.info("Using spark 3.x")
self.sparkSession.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
try:
self.sparkSession.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
except Exception: # pylint: disable=broad-exception-caught
pass
else:
self.sparkSession.conf.set("spark.sql.execution.arrow.enabled", "true")
try:
self.sparkSession.conf.set("spark.sql.execution.arrow.enabled", "true")
except Exception: # pylint: disable=broad-exception-caught
pass

if self._batchSize is not None:
self.sparkSession.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", self._batchSize)
try:
self.sparkSession.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", self._batchSize)
except Exception: # pylint: disable=broad-exception-caught
pass

def _setupLogger(self):
"""Set up logging
Expand Down
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
author = 'Databricks Inc'

# The full version, including alpha/beta/rc tags
release = "0.4.0" # DO NOT EDIT THIS DIRECTLY! It is managed by bumpversion
release = "0.4.0post2" # DO NOT EDIT THIS DIRECTLY! It is managed by bumpversion

# -- General configuration ---------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.4.0
current_version = 0.4.0post2
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+){0,1}(?P<release>\D*)(?P<build>\d*)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

setuptools.setup(
name="dbldatagen",
version="0.4.0",
version="0.4.0post2",
author="Ronan Stokes, Databricks",
description="Databricks Labs - PySpark Synthetic Data Generator",
long_description=long_description,
Expand Down
75 changes: 75 additions & 0 deletions tests/test_serverless.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import pytest

import dbldatagen as dg


class TestSimulatedServerless:
"""Serverless operation and other forms of shared spark cloud operation often have restrictions on what
features may be used.
In this set of tests, we'll simulate some of the common restrictions found in Databricks serverless and shared
environments to ensure that common operations still work.
Serverless operations have some of the following restrictions:
- Spark config settings cannot be written
"""

@pytest.fixture(scope="class")
def serverlessSpark(self):
from unittest.mock import MagicMock

sparkSession = dg.SparkSingleton.getLocalInstance("unit tests")

oldSetMethod = sparkSession.conf.set
oldGetMethod = sparkSession.conf.get
sparkSession.conf.set = MagicMock(
side_effect=ValueError("Setting value prohibited in simulated serverless env."))
sparkSession.conf.get = MagicMock(
side_effect=ValueError("Getting value prohibited in simulated serverless env."))

yield sparkSession

sparkSession.conf.set = oldSetMethod
sparkSession.conf.get = oldGetMethod

def test_basic_data(self, serverlessSpark):
from pyspark.sql.types import FloatType, IntegerType, StringType

row_count = 1000 * 100
column_count = 10
testDataSpec = (
dg.DataGenerator(serverlessSpark, name="test_data_set1", rows=row_count, partitions=4)
.withIdOutput()
.withColumn(
"r",
FloatType(),
expr="floor(rand() * 350) * (86400 + 3600)",
numColumns=column_count,
)
.withColumn("code1", IntegerType(), minValue=100, maxValue=200)
.withColumn("code2", "integer", minValue=0, maxValue=10, random=True)
.withColumn("code3", StringType(), values=["online", "offline", "unknown"])
.withColumn(
"code4", StringType(), values=["a", "b", "c"], random=True, percentNulls=0.05
)
.withColumn(
"code5", "string", values=["a", "b", "c"], random=True, weights=[9, 1, 1]
)
)

dfTestData = testDataSpec.build()

@pytest.mark.parametrize("providerName, providerOptions", [
("basic/user", {"rows": 50, "partitions": 4, "random": False, "dummyValues": 0}),
("basic/user", {"rows": 100, "partitions": -1, "random": True, "dummyValues": 0})
])
def test_basic_user_table_retrieval(self, providerName, providerOptions, serverlessSpark):
ds = dg.Datasets(serverlessSpark, providerName).get(**providerOptions)
assert ds is not None, f"""expected to get dataset specification for provider `{providerName}`
with options: {providerOptions}
"""
df = ds.build()

assert df.count() >= 0

0 comments on commit b57ae1b

Please sign in to comment.