Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature standard datasets - part 2 #286

Merged
Merged
Show file tree
Hide file tree
Changes from 89 commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
ece0726
work in progress
ronanstokes-db Mar 22, 2024
be90276
work in progress
ronanstokes-db Mar 22, 2024
38f66a3
work in progress
ronanstokes-db Mar 22, 2024
00a71da
wip
ronanstokes-db Mar 22, 2024
fdbcef2
wip
ronanstokes-db Mar 24, 2024
81b9680
added implementations for Datasets describe and listing
ronanstokes-db Mar 28, 2024
8cf15d9
bumpedBuild
ronanstokes-db Mar 28, 2024
0b12ad4
bumpedBuild
ronanstokes-db Mar 28, 2024
0d73fcc
bumpedBuild
ronanstokes-db Mar 28, 2024
471ee44
bumpedBuild
ronanstokes-db Mar 28, 2024
b79d611
fixed dataset provider imports
ronanstokes-db Mar 28, 2024
40ff743
fixed dataset provider imports
ronanstokes-db Mar 28, 2024
c262363
fixed dataset provider imports
ronanstokes-db Mar 28, 2024
76a09f0
fixed dataset provider imports
ronanstokes-db Mar 28, 2024
9cca76f
fixed dataset provider imports
ronanstokes-db Mar 28, 2024
26decec
fixed dataset provider imports
ronanstokes-db Mar 28, 2024
19b77df
wip
ronanstokes-db Mar 28, 2024
ac16b50
wip
ronanstokes-db Mar 28, 2024
3a9c5e6
initial working version
ronanstokes-db Mar 28, 2024
4d930ec
initial working version
ronanstokes-db Mar 28, 2024
dbda36e
initial working version
ronanstokes-db Mar 28, 2024
17b1e0d
initial working version
ronanstokes-db Mar 28, 2024
eaa57a3
initial working version
ronanstokes-db Mar 28, 2024
a02d150
initial working version
ronanstokes-db Mar 28, 2024
eba1724
initial working version
ronanstokes-db Mar 28, 2024
b0bc9ad
initial working version
ronanstokes-db Mar 28, 2024
a99f362
initial working version
ronanstokes-db Mar 28, 2024
c3f8ab8
added telephony plans
ronanstokes-db Mar 28, 2024
a6aa7d7
added telephony plans
ronanstokes-db Mar 28, 2024
6a7a6b8
Merge branch 'master' into feature_standard_datasets
ronanstokes-db Mar 28, 2024
32ecd13
added telephony plans
ronanstokes-db Mar 28, 2024
6167b27
initial working version
ronanstokes-db Mar 28, 2024
0c316e9
Added tokei.rs badge (#253)
nfx Feb 28, 2024
f966845
Prep for release 036 (#251)
ronanstokes-db Mar 28, 2024
4410b3d
added telephony plans
ronanstokes-db Mar 28, 2024
f702c8f
initial implementation
ronanstokes-db Mar 28, 2024
0fba14b
added basic/iot dataset
ronanstokes-db Mar 28, 2024
fc28735
wip
ronanstokes-db Mar 29, 2024
17af167
wip
ronanstokes-db Apr 1, 2024
679c473
work in progress
ronanstokes-db Apr 4, 2024
9609cda
wip
ronanstokes-db Apr 4, 2024
ecdd31b
wip
ronanstokes-db Apr 4, 2024
813ce9a
wip
ronanstokes-db Apr 6, 2024
e7abd60
wip
ronanstokes-db Apr 6, 2024
ecb888a
wip
ronanstokes-db Apr 6, 2024
035c29e
wip
ronanstokes-db Apr 8, 2024
afc0788
work in progress
ronanstokes-db Apr 12, 2024
28d9afd
wip
ronanstokes-db Apr 16, 2024
b8601d3
Merge branch 'master' into feature_standard_datasets
ronanstokes-db May 23, 2024
c12ff2f
Merge branch 'master' into feature_standard_datasets
ronanstokes-db May 23, 2024
b1e6ff4
Merge branch 'master' into feature_standard_datasets
ronanstokes-db May 23, 2024
c6352dc
wip
ronanstokes-db May 23, 2024
1bddbd9
wip
ronanstokes-db May 24, 2024
5171eff
wip
ronanstokes-db May 24, 2024
5fd9810
wip
ronanstokes-db May 25, 2024
6073049
wip
ronanstokes-db May 28, 2024
9f50b75
Merge branch 'master' into feature_standard_datasets
ronanstokes-db May 28, 2024
1ffa812
wip
ronanstokes-db May 28, 2024
2441e73
wip
ronanstokes-db May 29, 2024
f3a68ad
wip
ronanstokes-db May 29, 2024
4cfbb35
wip
ronanstokes-db May 29, 2024
a620072
wip
ronanstokes-db May 30, 2024
cdf61bd
wip
ronanstokes-db May 31, 2024
c3be5fc
wip
ronanstokes-db Jun 1, 2024
341f9c3
Merge branch 'master' into feature_standard_datasets
ronanstokes-db Jun 1, 2024
0b9ffa5
wip
ronanstokes-db Jun 1, 2024
809f7d1
wip
ronanstokes-db Jun 1, 2024
005b744
wip
ronanstokes-db Jun 1, 2024
1d0d77a
wip
ronanstokes-db Jun 1, 2024
8fd915d
wip
ronanstokes-db Jun 1, 2024
02d9634
wip
ronanstokes-db Jun 4, 2024
9fea524
wip
ronanstokes-db Jun 5, 2024
6f95d6b
wip
ronanstokes-db Jun 5, 2024
64de31d
wip
ronanstokes-db Jun 5, 2024
8dcf623
wip
ronanstokes-db Jun 5, 2024
d336fdc
additional coverage tests
ronanstokes-db Jun 5, 2024
1d2bc8e
additional coverage
ronanstokes-db Jun 5, 2024
548cb75
additional coverage
ronanstokes-db Jun 5, 2024
17a37bc
additional coverage
ronanstokes-db Jun 5, 2024
acb835c
additional coverage
ronanstokes-db Jun 5, 2024
f1f56c8
Add some standard datasets
ghanse Jun 6, 2024
d277da5
Fix a bug with the default number of groups
ghanse Jun 6, 2024
b42a391
Test changes
ghanse Jun 6, 2024
fe766f9
Updated datasets
ghanse Jun 6, 2024
05ffc0b
Added more unit tests and safety
ghanse Jun 7, 2024
40f3f72
Merge branch 'master' into feature_standard_datasets_001
ronanstokes-db Jun 7, 2024
96736d1
Merge branch 'master' into feature_standard_datasets_001
ghanse Jun 7, 2024
7c77e2a
Merge branch 'feature_standard_datasets_001' of https://github.com/gh…
ghanse Jun 7, 2024
d4e5e5c
Added more tests
ghanse Jun 7, 2024
e7a6216
Update test_standard_dataset_providers.py
ronanstokes-db Jun 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ All notable changes to the Databricks Labs Data Generator will be documented in
* Updated documentation for generating text data.
* Modified data distribiutions to use abstract base classes
* migrated data distribution tests to use `pytest`
* Additional standard datasets

#### Added
* Added classes for constraints on the data generation via new package `dbldatagen.constraints`
Expand Down
8 changes: 8 additions & 0 deletions dbldatagen/datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
from .dataset_provider import DatasetProvider, dataset_definition
from .basic_geometries import BasicGeometriesProvider
from .basic_process_historian import BasicProcessHistorianProvider
from .basic_telematics import BasicTelematicsProvider
from .basic_user import BasicUserProvider
from .benchmark_groupby import BenchmarkGroupByProvider
from .multi_table_telephony_provider import MultiTableTelephonyProvider

__all__ = ["dataset_provider",
"basic_geometries",
"basic_process_historian",
"basic_telematics",
"basic_user",
"benchmark_groupby",
"multi_table_telephony_provider"
]
111 changes: 111 additions & 0 deletions dbldatagen/datasets/basic_geometries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from .dataset_provider import DatasetProvider, dataset_definition


@dataset_definition(name="basic/geometries",
summary="Geometry WKT dataset",
autoRegister=True,
supportsStreaming=True)
class BasicGeometriesProvider(DatasetProvider.NoAssociatedDatasetsMixin, DatasetProvider):
"""
Basic Geometry WKT Dataset
==========================

This is a basic process geospatial dataset with WKT strings representing `POINT`, `LINESTRING`,
and `POLYGON` geometries.

It takes the following options when retrieving the table:
- random: if True, generates random data
- rows : number of rows to generate
- partitions: number of partitions to use
- geometryType: one of `point`, `lineString`, or `polygon`, default is `polygon`
- maxVertices: number of points in each `lineString` or `polygon`

As the data specification is a DataGenerator object, you can add further columns to the data set and
add constraints (when the feature is available)

Note that this datset does not use any features that would prevent it from being used as a source for a
streaming dataframe, and so the flag `supportsStreaming` is set to True.

"""
MIN_LOCATION_ID = 1000000
MAX_LOCATION_ID = 9223372036854775807
COLUMN_COUNT = 2
ALLOWED_OPTIONS = [
"geometryType",
"maxVertices",
"random"
]

@DatasetProvider.allowed_options(options=ALLOWED_OPTIONS)
def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions=-1,
**options):
import dbldatagen as dg
import warnings as w

generateRandom = options.get("random", False)
geometryType = options.get("geometryType", "point")
maxVertices = options.get("maxVertices", 1 if geometryType == "point" else 3)

assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name"
if rows is None or rows < 0:
rows = DatasetProvider.DEFAULT_ROWS
if partitions is None or partitions < 0:
partitions = self.autoComputePartitions(rows, self.COLUMN_COUNT)

df_spec = (
dg.DataGenerator(sparkSession=sparkSession, name="test_data_set1", rows=rows,
partitions=partitions, randomSeedMethod="hash_fieldname")
.withColumn("location_id", "long", minValue=self.MIN_LOCATION_ID, maxValue=self.MAX_LOCATION_ID,
uniqueValues=rows, random=generateRandom)
)
if geometryType == "point":
if maxVertices > 1:
w.warn('Ignoring property maxVertices for point geometries')
df_spec = (
df_spec.withColumn("lat", "float", minValue=-90.0, maxValue=90.0,
step=1e-5, random=generateRandom, omit=True)
.withColumn("lon", "float", minValue=-180.0, maxValue=180.0,
step=1e-5, random=generateRandom, omit=True)
.withColumn("wkt", "string", expr="concat('POINT(', lon, ' ', lat, ')')")
)
elif geometryType == "lineString":
if maxVertices < 2:
maxVertices = 2
w.warn("Parameter maxVertices must be >=2 for 'lineString' geometries; Setting to 2")
j = 0
while j < maxVertices:
df_spec = (
df_spec.withColumn(f"lat_{j}", "float", minValue=-90.0, maxValue=90.0,
step=1e-5, random=generateRandom, omit=True)
.withColumn(f"lon_{j}", "float", minValue=-180.0, maxValue=180.0,
step=1e-5, random=generateRandom, omit=True)
)
j = j + 1
concatCoordinatesExpr = [f"concat(lon_{j}, ' ', lat_{j}, ', ')" for j in range(maxVertices)]
concatPairsExpr = f"replace(concat('LINESTRING(', {', '.join(concatCoordinatesExpr)}, ')'), ', )', ')')"
df_spec = (
df_spec.withColumn("wkt", "string", expr=concatPairsExpr)
)
elif geometryType == "polygon":
if maxVertices < 3:
maxVertices = 3
w.warn("Parameter maxVertices must be >=3 for 'polygon' geometries; Setting to 3")
j = 0
while j < maxVertices:
df_spec = (
df_spec.withColumn(f"lat_{j}", "float", minValue=-90.0, maxValue=90.0,
step=1e-5, random=generateRandom, omit=True)
.withColumn(f"lon_{j}", "float", minValue=-180.0, maxValue=180.0,
step=1e-5, random=generateRandom, omit=True)
)
j = j + 1
vertexIndices = list(range(maxVertices)) + [0]
concatCoordinatesExpr = [f"concat(lon_{j}, ' ', lat_{j}, ', ')" for j in vertexIndices]
concatPairsExpr = f"replace(concat('POLYGON(', {', '.join(concatCoordinatesExpr)}, ')'), ', )', ')')"
df_spec = (
df_spec.withColumn("wkt", "string", expr=concatPairsExpr)
)
else:
raise ValueError("geometryType must be 'point', 'lineString', or 'polygon'")

return df_spec
110 changes: 110 additions & 0 deletions dbldatagen/datasets/basic_process_historian.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from .dataset_provider import DatasetProvider, dataset_definition


@dataset_definition(name="basic/process_historian",
summary="Basic Historian Data for Process Manufacturing",
autoRegister=True,
supportsStreaming=True)
class BasicProcessHistorianProvider(DatasetProvider.NoAssociatedDatasetsMixin, DatasetProvider):
"""
Basic Process Historian Dataset
===============================

This is a basic process historian data set with fields like plant ID, device ID, tag name, timestamp,
value, and units of measure.

It takes the following options when retrieving the table:
- random: if True, generates random data
- rows : number of rows to generate
- partitions: number of partitions to use
- numDevices: number of unique device IDs
- numPlants: number of unique plant IDs
- numTags: number of unique tag names
- startTimestamp: earliest timestamp for IOT time series data
- endTimestamp: latest timestamp for IOT time series data
- dataQualityRatios: dictionary with `pctQuestionable`, `pctSubstituted`, and `pctAnnotated`
values corresponding to the percentage of values to be marked `is_questionable`, `is_substituted`,
or `is_annotated`, respectively

As the data specification is a DataGenerator object, you can add further columns to the data set and
add constraints (when the feature is available)

Note that this datset does not use any features that would prevent it from being used as a source for a
streaming dataframe, and so the flag `supportsStreaming` is set to True.

"""
MIN_DEVICE_ID = 0x100000000
MAX_DEVICE_ID = 9223372036854775807
MIN_PROPERTY_VALUE = 50.0
MAX_PROPERTY_VALUE = 60.0
DEFAULT_NUM_DEVICES = 10000
DEFAULT_NUM_PLANTS = 100
DEFAULT_NUM_TAGS = 10
DEFAULT_START_TIMESTAMP = "2024-01-01 00:00:00"
DEFAULT_END_TIMESTAMP = "2024-02-01 00:00:00"
COLUMN_COUNT = 10
ALLOWED_OPTIONS = [
"numDevices",
"numPlants",
"numTags",
"startTimestamp",
"endTimestamp",
"dataQualityRatios",
"random"
]

@DatasetProvider.allowed_options(options=ALLOWED_OPTIONS)
def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions=-1, **options):
import dbldatagen as dg # import locally to avoid circular imports
import numpy as np

