Skip to content

Commit

Permalink
Feature standard datasets - part 2 (#286)
Browse files Browse the repository at this point in the history
* work in progress

* work in progress

* work in progress

* wip

* wip

* added implementations for Datasets describe and listing

* bumpedBuild

* bumpedBuild

* bumpedBuild

* bumpedBuild

* fixed dataset provider imports

* fixed dataset provider imports

* fixed dataset provider imports

* fixed dataset provider imports

* fixed dataset provider imports

* fixed dataset provider imports

* wip

* wip

* initial working version

* initial working version

* initial working version

* initial working version

* initial working version

* initial working version

* initial working version

* initial working version

* initial working version

* added telephony plans

* added telephony plans

* added telephony plans

* initial working version
added plugin mechanics, initial user table and part of telephony plans

* Added tokei.rs badge (#253)

[![lines of code](https://tokei.rs/b1/github/databrickslabs/dbldatagen)]([https://codecov.io/github/databrickslabs/dbldatagen](https://github.com/databrickslabs/dbldatagen))

* Prep for release 036 (#251)

* prep for version 0.3.6

* added telephony plans

* initial implementation

* added basic/iot dataset

* wip

* work in progress

* wip

* wip

* wip

* wip

* wip

* wip

* work in progress

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* additional coverage tests

* additional coverage

* additional coverage

* additional coverage

* additional coverage

* Add some standard datasets

* Fix a bug with the default number of groups

* Test changes

* Updated datasets

* Added more unit tests and safety

* Added more tests

* Update test_standard_dataset_providers.py

added space to line 147 to pass lint checks

---------

Co-authored-by: ronanstokes-db <[email protected]>
Co-authored-by: Ronan Stokes <[email protected]>
Co-authored-by: Serge Smertin <[email protected]>
  • Loading branch information
4 people authored Jun 7, 2024
1 parent da1df6b commit 4206b5c
Show file tree
Hide file tree
Showing 7 changed files with 691 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ All notable changes to the Databricks Labs Data Generator will be documented in
* Updated documentation for generating text data.
* Modified data distribiutions to use abstract base classes
* migrated data distribution tests to use `pytest`
* Additional standard datasets

#### Added
* Added classes for constraints on the data generation via new package `dbldatagen.constraints`
Expand Down
8 changes: 8 additions & 0 deletions dbldatagen/datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
from .dataset_provider import DatasetProvider, dataset_definition
from .basic_geometries import BasicGeometriesProvider
from .basic_process_historian import BasicProcessHistorianProvider
from .basic_telematics import BasicTelematicsProvider
from .basic_user import BasicUserProvider
from .benchmark_groupby import BenchmarkGroupByProvider
from .multi_table_telephony_provider import MultiTableTelephonyProvider

__all__ = ["dataset_provider",
"basic_geometries",
"basic_process_historian",
"basic_telematics",
"basic_user",
"benchmark_groupby",
"multi_table_telephony_provider"
]
111 changes: 111 additions & 0 deletions dbldatagen/datasets/basic_geometries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from .dataset_provider import DatasetProvider, dataset_definition


@dataset_definition(name="basic/geometries",
summary="Geometry WKT dataset",
autoRegister=True,
supportsStreaming=True)
class BasicGeometriesProvider(DatasetProvider.NoAssociatedDatasetsMixin, DatasetProvider):
"""
Basic Geometry WKT Dataset
==========================
This is a basic process geospatial dataset with WKT strings representing `POINT`, `LINESTRING`,
and `POLYGON` geometries.
It takes the following options when retrieving the table:
- random: if True, generates random data
- rows : number of rows to generate
- partitions: number of partitions to use
- geometryType: one of `point`, `lineString`, or `polygon`, default is `polygon`
- maxVertices: number of points in each `lineString` or `polygon`
As the data specification is a DataGenerator object, you can add further columns to the data set and
add constraints (when the feature is available)
Note that this datset does not use any features that would prevent it from being used as a source for a
streaming dataframe, and so the flag `supportsStreaming` is set to True.
"""
MIN_LOCATION_ID = 1000000
MAX_LOCATION_ID = 9223372036854775807
COLUMN_COUNT = 2
ALLOWED_OPTIONS = [
"geometryType",
"maxVertices",
"random"
]

@DatasetProvider.allowed_options(options=ALLOWED_OPTIONS)
def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions=-1,
**options):
import dbldatagen as dg
import warnings as w

generateRandom = options.get("random", False)
geometryType = options.get("geometryType", "point")
maxVertices = options.get("maxVertices", 1 if geometryType == "point" else 3)

assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name"
if rows is None or rows < 0:
rows = DatasetProvider.DEFAULT_ROWS
if partitions is None or partitions < 0:
partitions = self.autoComputePartitions(rows, self.COLUMN_COUNT)

df_spec = (
dg.DataGenerator(sparkSession=sparkSession, name="test_data_set1", rows=rows,
partitions=partitions, randomSeedMethod="hash_fieldname")
.withColumn("location_id", "long", minValue=self.MIN_LOCATION_ID, maxValue=self.MAX_LOCATION_ID,
uniqueValues=rows, random=generateRandom)
)
if geometryType == "point":
if maxVertices > 1:
w.warn('Ignoring property maxVertices for point geometries')
df_spec = (
df_spec.withColumn("lat", "float", minValue=-90.0, maxValue=90.0,
step=1e-5, random=generateRandom, omit=True)
.withColumn("lon", "float", minValue=-180.0, maxValue=180.0,
step=1e-5, random=generateRandom, omit=True)
.withColumn("wkt", "string", expr="concat('POINT(', lon, ' ', lat, ')')")
)
elif geometryType == "lineString":
if maxVertices < 2:
maxVertices = 2
w.warn("Parameter maxVertices must be >=2 for 'lineString' geometries; Setting to 2")
j = 0
while j < maxVertices:
df_spec = (
df_spec.withColumn(f"lat_{j}", "float", minValue=-90.0, maxValue=90.0,
step=1e-5, random=generateRandom, omit=True)
.withColumn(f"lon_{j}", "float", minValue=-180.0, maxValue=180.0,
step=1e-5, random=generateRandom, omit=True)
)
j = j + 1
concatCoordinatesExpr = [f"concat(lon_{j}, ' ', lat_{j}, ', ')" for j in range(maxVertices)]
concatPairsExpr = f"replace(concat('LINESTRING(', {', '.join(concatCoordinatesExpr)}, ')'), ', )', ')')"
df_spec = (
df_spec.withColumn("wkt", "string", expr=concatPairsExpr)
)
elif geometryType == "polygon":
if maxVertices < 3:
maxVertices = 3
w.warn("Parameter maxVertices must be >=3 for 'polygon' geometries; Setting to 3")
j = 0
while j < maxVertices:
df_spec = (
df_spec.withColumn(f"lat_{j}", "float", minValue=-90.0, maxValue=90.0,
step=1e-5, random=generateRandom, omit=True)
.withColumn(f"lon_{j}", "float", minValue=-180.0, maxValue=180.0,
step=1e-5, random=generateRandom, omit=True)
)
j = j + 1
vertexIndices = list(range(maxVertices)) + [0]
concatCoordinatesExpr = [f"concat(lon_{j}, ' ', lat_{j}, ', ')" for j in vertexIndices]
concatPairsExpr = f"replace(concat('POLYGON(', {', '.join(concatCoordinatesExpr)}, ')'), ', )', ')')"
df_spec = (
df_spec.withColumn("wkt", "string", expr=concatPairsExpr)
)
else:
raise ValueError("geometryType must be 'point', 'lineString', or 'polygon'")

return df_spec
110 changes: 110 additions & 0 deletions dbldatagen/datasets/basic_process_historian.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from .dataset_provider import DatasetProvider, dataset_definition


@dataset_definition(name="basic/process_historian",
summary="Basic Historian Data for Process Manufacturing",
autoRegister=True,
supportsStreaming=True)
class BasicProcessHistorianProvider(DatasetProvider.NoAssociatedDatasetsMixin, DatasetProvider):
"""
Basic Process Historian Dataset
===============================
This is a basic process historian data set with fields like plant ID, device ID, tag name, timestamp,
value, and units of measure.
It takes the following options when retrieving the table:
- random: if True, generates random data
- rows : number of rows to generate
- partitions: number of partitions to use
- numDevices: number of unique device IDs
- numPlants: number of unique plant IDs
- numTags: number of unique tag names
- startTimestamp: earliest timestamp for IOT time series data
- endTimestamp: latest timestamp for IOT time series data
- dataQualityRatios: dictionary with `pctQuestionable`, `pctSubstituted`, and `pctAnnotated`
values corresponding to the percentage of values to be marked `is_questionable`, `is_substituted`,
or `is_annotated`, respectively
As the data specification is a DataGenerator object, you can add further columns to the data set and
add constraints (when the feature is available)
Note that this datset does not use any features that would prevent it from being used as a source for a
streaming dataframe, and so the flag `supportsStreaming` is set to True.
"""
MIN_DEVICE_ID = 0x100000000
MAX_DEVICE_ID = 9223372036854775807
MIN_PROPERTY_VALUE = 50.0
MAX_PROPERTY_VALUE = 60.0
DEFAULT_NUM_DEVICES = 10000
DEFAULT_NUM_PLANTS = 100
DEFAULT_NUM_TAGS = 10
DEFAULT_START_TIMESTAMP = "2024-01-01 00:00:00"
DEFAULT_END_TIMESTAMP = "2024-02-01 00:00:00"
COLUMN_COUNT = 10
ALLOWED_OPTIONS = [
"numDevices",
"numPlants",
"numTags",
"startTimestamp",
"endTimestamp",
"dataQualityRatios",
"random"
]

@DatasetProvider.allowed_options(options=ALLOWED_OPTIONS)
def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions=-1, **options):
import dbldatagen as dg # import locally to avoid circular imports
import numpy as np

generateRandom = options.get("random", False)
numDevices = options.get("numDevices", self.DEFAULT_NUM_DEVICES)
numPlants = options.get("numPlants", self.DEFAULT_NUM_PLANTS)
numTags = options.get("numTags", self.DEFAULT_NUM_TAGS)
startTimestamp = options.get("startTimestamp", self.DEFAULT_START_TIMESTAMP)
endTimestamp = options.get("endTimestamp", self.DEFAULT_END_TIMESTAMP)
dataQualityRatios = options.get("dataQualityRatios", None)

assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name"
if rows is None or rows < 0:
rows = DatasetProvider.DEFAULT_ROWS
if partitions is None or partitions < 0:
partitions = self.autoComputePartitions(rows, self.COLUMN_COUNT)

tag_names = [f"HEX-{str(j).zfill(int(np.ceil(np.log10(numTags))))}_INLET_TMP" for j in range(numTags)]
plant_ids = [f"PLANT-{str(j).zfill(int(np.ceil(np.log10(numPlants))))}" for j in range(numPlants)]
testDataSpec = (
dg.DataGenerator(sparkSession, name="process_historian_data", rows=rows,
partitions=partitions,
randomSeedMethod="hash_fieldname")
.withColumn("internal_device_id", "long", minValue=self.MIN_DEVICE_ID, maxValue=self.MAX_DEVICE_ID,
uniqueValues=numDevices, omit=True, baseColumnType="hash")
.withColumn("device_id", "string", format="0x%09x", baseColumn="internal_device_id")
.withColumn("plant_id", "string", values=plant_ids, baseColumn="internal_device_id")
.withColumn("tag_name", "string", values=tag_names, baseColumn="internal_device_id")
.withColumn("ts", "timestamp", begin=startTimestamp, end=endTimestamp,
interval="1 second", random=generateRandom)
.withColumn("value", "float", minValue=self.MIN_PROPERTY_VALUE, maxValue=self.MAX_PROPERTY_VALUE,
step=1e-3, random=generateRandom)
.withColumn("engineering_units", "string", expr="'Deg.F'")
)
# Add the data quality columns if they were provided
if dataQualityRatios is not None:
if "pctQuestionable" in dataQualityRatios:
testDataSpec = testDataSpec.withColumn(
"is_questionable", "boolean",
expr=f"rand() < {dataQualityRatios['pctQuestionable']}"
)
if "pctSubstituted" in dataQualityRatios:
testDataSpec = testDataSpec.withColumn(
"is_substituted", "boolean",
expr=f"rand() < {dataQualityRatios['pctSubstituted']}"
)
if "pctAnnotated" in dataQualityRatios:
testDataSpec = testDataSpec.withColumn(
"is_annotated", "boolean",
expr=f"rand() < {dataQualityRatios['pctAnnotated']}"
)

return testDataSpec
131 changes: 131 additions & 0 deletions dbldatagen/datasets/basic_telematics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from .dataset_provider import DatasetProvider, dataset_definition


@dataset_definition(name="basic/telematics",
summary="Telematics dataset for GPS tracking",
autoRegister=True,
supportsStreaming=True)
class BasicTelematicsProvider(DatasetProvider.NoAssociatedDatasetsMixin, DatasetProvider):
"""
Basic Telematics Dataset
========================
This is a basic telematics dataset with time-series `lat`, `lon`, and `heading` values.
It takes the following options when retrieving the table:
- random: if True, generates random data
- rows : number of rows to generate
- partitions: number of partitions to use
- numDevices: number of unique device IDs
- startTimestamp: earliest timestamp for IOT time series data
- endTimestamp: latest timestamp for IOT time series data
- minLat: minimum latitude
- maxLat: maximum latitude
- minLon: minimum longitude
- maxLon: maximum longitude
- generateWKT: if `True`, generates the well-known text representation of the location
As the data specification is a DataGenerator object, you can add further columns to the data set and
add constraints (when the feature is available)
Note that this datset does not use any features that would prevent it from being used as a source for a
streaming dataframe, and so the flag `supportsStreaming` is set to True.
"""
MIN_DEVICE_ID = 1000000
MAX_DEVICE_ID = 9223372036854775807
DEFAULT_NUM_DEVICES = 1000
DEFAULT_START_TIMESTAMP = "2024-01-01 00:00:00"
DEFAULT_END_TIMESTAMP = "2024-02-01 00:00:00"
DEFAULT_MIN_LAT = -90.0
DEFAULT_MAX_LAT = 90.0
DEFAULT_MIN_LON = -180.0
DEFAULT_MAX_LON = 180.0
COLUMN_COUNT = 6
ALLOWED_OPTIONS = [
"numDevices",
"startTimestamp",
"endTimestamp",
"minLat",
"maxLat",
"minLon",
"maxLon",
"generateWkt",
"random"
]

@DatasetProvider.allowed_options(options=ALLOWED_OPTIONS)
def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions=-1,
**options):
import dbldatagen as dg
import warnings as w

generateRandom = options.get("random", False)
numDevices = options.get("numDevices", self.DEFAULT_NUM_DEVICES)
startTimestamp = options.get("startTimestamp", self.DEFAULT_START_TIMESTAMP)
endTimestamp = options.get("endTimestamp", self.DEFAULT_END_TIMESTAMP)
minLat = options.get("minLat", self.DEFAULT_MIN_LAT)
maxLat = options.get("maxLat", self.DEFAULT_MAX_LAT)
minLon = options.get("minLon", self.DEFAULT_MIN_LON)
maxLon = options.get("maxLon", self.DEFAULT_MAX_LON)
generateWkt = options.get("generateWkt", False)

assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name"
if rows is None or rows < 0:
rows = DatasetProvider.DEFAULT_ROWS
if partitions is None or partitions < 0:
partitions = self.autoComputePartitions(rows, self.COLUMN_COUNT)
if minLat < -90.0:
minLat = -90.0
w.warn("Received an invalid minLat value; Setting to -90.0")
if minLat > 90.0:
minLat = 89.0
w.warn("Recieved an invalid minLat value; Setting to 89.0")
if maxLat < -90:
maxLat = -89.0
w.warn("Recieved an invalid maxLat value; Setting to -89.0")
if maxLat > 90.0:
maxLat = 90.0
w.warn("Received an invalid maxLat value; Setting to 90.0")
if minLon < -180.0:
minLon = -180.0
w.warn("Received an invalid minLon value; Setting to -180.0")
if minLon > 180.0:
minLon = 179.0
w.warn("Received an invalid minLon value; Setting to 179.0")
if maxLon < -180.0:
maxLon = -179.0
w.warn("Received an invalid maxLon value; Setting to -179.0")
if maxLon > 180.0:
maxLon = 180.0
w.warn("Received an invalid maxLon value; Setting to 180.0")
if minLon > maxLon:
(minLon, maxLon) = (maxLon, minLon)
w.warn("Received minLon > maxLon; Swapping values")
if minLat > maxLat:
(minLat, maxLat) = (maxLat, minLat)
w.warn("Received minLat > maxLat; Swapping values")
df_spec = (
dg.DataGenerator(sparkSession=sparkSession, rows=rows,
partitions=partitions, randomSeedMethod="hash_fieldname")
.withColumn("device_id", "long", minValue=self.MIN_DEVICE_ID, maxValue=self.MAX_DEVICE_ID,
uniqueValues=numDevices, random=generateRandom)
.withColumn("ts", "timestamp", begin=startTimestamp, end=endTimestamp,
interval="1 second", random=generateRandom)
.withColumn("base_lat", "float", minValue=minLat, maxValue=maxLat, step=0.5,
baseColumn='device_id', omit=True)
.withColumn("base_lon", "float", minValue=minLon, maxValue=maxLon, step=0.5,
baseColumn='device_id', omit=True)
.withColumn("unv_lat", "float", expr="base_lat + (0.5-format_number(rand(), 3))*1e-3", omit=True)
.withColumn("unv_lon", "float", expr="base_lon + (0.5-format_number(rand(), 3))*1e-3", omit=True)
.withColumn("lat", "float", expr=f"""CASE WHEN unv_lat > {maxLat} THEN {maxLat}
ELSE CASE WHEN unv_lat < {minLat} THEN {minLat}
ELSE unv_lat END END""")
.withColumn("lon", "float", expr=f"""CASE WHEN unv_lon > {maxLon} THEN {maxLon}
ELSE CASE WHEN unv_lon < {minLon} THEN {minLon}
ELSE unv_lon END END""")
.withColumn("heading", "integer", minValue=0, maxValue=359, step=1, random=generateRandom)
.withColumn("wkt", "string", expr="concat('POINT(', lon, ' ', lat, ')')", omit=not generateWkt)
)

return df_spec
Loading

0 comments on commit 4206b5c

Please sign in to comment.