Skip to content

Latest commit

 

History

History
508 lines (432 loc) · 23.2 KB

DEVELOPER_GUIDE.md

File metadata and controls

508 lines (432 loc) · 23.2 KB

Developer Guide

The docker setup comes with a full stack of Kafka tools and utilities including Kafka Connect

  • Kafka broker
  • Zookeeper
  • Kafka Rest proxy
  • Kafka Topics UI
  • Kafka Connect, with the AEP Sink Connector installed.

Once the docker is running, you should be able to test the entire setup using a rest api to insert the message into your local docker kafka topic.

Build Docker locally and Run

./gradlew clean build
docker build -t streaming-connect .
docker compose up -d

Configuration Options

The AEP connector is a uber jar containing all the classfiles and its third-party dependencies. To install the connector, drop the jar file into the plug in directory of Kafka connect installation.

AEP Sink connector configurations can be supplied in the call register the connector.

Config Name Description Default Required Example
topics comma separated list of topics yes
connector.class classname of impl com.adobe.platform.streaming.sink.impl.AEPSinkConnector yes
aep.endpoint aep streaming endpoint url yes
aep.connection.proxy.host address of the proxy host to connect through no
aep.connection.proxy.port port of the proxy host to connect through 443 no
aep.connection.proxy.user username for the proxy host no
aep.connection.proxy.password password for the proxy host no
aep.connection.auth.enabled required for authenticated streaming endpoint false no
aep.connection.auth.token.type always set to access_token access_token no
aep.connection.auth.client.id IMS client id no
aep.connection.auth.client.code IMS client code no
aep.connection.auth.client.secret IME client secret no
aep.flush.bytes.kb bytes threshold to determine the batch 4 no
aep.connection.maxRetries maxNumber of retries in case of failure from aep streaming endpoint 3 no
aep.connection.retryBackoff retryBackoff in millis while retrying aep streaming endpoint 300 no

Step-by-Step Workflow

Build

./gradlew clean build

Build docker

docker build -t streaming-connect .

Running Docker

docker compose up -d

Tail Docker logs

docker logs experience-platform-streaming-connect_kafka-connect_1 -f

Manage running connectors

Kafka Connect exposes a set of REST APIs to manage connect instances.

List of running connectors

curl -X GET http://localhost:8083/connectors

Create a Streaming Connection

In order to send streaming data, you must first request a Streaming Connection from Adobe by providing some essential properties. Data Inlet Registration APIs are behind adobe.io gateway, so the first step in requesting a new endpoint, is to either use your existing authentication token and API key combination, or to create a new integration through Adobe I/O console. More information about adobe.io based authentication is available here.

Once you have an IMS access token and API key, it needs to be provided as part of the POST request.

Note that the sandbox-name is optional, if not provided will default to the Production sandbox.

curl -X POST https://platform.adobe.io/data/foundation/flowservice/connections \
 -H 'Authorization: Bearer {ACCESS_TOKEN}' \
 -H 'Content-Type: application/json' \
 -H 'x-gw-ims-org-id: {IMS_ORG}' \
 -H 'x-api-key: {API_KEY}' \
 -H 'x-sandbox-name: {SANDBOX_NAME}' \
 -d '{
     "name": "Sample Streaming Connection",
     "providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
     "description": "Sample description",
     "connectionSpec": {
         "id": "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
         "version": "1.0"
     },
     "auth": {
         "specName": "Streaming Connection",
         "params": {
             "sourceId": "Sample connection source",
             "dataType": "xdm",
             "name": "Sample connection"
         }
     }
 }

If the request was successful a new Streaming Connection should be created for you. The response will looking similar to the one below. The id field in the response is the Connection Id.

{
    "id": "77a05521-91d6-451c-a055-2191d6851c34",
    "etag": "\"a500e689-0000-0200-0000-5e31df730000\""
}

With the connection created, you can now retrieve your data collection URL from the connection information.