generateRandom = options.get("random", False)
numDevices = options.get("numDevices", self.DEFAULT_NUM_DEVICES)
numPlants = options.get("numPlants", self.DEFAULT_NUM_PLANTS)
numTags = options.get("numTags", self.DEFAULT_NUM_TAGS)
startTimestamp = options.get("startTimestamp", self.DEFAULT_START_TIMESTAMP)
endTimestamp = options.get("endTimestamp", self.DEFAULT_END_TIMESTAMP)
dataQualityRatios = options.get("dataQualityRatios", None)

assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name"
if rows is None or rows < 0:
rows = DatasetProvider.DEFAULT_ROWS
if partitions is None or partitions < 0:
partitions = self.autoComputePartitions(rows, self.COLUMN_COUNT)

tag_names = [f"HEX-{str(j).zfill(int(np.ceil(np.log10(numTags))))}_INLET_TMP" for j in range(numTags)]
plant_ids = [f"PLANT-{str(j).zfill(int(np.ceil(np.log10(numPlants))))}" for j in range(numPlants)]
testDataSpec = (
dg.DataGenerator(sparkSession, name="process_historian_data", rows=rows,
partitions=partitions,
randomSeedMethod="hash_fieldname")
.withColumn("internal_device_id", "long", minValue=self.MIN_DEVICE_ID, maxValue=self.MAX_DEVICE_ID,
uniqueValues=numDevices, omit=True, baseColumnType="hash")
.withColumn("device_id", "string", format="0x%09x", baseColumn="internal_device_id")
.withColumn("plant_id", "string", values=plant_ids, baseColumn="internal_device_id")
.withColumn("tag_name", "string", values=tag_names, baseColumn="internal_device_id")
.withColumn("ts", "timestamp", begin=startTimestamp, end=endTimestamp,
interval="1 second", random=generateRandom)
.withColumn("value", "float", minValue=self.MIN_PROPERTY_VALUE, maxValue=self.MAX_PROPERTY_VALUE,
step=1e-3, random=generateRandom)
.withColumn("engineering_units", "string", expr="'Deg.F'")
)
# Add the data quality columns if they were provided
if dataQualityRatios is not None:
if "pctQuestionable" in dataQualityRatios:
testDataSpec = testDataSpec.withColumn(
"is_questionable", "boolean",
expr=f"rand() < {dataQualityRatios['pctQuestionable']}"
)
if "pctSubstituted" in dataQualityRatios:
testDataSpec = testDataSpec.withColumn(
"is_substituted", "boolean",
expr=f"rand() < {dataQualityRatios['pctSubstituted']}"
)
if "pctAnnotated" in dataQualityRatios:
testDataSpec = testDataSpec.withColumn(
"is_annotated", "boolean",
expr=f"rand() < {dataQualityRatios['pctAnnotated']}"
)

