Skip to content

Commit

Permalink
Merge pull request #17 from elementary-data/incident-management-data-…
Browse files Browse the repository at this point in the history
…in-demo

Incident management data in demo
  • Loading branch information
ofek1weiss authored Oct 1, 2024
2 parents 070999a + 861652d commit 00e63bc
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from datetime import date, datetime, time, timedelta
import random
from datetime import date, datetime, time, timedelta
from typing import Any, Optional

import numpy
from elementary.clients.dbt.dbt_runner import DbtRunner
from pydantic import BaseModel

from data_creation.data_injection.data_generator.specs.tests.test_spec import TestSpec
from data_creation.data_injection.injectors.models.models_injector import ModelsInjector
from data_creation.data_injection.injectors.tests.test_run_results_injector import (
Expand All @@ -16,8 +18,6 @@
TestTypes,
)

from elementary.clients.dbt.dbt_runner import DbtRunner


class PeriodSchema(BaseModel):
count: int
Expand All @@ -27,6 +27,8 @@ class PeriodSchema(BaseModel):
class AnomalyTestSpec(TestSpec):
no_bucket: bool
metric_values: list[float]
historic_success_metric_values: list[float] = []
historic_failure_metric_values: list[float] = []
timestamp_column: Optional[str] = None
sensitivity: Optional[int] = None
min_values_bound: int = 0
Expand Down Expand Up @@ -107,20 +109,20 @@ def get_metric_timestamps(self, metric_values: list):
f"I don't know how to handle bucket size {self.detection_period.period}"
)

def get_metrics(self):
metric_timestamps = self.get_metric_timestamps(self.metric_values)
def get_metrics(self, metric_values: list[float]):
metric_timestamps = self.get_metric_timestamps(metric_values)
metrics = []
sensitivity = self.sensitivity or 3

for i, (value, (start_time, end_time)) in enumerate(
zip(self.metric_values, metric_timestamps)
zip(metric_values, metric_timestamps)
):
if self.day_of_week_seasonality:
relevant_metrics = list(reversed(self.metric_values[i:0:-7]))
relevant_metrics = list(reversed(metric_values[i:0:-7]))
if i % 7 == 0:
relevant_metrics.insert(0, self.metric_values[0])
relevant_metrics.insert(0, metric_values[0])
else:
relevant_metrics = self.metric_values[: (i + 1)]
relevant_metrics = metric_values[: (i + 1)]
average = numpy.average(relevant_metrics)
stddev = numpy.std(relevant_metrics)

Expand All @@ -130,6 +132,7 @@ def get_metrics(self):
max_value=average + sensitivity * stddev,
start_time=start_time.isoformat() if start_time else None,
end_time=end_time.isoformat(),
metric_name=self.test_sub_type.value,
)
if metric.is_anomalous:
if self.day_of_week_seasonality:
Expand Down Expand Up @@ -163,7 +166,7 @@ def generate(self, dbt_runner: DbtRunner):
injector.inject_test(test)