curl -X GET https://platform.adobe.io/data/foundation/flowservice/connections/{CONNECTION_ID} \
 -H 'Authorization: Bearer {ACCESS_TOKEN}' \
 -H 'x-gw-ims-org-id: {IMS_ORG}' \
 -H 'x-api-key: {API_KEY}' \
 -H 'x-sandbox-name: {SANDBOX_NAME}'
{
    "items": [
        {
            "createdAt": 1583971856947,
            "updatedAt": 1583971856947,
            "createdBy": "{API_KEY}",
            "updatedBy": "{API_KEY}",
            "createdClient": "{USER_ID}",
            "updatedClient": "{USER_ID}",
            "id": "77a05521-91d6-451c-a055-2191d6851c34",
            "name": "Another new sample connection (Experience Event)",
            "description": "Sample description",
            "connectionSpec": {
                "id": "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
                "version": "1.0"
            },
            "state": "enabled",
            "auth": {
                "specName": "Streaming Connection",
                "params": {
                    "sourceId": "Sample connection (ExperienceEvent)",
                    "inletUrl": "https://dcs.adobedc.net/collection/a868e1ce678a911ef1482b083329af3cafa4bafdc781285f25911eaae9e00eb2",
                    "inletId": "a868e1ce678a911ef1482b083329af3cafa4bafdc781285f25911eaae9e00eb2",
                    "dataType": "xdm",
                    "name": "Sample connection (ExperienceEvent)"
                }
            },
            "version": "\"56008aee-0000-0200-0000-5e697e150000\"",
            "etag": "\"56008aee-0000-0200-0000-5e697e150000\""
        }
    ]
}

The inletUrl in the response above is the AEP Streaming Connection to which the real time events will be getting sinked to.

Run AEP Streaming Connector

Once the Connect server is running on port 8083, you can use REST APIs to launch multiple instances of connectors.

Basic

curl -s -X POST \
-H "Content-Type: application/json" \
--data '{
  "name": "aep-sink-connector",
  "config": {
    "connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
    "topics": "connect-test",
    "tasks.max": 1,
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "aep.endpoint": "https://dcs.adobedc.net/collection/{DATA_INLET_ID}"
    "aep.flush.interval.seconds": 1,
    "aep.flush.bytes.kb": 4,
  }
}' http://localhost:8083/connectors

Authentication Enabled

Use the command below to set up a Sink connector to a Authenticated Streaming Connection:

  1. Using oauth2_access_token

    • Create http connector

      curl -s -X POST \
      -H "Content-Type: application/json" \
      --data '{
        "name": "aep-auth-sink-connector",
        "config": {
          "connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
          "topics": "connect-test",
          "tasks.max": 1,
          "key.converter": "org.apache.kafka.connect.json.JsonConverter",
          "key.converter.schemas.enable": "false",
          "value.converter": "org.apache.kafka.connect.json.JsonConverter",
          "value.converter.schemas.enable": "false",
          "aep.endpoint": "https://dcs.adobedc.net/collection/{DATA_INLET_ID}",
          "aep.flush.interval.seconds": 1,
          "aep.flush.bytes.kb": 4,
          "aep.connection.auth.enabled": true,
          "aep.connection.auth.token.type": "oauth2_access_token",
          "aep.connection.auth.client.id": "<client_id>",
          "aep.connection.auth.client.secret": "<client_secret>"
          "aep.connection.auth.endpoint": "<ims-url>",
          "aep.connection.endpoint.headers": "<optional-header-that-needs-to-be-passed-to-AEP>"
        }
      }' http://localhost:8083/connectors

      Note -

      1. aep.connection.endpoint.headers format should be JSON-encoded. Example: To send below 2 HTTP headers -

        1. key: x-adobe-flow-id, value: 341fd4f0-cdec-4912-1ab6-fb54aeb41286
        2. key: x-adobe-dataset-id, value: 3096fbfd5978431948af3ba3

        Use config -

        "aep.connection.endpoint.headers": "{\"x-adobe-flow-id\":\"341fd4f0-cdec-4912-1ab6-fb54aeb41286\", \"x-adobe-dataset-id\": \"3096fbfd5978431948af3ba3\"}"
      2. aep.connection.auth.endpoint value for prod <ims-url> : https://ims-na1.adobelogin.com

  2. Using jwt_token [DEPRECATED]

    • Convert private.key from adobe console to PKCS8 private using command

      openssl pkcs8 -topk8 -inform PEM -outform DER -in private.key -out private-pkcs8.key -nocrypt
    • Create http connector

      curl -s -X POST \
      -H "Content-Type: application/json" \
      --data '{
        "name": "aep-auth-sink-connector",
        "config": {
          "connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
          "topics": "connect-test",
          "tasks.max": 1,
          "key.converter": "org.apache.kafka.connect.json.JsonConverter",
          "key.converter.schemas.enable": "false",
          "value.converter": "org.apache.kafka.connect.json.JsonConverter",
          "value.converter.schemas.enable": "false",
          "aep.endpoint": "https://dcs.adobedc.net/collection/{DATA_INLET_ID}",
          "aep.flush.interval.seconds": 1,
          "aep.flush.bytes.kb": 4,
          "aep.connection.auth.enabled": true,
          "aep.connection.auth.token.type": "jwt_token",
          "aep.connection.auth.client.id": "<client_id>",
          "aep.connection.auth.imsOrg": "<organization-id>",
          "aep.connection.auth.accountKey": "<technical-account-id>",
          "aep.connection.auth.filePath": "<path-to-private-pkcs8.key>",
          "aep.connection.auth.endpoint": "<ims-url>",
          "aep.connection.endpoint.headers": "<optional-header-that-needs-to-be-passed-to-AEP>"
          "aep.connection.auth.client.secret": "<client_secret>"
        }
      }' http://localhost:8083/connectors

      Note -

      1. aep.connection.endpoint.headers format should be JSON-encoded. Example: To send below 2 HTTP headers -

        1. key: x-adobe-flow-id, value: 341fd4f0-cdec-4912-1ab6-fb54aeb41286
        2. key: x-adobe-dataset-id, value: 3096fbfd5978431948af3ba3

        Use config -

        "aep.connection.endpoint.headers": "{\"x-adobe-flow-id\":\"341fd4f0-cdec-4912-1ab6-fb54aeb41286\", \"x-adobe-dataset-id\": \"3096fbfd5978431948af3ba3\"}"
      2. aep.connection.auth.endpoint value for prod <ims-url> : https://ims-na1.adobelogin.com