return testDataSpec
131 changes: 131 additions & 0 deletions dbldatagen/datasets/basic_telematics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from .dataset_provider import DatasetProvider, dataset_definition


@dataset_definition(name="basic/telematics",
summary="Telematics dataset for GPS tracking",
autoRegister=True,
supportsStreaming=True)
class BasicTelematicsProvider(DatasetProvider.NoAssociatedDatasetsMixin, DatasetProvider):
"""
Basic Telematics Dataset
========================

This is a basic telematics dataset with time-series `lat`, `lon`, and `heading` values.

It takes the following options when retrieving the table:
- random: if True, generates random data
- rows : number of rows to generate
- partitions: number of partitions to use
- numDevices: number of unique device IDs
- startTimestamp: earliest timestamp for IOT time series data
- endTimestamp: latest timestamp for IOT time series data
- minLat: minimum latitude
- maxLat: maximum latitude
- minLon: minimum longitude
- maxLon: maximum longitude
- generateWKT: if `True`, generates the well-known text representation of the location

As the data specification is a DataGenerator object, you can add further columns to the data set and
add constraints (when the feature is available)

Note that this datset does not use any features that would prevent it from being used as a source for a
streaming dataframe, and so the flag `supportsStreaming` is set to True.

"""
MIN_DEVICE_ID = 1000000
MAX_DEVICE_ID = 9223372036854775807
DEFAULT_NUM_DEVICES = 1000
DEFAULT_START_TIMESTAMP = "2024-01-01 00:00:00"
DEFAULT_END_TIMESTAMP = "2024-02-01 00:00:00"
DEFAULT_MIN_LAT = -90.0
DEFAULT_MAX_LAT = 90.0
DEFAULT_MIN_LON = -180.0
DEFAULT_MAX_LON = 180.0
COLUMN_COUNT = 6
ALLOWED_OPTIONS = [
"numDevices",
"startTimestamp",
"endTimestamp",
"minLat",
"maxLat",
"minLon",
"maxLon",
"generateWkt",
"random"
]

