+
'/cohorts',
experiment: (id: string | number): string => `/experiments/${id}`,
experiments: (): string => '/experiments',
+ experimentsSavedMetrics: (): string => '/experiments/saved-metrics',
+ experimentsSavedMetric: (id: string | number): string => `/experiments/saved-metrics/${id}`,
featureFlags: (tab?: string): string => `/feature_flags${tab ? `?tab=${tab}` : ''}`,
featureFlag: (id: string | number): string => `/feature_flags/${id}`,
featureManagement: (id?: string | number): string => `/features${id ? `/${id}` : ''}`,
@@ -213,6 +215,7 @@ export const urls = {
// Self-hosted only
instanceStatus: (): string => '/instance/status',
instanceStaffUsers: (): string => '/instance/staff_users',
+ instanceKafkaInspector: (): string => '/instance/kafka_inspector',
instanceSettings: (): string => '/instance/settings',
instanceMetrics: (): string => `/instance/metrics`,
asyncMigrations: (): string => '/instance/async_migrations',
diff --git a/frontend/src/types.ts b/frontend/src/types.ts
index 4b218bd90f0f9..be7dea47a8302 100644
--- a/frontend/src/types.ts
+++ b/frontend/src/types.ts
@@ -702,6 +702,7 @@ export enum ExperimentsTabs {
Yours = 'yours',
Archived = 'archived',
Holdouts = 'holdouts',
+ SavedMetrics = 'saved-metrics',
}
export enum ActivityTab {
@@ -3310,6 +3311,8 @@ export interface Experiment {
filters: TrendsFilterType | FunnelsFilterType
metrics: (ExperimentTrendsQuery | ExperimentFunnelsQuery)[]
metrics_secondary: (ExperimentTrendsQuery | ExperimentFunnelsQuery)[]
+ saved_metrics_ids: { id: number; metadata: { type: 'primary' | 'secondary' } }[]
+ saved_metrics: any[]
parameters: {
minimum_detectable_effect?: number
recommended_running_time?: number
@@ -4669,6 +4672,7 @@ export interface HogFunctionMappingTemplateType extends HogFunctionMappingType {
export type HogFunctionTypeType =
| 'destination'
+ | 'internal_destination'
| 'site_destination'
| 'site_app'
| 'transformation'
@@ -4701,7 +4705,7 @@ export type HogFunctionType = {
}
export type HogFunctionTemplateStatus = 'alpha' | 'beta' | 'stable' | 'free' | 'deprecated' | 'client-side'
-export type HogFunctionSubTemplateIdType = 'early_access_feature_enrollment' | 'survey_response'
+export type HogFunctionSubTemplateIdType = 'early-access-feature-enrollment' | 'survey-response' | 'activity-log'
export type HogFunctionConfigurationType = Omit<
HogFunctionType,
diff --git a/plugin-server/package.json b/plugin-server/package.json
index 9014d19be548b..5f2a9dfdac165 100644
--- a/plugin-server/package.json
+++ b/plugin-server/package.json
@@ -93,7 +93,8 @@
"tail": "^2.2.6",
"uuid": "^9.0.1",
"v8-profiler-next": "^1.9.0",
- "vm2": "3.9.18"
+ "vm2": "3.9.18",
+ "zod": "^3.24.1"
},
"devDependencies": {
"0x": "^5.5.0",
diff --git a/plugin-server/pnpm-lock.yaml b/plugin-server/pnpm-lock.yaml
index f187191553102..e23910979edfc 100644
--- a/plugin-server/pnpm-lock.yaml
+++ b/plugin-server/pnpm-lock.yaml
@@ -166,6 +166,9 @@ dependencies:
vm2:
specifier: 3.9.18
version: 3.9.18
+ zod:
+ specifier: ^3.24.1
+ version: 3.24.1
devDependencies:
0x:
@@ -10915,6 +10918,10 @@ packages:
resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==}
engines: {node: '>=10'}
+ /zod@3.24.1:
+ resolution: {integrity: sha512-muH7gBL9sI1nciMZV67X5fTKKBLtwpZ5VBp1vsOQzj1MhrBZ4wlVCm3gedKZWLp0Oyel8sIGfeiz54Su+OVT+A==}
+ dev: false
+
file:../rust/cyclotron-node:
resolution: {directory: ../rust/cyclotron-node, type: directory}
name: '@posthog/cyclotron'
diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts
index 6a9d30af15ff4..9cefda83bb90d 100644
--- a/plugin-server/src/capabilities.ts
+++ b/plugin-server/src/capabilities.ts
@@ -24,6 +24,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
appManagementSingleton: true,
preflightSchedules: true,
cdpProcessedEvents: true,
+ cdpInternalEvents: true,
cdpFunctionCallbacks: true,
cdpCyclotronWorker: true,
syncInlinePlugins: true,
@@ -98,6 +99,11 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
cdpProcessedEvents: true,
...sharedCapabilities,
}
+ case PluginServerMode.cdp_internal_events:
+ return {
+ cdpInternalEvents: true,
+ ...sharedCapabilities,
+ }
case PluginServerMode.cdp_function_callbacks:
return {
cdpFunctionCallbacks: true,
diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts
index f738b559a0523..a219cd6242864 100644
--- a/plugin-server/src/cdp/cdp-consumers.ts
+++ b/plugin-server/src/cdp/cdp-consumers.ts
@@ -7,6 +7,7 @@ import { buildIntegerMatcher } from '../config/config'
import {
KAFKA_APP_METRICS_2,
KAFKA_CDP_FUNCTION_CALLBACKS,
+ KAFKA_CDP_INTERNAL_EVENTS,
KAFKA_EVENTS_JSON,
KAFKA_EVENTS_PLUGIN_INGESTION,
KAFKA_LOG_ENTRIES,
@@ -37,6 +38,7 @@ import { HogFunctionManager } from './hog-function-manager'
import { HogMasker } from './hog-masker'
import { HogWatcher, HogWatcherState } from './hog-watcher'
import { CdpRedis, createCdpRedisPool } from './redis'
+import { CdpInternalEventSchema } from './schema'
import {
HogFunctionInvocation,
HogFunctionInvocationGlobals,
@@ -46,9 +48,11 @@ import {
HogFunctionLogEntrySerialized,
HogFunctionMessageToProduce,
HogFunctionType,
+ HogFunctionTypeType,
HogHooksFetchResponse,
} from './types'
import {
+ convertInternalEventToHogFunctionInvocationGlobals,
convertToCaptureEvent,
convertToHogFunctionInvocationGlobals,
createInvocation,
@@ -81,6 +85,12 @@ const counterFunctionInvocation = new Counter({
labelNames: ['outcome'], // One of 'failed', 'succeeded', 'overflowed', 'disabled', 'filtered'
})
+const counterParseError = new Counter({
+ name: 'cdp_function_parse_error',
+ help: 'A function invocation was parsed with an error',
+ labelNames: ['error'],
+})
+
const gaugeBatchUtilization = new Gauge({
name: 'cdp_cyclotron_batch_utilization',
help: 'Indicates how big batches are we are processing compared to the max batch size. Useful as a scaling metric',
@@ -110,6 +120,7 @@ abstract class CdpConsumerBase {
messagesToProduce: HogFunctionMessageToProduce[] = []
redis: CdpRedis
+ protected hogTypes: HogFunctionTypeType[] = []
protected kafkaProducer?: KafkaProducerWrapper
protected abstract name: string
@@ -363,7 +374,7 @@ abstract class CdpConsumerBase {
public async start(): Promise {
// NOTE: This is only for starting shared services
await Promise.all([
- this.hogFunctionManager.start(),
+ this.hogFunctionManager.start(this.hogTypes),
createKafkaProducerWrapper(this.hub).then((producer) => {
this.kafkaProducer = producer
this.kafkaProducer.producer.connect()
@@ -397,6 +408,10 @@ abstract class CdpConsumerBase {
*/
export class CdpProcessedEventsConsumer extends CdpConsumerBase {
protected name = 'CdpProcessedEventsConsumer'
+ protected topic = KAFKA_EVENTS_JSON
+ protected groupId = 'cdp-processed-events-consumer'
+ protected hogTypes: HogFunctionTypeType[] = ['destination']
+
private cyclotronMatcher: ValueMatcher
private cyclotronManager?: CyclotronManager
@@ -559,8 +574,8 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
}
// This consumer always parses from kafka
- public async _handleKafkaBatch(messages: Message[]): Promise {
- const invocationGlobals = await this.runWithHeartbeat(() =>
+ public async _parseKafkaBatch(messages: Message[]): Promise {
+ return await this.runWithHeartbeat(() =>
runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`,
func: async () => {
@@ -596,16 +611,17 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
},
})
)
-
- await this.processBatch(invocationGlobals)
}
public async start(): Promise {
await super.start()
await this.startKafkaConsumer({
- topic: KAFKA_EVENTS_JSON,
- groupId: 'cdp-processed-events-consumer',
- handleBatch: (messages) => this._handleKafkaBatch(messages),
+ topic: this.topic,
+ groupId: this.groupId,
+ handleBatch: async (messages) => {
+ const invocationGlobals = await this._parseKafkaBatch(messages)
+ await this.processBatch(invocationGlobals)
+ },
})
const shardDepthLimit = this.hub.CYCLOTRON_SHARD_DEPTH_LIMIT ?? 1000000
@@ -618,11 +634,66 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
}
}
+/**
+ * This consumer handles incoming events from the main clickhouse topic
+ * Currently it produces to both kafka and Cyclotron based on the team
+ */
+export class CdpInternalEventsConsumer extends CdpProcessedEventsConsumer {
+ protected name = 'CdpInternalEventsConsumer'
+ protected topic = KAFKA_CDP_INTERNAL_EVENTS
+ protected groupId = 'cdp-internal-events-consumer'
+ protected hogTypes: HogFunctionTypeType[] = ['internal_destination']
+
+ // This consumer always parses from kafka
+ public async _parseKafkaBatch(messages: Message[]): Promise {
+ return await this.runWithHeartbeat(() =>
+ runInstrumentedFunction({
+ statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`,
+ func: async () => {
+ const events: HogFunctionInvocationGlobals[] = []
+ await Promise.all(
+ messages.map(async (message) => {
+ try {
+ const kafkaEvent = JSON.parse(message.value!.toString()) as unknown
+ // This is the input stream from elsewhere so we want to do some proper validation
+ const event = CdpInternalEventSchema.parse(kafkaEvent)
+
+ if (!this.hogFunctionManager.teamHasHogDestinations(event.team_id)) {
+ // No need to continue if the team doesn't have any functions
+ return
+ }
+
+ const team = await this.hub.teamManager.fetchTeam(event.team_id)
+ if (!team) {
+ return
+ }
+ events.push(
+ convertInternalEventToHogFunctionInvocationGlobals(
+ event,
+ team,
+ this.hub.SITE_URL ?? 'http://localhost:8000'
+ )
+ )
+ } catch (e) {
+ status.error('Error parsing message', e)
+ counterParseError.labels({ error: e.message }).inc()
+ }
+ })
+ )
+
+ return events
+ },
+ })
+ )
+ }
+}
+
/**
* This consumer only deals with kafka messages and will eventually be replaced by the Cyclotron worker
*/
export class CdpFunctionCallbackConsumer extends CdpConsumerBase {
protected name = 'CdpFunctionCallbackConsumer'
+ protected hogTypes: HogFunctionTypeType[] = ['destination', 'internal_destination']
public async processBatch(invocations: HogFunctionInvocation[]): Promise {
if (!invocations.length) {
@@ -658,8 +729,8 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase {
await this.produceQueuedMessages()
}
- public async _handleKafkaBatch(messages: Message[]): Promise {
- const events = await this.runWithHeartbeat(() =>
+ public async _parseKafkaBatch(messages: Message[]): Promise {
+ return await this.runWithHeartbeat(() =>
runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`,
func: async () => {
@@ -727,8 +798,6 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase {
},
})
)
-
- await this.processBatch(events)
}
public async start(): Promise {
@@ -736,7 +805,10 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase {
await this.startKafkaConsumer({
topic: KAFKA_CDP_FUNCTION_CALLBACKS,
groupId: 'cdp-function-callback-consumer',
- handleBatch: (messages) => this._handleKafkaBatch(messages),
+ handleBatch: async (messages) => {
+ const invocations = await this._parseKafkaBatch(messages)
+ await this.processBatch(invocations)
+ },
})
}
}
@@ -749,6 +821,7 @@ export class CdpCyclotronWorker extends CdpConsumerBase {
private cyclotronWorker?: CyclotronWorker
private runningWorker: Promise | undefined
protected queue: 'hog' | 'fetch' = 'hog'
+ protected hogTypes: HogFunctionTypeType[] = ['destination', 'internal_destination']
public async processBatch(invocations: HogFunctionInvocation[]): Promise {
if (!invocations.length) {
diff --git a/plugin-server/src/cdp/hog-executor.ts b/plugin-server/src/cdp/hog-executor.ts
index 1b843ca04c513..e45536dc947da 100644
--- a/plugin-server/src/cdp/hog-executor.ts
+++ b/plugin-server/src/cdp/hog-executor.ts
@@ -126,7 +126,7 @@ export class HogExecutor {
nonMatchingFunctions: HogFunctionType[]
erroredFunctions: [HogFunctionType, string][]
} {
- const allFunctionsForTeam = this.hogFunctionManager.getTeamHogDestinations(globals.project.id)
+ const allFunctionsForTeam = this.hogFunctionManager.getTeamHogFunctions(globals.project.id)
const filtersGlobals = convertToHogFunctionFilterGlobal(globals)
const nonMatchingFunctions: HogFunctionType[] = []
@@ -333,39 +333,39 @@ export class HogExecutor {
// We need to pass these in but they don't actually do anything as it is a sync exec
fetch: async () => Promise.resolve(),
},
- importBytecode: (module) => {
- // TODO: more than one hardcoded module
- if (module === 'provider/email') {
- const provider = this.hogFunctionManager.getTeamHogEmailProvider(invocation.teamId)
- if (!provider) {
- throw new Error('No email provider configured')
- }
- try {
- const providerGlobals = this.buildHogFunctionGlobals({
- id: '',
- teamId: invocation.teamId,
- hogFunction: provider,
- globals: {} as any,
- queue: 'hog',
- timings: [],
- priority: 0,
- } satisfies HogFunctionInvocation)
-
- return {
- bytecode: provider.bytecode,
- globals: providerGlobals,
- }
- } catch (e) {
- result.logs.push({
- level: 'error',
- timestamp: DateTime.now(),
- message: `Error building inputs: ${e}`,
- })
- throw e
- }
- }
- throw new Error(`Can't import unknown module: ${module}`)
- },
+ // importBytecode: (module) => {
+ // // TODO: more than one hardcoded module
+ // if (module === 'provider/email') {
+ // const provider = this.hogFunctionManager.getTeamHogEmailProvider(invocation.teamId)
+ // if (!provider) {
+ // throw new Error('No email provider configured')
+ // }
+ // try {
+ // const providerGlobals = this.buildHogFunctionGlobals({
+ // id: '',
+ // teamId: invocation.teamId,
+ // hogFunction: provider,
+ // globals: {} as any,
+ // queue: 'hog',
+ // timings: [],
+ // priority: 0,
+ // } satisfies HogFunctionInvocation)
+
+ // return {
+ // bytecode: provider.bytecode,
+ // globals: providerGlobals,
+ // }
+ // } catch (e) {
+ // result.logs.push({
+ // level: 'error',
+ // timestamp: DateTime.now(),
+ // message: `Error building inputs: ${e}`,
+ // })
+ // throw e
+ // }
+ // }
+ // throw new Error(`Can't import unknown module: ${module}`)
+ // },
functions: {
print: (...args) => {
hogLogs++
diff --git a/plugin-server/src/cdp/hog-function-manager.ts b/plugin-server/src/cdp/hog-function-manager.ts
index c53ff71952ec2..aea3ffb9b10e5 100644
--- a/plugin-server/src/cdp/hog-function-manager.ts
+++ b/plugin-server/src/cdp/hog-function-manager.ts
@@ -5,7 +5,7 @@ import { Hub, Team } from '../types'
import { PostgresUse } from '../utils/db/postgres'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
-import { HogFunctionType, IntegrationType } from './types'
+import { HogFunctionType, HogFunctionTypeType, IntegrationType } from './types'
type HogFunctionCache = {
functions: Record
@@ -26,14 +26,13 @@ const HOG_FUNCTION_FIELDS = [
'type',
]
-const RELOAD_HOG_FUNCTION_TYPES = ['destination', 'email']
-
export class HogFunctionManager {
private started: boolean
private ready: boolean
private cache: HogFunctionCache
private pubSub: PubSub
private refreshJob?: schedule.Job
+ private hogTypes: HogFunctionTypeType[] = []
constructor(private hub: Hub) {
this.started = false
@@ -60,7 +59,8 @@ export class HogFunctionManager {
})
}
- public async start(): Promise {
+ public async start(hogTypes: HogFunctionTypeType[]): Promise {
+ this.hogTypes = hogTypes
// TRICKY - when running with individual capabilities, this won't run twice but locally or as a complete service it will...
if (this.started) {
return
@@ -96,14 +96,6 @@ export class HogFunctionManager {
.filter((x) => !!x) as HogFunctionType[]
}
- public getTeamHogDestinations(teamId: Team['id']): HogFunctionType[] {
- return this.getTeamHogFunctions(teamId).filter((x) => x.type === 'destination' || !x.type)
- }
-
- public getTeamHogEmailProvider(teamId: Team['id']): HogFunctionType | undefined {
- return this.getTeamHogFunctions(teamId).find((x) => x.type === 'email')
- }
-
public getHogFunction(id: HogFunctionType['id']): HogFunctionType | undefined {
if (!this.ready) {
throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this')
@@ -124,7 +116,7 @@ export class HogFunctionManager {
}
public teamHasHogDestinations(teamId: Team['id']): boolean {
- return !!Object.keys(this.getTeamHogDestinations(teamId)).length
+ return !!Object.keys(this.getTeamHogFunctions(teamId)).length
}
public async reloadAllHogFunctions(): Promise {
@@ -134,9 +126,9 @@ export class HogFunctionManager {
`
SELECT ${HOG_FUNCTION_FIELDS.join(', ')}
FROM posthog_hogfunction
- WHERE deleted = FALSE AND enabled = TRUE AND (type is NULL or type = ANY($1))
+ WHERE deleted = FALSE AND enabled = TRUE AND type = ANY($1)
`,
- [RELOAD_HOG_FUNCTION_TYPES],
+ [this.hogTypes],
'fetchAllHogFunctions'
)
).rows
@@ -167,8 +159,8 @@ export class HogFunctionManager {
PostgresUse.COMMON_READ,
`SELECT ${HOG_FUNCTION_FIELDS.join(', ')}
FROM posthog_hogfunction
- WHERE id = ANY($1) AND deleted = FALSE AND enabled = TRUE`,
- [ids],
+ WHERE id = ANY($1) AND deleted = FALSE AND enabled = TRUE AND type = ANY($2)`,
+ [ids, this.hogTypes],
'fetchEnabledHogFunctions'
)
).rows
@@ -218,6 +210,11 @@ export class HogFunctionManager {
items.forEach((item) => {
const encryptedInputsString = item.encrypted_inputs as string | undefined
+ if (!Array.isArray(item.inputs_schema)) {
+ // NOTE: The sql lib can sometimes return an empty object instead of an empty array
+ item.inputs_schema = []
+ }
+
if (encryptedInputsString) {
try {
const decrypted = this.hub.encryptedFields.decrypt(encryptedInputsString || '')
diff --git a/plugin-server/src/cdp/schema.ts b/plugin-server/src/cdp/schema.ts
new file mode 100644
index 0000000000000..35dbf01e5e3f3
--- /dev/null
+++ b/plugin-server/src/cdp/schema.ts
@@ -0,0 +1,27 @@
+import { z } from 'zod'
+
+export const CdpInternalEventSchema = z.object({
+ team_id: z.number(),
+ event: z.object({
+ uuid: z.string(),
+ event: z.string(),
+ // In this context distinct_id should be whatever we want to use if doing follow up things (like tracking a standard event)
+ distinct_id: z.string(),
+ properties: z.record(z.any()),
+ timestamp: z.string(),
+ url: z.string().optional().nullable(),
+ }),
+ // Person may be a event-style person or an org member
+ person: z
+ .object({
+ id: z.string(),
+ properties: z.record(z.any()),
+ name: z.string().optional().nullable(),
+ url: z.string().optional().nullable(),
+ })
+ .optional()
+ .nullable(),
+})
+
+// Infer the TypeScript type
+export type CdpInternalEvent = z.infer
diff --git a/plugin-server/src/cdp/types.ts b/plugin-server/src/cdp/types.ts
index dfe0464a1f9ec..9f7b8bba7433f 100644
--- a/plugin-server/src/cdp/types.ts
+++ b/plugin-server/src/cdp/types.ts
@@ -275,7 +275,16 @@ export type HogFunctionInputSchemaType = {
requiredScopes?: string
}
-export type HogFunctionTypeType = 'destination' | 'email' | 'sms' | 'push' | 'activity' | 'alert' | 'broadcast'
+export type HogFunctionTypeType =
+ | 'destination'
+ | 'transformation'
+ | 'internal_destination'
+ | 'email'
+ | 'sms'
+ | 'push'
+ | 'activity'
+ | 'alert'
+ | 'broadcast'
export type HogFunctionType = {
id: string
diff --git a/plugin-server/src/cdp/utils.ts b/plugin-server/src/cdp/utils.ts
index f4c09b602a514..73909ec9e5a7b 100644
--- a/plugin-server/src/cdp/utils.ts
+++ b/plugin-server/src/cdp/utils.ts
@@ -11,6 +11,7 @@ import { RawClickHouseEvent, Team, TimestampFormat } from '../types'
import { safeClickhouseString } from '../utils/db/utils'
import { status } from '../utils/status'
import { castTimestampOrNow, clickHouseTimestampToISO, UUIDT } from '../utils/utils'
+import { CdpInternalEvent } from './schema'
import {
HogFunctionCapturedEvent,
HogFunctionFilterGlobals,
@@ -90,6 +91,47 @@ export function convertToHogFunctionInvocationGlobals(
return context
}
+export function convertInternalEventToHogFunctionInvocationGlobals(
+ data: CdpInternalEvent,
+ team: Team,
+ siteUrl: string
+): HogFunctionInvocationGlobals {
+ const projectUrl = `${siteUrl}/project/${team.id}`
+
+ let person: HogFunctionInvocationGlobals['person']
+
+ if (data.person) {
+ const personDisplayName = getPersonDisplayName(team, data.event.distinct_id, data.person.properties)
+
+ person = {
+ id: data.person.id,
+ properties: data.person.properties,
+ name: personDisplayName,
+ url: data.person.url ?? '',
+ }
+ }
+
+ const context: HogFunctionInvocationGlobals = {
+ project: {
+ id: team.id,
+ name: team.name,
+ url: projectUrl,
+ },
+ event: {
+ uuid: data.event.uuid,
+ event: data.event.event,
+ elements_chain: '', // Not applicable but left here for compatibility
+ distinct_id: data.event.distinct_id,
+ properties: data.event.properties,
+ timestamp: data.event.timestamp,
+ url: data.event.url ?? '',
+ },
+ person,
+ }
+
+ return context
+}
+
function getElementsChainHref(elementsChain: string): string {
// Adapted from SQL: extract(elements_chain, '(?::|\")href="(.*?)"'),
const hrefRegex = new RE2(/(?::|")href="(.*?)"/)
diff --git a/plugin-server/src/config/kafka-topics.ts b/plugin-server/src/config/kafka-topics.ts
index 8610bf8f0b819..79959a951e9a7 100644
--- a/plugin-server/src/config/kafka-topics.ts
+++ b/plugin-server/src/config/kafka-topics.ts
@@ -45,6 +45,7 @@ export const KAFKA_LOG_ENTRIES = `${prefix}log_entries${suffix}`
// CDP topics
export const KAFKA_CDP_FUNCTION_CALLBACKS = `${prefix}cdp_function_callbacks${suffix}`
export const KAFKA_CDP_FUNCTION_OVERFLOW = `${prefix}cdp_function_overflow${suffix}`
+export const KAFKA_CDP_INTERNAL_EVENTS = `${prefix}cdp_internal_events${suffix}`
// Error tracking topics
export const KAFKA_EXCEPTION_SYMBOLIFICATION_EVENTS = `${prefix}exception_symbolification_events${suffix}`
diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts
index 2f1082f2aa5b4..66f4eac0ea0f7 100644
--- a/plugin-server/src/kafka/batch-consumer.ts
+++ b/plugin-server/src/kafka/batch-consumer.ts
@@ -249,6 +249,8 @@ export const startBatchConsumer = async ({
let batchesProcessed = 0
const statusLogInterval = setInterval(() => {
status.info('🔁', 'main_loop', {
+ groupId,
+ topic,
messagesPerSecond: messagesProcessed / (STATUS_LOG_INTERVAL_MS / 1000),
batchesProcessed: batchesProcessed,
lastHeartbeatTime: new Date(lastHeartbeatTime).toISOString(),
diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts
index d61f3bb5e0510..ac482ca21a6fa 100644
--- a/plugin-server/src/main/pluginsServer.ts
+++ b/plugin-server/src/main/pluginsServer.ts
@@ -15,6 +15,7 @@ import {
CdpCyclotronWorker,
CdpCyclotronWorkerFetch,
CdpFunctionCallbackConsumer,
+ CdpInternalEventsConsumer,
CdpProcessedEventsConsumer,
} from '../cdp/cdp-consumers'
import { defaultConfig } from '../config/config'
@@ -451,6 +452,13 @@ export async function startPluginsServer(
services.push(consumer.service)
}
+ if (capabilities.cdpInternalEvents) {
+ const hub = await setupHub()
+ const consumer = new CdpInternalEventsConsumer(hub)
+ await consumer.start()
+ services.push(consumer.service)
+ }
+
if (capabilities.cdpFunctionCallbacks) {
const hub = await setupHub()
const consumer = new CdpFunctionCallbackConsumer(hub)
diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts
index 1263a896d04a3..390f7d8d3a5a5 100644
--- a/plugin-server/src/types.ts
+++ b/plugin-server/src/types.ts
@@ -84,6 +84,7 @@ export enum PluginServerMode {
recordings_blob_ingestion = 'recordings-blob-ingestion',
recordings_blob_ingestion_overflow = 'recordings-blob-ingestion-overflow',
cdp_processed_events = 'cdp-processed-events',
+ cdp_internal_events = 'cdp-internal-events',
cdp_function_callbacks = 'cdp-function-callbacks',
cdp_cyclotron_worker = 'cdp-cyclotron-worker',
functional_tests = 'functional-tests',
@@ -358,6 +359,7 @@ export interface PluginServerCapabilities {
sessionRecordingBlobIngestion?: boolean
sessionRecordingBlobOverflowIngestion?: boolean
cdpProcessedEvents?: boolean
+ cdpInternalEvents?: boolean
cdpFunctionCallbacks?: boolean
cdpCyclotronWorker?: boolean
appManagementSingleton?: boolean
diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-events-consumer.test.ts
similarity index 95%
rename from plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts
rename to plugin-server/tests/cdp/cdp-events-consumer.test.ts
index c559a4240fca4..db400a56672b3 100644
--- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts
+++ b/plugin-server/tests/cdp/cdp-events-consumer.test.ts
@@ -1,4 +1,4 @@
-import { CdpProcessedEventsConsumer } from '../../src/cdp/cdp-consumers'
+import { CdpInternalEventsConsumer, CdpProcessedEventsConsumer } from '../../src/cdp/cdp-consumers'
import { HogWatcherState } from '../../src/cdp/hog-watcher'
import { HogFunctionInvocationGlobals, HogFunctionType } from '../../src/cdp/types'
import { Hub, Team } from '../../src/types'
@@ -74,13 +74,22 @@ const decodeAllKafkaMessages = (): any[] => {
return mockProducer.produce.mock.calls.map((x) => decodeKafkaMessage(x[0]))
}
-describe('CDP Processed Events Consumer', () => {
- let processor: CdpProcessedEventsConsumer
+/**
+ * NOTE: The internal and normal events consumers are very similar so we can test them together
+ */
+describe.each([
+ [CdpProcessedEventsConsumer.name, CdpProcessedEventsConsumer, 'destination' as const],
+ [CdpInternalEventsConsumer.name, CdpInternalEventsConsumer, 'internal_destination' as const],
+])('%s', (_name, Consumer, hogType) => {
+ let processor: CdpProcessedEventsConsumer | CdpInternalEventsConsumer
let hub: Hub
let team: Team
const insertHogFunction = async (hogFunction: Partial) => {
- const item = await _insertHogFunction(hub.postgres, team.id, hogFunction)
+ const item = await _insertHogFunction(hub.postgres, team.id, {
+ ...hogFunction,
+ type: hogType,
+ })
// Trigger the reload that django would do
await processor.hogFunctionManager.reloadAllHogFunctions()
return item
@@ -91,7 +100,7 @@ describe('CDP Processed Events Consumer', () => {
hub = await createHub()
team = await getFirstTeam(hub)
- processor = new CdpProcessedEventsConsumer(hub)
+ processor = new Consumer(hub)
await processor.start()
mockFetch.mockClear()
diff --git a/plugin-server/tests/cdp/cdp-internal-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-internal-events-consumer.test.ts
new file mode 100644
index 0000000000000..995b2eeae2667
--- /dev/null
+++ b/plugin-server/tests/cdp/cdp-internal-events-consumer.test.ts
@@ -0,0 +1,99 @@
+import { CdpInternalEventsConsumer } from '../../src/cdp/cdp-consumers'
+import { HogFunctionType } from '../../src/cdp/types'
+import { Hub, Team } from '../../src/types'
+import { closeHub, createHub } from '../../src/utils/db/hub'
+import { getFirstTeam, resetTestDatabase } from '../helpers/sql'
+import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples'
+import { createInternalEvent, createKafkaMessage, insertHogFunction as _insertHogFunction } from './fixtures'
+
+describe('CDP Internal Events Consumer', () => {
+ let processor: CdpInternalEventsConsumer
+ let hub: Hub
+ let team: Team
+
+ const insertHogFunction = async (hogFunction: Partial) => {
+ const item = await _insertHogFunction(hub.postgres, team.id, hogFunction)
+ // Trigger the reload that django would do
+ await processor.hogFunctionManager.reloadAllHogFunctions()
+ return item
+ }
+
+ beforeEach(async () => {
+ await resetTestDatabase()
+ hub = await createHub()
+ team = await getFirstTeam(hub)
+
+ processor = new CdpInternalEventsConsumer(hub)
+ // Speed hack as we don't need all of kafka to be started for this test
+ await processor.hogFunctionManager.start(processor['hogTypes'])
+ })
+
+ afterEach(async () => {
+ jest.setTimeout(1000)
+ await closeHub(hub)
+ })
+
+ afterAll(() => {
+ jest.useRealTimers()
+ })
+
+ describe('_handleKafkaBatch', () => {
+ it('should ignore invalid message', async () => {
+ const events = await processor._parseKafkaBatch([createKafkaMessage({})])
+ expect(events).toHaveLength(0)
+ })
+
+ it('should ignore message with no team', async () => {
+ const events = await processor._parseKafkaBatch([createKafkaMessage(createInternalEvent(999999, {}))])
+ expect(events).toHaveLength(0)
+ })
+
+ describe('with an existing team and hog function', () => {
+ beforeEach(async () => {
+ await insertHogFunction({
+ ...HOG_EXAMPLES.simple_fetch,
+ ...HOG_INPUTS_EXAMPLES.simple_fetch,
+ ...HOG_FILTERS_EXAMPLES.no_filters,
+ type: 'internal_destination',
+ })
+ })
+
+ it('should ignore invalid payloads', async () => {
+ const events = await processor._parseKafkaBatch([
+ createKafkaMessage(
+ createInternalEvent(team.id, {
+ event: 'WRONG' as any,
+ })
+ ),
+ ])
+ expect(events).toHaveLength(0)
+ })
+
+ it('should parse a valid message with an existing team and hog function ', async () => {
+ const event = createInternalEvent(team.id, {})
+ event.event.timestamp = '2024-12-18T15:06:23.545Z'
+ event.event.uuid = 'b6da2f33-ba54-4550-9773-50d3278ad61f'
+
+ const events = await processor._parseKafkaBatch([createKafkaMessage(event)])
+ expect(events).toHaveLength(1)
+ expect(events[0]).toEqual({
+ event: {
+ distinct_id: 'distinct_id',
+ elements_chain: '',
+ event: '$pageview',
+ properties: {},
+ timestamp: '2024-12-18T15:06:23.545Z',
+ url: '',
+ uuid: 'b6da2f33-ba54-4550-9773-50d3278ad61f',
+ },
+ person: undefined,
+ project: {
+ id: 2,
+ name: 'TEST PROJECT',
+ url: 'http://localhost:8000/project/2',
+ },
+ })
+ })
+ })
+ })
+})
diff --git a/plugin-server/tests/cdp/fixtures.ts b/plugin-server/tests/cdp/fixtures.ts
index e34920fdd981e..79c56798866db 100644
--- a/plugin-server/tests/cdp/fixtures.ts
+++ b/plugin-server/tests/cdp/fixtures.ts
@@ -1,6 +1,7 @@
import { randomUUID } from 'crypto'
import { Message } from 'node-rdkafka'
+import { CdpInternalEvent } from '../../src/cdp/schema'
import {
HogFunctionInvocation,
HogFunctionInvocationGlobals,
@@ -60,7 +61,7 @@ export const createIncomingEvent = (teamId: number, data: Partial = {}): Message => {
+export const createKafkaMessage = (event: any, overrides: Partial = {}): Message => {
return {
partition: 1,
topic: 'test',
@@ -72,6 +73,20 @@ export const createMessage = (event: RawClickHouseEvent, overrides: Partial): CdpInternalEvent => {
+ return {
+ team_id: teamId,
+ event: {
+ timestamp: new Date().toISOString(),
+ properties: {},
+ uuid: randomUUID(),
+ event: '$pageview',
+ distinct_id: 'distinct_id',
+ },
+ ...data,
+ }
+}
+
export const insertHogFunction = async (
postgres: PostgresRouter,
team_id: Team['id'],
diff --git a/plugin-server/tests/cdp/hog-executor.test.ts b/plugin-server/tests/cdp/hog-executor.test.ts
index aeacc1067d0f4..99feb53d62207 100644
--- a/plugin-server/tests/cdp/hog-executor.test.ts
+++ b/plugin-server/tests/cdp/hog-executor.test.ts
@@ -48,9 +48,8 @@ describe('Hog Executor', () => {
const mockFunctionManager = {
reloadAllHogFunctions: jest.fn(),
- getTeamHogDestinations: jest.fn(),
+ getTeamHogFunctions: jest.fn(),
getTeamHogFunction: jest.fn(),
- getTeamHogEmailProvider: jest.fn(),
}
beforeEach(async () => {
@@ -70,7 +69,7 @@ describe('Hog Executor', () => {
...HOG_FILTERS_EXAMPLES.no_filters,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([hogFunction])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([hogFunction])
mockFunctionManager.getTeamHogFunction.mockReturnValue(hogFunction)
})
@@ -254,7 +253,7 @@ describe('Hog Executor', () => {
})
})
- describe('email provider functions', () => {
+ describe.skip('email provider functions', () => {
let hogFunction: HogFunctionType
let providerFunction: HogFunctionType
beforeEach(() => {
@@ -270,9 +269,9 @@ describe('Hog Executor', () => {
...HOG_INPUTS_EXAMPLES.email,
...HOG_FILTERS_EXAMPLES.no_filters,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([hogFunction, providerFunction])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([hogFunction, providerFunction])
mockFunctionManager.getTeamHogFunction.mockReturnValue(hogFunction)
- mockFunctionManager.getTeamHogEmailProvider.mockReturnValue(providerFunction)
+ // mockFunctionManager.getTeamHogEmailProvider.mockReturnValue(providerFunction)
})
it('can execute an invocation', () => {
@@ -326,7 +325,7 @@ describe('Hog Executor', () => {
...HOG_FILTERS_EXAMPLES.pageview_or_autocapture_filter,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([fn])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn])
const resultsShouldntMatch = executor.findMatchingFunctions(createHogExecutionGlobals({ groups: {} }))
expect(resultsShouldntMatch.matchingFunctions).toHaveLength(0)
@@ -356,7 +355,7 @@ describe('Hog Executor', () => {
...HOG_INPUTS_EXAMPLES.simple_fetch,
...HOG_FILTERS_EXAMPLES.broken_filters,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([fn])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn])
const resultsShouldMatch = executor.findMatchingFunctions(
createHogExecutionGlobals({
groups: {},
@@ -388,7 +387,7 @@ describe('Hog Executor', () => {
...HOG_FILTERS_EXAMPLES.elements_text_filter,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([fn])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn])
const elementsChain = (buttonText: string) =>
`span.LemonButton__content:attr__class="LemonButton__content"nth-child="2"nth-of-type="2"text="${buttonText}";span.LemonButton__chrome:attr__class="LemonButton__chrome"nth-child="1"nth-of-type="1";button.LemonButton.LemonButton--has-icon.LemonButton--secondary.LemonButton--status-default:attr__class="LemonButton LemonButton--secondary LemonButton--status-default LemonButton--has-icon"attr__type="button"nth-child="1"nth-of-type="1"text="${buttonText}";div.flex.gap-4.items-center:attr__class="flex gap-4 items-center"nth-child="1"nth-of-type="1";div.flex.flex-wrap.gap-4.justify-between:attr__class="flex gap-4 justify-between flex-wrap"nth-child="3"nth-of-type="3";div.flex.flex-1.flex-col.gap-4.h-full.relative.w-full:attr__class="relative w-full flex flex-col gap-4 flex-1 h-full"nth-child="1"nth-of-type="1";div.LemonTabs__content:attr__class="LemonTabs__content"nth-child="2"nth-of-type="1";div.LemonTabs.LemonTabs--medium:attr__class="LemonTabs LemonTabs--medium"attr__style="--lemon-tabs-slider-width: 48px; --lemon-tabs-slider-offset: 0px;"nth-child="1"nth-of-type="1";div.Navigation3000__scene:attr__class="Navigation3000__scene"nth-child="2"nth-of-type="2";main:nth-child="2"nth-of-type="1";div.Navigation3000:attr__class="Navigation3000"nth-child="1"nth-of-type="1";div:attr__id="root"attr_id="root"nth-child="3"nth-of-type="1";body.overflow-hidden:attr__class="overflow-hidden"attr__theme="light"nth-child="2"nth-of-type="1"`
@@ -438,7 +437,7 @@ describe('Hog Executor', () => {
...HOG_FILTERS_EXAMPLES.elements_href_filter,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([fn])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn])
const elementsChain = (link: string) =>
`span.LemonButton__content:attr__class="LemonButton__content"attr__href="${link}"href="${link}"nth-child="2"nth-of-type="2"text="Activity";span.LemonButton__chrome:attr__class="LemonButton__chrome"nth-child="1"nth-of-type="1";a.LemonButton.LemonButton--full-width.LemonButton--has-icon.LemonButton--secondary.LemonButton--status-alt.Link.NavbarButton:attr__class="Link LemonButton LemonButton--secondary LemonButton--status-alt LemonButton--full-width LemonButton--has-icon NavbarButton"attr__data-attr="menu-item-activity"attr__href="${link}"href="${link}"nth-child="1"nth-of-type="1"text="Activity";li.w-full:attr__class="w-full"nth-child="6"nth-of-type="6";ul:nth-child="1"nth-of-type="1";div.Navbar3000__top.ScrollableShadows__inner:attr__class="ScrollableShadows__inner Navbar3000__top"nth-child="1"nth-of-type="1";div.ScrollableShadows.ScrollableShadows--vertical:attr__class="ScrollableShadows ScrollableShadows--vertical"nth-child="1"nth-of-type="1";div.Navbar3000__content:attr__class="Navbar3000__content"nth-child="1"nth-of-type="1";nav.Navbar3000:attr__class="Navbar3000"nth-child="1"nth-of-type="1";div.Navigation3000:attr__class="Navigation3000"nth-child="1"nth-of-type="1";div:attr__id="root"attr_id="root"nth-child="3"nth-of-type="1";body.overflow-hidden:attr__class="overflow-hidden"attr__theme="light"nth-child="2"nth-of-type="1"`
@@ -488,7 +487,7 @@ describe('Hog Executor', () => {
...HOG_FILTERS_EXAMPLES.elements_tag_and_id_filter,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([fn])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn])
const elementsChain = (id: string) =>
`a.Link.font-semibold.text-text-3000.text-xl:attr__class="Link font-semibold text-xl text-text-3000"attr__href="/project/1/dashboard/1"attr__id="${id}"attr_id="${id}"href="/project/1/dashboard/1"nth-child="1"nth-of-type="1"text="My App Dashboard";div.ProjectHomepage__dashboardheader__title:attr__class="ProjectHomepage__dashboardheader__title"nth-child="1"nth-of-type="1";div.ProjectHomepage__dashboardheader:attr__class="ProjectHomepage__dashboardheader"nth-child="2"nth-of-type="2";div.ProjectHomepage:attr__class="ProjectHomepage"nth-child="1"nth-of-type="1";div.Navigation3000__scene:attr__class="Navigation3000__scene"nth-child="2"nth-of-type="2";main:nth-child="2"nth-of-type="1";div.Navigation3000:attr__class="Navigation3000"nth-child="1"nth-of-type="1";div:attr__id="root"attr_id="root"nth-child="3"nth-of-type="1";body.overflow-hidden:attr__class="overflow-hidden"attr__theme="light"nth-child="2"nth-of-type="1"`
@@ -579,7 +578,7 @@ describe('Hog Executor', () => {
...HOG_FILTERS_EXAMPLES.no_filters,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([fn])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn])
const result = executor.execute(createInvocation(fn))
expect(result.error).toContain('Execution timed out after 0.1 seconds. Performed ')
diff --git a/plugin-server/tests/cdp/hog-function-manager.test.ts b/plugin-server/tests/cdp/hog-function-manager.test.ts
index d5d5b575dd3ec..752927c3d53dd 100644
--- a/plugin-server/tests/cdp/hog-function-manager.test.ts
+++ b/plugin-server/tests/cdp/hog-function-manager.test.ts
@@ -62,22 +62,31 @@ describe('HogFunctionManager', () => {
hogFunctions.push(
await insertHogFunction(hub.postgres, teamId1, {
- name: 'Email Provider team 1',
- type: 'email',
- inputs_schema: [
- {
- type: 'email',
- key: 'message',
- },
- ],
- inputs: {
- email: {
- value: { from: 'me@a.com', to: 'you@b.com', subject: 'subject', html: 'text' },
- },
- },
+ name: 'Test Hog Function team 1 - transformation',
+ type: 'transformation',
+ inputs_schema: [],
+ inputs: {},
})
)
+ // hogFunctions.push(
+ // await insertHogFunction(hub.postgres, teamId1, {
+ // name: 'Email Provider team 1',
+ // type: 'email',
+ // inputs_schema: [
+ // {
+ // type: 'email',
+ // key: 'message',
+ // },
+ // ],
+ // inputs: {
+ // email: {
+ // value: { from: 'me@a.com', to: 'you@b.com', subject: 'subject', html: 'text' },
+ // },
+ // },
+ // })
+ // )
+
hogFunctions.push(
await insertHogFunction(hub.postgres, teamId2, {
name: 'Test Hog Function team 2',
@@ -98,7 +107,7 @@ describe('HogFunctionManager', () => {
})
)
- await manager.start()
+ await manager.start(['destination'])
})
afterEach(async () => {
@@ -107,7 +116,7 @@ describe('HogFunctionManager', () => {
})
it('returns the hog functions', async () => {
- let items = manager.getTeamHogDestinations(teamId1)
+ let items = manager.getTeamHogFunctions(teamId1)
expect(items).toEqual([
{
@@ -142,13 +151,6 @@ describe('HogFunctionManager', () => {
},
])
- const allFunctions = manager.getTeamHogFunctions(teamId1)
- expect(allFunctions.length).toEqual(2)
- expect(allFunctions.map((f) => f.type).sort()).toEqual(['destination', 'email'])
-
- const emailProvider = manager.getTeamHogEmailProvider(teamId1)
- expect(emailProvider.type).toEqual('email')
-
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
`UPDATE posthog_hogfunction SET name='Test Hog Function team 1 updated' WHERE id = $1`,
@@ -159,7 +161,7 @@ describe('HogFunctionManager', () => {
// This is normally dispatched by django
await manager.reloadHogFunctions(teamId1, [hogFunctions[0].id])
- items = manager.getTeamHogDestinations(teamId1)
+ items = manager.getTeamHogFunctions(teamId1)
expect(items).toMatchObject([
{
@@ -169,8 +171,21 @@ describe('HogFunctionManager', () => {
])
})
+ it('filters hog functions by type', async () => {
+ manager['hogTypes'] = ['transformation']
+ await manager.reloadAllHogFunctions()
+ expect(manager.getTeamHogFunctions(teamId1).length).toEqual(1)
+ expect(manager.getTeamHogFunctions(teamId1)[0].type).toEqual('transformation')
+
+ manager['hogTypes'] = ['transformation', 'destination']
+ await manager.reloadAllHogFunctions()
+ expect(manager.getTeamHogFunctions(teamId1).length).toEqual(2)
+ expect(manager.getTeamHogFunctions(teamId1)[0].type).toEqual('destination')
+ expect(manager.getTeamHogFunctions(teamId1)[1].type).toEqual('transformation')
+ })
+
it('removes disabled functions', async () => {
- let items = manager.getTeamHogDestinations(teamId1)
+ let items = manager.getTeamHogFunctions(teamId1)
expect(items).toMatchObject([
{
@@ -188,14 +203,14 @@ describe('HogFunctionManager', () => {
// This is normally dispatched by django
await manager.reloadHogFunctions(teamId1, [hogFunctions[0].id])
- items = manager.getTeamHogDestinations(teamId1)
+ items = manager.getTeamHogFunctions(teamId1)
expect(items).toEqual([])
})
it('enriches integration inputs if found and belonging to the team', () => {
- const function1Inputs = manager.getTeamHogDestinations(teamId1)[0].inputs
- const function2Inputs = manager.getTeamHogDestinations(teamId2)[0].inputs
+ const function1Inputs = manager.getTeamHogFunctions(teamId1)[0].inputs
+ const function2Inputs = manager.getTeamHogFunctions(teamId2)[0].inputs
// Only the right team gets the integration inputs enriched
expect(function1Inputs).toEqual({
diff --git a/posthog/api/hog_function_template.py b/posthog/api/hog_function_template.py
index 38641031167ad..1f8151e161bec 100644
--- a/posthog/api/hog_function_template.py
+++ b/posthog/api/hog_function_template.py
@@ -5,7 +5,7 @@
from rest_framework.response import Response
from rest_framework.exceptions import NotFound
-from posthog.cdp.templates import HOG_FUNCTION_TEMPLATES
+from posthog.cdp.templates import HOG_FUNCTION_SUB_TEMPLATES, HOG_FUNCTION_TEMPLATES, ALL_HOG_FUNCTION_TEMPLATES_BY_ID
from posthog.cdp.templates.hog_function_template import (
HogFunctionMapping,
HogFunctionMappingTemplate,
@@ -51,17 +51,33 @@ class PublicHogFunctionTemplateViewSet(viewsets.GenericViewSet):
def list(self, request: Request, *args, **kwargs):
types = ["destination"]
+
+ sub_template_id = request.GET.get("sub_template_id")
+
if "type" in request.GET:
types = [self.request.GET.get("type", "destination")]
elif "types" in request.GET:
types = self.request.GET.get("types", "destination").split(",")
- templates = [item for item in HOG_FUNCTION_TEMPLATES if item.type in types]
- page = self.paginate_queryset(templates)
+
+ templates_list = HOG_FUNCTION_SUB_TEMPLATES if sub_template_id else HOG_FUNCTION_TEMPLATES
+
+ matching_templates = []
+
+ for template in templates_list:
+ if template.type not in types:
+ continue
+
+ if sub_template_id and sub_template_id not in template.id:
+ continue
+
+ matching_templates.append(template)
+
+ page = self.paginate_queryset(matching_templates)
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
def retrieve(self, request: Request, *args, **kwargs):
- item = next((item for item in HOG_FUNCTION_TEMPLATES if item.id == kwargs["pk"]), None)
+ item = ALL_HOG_FUNCTION_TEMPLATES_BY_ID.get(kwargs["pk"], None)
if not item:
raise NotFound(f"Template with id {kwargs['pk']} not found.")
diff --git a/posthog/api/test/batch_exports/fixtures.py b/posthog/api/test/batch_exports/fixtures.py
new file mode 100644
index 0000000000000..1c13b43d22db7
--- /dev/null
+++ b/posthog/api/test/batch_exports/fixtures.py
@@ -0,0 +1,13 @@
+from posthog.api.test.test_organization import create_organization as create_organization_base
+from posthog.constants import AvailableFeature
+from posthog.models import Organization
+
+
+def create_organization(name: str, has_data_pipelines_feature: bool = True) -> Organization:
+ organization = create_organization_base(name)
+ if has_data_pipelines_feature:
+ organization.available_product_features = [
+ {"key": AvailableFeature.DATA_PIPELINES, "name": AvailableFeature.DATA_PIPELINES}
+ ]
+ organization.save()
+ return organization
diff --git a/posthog/api/test/batch_exports/test_backfill.py b/posthog/api/test/batch_exports/test_backfill.py
index e8dd4a29e81db..797db7ea93fd1 100644
--- a/posthog/api/test/batch_exports/test_backfill.py
+++ b/posthog/api/test/batch_exports/test_backfill.py
@@ -10,7 +10,7 @@
backfill_batch_export,
create_batch_export_ok,
)
-from posthog.api.test.test_organization import create_organization
+from posthog.api.test.batch_exports.fixtures import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.temporal.common.client import sync_connect
diff --git a/posthog/api/test/batch_exports/test_create.py b/posthog/api/test/batch_exports/test_create.py
index 28524c7d99513..676935e56b597 100644
--- a/posthog/api/test/batch_exports/test_create.py
+++ b/posthog/api/test/batch_exports/test_create.py
@@ -9,8 +9,8 @@
from rest_framework import status
from posthog.api.test.batch_exports.conftest import describe_schedule, start_test_worker
+from posthog.api.test.batch_exports.fixtures import create_organization
from posthog.api.test.batch_exports.operations import create_batch_export
-from posthog.api.test.test_organization import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.batch_exports.models import BatchExport
diff --git a/posthog/api/test/batch_exports/test_delete.py b/posthog/api/test/batch_exports/test_delete.py
index 697415a4525cb..f5b303538707a 100644
--- a/posthog/api/test/batch_exports/test_delete.py
+++ b/posthog/api/test/batch_exports/test_delete.py
@@ -8,6 +8,7 @@
from temporalio.service import RPCError
from posthog.api.test.batch_exports.conftest import start_test_worker
+from posthog.api.test.batch_exports.fixtures import create_organization
from posthog.api.test.batch_exports.operations import (
backfill_batch_export_ok,
create_batch_export_ok,
@@ -15,7 +16,6 @@
delete_batch_export_ok,
get_batch_export,
)
-from posthog.api.test.test_organization import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.temporal.common.client import sync_connect
diff --git a/posthog/api/test/batch_exports/test_get.py b/posthog/api/test/batch_exports/test_get.py
index f5e0060bc67b5..4e5adfd5a5d63 100644
--- a/posthog/api/test/batch_exports/test_get.py
+++ b/posthog/api/test/batch_exports/test_get.py
@@ -7,7 +7,7 @@
create_batch_export_ok,
get_batch_export,
)
-from posthog.api.test.test_organization import create_organization
+from posthog.api.test.batch_exports.fixtures import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.temporal.common.client import sync_connect
diff --git a/posthog/api/test/batch_exports/test_list.py b/posthog/api/test/batch_exports/test_list.py
index 7796964228fe5..579e3469b5b16 100644
--- a/posthog/api/test/batch_exports/test_list.py
+++ b/posthog/api/test/batch_exports/test_list.py
@@ -6,7 +6,7 @@
delete_batch_export_ok,
list_batch_exports_ok,
)
-from posthog.api.test.test_organization import create_organization
+from posthog.api.test.batch_exports.fixtures import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
diff --git a/posthog/api/test/batch_exports/test_pause.py b/posthog/api/test/batch_exports/test_pause.py
index 33c32f1a200bc..97eb8f90a1809 100644
--- a/posthog/api/test/batch_exports/test_pause.py
+++ b/posthog/api/test/batch_exports/test_pause.py
@@ -14,7 +14,7 @@
unpause_batch_export,
unpause_batch_export_ok,
)
-from posthog.api.test.test_organization import create_organization
+from posthog.api.test.batch_exports.fixtures import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.batch_exports.service import batch_export_delete_schedule
diff --git a/posthog/api/test/batch_exports/test_runs.py b/posthog/api/test/batch_exports/test_runs.py
index 0c58b717be6f2..75be430e4b07c 100644
--- a/posthog/api/test/batch_exports/test_runs.py
+++ b/posthog/api/test/batch_exports/test_runs.py
@@ -15,7 +15,7 @@
get_batch_export_runs,
get_batch_export_runs_ok,
)
-from posthog.api.test.test_organization import create_organization
+from posthog.api.test.batch_exports.fixtures import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.temporal.common.client import sync_connect
diff --git a/posthog/api/test/batch_exports/test_update.py b/posthog/api/test/batch_exports/test_update.py
index ec794d85484ee..80860ab153057 100644
--- a/posthog/api/test/batch_exports/test_update.py
+++ b/posthog/api/test/batch_exports/test_update.py
@@ -14,7 +14,7 @@
patch_batch_export,
put_batch_export,
)
-from posthog.api.test.test_organization import create_organization
+from posthog.api.test.batch_exports.fixtures import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.batch_exports.service import sync_batch_export
diff --git a/posthog/api/test/test_app_metrics.py b/posthog/api/test/test_app_metrics.py
index 67b9a0a42eaa5..d11a27a394b76 100644
--- a/posthog/api/test/test_app_metrics.py
+++ b/posthog/api/test/test_app_metrics.py
@@ -8,6 +8,7 @@
from posthog.api.test.batch_exports.conftest import start_test_worker
from posthog.api.test.batch_exports.operations import create_batch_export_ok
from posthog.batch_exports.models import BatchExportRun
+from posthog.constants import AvailableFeature
from posthog.models.activity_logging.activity_log import Detail, Trigger, log_activity
from posthog.models.plugin import Plugin, PluginConfig
from posthog.models.utils import UUIDT
@@ -27,6 +28,11 @@ def setUp(self):
self.plugin = Plugin.objects.create(organization=self.organization)
self.plugin_config = PluginConfig.objects.create(plugin=self.plugin, team=self.team, enabled=True, order=1)
+ self.organization.available_product_features = [
+ {"key": AvailableFeature.DATA_PIPELINES, "name": AvailableFeature.DATA_PIPELINES}
+ ]
+ self.organization.save()
+
def test_retrieve(self):
create_app_metric(
team_id=self.team.pk,
diff --git a/posthog/api/test/test_hog_function_templates.py b/posthog/api/test/test_hog_function_templates.py
index 7a9b5150f5acd..4a34e36f88235 100644
--- a/posthog/api/test/test_hog_function_templates.py
+++ b/posthog/api/test/test_hog_function_templates.py
@@ -1,6 +1,8 @@
from unittest.mock import ANY
+from inline_snapshot import snapshot
from rest_framework import status
+from posthog.cdp.templates.hog_function_template import derive_sub_templates
from posthog.test.base import APIBaseTest, ClickhouseTestMixin, QueryMatchingTest
from posthog.cdp.templates.slack.template_slack import template
@@ -23,6 +25,22 @@
}
+class TestHogFunctionTemplatesMixin(APIBaseTest):
+ def test_derive_sub_templates(self):
+ # One sanity check test (rather than all of them)
+ sub_templates = derive_sub_templates([template])
+
+ # check overridden params
+ assert sub_templates[0].inputs_schema[-1]["key"] == "text"
+ assert sub_templates[0].inputs_schema[-1]["default"] == snapshot(
+ "*{person.name}* {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'"
+ )
+ assert sub_templates[0].filters == snapshot(
+ {"events": [{"id": "$feature_enrollment_update", "type": "events"}]}
+ )
+ assert sub_templates[0].type == "destination"
+
+
class TestHogFunctionTemplates(ClickhouseTestMixin, APIBaseTest, QueryMatchingTest):
def test_list_function_templates(self):
response = self.client.get("/api/projects/@current/hog_function_templates/")
@@ -48,6 +66,29 @@ def test_filter_function_templates(self):
response5 = self.client.get("/api/projects/@current/hog_function_templates/?types=site_destination,destination")
assert len(response5.json()["results"]) > 0
+ def test_filter_sub_templates(self):
+ response1 = self.client.get(
+ "/api/projects/@current/hog_function_templates/?type=internal_destination&sub_template_id=activity-log"
+ )
+ assert response1.status_code == status.HTTP_200_OK, response1.json()
+ assert len(response1.json()["results"]) > 0
+
+ template = response1.json()["results"][0]
+
+ assert template["sub_templates"] is None
+ assert template["type"] == "internal_destination"
+ assert template["id"] == "template-slack-activity-log"
+
+ def test_retrieve_function_template(self):
+ response = self.client.get("/api/projects/@current/hog_function_templates/template-slack")
+ assert response.status_code == status.HTTP_200_OK, response.json()
+ assert response.json()["id"] == "template-slack"
+
+ def test_retrieve_function_sub_template(self):
+ response = self.client.get("/api/projects/@current/hog_function_templates/template-slack-activity-log")
+ assert response.status_code == status.HTTP_200_OK, response.json()
+ assert response.json()["id"] == "template-slack-activity-log"
+
def test_public_list_function_templates(self):
self.client.logout()
response = self.client.get("/api/public_hog_function_templates/")
diff --git a/posthog/api/test/test_team.py b/posthog/api/test/test_team.py
index 1da488504f57d..8d4509f930584 100644
--- a/posthog/api/test/test_team.py
+++ b/posthog/api/test/test_team.py
@@ -458,7 +458,10 @@ def test_delete_bulky_postgres_data(self):
def test_delete_batch_exports(self):
self.organization_membership.level = OrganizationMembership.Level.ADMIN
self.organization_membership.save()
-
+ self.organization.available_product_features = [
+ {"key": AvailableFeature.DATA_PIPELINES, "name": AvailableFeature.DATA_PIPELINES}
+ ]
+ self.organization.save()
team: Team = Team.objects.create_with_data(initiating_user=self.user, organization=self.organization)
destination_data = {
@@ -486,16 +489,16 @@ def test_delete_batch_exports(self):
json.dumps(batch_export_data),
content_type="application/json",
)
- self.assertEqual(response.status_code, 201)
+ assert response.status_code == 201, response.json()
batch_export = response.json()
batch_export_id = batch_export["id"]
response = self.client.delete(f"/api/environments/{team.id}")
- self.assertEqual(response.status_code, 204)
+ assert response.status_code == 204, response.json()
response = self.client.get(f"/api/environments/{team.id}/batch_exports/{batch_export_id}")
- self.assertEqual(response.status_code, 404)
+ assert response.status_code == 404, response.json()
with self.assertRaises(RPCError):
describe_schedule(temporal, batch_export_id)
diff --git a/posthog/batch_exports/http.py b/posthog/batch_exports/http.py
index 72bc94adf03bd..c3929503635b2 100644
--- a/posthog/batch_exports/http.py
+++ b/posthog/batch_exports/http.py
@@ -1,5 +1,6 @@
import datetime as dt
from typing import Any, TypedDict, cast
+from loginas.utils import is_impersonated_session
import posthoganalytics
import structlog
@@ -31,6 +32,7 @@
sync_batch_export,
unpause_batch_export,
)
+from posthog.constants import AvailableFeature
from posthog.hogql import ast, errors
from posthog.hogql.hogql import HogQLContext
from posthog.hogql.parser import parse_select
@@ -245,6 +247,20 @@ class Meta:
]
read_only_fields = ["id", "team_id", "created_at", "last_updated_at", "latest_runs", "schema"]
+ def validate(self, attrs: dict) -> dict:
+ team = self.context["get_team"]()
+ attrs["team"] = team
+
+ has_addon = team.organization.is_feature_available(AvailableFeature.DATA_PIPELINES)
+
+ if not has_addon:
+ # Check if the user is impersonated - if so we allow changes as it could be an admin user fixing things
+
+ if not is_impersonated_session(self.context["request"]):
+ raise serializers.ValidationError("The Data Pipelines addon is required for batch exports.")
+
+ return attrs
+
def create(self, validated_data: dict) -> BatchExport:
"""Create a BatchExport."""
destination_data = validated_data.pop("destination")
diff --git a/posthog/cdp/internal_events.py b/posthog/cdp/internal_events.py
new file mode 100644
index 0000000000000..ba945ede3f2ff
--- /dev/null
+++ b/posthog/cdp/internal_events.py
@@ -0,0 +1,72 @@
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Optional
+import uuid
+
+import structlog
+from posthog.kafka_client.client import KafkaProducer
+from posthog.kafka_client.topics import KAFKA_CDP_INTERNAL_EVENTS
+from rest_framework_dataclasses.serializers import DataclassSerializer
+
+logger = structlog.get_logger(__name__)
+
+
+@dataclass
+class InternalEventEvent:
+ event: str
+ distinct_id: str
+ properties: dict
+ timestamp: Optional[str] = None
+ url: Optional[str] = None
+ uuid: Optional[str] = None
+
+
+@dataclass
+class InternalEventPerson:
+ id: str
+ properties: dict
+ name: Optional[str] = None
+ url: Optional[str] = None
+
+
+@dataclass
+class InternalEvent:
+ team_id: int
+ event: InternalEventEvent
+ person: Optional[InternalEventPerson] = None
+
+
+class InternalEventSerializer(DataclassSerializer):
+ class Meta:
+ dataclass = InternalEvent
+
+
+def internal_event_to_dict(data: InternalEvent) -> dict:
+ return InternalEventSerializer(data).data
+
+
+def create_internal_event(
+ team_id: int, event: InternalEventEvent, person: Optional[InternalEventPerson] = None
+) -> InternalEvent:
+ data = InternalEvent(team_id=team_id, event=event, person=person)
+
+ if data.event.uuid is None:
+ data.event.uuid = str(uuid.uuid4())
+ if data.event.timestamp is None:
+ data.event.timestamp = datetime.now().isoformat()
+
+ return data
+
+
+def produce_internal_event(team_id: int, event: InternalEventEvent, person: Optional[InternalEventPerson] = None):
+ data = create_internal_event(team_id, event, person)
+ serialized_data = internal_event_to_dict(data)
+ kafka_topic = KAFKA_CDP_INTERNAL_EVENTS
+
+ try:
+ producer = KafkaProducer()
+ future = producer.produce(topic=kafka_topic, data=serialized_data, key=data.event.uuid)
+ future.get()
+ except Exception as e:
+ logger.exception("Failed to produce internal event", data=serialized_data, error=e)
+ raise
diff --git a/posthog/cdp/templates/__init__.py b/posthog/cdp/templates/__init__.py
index fd2f988a8df72..6d4a24e6d1a27 100644
--- a/posthog/cdp/templates/__init__.py
+++ b/posthog/cdp/templates/__init__.py
@@ -1,3 +1,4 @@
+from posthog.cdp.templates.hog_function_template import derive_sub_templates
from .webhook.template_webhook import template as webhook
from .slack.template_slack import template as slack
from .hubspot.template_hubspot import template_event as hubspot_event, template as hubspot, TemplateHubspotMigrator
@@ -51,6 +52,7 @@
from .snapchat_ads.template_pixel import template_snapchat_pixel as snapchat_pixel
from ._transformations.template_pass_through import template as pass_through_transformation
+
HOG_FUNCTION_TEMPLATES = [
_broadcast,
blank_site_destination,
@@ -107,7 +109,12 @@
]
+# This is a list of sub templates that are generated by merging the subtemplate with it's template
+HOG_FUNCTION_SUB_TEMPLATES = derive_sub_templates(HOG_FUNCTION_TEMPLATES)
+
HOG_FUNCTION_TEMPLATES_BY_ID = {template.id: template for template in HOG_FUNCTION_TEMPLATES}
+HOG_FUNCTION_SUB_TEMPLATES_BY_ID = {template.id: template for template in HOG_FUNCTION_SUB_TEMPLATES}
+ALL_HOG_FUNCTION_TEMPLATES_BY_ID = {**HOG_FUNCTION_TEMPLATES_BY_ID, **HOG_FUNCTION_SUB_TEMPLATES_BY_ID}
HOG_FUNCTION_MIGRATORS = {
TemplateCustomerioMigrator.plugin_url: TemplateCustomerioMigrator,
@@ -123,4 +130,4 @@
TemplateAvoMigrator.plugin_url: TemplateAvoMigrator,
}
-__all__ = ["HOG_FUNCTION_TEMPLATES", "HOG_FUNCTION_TEMPLATES_BY_ID"]
+__all__ = ["HOG_FUNCTION_TEMPLATES", "HOG_FUNCTION_TEMPLATES_BY_ID", "ALL_HOG_FUNCTION_TEMPLATES_BY_ID"]
diff --git a/posthog/cdp/templates/discord/template_discord.py b/posthog/cdp/templates/discord/template_discord.py
index fb8cb2bf50c64..9e3111ec88817 100644
--- a/posthog/cdp/templates/discord/template_discord.py
+++ b/posthog/cdp/templates/discord/template_discord.py
@@ -1,5 +1,16 @@
from posthog.cdp.templates.hog_function_template import HogFunctionTemplate, HogFunctionSubTemplate, SUB_TEMPLATE_COMMON
+COMMON_INPUTS_SCHEMA = [
+ {
+ "key": "webhookUrl",
+ "type": "string",
+ "label": "Webhook URL",
+ "description": "See this page on how to generate a Webhook URL: https://support.discord.com/hc/en-us/articles/228383668-Intro-to-Webhooks",
+ "secret": False,
+ "required": True,
+ },
+]
+
template: HogFunctionTemplate = HogFunctionTemplate(
status="free",
type="destination",
@@ -48,20 +59,37 @@
],
sub_templates=[
HogFunctionSubTemplate(
- id="early_access_feature_enrollment",
+ id="early-access-feature-enrollment",
name="Post to Discord on feature enrollment",
description="Posts a message to Discord when a user enrolls or un-enrolls in an early access feature",
- filters=SUB_TEMPLATE_COMMON["early_access_feature_enrollment"].filters,
- inputs={
- "content": "**{person.name}** {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'"
+ filters=SUB_TEMPLATE_COMMON["early-access-feature-enrollment"].filters,
+ input_schema_overrides={
+ "content": {
+ "default": "**{person.name}** {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'",
+ }
},
),
HogFunctionSubTemplate(
- id="survey_response",
+ id="survey-response",
name="Post to Discord on survey response",
description="Posts a message to Discord when a user responds to a survey",
- filters=SUB_TEMPLATE_COMMON["survey_response"].filters,
- inputs={"content": "**{person.name}** responded to survey **{event.properties.$survey_name}**"},
+ filters=SUB_TEMPLATE_COMMON["survey-response"].filters,
+ input_schema_overrides={
+ "content": {
+ "default": "**{person.name}** responded to survey **{event.properties.$survey_name}**",
+ }
+ },
+ ),
+ HogFunctionSubTemplate(
+ id="activity-log",
+ type="internal_destination",
+ name="Post to Discord on team activity",
+ filters=SUB_TEMPLATE_COMMON["activity-log"].filters,
+ input_schema_overrides={
+ "content": {
+ "default": "**{person.name}** {event.properties.activity} {event.properties.scope} {event.properties.item_id}",
+ }
+ },
),
],
)
diff --git a/posthog/cdp/templates/hog_function_template.py b/posthog/cdp/templates/hog_function_template.py
index 0ebfc1f1c37dc..f76deacc3d4e4 100644
--- a/posthog/cdp/templates/hog_function_template.py
+++ b/posthog/cdp/templates/hog_function_template.py
@@ -8,10 +8,25 @@
PluginConfig = None
-SubTemplateId = Literal["early_access_feature_enrollment", "survey_response"]
+SubTemplateId = Literal["early-access-feature-enrollment", "survey-response", "activity-log"]
SUB_TEMPLATE_ID: tuple[SubTemplateId, ...] = get_args(SubTemplateId)
+HogFunctionTemplateType = Literal[
+ "destination",
+ "internal_destination",
+ "site_destination",
+ "site_app",
+ "transformation",
+ "shared",
+ "email",
+ "sms",
+ "push",
+ "broadcast",
+ "activity",
+ "alert",
+]
+
@dataclasses.dataclass(frozen=True)
class HogFunctionSubTemplate:
@@ -20,7 +35,8 @@ class HogFunctionSubTemplate:
description: Optional[str] = None
filters: Optional[dict] = None
masking: Optional[dict] = None
- inputs: Optional[dict] = None
+ input_schema_overrides: Optional[dict[str, dict]] = None
+ type: Optional[HogFunctionTemplateType] = None
@dataclasses.dataclass(frozen=True)
@@ -42,19 +58,7 @@ class HogFunctionMappingTemplate:
@dataclasses.dataclass(frozen=True)
class HogFunctionTemplate:
status: Literal["alpha", "beta", "stable", "free", "client-side"]
- type: Literal[
- "destination",
- "site_destination",
- "site_app",
- "transformation",
- "shared",
- "email",
- "sms",
- "push",
- "broadcast",
- "activity",
- "alert",
- ]
+ type: HogFunctionTemplateType
id: str
name: str
description: str
@@ -78,9 +82,41 @@ def migrate(cls, obj: PluginConfig) -> dict:
raise NotImplementedError()
+def derive_sub_templates(templates: list[HogFunctionTemplate]) -> list[HogFunctionTemplate]:
+ sub_templates = []
+ for template in templates:
+ for sub_template in template.sub_templates or []:
+ merged_id = f"{template.id}-{sub_template.id}"
+ template_params = dataclasses.asdict(template)
+ sub_template_params = dataclasses.asdict(sub_template)
+
+ # Override inputs_schema if set
+ input_schema_overrides = sub_template_params.pop("input_schema_overrides")
+ if input_schema_overrides:
+ new_input_schema = []
+ for schema in template_params["inputs_schema"]:
+ if schema["key"] in input_schema_overrides:
+ schema.update(input_schema_overrides[schema["key"]])
+ new_input_schema.append(schema)
+ template_params["inputs_schema"] = new_input_schema
+
+ # Get rid of the sub_templates from the template
+ template_params.pop("sub_templates")
+ # Update with the sub template params if not none
+ for key, value in sub_template_params.items():
+ if value is not None:
+ template_params[key] = value
+
+ template_params["id"] = merged_id
+ merged_template = HogFunctionTemplate(**template_params)
+ sub_templates.append(merged_template)
+
+ return sub_templates
+
+
SUB_TEMPLATE_COMMON: dict[SubTemplateId, HogFunctionSubTemplate] = {
- "survey_response": HogFunctionSubTemplate(
- id="survey_response",
+ "survey-response": HogFunctionSubTemplate(
+ id="survey-response",
name="Survey Response",
filters={
"events": [
@@ -99,9 +135,15 @@ def migrate(cls, obj: PluginConfig) -> dict:
]
},
),
- "early_access_feature_enrollment": HogFunctionSubTemplate(
- id="early_access_feature_enrollment",
+ "early-access-feature-enrollment": HogFunctionSubTemplate(
+ id="early-access-feature-enrollment",
name="Early Access Feature Enrollment",
filters={"events": [{"id": "$feature_enrollment_update", "type": "events"}]},
),
+ "activity-log": HogFunctionSubTemplate(
+ id="activity-log",
+ name="Team Activity",
+ type="internal_destination",
+ filters={"events": [{"id": "$activity_log_entry_created", "type": "events"}]},
+ ),
}
diff --git a/posthog/cdp/templates/microsoft_teams/template_microsoft_teams.py b/posthog/cdp/templates/microsoft_teams/template_microsoft_teams.py
index e647dde19f411..a6eb7063a52e6 100644
--- a/posthog/cdp/templates/microsoft_teams/template_microsoft_teams.py
+++ b/posthog/cdp/templates/microsoft_teams/template_microsoft_teams.py
@@ -1,5 +1,6 @@
from posthog.cdp.templates.hog_function_template import HogFunctionTemplate, HogFunctionSubTemplate, SUB_TEMPLATE_COMMON
+
template: HogFunctionTemplate = HogFunctionTemplate(
status="free",
type="destination",
@@ -66,20 +67,37 @@
],
sub_templates=[
HogFunctionSubTemplate(
- id="early_access_feature_enrollment",
+ id="early-access-feature-enrollment",
name="Post to Microsoft Teams on feature enrollment",
description="Posts a message to Microsoft Teams when a user enrolls or un-enrolls in an early access feature",
- filters=SUB_TEMPLATE_COMMON["early_access_feature_enrollment"].filters,
- inputs={
- "text": "**{person.name}** {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'"
+ filters=SUB_TEMPLATE_COMMON["early-access-feature-enrollment"].filters,
+ input_schema_overrides={
+ "text": {
+ "default": "**{person.name}** {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'",
+ }
},
),
HogFunctionSubTemplate(
- id="survey_response",
+ id="survey-response",
name="Post to Microsoft Teams on survey response",
description="Posts a message to Microsoft Teams when a user responds to a survey",
- filters=SUB_TEMPLATE_COMMON["survey_response"].filters,
- inputs={"text": "**{person.name}** responded to survey **{event.properties.$survey_name}**"},
+ filters=SUB_TEMPLATE_COMMON["survey-response"].filters,
+ input_schema_overrides={
+ "text": {
+ "default": "**{person.name}** responded to survey **{event.properties.$survey_name}**",
+ }
+ },
+ ),
+ HogFunctionSubTemplate(
+ id="activity-log",
+ type="internal_destination",
+ name="Post to Microsoft Teams on team activity",
+ filters=SUB_TEMPLATE_COMMON["activity-log"].filters,
+ input_schema_overrides={
+ "text": {
+ "default": "**{person.name}** {event.properties.activity} {event.properties.scope} {event.properties.item_id}",
+ }
+ },
),
],
)
diff --git a/posthog/cdp/templates/slack/template_slack.py b/posthog/cdp/templates/slack/template_slack.py
index 8cfb5a84101de..3454c18381797 100644
--- a/posthog/cdp/templates/slack/template_slack.py
+++ b/posthog/cdp/templates/slack/template_slack.py
@@ -1,5 +1,6 @@
from posthog.cdp.templates.hog_function_template import HogFunctionTemplate, HogFunctionSubTemplate, SUB_TEMPLATE_COMMON
+
template: HogFunctionTemplate = HogFunctionTemplate(
status="free",
type="destination",
@@ -108,65 +109,95 @@
],
sub_templates=[
HogFunctionSubTemplate(
- id="early_access_feature_enrollment",
+ id="early-access-feature-enrollment",
name="Post to Slack on feature enrollment",
- description="Posts a message to Slack when a user enrolls or un-enrolls in an early access feature",
- filters=SUB_TEMPLATE_COMMON["early_access_feature_enrollment"].filters,
- inputs={
- "text": "*{person.name}* {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'",
- "blocks": [
- {
- "text": {
- "text": "*{person.name}* {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'",
- "type": "mrkdwn",
- },
- "type": "section",
- },
- {
- "type": "actions",
- "elements": [
- {
- "url": "{person.url}",
- "text": {"text": "View Person in PostHog", "type": "plain_text"},
- "type": "button",
+ # description="Posts a message to Slack when a user enrolls or un-enrolls in an early access feature",
+ filters=SUB_TEMPLATE_COMMON["early-access-feature-enrollment"].filters,
+ input_schema_overrides={
+ "blocks": {
+ "default": [
+ {
+ "text": {
+ "text": "*{person.name}* {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'",
+ "type": "mrkdwn",
},
- # NOTE: It would be nice to have a link to the EAF but the event needs more info
- ],
- },
- ],
+ "type": "section",
+ },
+ {
+ "type": "actions",
+ "elements": [
+ {
+ "url": "{person.url}",
+ "text": {"text": "View Person in PostHog", "type": "plain_text"},
+ "type": "button",
+ },
+ ],
+ },
+ ],
+ },
+ "text": {
+ "default": "*{person.name}* {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'",
+ },
},
),
HogFunctionSubTemplate(
- id="survey_response",
+ id="survey-response",
name="Post to Slack on survey response",
description="Posts a message to Slack when a user responds to a survey",
- filters=SUB_TEMPLATE_COMMON["survey_response"].filters,
- inputs={
- "text": "*{person.name}* responded to survey *{event.properties.$survey_name}*",
- "blocks": [
- {
- "text": {
- "text": "*{person.name}* responded to survey *{event.properties.$survey_name}*",
- "type": "mrkdwn",
- },
- "type": "section",
- },
- {
- "type": "actions",
- "elements": [
- {
- "url": "{project.url}/surveys/{event.properties.$survey_id}",
- "text": {"text": "View Survey", "type": "plain_text"},
- "type": "button",
+ filters=SUB_TEMPLATE_COMMON["survey-response"].filters,
+ input_schema_overrides={
+ "blocks": {
+ "default": [
+ {
+ "text": {
+ "text": "*{person.name}* responded to survey *{event.properties.$survey_name}*",
+ "type": "mrkdwn",
},
- {
- "url": "{person.url}",
- "text": {"text": "View Person", "type": "plain_text"},
- "type": "button",
+ "type": "section",
+ },
+ {
+ "type": "actions",
+ "elements": [
+ {
+ "url": "{project.url}/surveys/{event.properties.$survey_id}",
+ "text": {"text": "View Survey", "type": "plain_text"},
+ "type": "button",
+ },
+ {
+ "url": "{person.url}",
+ "text": {"text": "View Person", "type": "plain_text"},
+ "type": "button",
+ },
+ ],
+ },
+ ],
+ },
+ "text": {
+ "default": "*{person.name}* responded to survey *{event.properties.$survey_name}*",
+ },
+ },
+ ),
+ HogFunctionSubTemplate(
+ id="activity-log",
+ name="Post to Slack on team activity",
+ description="",
+ filters=SUB_TEMPLATE_COMMON["activity-log"].filters,
+ type="internal_destination",
+ input_schema_overrides={
+ "blocks": {
+ "default": [
+ {
+ "text": {
+ "text": "*{person.properties.email}* {event.properties.activity} {event.properties.scope} {event.properties.item_id} ",
+ "type": "mrkdwn",
},
- ],
- },
- ],
+ "type": "section",
+ }
+ ],
+ },
+ "text": {
+ "default": "*{person.properties.email}* {event.properties.activity} {event.properties.scope} {event.properties.item_id}",
+ },
},
),
],
diff --git a/posthog/cdp/templates/webhook/template_webhook.py b/posthog/cdp/templates/webhook/template_webhook.py
index 49e350736de51..45789df2b9fac 100644
--- a/posthog/cdp/templates/webhook/template_webhook.py
+++ b/posthog/cdp/templates/webhook/template_webhook.py
@@ -92,14 +92,20 @@
],
sub_templates=[
HogFunctionSubTemplate(
- id="early_access_feature_enrollment",
+ id="early-access-feature-enrollment",
name="HTTP Webhook on feature enrollment",
- filters=SUB_TEMPLATE_COMMON["early_access_feature_enrollment"].filters,
+ filters=SUB_TEMPLATE_COMMON["early-access-feature-enrollment"].filters,
),
HogFunctionSubTemplate(
- id="survey_response",
+ id="survey-response",
name="HTTP Webhook on survey response",
- filters=SUB_TEMPLATE_COMMON["survey_response"].filters,
+ filters=SUB_TEMPLATE_COMMON["survey-response"].filters,
+ ),
+ HogFunctionSubTemplate(
+ id="activity-log",
+ name="HTTP Webhook on team activity",
+ filters=SUB_TEMPLATE_COMMON["activity-log"].filters,
+ type="internal_destination",
),
],
)
diff --git a/posthog/hogql_queries/experiments/experiment_funnels_query_runner.py b/posthog/hogql_queries/experiments/experiment_funnels_query_runner.py
index 363ee06bc78be..08c7e3dd91de5 100644
--- a/posthog/hogql_queries/experiments/experiment_funnels_query_runner.py
+++ b/posthog/hogql_queries/experiments/experiment_funnels_query_runner.py
@@ -31,6 +31,7 @@
from typing import Optional, Any, cast
from zoneinfo import ZoneInfo
from rest_framework.exceptions import ValidationError
+from datetime import datetime, timedelta, UTC
class ExperimentFunnelsQueryRunner(QueryRunner):
@@ -216,3 +217,14 @@ def _validate_event_variants(self, funnels_result: FunnelsQueryResponse):
def to_query(self) -> ast.SelectQuery:
raise ValueError(f"Cannot convert source query of type {self.query.funnels_query.kind} to query")
+
+ # Cache results for 24 hours
+ def cache_target_age(self, last_refresh: Optional[datetime], lazy: bool = False) -> Optional[datetime]:
+ if last_refresh is None:
+ return None
+ return last_refresh + timedelta(hours=24)
+
+ def _is_stale(self, last_refresh: Optional[datetime], lazy: bool = False) -> bool:
+ if not last_refresh:
+ return True
+ return (datetime.now(UTC) - last_refresh) > timedelta(hours=24)
diff --git a/posthog/hogql_queries/experiments/experiment_trends_query_runner.py b/posthog/hogql_queries/experiments/experiment_trends_query_runner.py
index 6cf76b4c5b4e7..b3c72c788c951 100644
--- a/posthog/hogql_queries/experiments/experiment_trends_query_runner.py
+++ b/posthog/hogql_queries/experiments/experiment_trends_query_runner.py
@@ -45,6 +45,7 @@
)
from typing import Any, Optional
import threading
+from datetime import datetime, timedelta, UTC
class ExperimentTrendsQueryRunner(QueryRunner):
@@ -430,3 +431,14 @@ def _is_data_warehouse_query(self, query: TrendsQuery) -> bool:
def to_query(self) -> ast.SelectQuery:
raise ValueError(f"Cannot convert source query of type {self.query.count_query.kind} to query")
+
+ # Cache results for 24 hours
+ def cache_target_age(self, last_refresh: Optional[datetime], lazy: bool = False) -> Optional[datetime]:
+ if last_refresh is None:
+ return None
+ return last_refresh + timedelta(hours=24)
+
+ def _is_stale(self, last_refresh: Optional[datetime], lazy: bool = False) -> bool:
+ if not last_refresh:
+ return True
+ return (datetime.now(UTC) - last_refresh) > timedelta(hours=24)
diff --git a/posthog/kafka_client/topics.py b/posthog/kafka_client/topics.py
index fa58d40c5fa36..3ed04cfc78d38 100644
--- a/posthog/kafka_client/topics.py
+++ b/posthog/kafka_client/topics.py
@@ -35,3 +35,5 @@
KAFKA_EXCEPTION_SYMBOLIFICATION_EVENTS = f"{KAFKA_PREFIX}exception_symbolification_events{SUFFIX}"
KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT = f"{KAFKA_PREFIX}clickhouse_error_tracking_issue_fingerprint{SUFFIX}"
+
+KAFKA_CDP_INTERNAL_EVENTS = f"{KAFKA_PREFIX}cdp_internal_events{SUFFIX}"
diff --git a/posthog/models/activity_logging/activity_log.py b/posthog/models/activity_logging/activity_log.py
index 7cf9595e64983..567ea9d6d7f85 100644
--- a/posthog/models/activity_logging/activity_log.py
+++ b/posthog/models/activity_logging/activity_log.py
@@ -5,6 +5,9 @@
from typing import Any, Literal, Optional, Union
from uuid import UUID
+from django.db.models.signals import post_save
+from django.dispatch.dispatcher import receiver
+from sentry_sdk import capture_exception
import structlog
from django.core.paginator import Paginator
from django.core.exceptions import ObjectDoesNotExist
@@ -498,3 +501,37 @@ def load_all_activity(scope_list: list[ActivityScope], team_id: int, limit: int
)
return get_activity_page(activity_query, limit, page)
+
+
+@receiver(post_save, sender=ActivityLog)
+def activity_log_created(sender, instance: "ActivityLog", created, **kwargs):
+ from posthog.cdp.internal_events import InternalEventEvent, InternalEventPerson, produce_internal_event
+ from posthog.api.activity_log import ActivityLogSerializer
+ from posthog.api.shared import UserBasicSerializer
+
+ try:
+ serialized_data = ActivityLogSerializer(instance).data
+ # TODO: Move this into the producer to support dataclasses
+ serialized_data["detail"] = dataclasses.asdict(serialized_data["detail"])
+ user_data = UserBasicSerializer(instance.user).data if instance.user else None
+
+ if created and instance.team_id is not None:
+ produce_internal_event(
+ team_id=instance.team_id,
+ event=InternalEventEvent(
+ event="$activity_log_entry_created",
+ distinct_id=user_data["distinct_id"] if user_data else f"team_{instance.team_id}",
+ properties=serialized_data,
+ ),
+ person=InternalEventPerson(
+ id=user_data["id"],
+ properties=user_data,
+ )
+ if user_data
+ else None,
+ )
+ except Exception as e:
+ # We don't want to hard fail here.
+ logger.exception("Failed to produce internal event", data=serialized_data, error=e)
+ capture_exception(e)
+ return
diff --git a/posthog/models/hog_functions/hog_function.py b/posthog/models/hog_functions/hog_function.py
index 8328973bc0a2e..a715f10b86b7b 100644
--- a/posthog/models/hog_functions/hog_function.py
+++ b/posthog/models/hog_functions/hog_function.py
@@ -36,6 +36,7 @@ class HogFunctionState(enum.Enum):
class HogFunctionType(models.TextChoices):
DESTINATION = "destination"
SITE_DESTINATION = "site_destination"
+ INTERNAL_DESTINATION = "internal_destination"
SITE_APP = "site_app"
TRANSFORMATION = "transformation"
EMAIL = "email"
@@ -46,8 +47,13 @@ class HogFunctionType(models.TextChoices):
BROADCAST = "broadcast"
-TYPES_THAT_RELOAD_PLUGIN_SERVER = (HogFunctionType.DESTINATION, HogFunctionType.EMAIL, HogFunctionType.TRANSFORMATION)
-TYPES_WITH_COMPILED_FILTERS = (HogFunctionType.DESTINATION,)
+TYPES_THAT_RELOAD_PLUGIN_SERVER = (
+ HogFunctionType.DESTINATION,
+ HogFunctionType.EMAIL,
+ HogFunctionType.TRANSFORMATION,
+ HogFunctionType.INTERNAL_DESTINATION,
+)
+TYPES_WITH_COMPILED_FILTERS = (HogFunctionType.DESTINATION, HogFunctionType.INTERNAL_DESTINATION)
TYPES_WITH_TRANSPILED_FILTERS = (HogFunctionType.SITE_DESTINATION, HogFunctionType.SITE_APP)
TYPES_WITH_JAVASCRIPT_SOURCE = (HogFunctionType.SITE_DESTINATION, HogFunctionType.SITE_APP)
@@ -88,9 +94,9 @@ class Meta:
@property
def template(self) -> Optional[HogFunctionTemplate]:
- from posthog.cdp.templates import HOG_FUNCTION_TEMPLATES_BY_ID
+ from posthog.cdp.templates import ALL_HOG_FUNCTION_TEMPLATES_BY_ID
- return HOG_FUNCTION_TEMPLATES_BY_ID.get(self.template_id, None)
+ return ALL_HOG_FUNCTION_TEMPLATES_BY_ID.get(self.template_id, None)
@property
def filter_action_ids(self) -> list[int]: