Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cloudevents] Switch to Debezium 2.0 #292

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cloudevents/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This demo automatically deploys the topology of services as defined in the [Debe
## Preparations

```shell
export DEBEZIUM_VERSION=1.9
export DEBEZIUM_VERSION=2.0
mvn clean install -f avro-data-extractor/pom.xml
docker-compose up --build
```
Expand Down Expand Up @@ -51,7 +51,7 @@ Examine its contents like so:
docker run --rm --tty \
--network cloudevents-network \
quay.io/debezium/tooling:1.2 \
kafkacat -b kafka:9092 -C -o beginning -q -s value=avro -r http://schema-registry:8081 \
kafkacat -b kafka:9092 -C -o beginning -q \
-t customers2 | jq .
```

Expand All @@ -65,7 +65,7 @@ curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json"
docker run --rm --tty \
--network cloudevents-network \
quay.io/debezium/tooling:1.2 \
kafkacat -b kafka:9092 -C -o beginning -q -s value=avro -r http://schema-registry:8081 \
kafkacat -b kafka:9092 -C -o beginning -q \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vjuranek Should not be README.md descriptions also updated because I suppose they are not aligned with the new kafkacat output?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not aligned, but no idea what I should use a replacement. Removing schema option make the command to succeed (which schema it fails), output is not nice, but still good enough for user to verify data was written there. Output look like this:

$ docker run --rm --tty   --network cloudevents-network   quay.io/debezium/tooling:1.2   kafkacat -b kafka:9092 -C -o beginning -q    -t customers3
�
Sally
     Thomas*[email protected]
�
 George
       [email protected]
�
 Edward
       Walker▒[email protected]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vjuranek Are the schemas different? If yes can we replace the samples by quering the schemas from apicurio registry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpechane I'm not sure I follow. Which schemas? There is only one schema for all events. Mayeb there is misunderstanding as I forgot to remove schema from kafkacat -b kafka:9092 -C -o beginning -q -s value=avro -r http://schema-registry:8080 in previous example. This is a mistake - it doesn't work either. Will remove it soon.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vjuranek ok, maybe it would make sense to rebuild the example a bit more. Apicurio provides the JSON converter that does not embed schema in message but stores it in registry in the same way as Avro does. How about changing the exmaple to use it instead of Avro? In that case it would be nicely presentable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpechane unfortunately this is not possible right now, as CloudEventsConverter works with schema registry only when avro is used and there are also other issues. I spent quite some time with it today with no reasonable results, so I'll try to prepare some reproducer and will raise an issue with Apicurio.

For the record, I found out when one wants to use confluent based tools, apis/ccompat/v6 should be used, i.e. the command should be something like this:

docker run --rm --tty   --network cloudevents-network   quay.io/debezium/tooling:1.2   kafkacat -b kafka:9092 -C -o beginning -s value=avro -r http://schema-registry:8080/apis/ccompat/v6 -t dbserver3.inventory.customers

With this, it still fails, but give at least somehow reasonable error:

% ERROR: Failed to format message in dbserver3.inventory.customers [0] at offset 0: Avro/Schema-registry message deserialization: REST request failed (code 404): {"message":"No content with id/hash 'contentId-0' was found.","error_code":40403}: terminating

So far no idea why kafkcat requests schema with ID 0 (which is not present, but starting ID 1 it is present), where it comes from and how to fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's wait with this PR when the issue is resolved. BTW, I'd prefer not use ccompat API just to have different tooling used.WRT the schema there is apicurio.registry.id-handler that should be set to the io.apicurio.registry.serde.Legacy4ByteIdHandler

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using apicurio.registry.id-handler doesn't seem to help, it just given me another exception. I also found out the issue mention above is already reported as apicurio-registry #2878

-t dbserver3.inventory.customers | jq .
```

Expand All @@ -76,8 +76,8 @@ The same stream processing application writes out that data to the `customers3`
docker run --rm --tty \
--network cloudevents-network \
quay.io/debezium/tooling:1.2 \
kafkacat -b kafka:9092 -C -o beginning -q -s value=avro -r http://schema-registry:8081 \
-t customers2 | jq .
kafkacat -b kafka:9092 -C -o beginning -q \
-t customers3
```

## CloudEvents Binary Mode
Expand Down
5 changes: 0 additions & 5 deletions cloudevents/avro-data-extractor/Dockerfile

This file was deleted.

43 changes: 37 additions & 6 deletions cloudevents/avro-data-extractor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
<packaging>jar</packaging>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<failOnMissingWebXml>false</failOnMissingWebXml>
Expand All @@ -21,8 +21,9 @@
<version.surefire>3.0.0-M6</version.surefire>

<apache.kafka.version>3.2.0</apache.kafka.version>
<version.quarkus>2.7.2.Final</version.quarkus>
<version.debezium>1.9.5.Final</version.debezium>
<version.quarkus>2.13.1.Final</version.quarkus>
<version.debezium>2.0.0.Final</version.debezium>
<version.kafka.avro>7.2.0</version.kafka.avro>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -76,7 +77,7 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-undertow-websockets</artifactId>
<artifactId>quarkus-websockets</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand All @@ -97,7 +98,37 @@
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>5.3.2</version>
<version>7.2.1</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-utils-serde</artifactId>
<version>1.3.2.Final</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
</dependency>
<!-- Confluent registry libraries use JAX-RS client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-reactive</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${version.kafka.avro}</version>
<exclusions>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${version.kafka.avro}</version>
</dependency>
</dependencies>
<profiles>
Expand Down
94 changes: 94 additions & 0 deletions cloudevents/avro-data-extractor/src/main/docker/Dockerfile.jvm
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
####
# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode
#
# Before building the container image run:
#
# ./mvnw package
#
# Then, build the image with:
#
# docker build -f src/main/docker/Dockerfile.jvm -t quarkus/my-artifactId-jvm .
#
# Then run the container using:
#
# docker run -i --rm -p 8080:8080 quarkus/my-artifactId-jvm
#
# If you want to include the debug port into your docker image
# you will have to expose the debug port (default 5005) like this : EXPOSE 8080 5005
#
# Then run the container using :
#
# docker run -i --rm -p 8080:8080 quarkus/my-artifactId-jvm
#
# This image uses the `run-java.sh` script to run the application.
# This scripts computes the command line to execute your Java application, and
# includes memory/GC tuning.
# You can configure the behavior using the following environment properties:
# - JAVA_OPTS: JVM options passed to the `java` command (example: "-verbose:class")
# - JAVA_OPTS_APPEND: User specified Java options to be appended to generated options
# in JAVA_OPTS (example: "-Dsome.property=foo")
# - JAVA_MAX_MEM_RATIO: Is used when no `-Xmx` option is given in JAVA_OPTS. This is
# used to calculate a default maximal heap memory based on a containers restriction.
# If used in a container without any memory constraints for the container then this
# option has no effect. If there is a memory constraint then `-Xmx` is set to a ratio
# of the container available memory as set here. The default is `50` which means 50%
# of the available memory is used as an upper boundary. You can skip this mechanism by
# setting this value to `0` in which case no `-Xmx` option is added.
# - JAVA_INITIAL_MEM_RATIO: Is used when no `-Xms` option is given in JAVA_OPTS. This
# is used to calculate a default initial heap memory based on the maximum heap memory.
# If used in a container without any memory constraints for the container then this
# option has no effect. If there is a memory constraint then `-Xms` is set to a ratio
# of the `-Xmx` memory as set here. The default is `25` which means 25% of the `-Xmx`
# is used as the initial heap size. You can skip this mechanism by setting this value
# to `0` in which case no `-Xms` option is added (example: "25")
# - JAVA_MAX_INITIAL_MEM: Is used when no `-Xms` option is given in JAVA_OPTS.
# This is used to calculate the maximum value of the initial heap memory. If used in
# a container without any memory constraints for the container then this option has
# no effect. If there is a memory constraint then `-Xms` is limited to the value set
# here. The default is 4096MB which means the calculated value of `-Xms` never will
# be greater than 4096MB. The value of this variable is expressed in MB (example: "4096")
# - JAVA_DIAGNOSTICS: Set this to get some diagnostics information to standard output
# when things are happening. This option, if set to true, will set
# `-XX:+UnlockDiagnosticVMOptions`. Disabled by default (example: "true").
# - JAVA_DEBUG: If set remote debugging will be switched on. Disabled by default (example:
# true").
# - JAVA_DEBUG_PORT: Port used for remote debugging. Defaults to 5005 (example: "8787").
# - CONTAINER_CORE_LIMIT: A calculated core limit as described in
# https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt. (example: "2")
# - CONTAINER_MAX_MEMORY: Memory limit given to the container (example: "1024").
# - GC_MIN_HEAP_FREE_RATIO: Minimum percentage of heap free after GC to avoid expansion.
# (example: "20")
# - GC_MAX_HEAP_FREE_RATIO: Maximum percentage of heap free after GC to avoid shrinking.
# (example: "40")
# - GC_TIME_RATIO: Specifies the ratio of the time spent outside the garbage collection.
# (example: "4")
# - GC_ADAPTIVE_SIZE_POLICY_WEIGHT: The weighting given to the current GC time versus
# previous GC times. (example: "90")
# - GC_METASPACE_SIZE: The initial metaspace size. (example: "20")
# - GC_MAX_METASPACE_SIZE: The maximum metaspace size. (example: "100")
# - GC_CONTAINER_OPTIONS: Specify Java GC to use. The value of this variable should
# contain the necessary JRE command-line options to specify the required GC, which
# will override the default of `-XX:+UseParallelGC` (example: -XX:+UseG1GC).
# - HTTPS_PROXY: The location of the https proxy. (example: "[email protected]:8080")
# - HTTP_PROXY: The location of the http proxy. (example: "[email protected]:8080")
# - NO_PROXY: A comma separated lists of hosts, IP addresses or domains that can be
# accessed directly. (example: "foo.example.com,bar.example.com")
#
###
FROM registry.access.redhat.com/ubi8/openjdk-11:1.11

ENV LANGUAGE='en_US:en'


# We make four distinct layers so if there are application changes the library layers can be re-used
COPY --chown=185 target/quarkus-app/lib/ /deployments/lib/
COPY --chown=185 target/quarkus-app/*.jar /deployments/
COPY --chown=185 target/quarkus-app/app/ /deployments/app/
COPY --chown=185 target/quarkus-app/quarkus/ /deployments/quarkus/

EXPOSE 8079
USER 185
ENV AB_JOLOKIA_OFF=""
ENV JAVA_OPTS="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager"
ENV JAVA_APP_JAR="/deployments/quarkus-run.jar"

Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
Expand All @@ -24,10 +24,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import io.apicurio.registry.client.CompatibleClient;
import io.apicurio.registry.client.RegistryService;
import io.apicurio.registry.utils.serde.AvroKafkaDeserializer;
import io.apicurio.registry.utils.serde.AvroKafkaSerializer;

import io.debezium.examples.cloudevents.dataextractor.model.CloudEvent;
import io.debezium.serde.DebeziumSerdes;


/**
* Starts up the KStreams pipeline once the source topics have been created.
*
Expand Down Expand Up @@ -73,9 +78,10 @@ Topology createStreamTopology() {
.mapValues(ce -> ce.data)
.to(jsonAvroExtractedTopic, Produced.with(longKeySerde, Serdes.ByteArray()));

Serde<GenericRecord> genericAvroSerde = new GenericAvroSerde();
Map<String, String> config = Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
genericAvroSerde.configure(config, false);

RegistryService service = CompatibleClient.createCompatible(schemaRegistryUrl);
Deserializer<GenericRecord> deserializer = new AvroKafkaDeserializer(service);
Serde<GenericRecord> genericAvroSerde = Serdes.serdeFrom(new AvroKafkaSerializer<>(service), deserializer);

builder.stream(avroAvroCustomersTopic, Consumed.with(longKeySerde, genericAvroSerde))
.mapValues(ce -> ((ByteBuffer) ce.get("data")).array())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ public class CloudEvent {
public String iodebeziumconnector;
public String iodebeziumname;
public String iodebeziumtsms;
public boolean iodebeziumsnapshot;
public String iodebeziumsnapshot;
public String iodebeziumdb;
public String iodebeziumsequence;
public String iodebeziumschema;
public String iodebeziumtable;
public String iodebeziumtxId;
public String iodebeziumtxid;
public String iodebeziumlsn;
public String iodebeziumxmin;
public String iodebeziumtxtotalorder;
public String iodebeziumtxdatacollectionorder;
public byte[] data;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ json.avro.extracted.topic=customers2
avro.avro.customers.topic=dbserver3.inventory.customers
avro.avro.extracted.topic=customers3

schema.registry.url=http://schema-registry:8081
schema.registry.url=http://schema-registry:8080/api/

quarkus.kafka-streams.bootstrap-servers=localhost:9092
quarkus.kafka-streams.bootstrap-servers=kafka:9092
quarkus.kafka-streams.application-id=cloudevents-data-extractor
quarkus.kafka-streams.topics=${json.avro.customers.topic},${avro.avro.customers.topic}

Expand Down
26 changes: 4 additions & 22 deletions cloudevents/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,38 +44,20 @@ services:
- STATUS_STORAGE_TOPIC=my_connect_statuses
- KAFKA_DEBUG=true
- DEBUG_SUSPEND_FLAG=n
- ENABLE_APICURIO_CONVERTERS=true
networks:
- my-network
schema-registry:
image: confluentinc/cp-schema-registry:7.0.1
image: apicurio/apicurio-registry-mem:2.3.1.Final
ports:
- 8181:8181
- 8081:8081
environment:
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
- SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS=GET,POST,PUT,OPTIONS
- SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN=*
links:
- zookeeper
networks:
- my-network
schema-registry-ui:
image: landoop/schema-registry-ui
ports:
- 8000:8000
environment:
- SCHEMAREGISTRY_URL=http://schema-registry:8081
- PROXY=true
links:
- schema-registry
- 8080:8080
networks:
- my-network
avro-extractor:
image: debezium-examples/cloudevents-avro-extractor:${DEBEZIUM_VERSION}
build:
context: avro-data-extractor
dockerfile: src/main/docker/Dockerfile.jvm
ports:
- 8079:8079
links:
Expand Down
6 changes: 4 additions & 2 deletions cloudevents/register-postgres-avro-avro.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver3",
"topic.prefix": "dbserver3",
"slot.name":"dbserver3",
"schema.include.list": "inventory",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "avro",
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://schema-registry:8081"
"value.converter.avro.apicurio.registry.url": "http://schema-registry:8080/apis/registry/v2",
"value.converter.avro.apicurio.registry.auto-register": "true",
"value.converter.avro.apicurio.registry.find-latest": "true"
}
6 changes: 4 additions & 2 deletions cloudevents/register-postgres-json-avro.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver2",
"topic.prefix": "dbserver2",
"slot.name":"dbserver2",
"schema.include.list": "inventory",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://schema-registry:8081"
"value.converter.avro.apicurio.registry.url": "http://schema-registry:8080/apis/registry/v2",
"value.converter.avro.apicurio.registry.auto-register": "true",
"value.converter.avro.apicurio.registry.find-latest": "true"
}
2 changes: 1 addition & 1 deletion cloudevents/register-postgres-json-json.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"topic.prefix": "dbserver1",
"slot.name":"dbserver1",
"schema.include.list": "inventory",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand Down