@DatasetProvider.allowed_options(options=ALLOWED_OPTIONS)
def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions=-1,
**options):
import dbldatagen as dg
import warnings as w

generateRandom = options.get("random", False)
numDevices = options.get("numDevices", self.DEFAULT_NUM_DEVICES)
startTimestamp = options.get("startTimestamp", self.DEFAULT_START_TIMESTAMP)
endTimestamp = options.get("endTimestamp", self.DEFAULT_END_TIMESTAMP)
minLat = options.get("minLat", self.DEFAULT_MIN_LAT)
maxLat = options.get("maxLat", self.DEFAULT_MAX_LAT)
minLon = options.get("minLon", self.DEFAULT_MIN_LON)
maxLon = options.get("maxLon", self.DEFAULT_MAX_LON)
generateWkt = options.get("generateWkt", False)

assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name"
if rows is None or rows < 0:
rows = DatasetProvider.DEFAULT_ROWS
if partitions is None or partitions < 0:
partitions = self.autoComputePartitions(rows, self.COLUMN_COUNT)
if minLat < -90.0:
minLat = -90.0
w.warn("Received an invalid minLat value; Setting to -90.0")
if minLat > 90.0:
minLat = 89.0
w.warn("Recieved an invalid minLat value; Setting to 89.0")
if maxLat < -90:
maxLat = -89.0
w.warn("Recieved an invalid maxLat value; Setting to -89.0")
if maxLat > 90.0:
maxLat = 90.0
w.warn("Received an invalid maxLat value; Setting to 90.0")
if minLon < -180.0:
minLon = -180.0
w.warn("Received an invalid minLon value; Setting to -180.0")
if minLon > 180.0:
minLon = 179.0
w.warn("Received an invalid minLon value; Setting to 179.0")
if maxLon < -180.0:
maxLon = -179.0
w.warn("Received an invalid maxLon value; Setting to -179.0")
if maxLon > 180.0:
maxLon = 180.0
w.warn("Received an invalid maxLon value; Setting to 180.0")
if minLon > maxLon:
(minLon, maxLon) = (maxLon, minLon)
w.warn("Received minLon > maxLon; Swapping values")
if minLat > maxLat:
(minLat, maxLat) = (maxLat, minLat)
w.warn("Received minLat > maxLat; Swapping values")
df_spec = (
dg.DataGenerator(sparkSession=sparkSession, rows=rows,
partitions=partitions, randomSeedMethod="hash_fieldname")
.withColumn("device_id", "long", minValue=self.MIN_DEVICE_ID, maxValue=self.MAX_DEVICE_ID,
uniqueValues=numDevices, random=generateRandom)
.withColumn("ts", "timestamp", begin=startTimestamp, end=endTimestamp,
interval="1 second", random=generateRandom)
.withColumn("base_lat", "float", minValue=minLat, maxValue=maxLat, step=0.5,
baseColumn='device_id', omit=True)
.withColumn("base_lon", "float", minValue=minLon, maxValue=maxLon, step=0.5,
baseColumn='device_id', omit=True)
.withColumn("unv_lat", "float", expr="base_lat + (0.5-format_number(rand(), 3))*1e-3", omit=True)
.withColumn("unv_lon", "float", expr="base_lon + (0.5-format_number(rand(), 3))*1e-3", omit=True)
.withColumn("lat", "float", expr=f"""CASE WHEN unv_lat > {maxLat} THEN {maxLat}
ELSE CASE WHEN unv_lat < {minLat} THEN {minLat}
ELSE unv_lat END END""")
.withColumn("lon", "float", expr=f"""CASE WHEN unv_lon > {maxLon} THEN {maxLon}
ELSE CASE WHEN unv_lon < {minLon} THEN {minLon}
ELSE unv_lon END END""")
.withColumn("heading", "integer", minValue=0, maxValue=359, step=1, random=generateRandom)
.withColumn("wkt", "string", expr="concat('POINT(', lon, ' ', lat, ')')", omit=not generateWkt)
)

return df_spec
Loading
Loading