Skip to content

Commit

Permalink
fix(data-warehouse): Handle missing sources or schemas (#27243)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 authored Jan 3, 2025
1 parent 3ac3697 commit 7741829
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# TODO: remove dependency

from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE_V2
from posthog.warehouse.data_load.service import delete_external_data_schedule
from posthog.warehouse.models import ExternalDataJob, ExternalDataSource
from posthog.warehouse.models.external_data_schema import (
ExternalDataSchema,
Expand Down Expand Up @@ -38,6 +39,13 @@ def create_external_data_job_model_activity(
close_old_connections()

try:
source_exists = ExternalDataSource.objects.filter(id=inputs.source_id).exclude(deleted=True).exists()
schema_exists = ExternalDataSchema.objects.filter(id=inputs.schema_id).exclude(deleted=True).exists()

if not source_exists or not schema_exists:
delete_external_data_schedule(str(inputs.schema_id))
raise Exception("Source or schema no longer exists - deleted temporal schedule")

job = ExternalDataJob.objects.create(
team_id=inputs.team_id,
pipeline_id=inputs.source_id,
Expand Down
31 changes: 30 additions & 1 deletion posthog/temporal/tests/data_imports/test_end_to_end.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from concurrent.futures import ThreadPoolExecutor
import functools
import uuid
from typing import Any, Optional
from typing import Any, Optional, cast
from unittest import mock

import aioboto3
Expand Down Expand Up @@ -1145,3 +1145,32 @@ async def test_decimal_down_scales(team, postgres_config, postgres_connection):
await postgres_connection.commit()

await _execute_run(str(uuid.uuid4()), inputs, [])


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_missing_source(team, stripe_balance_transaction):
inputs = ExternalDataWorkflowInputs(
team_id=team.id,
external_data_source_id=uuid.uuid4(),
external_data_schema_id=uuid.uuid4(),
)

with (
pytest.raises(Exception) as e,
mock.patch(
"posthog.temporal.data_imports.workflow_activities.create_job_model.delete_external_data_schedule"
) as mock_delete_external_data_schedule,
):
await _execute_run(str(uuid.uuid4()), inputs, [])

exc = cast(Any, e)

assert exc.value is not None
assert exc.value.cause is not None
assert exc.value.cause.cause is not None
assert exc.value.cause.cause.message is not None

assert exc.value.cause.cause.message == "Source or schema no longer exists - deleted temporal schedule"

mock_delete_external_data_schedule.assert_called()

0 comments on commit 7741829

Please sign in to comment.