execution_time = self.max_execution_time
metrics = self.get_metrics()
metrics = self.get_metrics(self.metric_values)
test_result = AnomalyTestResult(
test_timestamp=datetime.utcnow(),
test_status="fail" if metrics[-1].is_anomalous else "pass",
Expand All @@ -180,11 +183,16 @@ def generate(self, dbt_runner: DbtRunner):
status = random.choice(["fail"] + ["pass"] * 3)
else:
status = "pass"
metric_values = (
self.historic_success_metric_values
if status == "pass"
else self.historic_failure_metric_values
)
execution_time = execution_time * ((100 - random.uniform(1, 3)) / 100)
prev_test_result = AnomalyTestResult(
test_timestamp=cur_timestamp,
test_status=status,
test_metrics=[],
test_metrics=self.get_metrics(metric_values),
result_description="",
execution_time=execution_time,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import random
from datetime import datetime, timedelta
from enum import Enum
import random
from typing import Any, Optional

from elementary.clients.dbt.dbt_runner import DbtRunner
from pydantic import validator

from data_creation.data_injection.data_generator.specs.tests.test_spec import TestSpec
from data_creation.data_injection.injectors.models.models_injector import ModelsInjector
from data_creation.data_injection.injectors.tests.test_run_results_injector import (
Expand All @@ -15,8 +17,6 @@
TestTypes,
)

from elementary.clients.dbt.dbt_runner import DbtRunner


class TestStatuses(Enum):
PASS = "pass"
Expand Down Expand Up @@ -78,11 +78,12 @@ def generate(self, dbt_runner: DbtRunner):
status = random.choice(
[TestStatuses.FAIL.value] + [TestStatuses.PASS.value] * 3
)
result_rows = self.result_rows if status == TestStatuses.FAIL.value else []
execution_time = execution_time * ((100 - random.uniform(1, 3)) / 100)
prev_test_result = DbtTestResult(
test_timestamp=cur_timestamp,
test_status=status,
test_result_rows=[],
test_result_rows=result_rows,
result_description="",
execution_time=execution_time,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from datetime import datetime, timedelta
import random
from datetime import datetime, timedelta
from typing import Any

import numpy
from elementary.clients.dbt.dbt_runner import DbtRunner

from data_creation.data_injection.data_generator.specs.tests.anomaly_test_spec import (
AnomalyTestSpec,
)
Expand All @@ -17,8 +19,6 @@
TestSubTypes,
)

from elementary.clients.dbt.dbt_runner import DbtRunner


class DimensionAnomalyTestSpec(AnomalyTestSpec):
test_sub_type: TestSubTypes = TestSubTypes.DIMENSION
Expand Down Expand Up @@ -65,6 +65,7 @@ def get_anmalous_metrics(self):
average - self.sensitivity * stddev, self.min_values_bound
),
max_value=average + self.sensitivity * stddev,
metric_name="row_count",
average=average,
start_time=start_time.isoformat() if start_time else None,
end_time=end_time.isoformat(),
Expand Down Expand Up @@ -128,7 +129,7 @@ def generate(self, dbt_runner: DbtRunner):
prev_test_result = DimensionAnomalyTestResult(
test_timestamp=cur_timestamp,
test_status=status,
test_metrics=[],
test_metrics=[] if status == "pass" else anomalous_metrics,
result_description="",
execution_time=execution_time,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from datetime import datetime, timedelta
import random
from datetime import datetime, timedelta
from typing import Any

from elementary.clients.dbt.dbt_runner import DbtRunner

from data_creation.data_injection.data_generator.specs.base_spec import BaseSpec
from data_creation.data_injection.injectors.models.models_injector import ModelsInjector
from data_creation.data_injection.injectors.tests.test_run_results_injector import (
Expand All @@ -14,8 +16,6 @@
TestTypes,
)

from elementary.clients.dbt.dbt_runner import DbtRunner


class SchemaChangeTestSpec(BaseSpec):
model_name: str
Expand Down Expand Up @@ -50,8 +50,11 @@ def generate(self, dbt_runner: DbtRunner):
)
injector.inject_test(test)

from_type = to_type = None
for result in self.results:
injector.inject_failed_schema_change_test_result(test, result)
from_type = result.from_type
to_type = result.to_type

cur_timestamp = datetime.utcnow()
for i in range(10):
Expand All @@ -63,6 +66,8 @@ def generate(self, dbt_runner: DbtRunner):
test_timestamp=cur_timestamp,
column_name=self.results[0].column_name if self.results else "",
test_sub_type=TestSubTypes.TYPE_CHANGED,
from_type=from_type,
to_type=to_type,
)
injector.inject_failed_schema_change_test_result(test, prev_test_result)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
'depends_on_macros': '[]',
'depends_on_nodes': '[]',
'generated_at': elementary.datetime_now_utc_as_string(),
'original_path': ''
} %}
{% do elementary.insert_rows(relation, [test_data], true) %}
{% do rows_to_insert['dbt_tests'].append(test_data) %}
Expand Down
34 changes: 31 additions & 3 deletions data_creation/data_injection/inject_jaffle_shop_tests.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from datetime import datetime
import os
from datetime import datetime
from pathlib import Path
from typing import Optional
from uuid import uuid4

from elementary.clients.dbt.dbt_runner import DbtRunner

from data_creation.data_injection.data_generator.specs.tests.anomaly_test_spec import (
Expand All @@ -25,7 +26,6 @@
from data_creation.data_injection.data_generator.specs.tests.schema_change_test_spec import (
SchemaChangeTestSpec,
)

from data_creation.data_injection.data_generator.test_data_generator import (
TestDataGenerator,
)
Expand All @@ -37,9 +37,9 @@
get_values_around_middle,
get_values_around_middle_anomalous,
get_values_around_middle_anomalous_weekly_seasonality,
get_values_around_middle_weekly_seasonality,
)