Note : jwt_token authentication is deprecated

  1. Using access_token

    • Create http connector

      curl -s -X POST \
      -H "Content-Type: application/json" \
      --data '{
        "name": "aep-auth-sink-connector",
        "config": {
          "connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
          "topics": "connect-test",
          "tasks.max": 1,
          "key.converter": "org.apache.kafka.connect.json.JsonConverter",
          "key.converter.schemas.enable": "false",
          "value.converter": "org.apache.kafka.connect.json.JsonConverter",
          "value.converter.schemas.enable": "false",
          "aep.endpoint": "https://dcs.adobedc.net/collection/{DATA_INLET_ID}",
          "aep.flush.interval.seconds": 1,
          "aep.flush.bytes.kb": 4,       
          "aep.connection.auth.enabled": true,
          "aep.connection.auth.token.type": "access_token",
          "aep.connection.auth.endpoint": "<ims-url>",
          "aep.connection.endpoint.headers": "<optional-header-that-needs-to-be-passed-to-AEP>"
          "aep.connection.auth.client.id": "<client_id>",
          "aep.connection.auth.client.code": "<client_code>",
          "aep.connection.auth.client.secret": "<client_secret>"
        }
      }' http://localhost:8083/connectors

      Note -

      1. aep.connection.endpoint.headers format should be JSON-encoded. Example: To send below 2 HTTP headers -

        1. key: x-adobe-flow-id, value: 341fd4f0-cdec-4912-1ab6-fb54aeb41286
        2. key: x-adobe-dataset-id, value: 3096fbfd5978431948af3ba3

        Use config -

        "aep.connection.endpoint.headers": "{\"x-adobe-flow-id\":\"341fd4f0-cdec-4912-1ab6-fb54aeb41286\", \"x-adobe-dataset-id\": \"3096fbfd5978431948af3ba3\"}"
      2. aep.connection.auth.endpoint value for prod <ims-url> : https://ims-na1.adobelogin.com

Batching

Use the command below to set up an Sink connector to batch up requests and reduce the number of network calls

curl -s -X POST \
-H "Content-Type: application/json" \
--data '{
  "name": "aep-batch-sink-connector",
  "config": {
    "connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
    "topics": "connect-test",
    "tasks.max": 1,
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "aep.endpoint": "https://dcs.adobedc.net/collection/{DATA_INLET_ID}",
    "aep.flush.interval.seconds": 1,
    "aep.flush.bytes.kb": 20,
  }
}' http://localhost:8083/connectors

Error Handling And Logging

Config Description
"errors.tolerance": "none" Default Behaviour.Connector will stop working on error
"errors.tolerance": "all","errors.log.enable": "false" Connector will continue on error and error message will not be logged
"errors.tolerance": "all","errors.log.enable": "true","errors.log.include.message": "false" Connector will continue on error and error occurrence will be logged and failed message will not be logged
"errors.tolerance": "all","errors.log.enable": "true","errors.log.include.message": "true" Connector will continue on error and error occurrence will be logged and failed message will be logged
"errors.tolerance": "all","errors.log.enable": "true or false","errors.deadletterqueue.topic.name": "topic-name" Connector will continue on error and error occurrence will be logged as per the option and failed message will be send to dead letter topic
"errors.tolerance": "all","errors.log.enable": "true or false","errors.deadletterqueue.topic.name": "topic-name","errors.deadletterqueue.context.headers.enable": "true" Connector will continue on error and error occurrence will be logged as per the option and failed message will be send to dead letter topic and failure reason will be logged in message header.
Please follow documentation for more information : https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues

