diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f3c48b1..a8bb4570 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ All notable changes to the Databricks Labs Data Generator will be documented in * Updated documentation for generating text data. * Modified data distribiutions to use abstract base classes * migrated data distribution tests to use `pytest` +* Additional standard datasets #### Added * Added classes for constraints on the data generation via new package `dbldatagen.constraints` diff --git a/dbldatagen/datasets/__init__.py b/dbldatagen/datasets/__init__.py index 50b4e901..17f5b212 100644 --- a/dbldatagen/datasets/__init__.py +++ b/dbldatagen/datasets/__init__.py @@ -1,8 +1,16 @@ from .dataset_provider import DatasetProvider, dataset_definition +from .basic_geometries import BasicGeometriesProvider +from .basic_process_historian import BasicProcessHistorianProvider +from .basic_telematics import BasicTelematicsProvider from .basic_user import BasicUserProvider +from .benchmark_groupby import BenchmarkGroupByProvider from .multi_table_telephony_provider import MultiTableTelephonyProvider __all__ = ["dataset_provider", + "basic_geometries", + "basic_process_historian", + "basic_telematics", "basic_user", + "benchmark_groupby", "multi_table_telephony_provider" ] diff --git a/dbldatagen/datasets/basic_geometries.py b/dbldatagen/datasets/basic_geometries.py new file mode 100644 index 00000000..1673bfd0 --- /dev/null +++ b/dbldatagen/datasets/basic_geometries.py @@ -0,0 +1,111 @@ +from .dataset_provider import DatasetProvider, dataset_definition + + +@dataset_definition(name="basic/geometries", + summary="Geometry WKT dataset", + autoRegister=True, + supportsStreaming=True) +class BasicGeometriesProvider(DatasetProvider.NoAssociatedDatasetsMixin, DatasetProvider): + """ + Basic Geometry WKT Dataset + ========================== + + This is a basic process geospatial dataset with WKT strings representing `POINT`, `LINESTRING`, + and `POLYGON` geometries. + + It takes the following options when retrieving the table: + - random: if True, generates random data + - rows : number of rows to generate + - partitions: number of partitions to use + - geometryType: one of `point`, `lineString`, or `polygon`, default is `polygon` + - maxVertices: number of points in each `lineString` or `polygon` + + As the data specification is a DataGenerator object, you can add further columns to the data set and + add constraints (when the feature is available) + + Note that this datset does not use any features that would prevent it from being used as a source for a + streaming dataframe, and so the flag `supportsStreaming` is set to True. + + """ + MIN_LOCATION_ID = 1000000 + MAX_LOCATION_ID = 9223372036854775807 + COLUMN_COUNT = 2 + ALLOWED_OPTIONS = [ + "geometryType", + "maxVertices", + "random" + ] + + @DatasetProvider.allowed_options(options=ALLOWED_OPTIONS) + def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions=-1, + **options): + import dbldatagen as dg + import warnings as w + + generateRandom = options.get("random", False) + geometryType = options.get("geometryType", "point") + maxVertices = options.get("maxVertices", 1 if geometryType == "point" else 3) + + assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name" + if rows is None or rows < 0: + rows = DatasetProvider.DEFAULT_ROWS + if partitions is None or partitions < 0: + partitions = self.autoComputePartitions(rows, self.COLUMN_COUNT) + + df_spec = ( + dg.DataGenerator(sparkSession=sparkSession, name="test_data_set1", rows=rows, + partitions=partitions, randomSeedMethod="hash_fieldname") + .withColumn("location_id", "long", minValue=self.MIN_LOCATION_ID, maxValue=self.MAX_LOCATION_ID, + uniqueValues=rows, random=generateRandom) + ) + if geometryType == "point": + if maxVertices > 1: + w.warn('Ignoring property maxVertices for point geometries') + df_spec = ( + df_spec.withColumn("lat", "float", minValue=-90.0, maxValue=90.0, + step=1e-5, random=generateRandom, omit=True) + .withColumn("lon", "float", minValue=-180.0, maxValue=180.0, + step=1e-5, random=generateRandom, omit=True) + .withColumn("wkt", "string", expr="concat('POINT(', lon, ' ', lat, ')')") + ) + elif geometryType == "lineString": + if maxVertices < 2: + maxVertices = 2 + w.warn("Parameter maxVertices must be >=2 for 'lineString' geometries; Setting to 2") + j = 0 + while j < maxVertices: + df_spec = ( + df_spec.withColumn(f"lat_{j}", "float", minValue=-90.0, maxValue=90.0, + step=1e-5, random=generateRandom, omit=True) + .withColumn(f"lon_{j}", "float", minValue=-180.0, maxValue=180.0, + step=1e-5, random=generateRandom, omit=True) + ) + j = j + 1 + concatCoordinatesExpr = [f"concat(lon_{j}, ' ', lat_{j}, ', ')" for j in range(maxVertices)] + concatPairsExpr = f"replace(concat('LINESTRING(', {', '.join(concatCoordinatesExpr)}, ')'), ', )', ')')" + df_spec = ( + df_spec.withColumn("wkt", "string", expr=concatPairsExpr) + ) + elif geometryType == "polygon": + if maxVertices < 3: + maxVertices = 3 + w.warn("Parameter maxVertices must be >=3 for 'polygon' geometries; Setting to 3") + j = 0 + while j < maxVertices: + df_spec = ( + df_spec.withColumn(f"lat_{j}", "float", minValue=-90.0, maxValue=90.0, + step=1e-5, random=generateRandom, omit=True) + .withColumn(f"lon_{j}", "float", minValue=-180.0, maxValue=180.0, + step=1e-5, random=generateRandom, omit=True) + ) + j = j + 1 + vertexIndices = list(range(maxVertices)) + [0] + concatCoordinatesExpr = [f"concat(lon_{j}, ' ', lat_{j}, ', ')" for j in vertexIndices] + concatPairsExpr = f"replace(concat('POLYGON(', {', '.join(concatCoordinatesExpr)}, ')'), ', )', ')')" + df_spec = ( + df_spec.withColumn("wkt", "string", expr=concatPairsExpr) + ) + else: + raise ValueError("geometryType must be 'point', 'lineString', or 'polygon'") + + return df_spec diff --git a/dbldatagen/datasets/basic_process_historian.py b/dbldatagen/datasets/basic_process_historian.py new file mode 100644 index 00000000..870ad3b7 --- /dev/null +++ b/dbldatagen/datasets/basic_process_historian.py @@ -0,0 +1,110 @@ +from .dataset_provider import DatasetProvider, dataset_definition + + +@dataset_definition(name="basic/process_historian", + summary="Basic Historian Data for Process Manufacturing", + autoRegister=True, + supportsStreaming=True) +class BasicProcessHistorianProvider(DatasetProvider.NoAssociatedDatasetsMixin, DatasetProvider): + """ + Basic Process Historian Dataset + =============================== + + This is a basic process historian data set with fields like plant ID, device ID, tag name, timestamp, + value, and units of measure. + + It takes the following options when retrieving the table: + - random: if True, generates random data + - rows : number of rows to generate + - partitions: number of partitions to use + - numDevices: number of unique device IDs + - numPlants: number of unique plant IDs + - numTags: number of unique tag names + - startTimestamp: earliest timestamp for IOT time series data + - endTimestamp: latest timestamp for IOT time series data + - dataQualityRatios: dictionary with `pctQuestionable`, `pctSubstituted`, and `pctAnnotated` + values corresponding to the percentage of values to be marked `is_questionable`, `is_substituted`, + or `is_annotated`, respectively + + As the data specification is a DataGenerator object, you can add further columns to the data set and + add constraints (when the feature is available) + + Note that this datset does not use any features that would prevent it from being used as a source for a + streaming dataframe, and so the flag `supportsStreaming` is set to True. + + """ + MIN_DEVICE_ID = 0x100000000 + MAX_DEVICE_ID = 9223372036854775807 + MIN_PROPERTY_VALUE = 50.0 + MAX_PROPERTY_VALUE = 60.0 + DEFAULT_NUM_DEVICES = 10000 + DEFAULT_NUM_PLANTS = 100 + DEFAULT_NUM_TAGS = 10 + DEFAULT_START_TIMESTAMP = "2024-01-01 00:00:00" + DEFAULT_END_TIMESTAMP = "2024-02-01 00:00:00" + COLUMN_COUNT = 10 + ALLOWED_OPTIONS = [ + "numDevices", + "numPlants", + "numTags", + "startTimestamp", + "endTimestamp", + "dataQualityRatios", + "random" + ] + + @DatasetProvider.allowed_options(options=ALLOWED_OPTIONS) + def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions=-1, **options): + import dbldatagen as dg # import locally to avoid circular imports + import numpy as np + + generateRandom = options.get("random", False) + numDevices = options.get("numDevices", self.DEFAULT_NUM_DEVICES) + numPlants = options.get("numPlants", self.DEFAULT_NUM_PLANTS) + numTags = options.get("numTags", self.DEFAULT_NUM_TAGS) + startTimestamp = options.get("startTimestamp", self.DEFAULT_START_TIMESTAMP) + endTimestamp = options.get("endTimestamp", self.DEFAULT_END_TIMESTAMP) + dataQualityRatios = options.get("dataQualityRatios", None) + + assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name" + if rows is None or rows < 0: + rows = DatasetProvider.DEFAULT_ROWS + if partitions is None or partitions < 0: + partitions = self.autoComputePartitions(rows, self.COLUMN_COUNT) + + tag_names = [f"HEX-{str(j).zfill(int(np.ceil(np.log10(numTags))))}_INLET_TMP" for j in range(numTags)] + plant_ids = [f"PLANT-{str(j).zfill(int(np.ceil(np.log10(numPlants))))}" for j in range(numPlants)] + testDataSpec = ( + dg.DataGenerator(sparkSession, name="process_historian_data", rows=rows, + partitions=partitions, + randomSeedMethod="hash_fieldname") + .withColumn("internal_device_id", "long", minValue=self.MIN_DEVICE_ID, maxValue=self.MAX_DEVICE_ID, + uniqueValues=numDevices, omit=True, baseColumnType="hash") + .withColumn("device_id", "string", format="0x%09x", baseColumn="internal_device_id") + .withColumn("plant_id", "string", values=plant_ids, baseColumn="internal_device_id") + .withColumn("tag_name", "string", values=tag_names, baseColumn="internal_device_id") + .withColumn("ts", "timestamp", begin=startTimestamp, end=endTimestamp, + interval="1 second", random=generateRandom) + .withColumn("value", "float", minValue=self.MIN_PROPERTY_VALUE, maxValue=self.MAX_PROPERTY_VALUE, + step=1e-3, random=generateRandom) + .withColumn("engineering_units", "string", expr="'Deg.F'") + ) + # Add the data quality columns if they were provided + if dataQualityRatios is not None: + if "pctQuestionable" in dataQualityRatios: + testDataSpec = testDataSpec.withColumn( + "is_questionable", "boolean", + expr=f"rand() < {dataQualityRatios['pctQuestionable']}" + ) + if "pctSubstituted" in dataQualityRatios: + testDataSpec = testDataSpec.withColumn( + "is_substituted", "boolean", + expr=f"rand() < {dataQualityRatios['pctSubstituted']}" + ) + if "pctAnnotated" in dataQualityRatios: + testDataSpec = testDataSpec.withColumn( + "is_annotated", "boolean", + expr=f"rand() < {dataQualityRatios['pctAnnotated']}" + ) + + return testDataSpec diff --git a/dbldatagen/datasets/basic_telematics.py b/dbldatagen/datasets/basic_telematics.py new file mode 100644 index 00000000..13bb806c --- /dev/null +++ b/dbldatagen/datasets/basic_telematics.py @@ -0,0 +1,131 @@ +from .dataset_provider import DatasetProvider, dataset_definition + + +@dataset_definition(name="basic/telematics", + summary="Telematics dataset for GPS tracking", + autoRegister=True, + supportsStreaming=True) +class BasicTelematicsProvider(DatasetProvider.NoAssociatedDatasetsMixin, DatasetProvider): + """ + Basic Telematics Dataset + ======================== + + This is a basic telematics dataset with time-series `lat`, `lon`, and `heading` values. + + It takes the following options when retrieving the table: + - random: if True, generates random data + - rows : number of rows to generate + - partitions: number of partitions to use + - numDevices: number of unique device IDs + - startTimestamp: earliest timestamp for IOT time series data + - endTimestamp: latest timestamp for IOT time series data + - minLat: minimum latitude + - maxLat: maximum latitude + - minLon: minimum longitude + - maxLon: maximum longitude + - generateWKT: if `True`, generates the well-known text representation of the location + + As the data specification is a DataGenerator object, you can add further columns to the data set and + add constraints (when the feature is available) + + Note that this datset does not use any features that would prevent it from being used as a source for a + streaming dataframe, and so the flag `supportsStreaming` is set to True. + + """ + MIN_DEVICE_ID = 1000000 + MAX_DEVICE_ID = 9223372036854775807 + DEFAULT_NUM_DEVICES = 1000 + DEFAULT_START_TIMESTAMP = "2024-01-01 00:00:00" + DEFAULT_END_TIMESTAMP = "2024-02-01 00:00:00" + DEFAULT_MIN_LAT = -90.0 + DEFAULT_MAX_LAT = 90.0 + DEFAULT_MIN_LON = -180.0 + DEFAULT_MAX_LON = 180.0 + COLUMN_COUNT = 6 + ALLOWED_OPTIONS = [ + "numDevices", + "startTimestamp", + "endTimestamp", + "minLat", + "maxLat", + "minLon", + "maxLon", + "generateWkt", + "random" + ] + + @DatasetProvider.allowed_options(options=ALLOWED_OPTIONS) + def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions=-1, + **options): + import dbldatagen as dg + import warnings as w + + generateRandom = options.get("random", False) + numDevices = options.get("numDevices", self.DEFAULT_NUM_DEVICES) + startTimestamp = options.get("startTimestamp", self.DEFAULT_START_TIMESTAMP) + endTimestamp = options.get("endTimestamp", self.DEFAULT_END_TIMESTAMP) + minLat = options.get("minLat", self.DEFAULT_MIN_LAT) + maxLat = options.get("maxLat", self.DEFAULT_MAX_LAT) + minLon = options.get("minLon", self.DEFAULT_MIN_LON) + maxLon = options.get("maxLon", self.DEFAULT_MAX_LON) + generateWkt = options.get("generateWkt", False) + + assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name" + if rows is None or rows < 0: + rows = DatasetProvider.DEFAULT_ROWS + if partitions is None or partitions < 0: + partitions = self.autoComputePartitions(rows, self.COLUMN_COUNT) + if minLat < -90.0: + minLat = -90.0 + w.warn("Received an invalid minLat value; Setting to -90.0") + if minLat > 90.0: + minLat = 89.0 + w.warn("Recieved an invalid minLat value; Setting to 89.0") + if maxLat < -90: + maxLat = -89.0 + w.warn("Recieved an invalid maxLat value; Setting to -89.0") + if maxLat > 90.0: + maxLat = 90.0 + w.warn("Received an invalid maxLat value; Setting to 90.0") + if minLon < -180.0: + minLon = -180.0 + w.warn("Received an invalid minLon value; Setting to -180.0") + if minLon > 180.0: + minLon = 179.0 + w.warn("Received an invalid minLon value; Setting to 179.0") + if maxLon < -180.0: + maxLon = -179.0 + w.warn("Received an invalid maxLon value; Setting to -179.0") + if maxLon > 180.0: + maxLon = 180.0 + w.warn("Received an invalid maxLon value; Setting to 180.0") + if minLon > maxLon: + (minLon, maxLon) = (maxLon, minLon) + w.warn("Received minLon > maxLon; Swapping values") + if minLat > maxLat: + (minLat, maxLat) = (maxLat, minLat) + w.warn("Received minLat > maxLat; Swapping values") + df_spec = ( + dg.DataGenerator(sparkSession=sparkSession, rows=rows, + partitions=partitions, randomSeedMethod="hash_fieldname") + .withColumn("device_id", "long", minValue=self.MIN_DEVICE_ID, maxValue=self.MAX_DEVICE_ID, + uniqueValues=numDevices, random=generateRandom) + .withColumn("ts", "timestamp", begin=startTimestamp, end=endTimestamp, + interval="1 second", random=generateRandom) + .withColumn("base_lat", "float", minValue=minLat, maxValue=maxLat, step=0.5, + baseColumn='device_id', omit=True) + .withColumn("base_lon", "float", minValue=minLon, maxValue=maxLon, step=0.5, + baseColumn='device_id', omit=True) + .withColumn("unv_lat", "float", expr="base_lat + (0.5-format_number(rand(), 3))*1e-3", omit=True) + .withColumn("unv_lon", "float", expr="base_lon + (0.5-format_number(rand(), 3))*1e-3", omit=True) + .withColumn("lat", "float", expr=f"""CASE WHEN unv_lat > {maxLat} THEN {maxLat} + ELSE CASE WHEN unv_lat < {minLat} THEN {minLat} + ELSE unv_lat END END""") + .withColumn("lon", "float", expr=f"""CASE WHEN unv_lon > {maxLon} THEN {maxLon} + ELSE CASE WHEN unv_lon < {minLon} THEN {minLon} + ELSE unv_lon END END""") + .withColumn("heading", "integer", minValue=0, maxValue=359, step=1, random=generateRandom) + .withColumn("wkt", "string", expr="concat('POINT(', lon, ' ', lat, ')')", omit=not generateWkt) + ) + + return df_spec diff --git a/dbldatagen/datasets/benchmark_groupby.py b/dbldatagen/datasets/benchmark_groupby.py new file mode 100644 index 00000000..eec7577c --- /dev/null +++ b/dbldatagen/datasets/benchmark_groupby.py @@ -0,0 +1,92 @@ +from .dataset_provider import DatasetProvider, dataset_definition + + +@dataset_definition(name="benchmark/groupby", + summary="Benchmarking dataset for GROUP BY queries in various database systems", + autoRegister=True, + supportsStreaming=True) +class BenchmarkGroupByProvider(DatasetProvider.NoAssociatedDatasetsMixin, DatasetProvider): + """ + Grouping Benchmark Dataset + ========================== + + This is a benchmarking dataset for evaluating groupBy operations on columns of different type and + cardinality. + + It takes the following options when retrieving the table: + - random: if True, generates random data + - rows : number of rows to generate + - partitions: number of partitions to use + - groups: number of groups within the dataset + - percentNulls: percentage of nulls within the non-base columns + + As the data specification is a DataGenerator object, you can add further columns to the data set and + add constraints (when the feature is available) + + Note that this datset does not use any features that would prevent it from being used as a source for a + streaming dataframe, and so the flag `supportsStreaming` is set to True. + + """ + MAX_LONG = 9223372036854775807 + DEFAULT_NUM_GROUPS = 100 + DEFAULT_PCT_NULLS = 0.0 + COLUMN_COUNT = 12 + ALLOWED_OPTIONS = ["groups", "percentNulls", "rows", "partitions", "tableName", "random"] + + @DatasetProvider.allowed_options(options=ALLOWED_OPTIONS) + def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions=-1, + **options): + import dbldatagen as dg + import warnings as w + + generateRandom = options.get("random", False) + groups = options.get("groups", self.DEFAULT_NUM_GROUPS) + percentNulls = options.get("percentNulls", self.DEFAULT_PCT_NULLS) + + assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name" + if rows is None or rows < 0: + rows = DatasetProvider.DEFAULT_ROWS + if partitions is None or partitions < 0: + partitions = self.autoComputePartitions(rows, self.COLUMN_COUNT) + try: + groups = int(groups) + except Exception as e: + raise ValueError("groups must be a value of type 'int'") from e + if groups <= 0: + groups = 100 + w.warn(f"Received an invalid groups value; Setting to {groups}") + if rows < groups: + groups = 1 + int(rows / 1000) + w.warn(f"Received more groups than rows; Setting the number of groups to {groups}") + if percentNulls > 1.0: + percentNulls = 1.0 + w.warn(f"Received a percentNulls value > 1; Setting to {percentNulls}") + if percentNulls < 1.0: + percentNulls = 0.0 + w.warn(f"Received a percentNulls value < 1; Setting to {percentNulls}") + + assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name" + df_spec = ( + dg.DataGenerator(sparkSession=sparkSession, rows=rows, + partitions=partitions, + randomSeedMethod="hash_fieldname") + .withColumn("base1", "integer", minValue=1, maxValue=groups, + uniqueValues=groups, random=generateRandom, omit=True) + .withColumn("base2", "integer", minValue=1, maxValue=groups, + uniqueValues=groups, random=generateRandom, omit=True) + .withColumn("base3", "integer", minValue=1, maxValue=(1 + int(rows / groups)), + uniqueValues=(1 + int(rows / groups)), random=generateRandom, omit=True) + .withColumn("id1", "string", baseColumn="base1", format="id%03d", percentNulls=percentNulls) + .withColumn("id2", "string", baseColumn="base2", format="id%03d", percentNulls=percentNulls) + .withColumn("id3", "string", baseColumn="base3", format="id%010d", percentNulls=percentNulls) + .withColumn("id4", "integer", minValue=1, maxValue=groups, random=generateRandom, percentNulls=percentNulls) + .withColumn("id5", "integer", minValue=1, maxValue=groups, random=generateRandom, percentNulls=percentNulls) + .withColumn("id6", "integer", minValue=1, maxValue=(1 + int(rows / groups)), random=generateRandom, + percentNulls=percentNulls) + .withColumn("v1", "integer", minValue=1, maxValue=5, random=generateRandom, percentNulls=percentNulls) + .withColumn("v2", "integer", minValue=1, maxValue=15, random=generateRandom, percentNulls=percentNulls) + .withColumn("v3", "decimal(9,6)", minValue=0.0, maxValue=100.0, + step=1e-6, random=generateRandom, percentNulls=percentNulls) + ) + + return df_spec diff --git a/tests/test_standard_dataset_providers.py b/tests/test_standard_dataset_providers.py index 01bcd9e7..bb952b40 100644 --- a/tests/test_standard_dataset_providers.py +++ b/tests/test_standard_dataset_providers.py @@ -6,7 +6,210 @@ class TestStandardDatasetProviders: + + # BASIC GEOMETRIES tests: + @pytest.mark.parametrize("providerName, providerOptions", [ + ("basic/geometries", + {"rows": 50, "partitions": 4, "random": False, "geometryType": "point", "maxVertices": 1}), + ("basic/geometries", + {"rows": 100, "partitions": -1, "random": False, "geometryType": "point", "maxVertices": 2}), + ("basic/geometries", + {"rows": -1, "partitions": 4, "random": True, "geometryType": "point"}), + ("basic/geometries", {}), + ("basic/geometries", + {"rows": 5000, "partitions": -1, "random": True, "geometryType": "lineString"}), + ("basic/geometries", + {"rows": -1, "partitions": -1, "random": False, "geometryType": "lineString", "maxVertices": 2}), + ("basic/geometries", + {"rows": -1, "partitions": 4, "random": True, "geometryType": "lineString", "maxVertices": 1}), + ("basic/geometries", + {"rows": 5000, "partitions": 4, "geometryType": "lineString", "maxVertices": 2}), + ("basic/geometries", + {"rows": 5000, "partitions": -1, "random": False, "geometryType": "polygon"}), + ("basic/geometries", + {"rows": -1, "partitions": -1, "random": True, "geometryType": "polygon", "maxVertices": 3}), + ("basic/geometries", + {"rows": -1, "partitions": 4, "random": True, "geometryType": "polygon", "maxVertices": 2}), + ("basic/geometries", + {"rows": 5000, "partitions": 4, "geometryType": "polygon", "maxVertices": 5}), + ]) + def test_basic_geometries_retrieval(self, providerName, providerOptions): + ds = dg.Datasets(spark, providerName).get(**providerOptions) + assert ds is not None + + df = ds.build() + assert df.count() >= 0 + assert "wkt" in df.columns + + geometryType = providerOptions.get("geometryType", None) + row = df.first().asDict() + if geometryType == "point" or geometryType is None: + assert "POINT" in row["wkt"] + if geometryType == "lineString": + assert "LINESTRING" in row["wkt"] + + if geometryType == "polygon": + assert "POLYGON" in row["wkt"] + + random = providerOptions.get("random", None) + if random: + print("") + leadingRows = df.limit(100).collect() + ids = [r.location_id for r in leadingRows] + assert ids != sorted(ids) + + # BASIC PROCESS HISTORIAN tests: + @pytest.mark.parametrize("providerName, providerOptions", [ + ("basic/process_historian", + {"rows": 50, "partitions": 4, "random": False, "numDevices": 1, "numPlants": 1, + "numTags": 1, "startTimestamp": "2020-01-01 00:00:00", "endTimestamp": "2020-04-01 00:00:00"}), + ("basic/process_historian", + {"rows": 1000, "partitions": -1, "random": True, "numDevices": 10, "numPlants": 2, + "numTags": 2, "startTimestamp": "2020-01-01 00:00:00", "endTimestamp": "2020-04-01 00:00:00"}), + ("basic/process_historian", + {"rows": 5000, "partitions": -1, "random": True, "numDevices": 100, "numPlants": 10, + "numTags": 5, "startTimestamp": "2020-01-01 00:00:00", "endTimestamp": "2020-04-01 00:00:00"}), + ("basic/process_historian", {}), + ("basic/process_historian", + {"rows": 5000, "partitions": -1, "random": True, "numDevices": 100, "numPlants": 10, + "numTags": 5, "startTimestamp": "2020-04-01 00:00:00", "endTimestamp": "2020-01-01 00:00:00"}), + ("basic/process_historian", + {"rows": 100, "partitions": -1, "random": True, "numDevices": 100, "numPlants": 10, + "numTags": 5, "startTimestamp": "2020-01-01 00:00:00", "endTimestamp": "2020-04-01 00:00:00"}), + ("basic/process_historian", + {"rows": 100, "partitions": -1, "random": True, "numDevices": 100, "numPlants": 10, + "numTags": 5, "startTimestamp": "2020-01-01 00:00:00", "endTimestamp": "2020-04-01 00:00:00", + "dataQualityRatios": {"pctQuestionable": 0.1, "pctAnnotated": 0.05, "pctSubstituded": 0.12}}), + ("basic/process_historian", + {"rows": 100, "partitions": -1, "random": True, "numDevices": 100, "numPlants": 10, + "numTags": 5, "startTimestamp": "2020-01-01 00:00:00", "endTimestamp": "2020-04-01 00:00:00", + "dataQualityRatios": {"pctQuestionable": 0.1, "pctSubstituded": 0.12}}), + ("basic/process_historian", + {"rows": 100, "partitions": -1, "random": True, "numDevices": 100, "numPlants": 10, + "numTags": 5, "startTimestamp": "2020-01-01 00:00:00", "endTimestamp": "2020-04-01 00:00:00", + "dataQualityRatios": {"pctAnnotated": 0.05}}), + + ]) + def test_basic_process_historian_retrieval(self, providerName, providerOptions): + ds = dg.Datasets(spark, providerName).get(**providerOptions) + assert ds is not None + + df = ds.build() + assert df.count() >= 0 + + startTimestamp = providerOptions.get("startTimestamp", "2024-01-01 00:00:00") + endTimestamp = providerOptions.get("endTimestamp", "2024-02-01 00:00:00") + if startTimestamp > endTimestamp: + (startTimestamp, endTimestamp) = (endTimestamp, startTimestamp) + assert df.where(f'ts < "{startTimestamp}"').count() == 0 + assert df.where(f'ts > "{endTimestamp}"').count() == 0 + + random = providerOptions.get("random", None) + if random: + print("") + leadingRows = df.limit(100).collect() + ids = [r.device_id for r in leadingRows] + assert ids != sorted(ids) + + # BASIC TELEMATICS tests: + @pytest.mark.parametrize("providerName, providerOptions", [ + ("basic/telematics", + {"rows": 50, "partitions": 4, "random": False, "numDevices": 5000, "startTimestamp": "2020-01-01 00:00:00", + "endTimestamp": "2020-04-01 00:00:00", "minLat": 40.0, "maxLat": 43.0, "minLon": -93.0, "maxLon": -89.0, + "generateWkt": False}), + ("basic/telematics", + {"rows": 1000, "partitions": 4, "random": True, "numDevices": 1000, "startTimestamp": "2020-01-01 00:00:00", + "endTimestamp": "2020-04-01 00:00:00", "minLat": 45.0, "maxLat": 35.0, "minLon": -89.0, "maxLon": -93.0, + "generateWkt": True}), + ("basic/telematics", + {"rows": -1, "partitions": -1, "numDevices": 1000, "minLat": 98.0, "maxLat": 100.0, + "minLon": -181.0, "maxLon": -185.0, "generateWkt": False}), + ("basic/telematics", + {"rows": 5000, "partitions": -1, "startTimestamp": "2020-01-01 00:00:00", + "endTimestamp": "2020-04-01 00:00:00", "generateWkt": True}), + ("basic/telematics", {}), + ("basic/telematics", + {"rows": -1, "partitions": -1, "random": False, "numDevices": 50, "startTimestamp": "2020-06-01 00:00:00", + "endTimestamp": "2020-04-01 00:00:00", "minLat": 40.0, "maxLat": 43.0, "minLon": -93.0, "maxLon": -89.0, + "generateWkt": False}), + ("basic/telematics", + {"rows": -1, "partitions": -1, "random": False, "numDevices": 100, "startTimestamp": "2020-01-01 00:00:00", + "endTimestamp": "2020-04-01 00:00:00", "maxLat": 45.0, "minLon": -93.0, "generateWkt": False}), + ("basic/telematics", + {"rows": -1, "partitions": -1, "random": False, "numDevices": 100, "startTimestamp": "2020-01-01 00:00:00", + "endTimestamp": "2020-04-01 00:00:00", "minLat": 45.0, "generateWkt": False}), + ("basic/telematics", + {"rows": -1, "partitions": -1, "random": False, "numDevices": 100, "startTimestamp": "2020-01-01 00:00:00", + "endTimestamp": "2020-04-01 00:00:00", "minLat": -120.0, "generateWkt": False}), + ("basic/telematics", + {"rows": -1, "partitions": -1, "random": False, "numDevices": 100, "startTimestamp": "2020-01-01 00:00:00", + "endTimestamp": "2020-04-01 00:00:00", "maxLat": -120.0, "generateWkt": False}), + ("basic/telematics", + {"rows": -1, "partitions": -1, "random": False, "numDevices": 100, "startTimestamp": "2020-01-01 00:00:00", + "endTimestamp": "2020-04-01 00:00:00", "minLon": 190.0, "generateWkt": False}), + ("basic/telematics", + {"rows": -1, "partitions": -1, "random": False, "numDevices": 100, "startTimestamp": "2020-01-01 00:00:00", + "endTimestamp": "2020-04-01 00:00:00", "maxLon": 190.0, "generateWkt": False}), + ]) + def test_basic_telematics_retrieval(self, providerName, providerOptions): + ds = dg.Datasets(spark, providerName).get(**providerOptions) + assert ds is not None + + df = ds.build() + assert df.count() >= 0 + + row = df.first().asDict() + assert "lat" in df.columns + assert "lon" in df.columns + assert "heading" in df.columns + assert df.where('heading < 0 and heading > 359').count() == 0 + + minLat = providerOptions.get("minLat", -90.0) + maxLat = providerOptions.get("maxLat", 90.0) + minLat = max(minLat, -90.0) + maxLat = min(maxLat, 90.0) + if minLat > 90.0: + minLat = 89.0 + if maxLat < -90.0: + maxLat = -89.0 + if minLat > maxLat: + (minLat, maxLat) = (maxLat, minLat) + assert df.where(f'lat < {minLat}').count() == 0 + assert df.where(f'lat > {maxLat}').count() == 0 + + minLon = providerOptions.get("minLon", -180.0) + maxLon = providerOptions.get("maxLon", 180.0) + minLon = max(minLon, -180.0) + maxLon = min(maxLon, 180.0) + if minLon > 180.0: + minLon = 179.0 + if maxLon < -180.0: + maxLon = -179.0 + if minLon > maxLon: + (minLon, maxLon) = (maxLon, minLon) + assert df.where(f'lon < {minLon}').count() == 0 + assert df.where(f'lon > {maxLon}').count() == 0 + + startTimestamp = providerOptions.get("startTimestamp", "2024-01-01 00:00:00") + endTimestamp = providerOptions.get("endTimestamp", "2024-02-01 00:00:00") + if startTimestamp > endTimestamp: + (startTimestamp, endTimestamp) = (endTimestamp, startTimestamp) + assert df.where(f'ts < "{startTimestamp}"').count() == 0 + assert df.where(f'ts > "{endTimestamp}"').count() == 0 + + generateWkt = providerOptions.get("generateWkt", False) + if generateWkt: + assert "wkt" in row.keys() + + random = providerOptions.get("random", None) + if random: + print("") + leadingRows = df.limit(100).collect() + ids = [r.device_id for r in leadingRows] + assert ids != sorted(ids) + + # BASIC USER tests: @pytest.mark.parametrize("providerName, providerOptions", [ ("basic/user", {"rows": 50, "partitions": 4, "random": False, "dummyValues": 0}), ("basic/user", {"rows": -1, "partitions": 4, "random": False, "dummyValues": 0}), @@ -24,12 +227,46 @@ def test_basic_user_table_retrieval(self, providerName, providerOptions): assert df.count() >= 0 - if 'random' in providerOptions and providerOptions['random']: + random = providerOptions.get("random", None) + if random: print("") leadingRows = df.limit(100).collect() customer_ids = [r.customer_id for r in leadingRows] assert customer_ids != sorted(customer_ids) + # BENCHMARK GROUPBY tests: + @pytest.mark.parametrize("providerName, providerOptions", [ + ("benchmark/groupby", {"rows": 50, "partitions": 4, "random": False, "groups": 10, "percentNulls": 0.1}), + ("benchmark/groupby", {"rows": -1, "partitions": 4, "random": True, "groups": 100}), + ("benchmark/groupby", {}), + ("benchmark/groupby", {"rows": 1000, "partitions": -1, "random": False}), + ("benchmark/groupby", {"rows": -1, "groups": 1000, "percentNulls": 0.2}), + ("benchmark/groupby", {"rows": 1000, "partitions": -1, "random": True, "groups": 5000, "percentNulls": 0.5}), + ("benchmark/groupby", {"rows": -1, "partitions": -1, "random": True, "groups": 0}), + ("benchmark/groupby", {"rows": 10, "partitions": -1, "random": True, "groups": 100, "percentNulls": 0.1}), + ("benchmark/groupby", {"rows": -1, "partitions": -1, "random": False, "groups": -50}), + ("benchmark/groupby", {"rows": -1, "partitions": -1, "random": False, "groups": -50, "percentNulls": -12.1}), + ("benchmark/groupby", {"rows": -1, "partitions": -1, "random": True, "groups": -50, "percentNulls": 1.1}), + ]) + def test_benchmark_groupby_retrieval(self, providerName, providerOptions): + ds = dg.Datasets(spark, providerName).get(**providerOptions) + assert ds is not None + + df = ds.build() + assert df.count() >= 0 + + percentNulls = providerOptions.get("percentNulls", 0.0) + if percentNulls >= 1.0: + return + + random = providerOptions.get("random", None) + if random: + print("") + leadingRows = df.limit(100).collect() + vals = [r.v3 for r in leadingRows] + assert vals != sorted(vals) + + # MULTI-TABLE TELEPHONY tests: @pytest.mark.parametrize("providerName, providerOptions", [ ("multi_table/telephony", {"rows": 50, "partitions": 4, "random": False}), ("multi_table/telephony", {"rows": -1, "partitions": 4, "random": False}),