REPO_DIR = Path(os.path.dirname(__file__)).parent.parent.absolute()
INJECTION_DBT_PROJECT_DIR = os.path.join(
REPO_DIR, "data_creation/data_injection/dbt_project"
Expand Down Expand Up @@ -266,6 +266,10 @@ def inject_jaffle_shop_tests(
test_name="column_anomalies",
no_bucket=False,
metric_values=get_values_around_middle_anomalous(20, 5, is_spike=True),
historic_success_metric_values=get_values_around_middle(20, 5),
historic_failure_metric_values=get_values_around_middle_anomalous(
20, 5, is_spike=True
),
timestamp_column=None,
test_column_name="email",
test_sub_type="missing_count",
Expand All @@ -278,6 +282,12 @@ def inject_jaffle_shop_tests(
test_name="column_anomalies",
no_bucket=False,
metric_values=get_values_around_middle(40, 3, num_entries=72),
historic_success_metric_values=get_values_around_middle(
40, 3, num_entries=72
),
historic_failure_metric_values=get_values_around_middle_anomalous(
40, 3, num_entries=72, is_spike=True
),
timestamp_column=None,
test_column_name="order_category",
test_sub_type="null_count",
Expand All @@ -290,6 +300,12 @@ def inject_jaffle_shop_tests(
test_name="column_anomalies",
no_bucket=False,
metric_values=get_values_around_middle(40, 3, num_entries=72),
historic_success_metric_values=get_values_around_middle(
40, 3, num_entries=72
),
historic_failure_metric_values=get_values_around_middle_anomalous(
40, 3, num_entries=72, is_spike=True
),
timestamp_column=None,
test_column_name="campaign_name",
test_sub_type="null_count",
Expand All @@ -302,6 +318,12 @@ def inject_jaffle_shop_tests(
test_name="column_anomalies",
no_bucket=False,
metric_values=get_values_around_middle(40, 3, num_entries=72),
historic_success_metric_values=get_values_around_middle(
40, 3, num_entries=72
),
historic_failure_metric_values=get_values_around_middle_anomalous(
40, 3, num_entries=72, is_spike=True
),
timestamp_column=None,
test_column_name="impressions",
test_sub_type="zero_count",
Expand All @@ -316,6 +338,12 @@ def inject_jaffle_shop_tests(
metric_values=get_values_around_middle_anomalous_weekly_seasonality(
700, 30, 1100, is_spike=True, num_entries=72
),
historic_success_metric_values=get_values_around_middle_weekly_seasonality(
700, 30, 1100, num_entries=72
),
historic_failure_metric_values=get_values_around_middle_anomalous_weekly_seasonality(
700, 30, 1100, is_spike=True, num_entries=72
),
timestamp_column=None,
test_column_name="revenue",
test_sub_type="zero_count",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from datetime import datetime
import random
from datetime import datetime
from typing import Optional

from elementary.clients.dbt.dbt_runner import DbtRunner
from pydantic import BaseModel, root_validator

from data_creation.data_injection.injectors.tests.tests_injector import (
TestSchema,
TestsInjector,
TestSubTypes,
TestTypes,
TestsInjector,
)


Expand All @@ -28,6 +28,7 @@ class AnomalyTestMetric(BaseModel):
value: float
min_value: float
max_value: float
metric_name: str
start_time: Optional[str] = None
end_time: str
is_anomalous: Optional[bool] = None
Expand Down
11 changes: 11 additions & 0 deletions data_creation/data_injection/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ def get_values_around_middle_anomalous(middle, space, is_spike=False, num_entrie
return values


def get_values_around_middle_weekly_seasonality(
middle, space, weekly_middle, num_entries=14 * 7 + 3
):
return [
random.randint(weekly_middle - space, weekly_middle + space)
if i % 7 <= 1
else random.randint(middle - space, middle + space)
for i in range(num_entries)
]


def get_values_around_middle_anomalous_weekly_seasonality(
middle, space, weekly_middle, is_spike=False, num_entries=14 * 7 + 3
):
Expand Down
2 changes: 1 addition & 1 deletion data_creation/initial_demo.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import argparse
import logging
from typing import Optional

from data_creation.data_injection.inject_jaffle_shop_exposures import (
inject_jaffle_shop_exposures,
)

from data_creation.data_injection.inject_jaffle_shop_tests import (
inject_jaffle_shop_tests,
)
Expand Down

0 comments on commit 00e63bc

Please sign in to comment.