diff --git a/posthog/temporal/common/posthog_client.py b/posthog/temporal/common/posthog_client.py deleted file mode 100644 index 2680c3c4b9021..0000000000000 --- a/posthog/temporal/common/posthog_client.py +++ /dev/null @@ -1,54 +0,0 @@ -import asyncio -from typing import Any, Optional -from posthoganalytics.client import Client -from temporalio.worker import ( - ActivityInboundInterceptor, - ExecuteActivityInput, - ExecuteWorkflowInput, - Interceptor, - WorkflowInboundInterceptor, - WorkflowInterceptorClassInput, -) - - -class _PostHogClientActivityInboundInterceptor(ActivityInboundInterceptor): - async def execute_activity(self, input: ExecuteActivityInput) -> Any: - ph_client = Client(api_key="sTMFPsFhdP1Ssg", enable_exception_autocapture=True) - - try: - activity_result = await super().execute_activity(input) - except: - raise - finally: - await asyncio.to_thread(ph_client.flush) - - return activity_result - - -class _PostHogClientWorkflowInterceptor(WorkflowInboundInterceptor): - async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: - ph_client = Client(api_key="sTMFPsFhdP1Ssg", enable_exception_autocapture=True) - - try: - workflow_result = await super().execute_workflow(input) - except: - raise - finally: - await asyncio.to_thread(ph_client.flush) - - return workflow_result - - -class PostHogClientInterceptor(Interceptor): - """PostHog Interceptor class which will report workflow & activity exceptions to PostHog""" - - def intercept_activity(self, next: ActivityInboundInterceptor) -> ActivityInboundInterceptor: - """Implementation of - :py:meth:`temporalio.worker.Interceptor.intercept_activity`. - """ - return _PostHogClientActivityInboundInterceptor(super().intercept_activity(next)) - - def workflow_interceptor_class( - self, input: WorkflowInterceptorClassInput - ) -> Optional[type[WorkflowInboundInterceptor]]: - return _PostHogClientWorkflowInterceptor diff --git a/posthog/temporal/common/worker.py b/posthog/temporal/common/worker.py index 398fdfa09dc2e..f5db3d6b0417d 100644 --- a/posthog/temporal/common/worker.py +++ b/posthog/temporal/common/worker.py @@ -8,7 +8,6 @@ from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE_V2 from posthog.temporal.common.client import connect -from posthog.temporal.common.posthog_client import PostHogClientInterceptor from posthog.temporal.common.sentry import SentryInterceptor @@ -36,7 +35,6 @@ async def start_worker( client_key, runtime=runtime, ) - if task_queue == DATA_WAREHOUSE_TASK_QUEUE_V2: worker = Worker( client, @@ -45,7 +43,7 @@ async def start_worker( activities=activities, workflow_runner=UnsandboxedWorkflowRunner(), graceful_shutdown_timeout=timedelta(minutes=5), - interceptors=[SentryInterceptor(), PostHogClientInterceptor()], + interceptors=[SentryInterceptor()], activity_executor=ThreadPoolExecutor(max_workers=max_concurrent_activities or 50), # Only run one workflow at a time max_concurrent_activities=1, @@ -61,7 +59,7 @@ async def start_worker( activities=activities, workflow_runner=UnsandboxedWorkflowRunner(), graceful_shutdown_timeout=timedelta(minutes=5), - interceptors=[SentryInterceptor(), PostHogClientInterceptor()], + interceptors=[SentryInterceptor()], activity_executor=ThreadPoolExecutor(max_workers=max_concurrent_activities or 50), max_concurrent_activities=max_concurrent_activities or 50, max_concurrent_workflow_tasks=max_concurrent_workflow_tasks,