diff --git a/posthog/temporal/data_imports/workflow_activities/create_job_model.py b/posthog/temporal/data_imports/workflow_activities/create_job_model.py index b404c610c1cad..78e85dfdd182f 100644 --- a/posthog/temporal/data_imports/workflow_activities/create_job_model.py +++ b/posthog/temporal/data_imports/workflow_activities/create_job_model.py @@ -8,6 +8,7 @@ # TODO: remove dependency from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE_V2 +from posthog.warehouse.data_load.service import delete_external_data_schedule from posthog.warehouse.models import ExternalDataJob, ExternalDataSource from posthog.warehouse.models.external_data_schema import ( ExternalDataSchema, @@ -38,6 +39,13 @@ def create_external_data_job_model_activity( close_old_connections() try: + source_exists = ExternalDataSource.objects.filter(id=inputs.source_id).exclude(deleted=True).exists() + schema_exists = ExternalDataSchema.objects.filter(id=inputs.schema_id).exclude(deleted=True).exists() + + if not source_exists or not schema_exists: + delete_external_data_schedule(str(inputs.schema_id)) + raise Exception("Source or schema no longer exists - deleted temporal schedule") + job = ExternalDataJob.objects.create( team_id=inputs.team_id, pipeline_id=inputs.source_id, diff --git a/posthog/temporal/tests/data_imports/test_end_to_end.py b/posthog/temporal/tests/data_imports/test_end_to_end.py index 7a953f8d679a2..5c20304085832 100644 --- a/posthog/temporal/tests/data_imports/test_end_to_end.py +++ b/posthog/temporal/tests/data_imports/test_end_to_end.py @@ -1,7 +1,7 @@ from concurrent.futures import ThreadPoolExecutor import functools import uuid -from typing import Any, Optional +from typing import Any, Optional, cast from unittest import mock import aioboto3 @@ -1145,3 +1145,32 @@ async def test_decimal_down_scales(team, postgres_config, postgres_connection): await postgres_connection.commit() await _execute_run(str(uuid.uuid4()), inputs, []) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_missing_source(team, stripe_balance_transaction): + inputs = ExternalDataWorkflowInputs( + team_id=team.id, + external_data_source_id=uuid.uuid4(), + external_data_schema_id=uuid.uuid4(), + ) + + with ( + pytest.raises(Exception) as e, + mock.patch( + "posthog.temporal.data_imports.workflow_activities.create_job_model.delete_external_data_schedule" + ) as mock_delete_external_data_schedule, + ): + await _execute_run(str(uuid.uuid4()), inputs, []) + + exc = cast(Any, e) + + assert exc.value is not None + assert exc.value.cause is not None + assert exc.value.cause.cause is not None + assert exc.value.cause.cause.message is not None + + assert exc.value.cause.cause.message == "Source or schema no longer exists - deleted temporal schedule" + + mock_delete_external_data_schedule.assert_called()