Note : In case of Authentication error such as 401 and 403 connector will stop working irrespective of what error tolerance value we have set.

Dead Letter Configuration

To send error records to dead letter topic please use standard kafka connector error configuration.

Kafka connect dead letter configurations : https://docs.confluent.io/platform/current/connect/concepts.html#dead-letter-queue

Avro data ingestion

Ingest avro data using confluent schema registry and avro converter. Steps to ingest avro data :

  1. Install confluent avro converter by following document - https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter/
  2. Upload schema registry rest endpoint to add or update avro schema. Please follow documentation to add schema - https://docs.confluent.io/platform/7.5/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration:~:text=the%20command%20below.-,curl%20%2DX%20POST%20%2DH,-%22Content%2DType%3A%20application
  3. Create connector with following configuration :
    curl -s -X POST \
    -H 'Content-Type: application/json' \
    --data '{
    "name": "aep-sink-connector",
    "config": {
        "topics": "connect-test",
        "tasks.max": 1,
        "connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "{SCHEMA_REGISTRY_URL}",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "{SCHEMA_REGISTRY_URL}",
        "aep.endpoint": "https://dcs.adobedc.net/collection/{DATA_INLET_ID}",
        "aep.flush.interval.seconds": 1,
        "aep.flush.bytes.kb": 20
    }
    }' http://localhost:8083/connectors
    Replace {SCHEMA_REGISTRY_URL} with schema registry rest endpoint.
  4. Once connector is up, post avro data to topic connect-test to send data to AEP.

Poxy host configuration

There are 2 ways to route request to aep endpoint through proxy server :

  1. Using Environment Variable : Export poxyHost and proxyPort on each kafka node, then restart kafka connect node.

    For HTTPS use following :

    export KAFKA_OPTS="-Dhttps.proxyHost=127.0.0.1 -Dhttps.proxyPort=8085 -Dhttps.proxyUser=proxyUsername -Dhttps.proxyPassword=proxyPassword"

    For HTTP use following:

    export KAFKA_OPTS="-Dhttp.proxyHost=127.0.0.1 -Dhttp.proxyPort=8085 -Dhttp.proxyUser=proxyUsername -Dhttp.proxyPassword=proxyPassword"
  2. Using Connector Properties : While creating connector set following properties, default values mentioned in connect configurations.

    aep.connection.proxy.host                                                          
    aep.connection.proxy.port
    aep.connection.proxy.user
    aep.connection.proxy.password
    

For reference, more details are in oracle documentation on configuring proxy settings in java : https://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html

Use the Kafka Topics UI to view your topics

The docker setup comes with Topics UI to view the topic and messages within. Open a browser and go to http://localhost:8000 and view the connect-test topic

Topics UI

In order to test the flow, you can use the following curl command to post a message into the Kafka topic using the Kafka rest proxy. Please ensure that the curl command uses your inlet endpoint, and the schema of the XDM message corresponding to your setup.

curl -X POST \
  http://localhost:8082/topics/connect-test \
  -H 'Content-Type: application/vnd.kafka.json.v2+json' \
  -H 'Host: localhost:8082' \
  -d '{
  "records": [{
    "value": {
      "header": {
        "schemaRef": {
          "id": "<schema-id>",
          "contentType": "application/vnd.adobe.xed-full+json;version=1"
        },
        "msgId": "1553542044760:1153:5",
        "source": {
          "name": "POCEvent1122ew2"
        },
        "msgVersion": "1.0",
        "imsOrgId": "0DD379AC5B117F6E0A494106@AdobeOrg"
      },
      "body": {
        "xdmMeta": {
          "schemaRef": {
            "id": "<schema-id>",
            "contentType": "application/vnd.adobe.xed-full+json;version=1"
          }
        },
        "xdmEntity": {
          "identityMap": {
            "email": [{
              "id": "[email protected]"
            }]
          },
          "_id": "1553542044071",
          "timestamp": "2019-03-25T19:27:24Z",
          "_msft_cds_acp": {
            "productListItems": {
              "priceTotal": 10,
              "name": "prod1",
              "_id": "1212121",
              "SKU": "13455"
            }
          }
        }
      }
    }
  }]
}'

You will be able to see the message written to the "connect-test" topic in the Local Kafka cluster, which is picked up by the AEP Streaming Sink Connector and sent the AEP Streaming inlet.