Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefetching topic metadata on newly created topic interferes with consumer (?) #194

Open
apeloquin-agilysys opened this issue Dec 4, 2024 · 4 comments

Comments

@apeloquin-agilysys
Copy link

Environment Information

  • OS: Mac M3 Sonoma 14.6.1
  • Node Version: 20.14.0
  • NPM Version: 10.7.0
  • confluent-kafka-javascript version: 0.5.2

Steps to Reproduce

While incorporating use of the producer dependentAdmin() and prefetching topic metadata we ran into an issue where a simple produce/consume test was consistently failing in our build pipeline but was not reproducible locally on our dev machines.

Our build pipeline always starts with a fresh Kafka docker instance when running integration tests and this was the key differentiation that allowed us to narrow down the issue.

The test below uses a topic name with a timestamp component to ensure that each run is for a new topic, as this issue does not replicate with multiple runs using the same topic.

There are three tests:

  1. The first does a prefetch prior to sending to the topic -- and fails.
  2. The second does the same thing but skips the prefetch -- and succeeds.
  3. The third is the same as the first test -- but now succeeds.
import {KafkaJS as Confluent, RdKafka} from "@confluentinc/kafka-javascript";
import {expect} from "chai";

const TOPIC = `test-confluent-topic-${Date.now()}`;
const GROUP_ID = `test-confluent-group-${Date.now()}`;

describe("fetchMetadata", () => {
  let kafka: Confluent.Kafka;
  let admin: Confluent.Admin;
  let consumer: Confluent.Consumer;
  let producer: Confluent.Producer;
  let producerAdmin: Confluent.Admin;

  before(async () => {
    kafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});
    admin = kafka.admin();
    await admin.connect();
  });

  beforeEach(async () => {
    await admin.createTopics({topics: [{topic: TOPIC}]});
    producer = kafka.producer();
    await producer.connect();
    producerAdmin = producer.dependentAdmin();
    await producerAdmin.connect();
  });

  afterEach(async () => {
    await producerAdmin.disconnect();
    await producer.disconnect();
    await consumer?.disconnect();
  });

  after(async () => {
    await admin.disconnect();
    await producerAdmin.disconnect();
    await producer.disconnect();
    await consumer?.disconnect();
  });

  it("with prefetch fails", async () => doTest(true));
  it("without prefetch succeeds", async () => doTest(false));
  it("now with prefetch succeeds", async () => doTest(true));

  async function doTest(prefetch: boolean) {
    let ready = false;
    const receivedMessages: string[] = [];
    consumer = kafka.consumer({
      kafkaJS: {groupId: GROUP_ID},
      rebalance_cb: (err: any, assignment: any, consumer: any) => {
        if (err.code !== RdKafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) return;
        if (!ready) {
          ready = true;
        }
      }
    });
    await consumer.connect();
    await consumer.subscribe({topic: TOPIC});
    await consumer.run({
      eachMessage: async ({message}) => {
        receivedMessages.push(message.value?.toString() ?? "");
      }
    });

    await until(() => ready);

    if (prefetch) await producerAdmin.fetchTopicMetadata({topics: [TOPIC]});
    await producer.send({
      topic: TOPIC,
      messages: [{value: "one"}]
    });

    await until(() => receivedMessages.length == 1);
    expect(receivedMessages).to.have.members(["one"]);
  }

  async function until(condition: () => boolean) {
    const timeout = 10000;
    const finish = Date.now() + timeout;
    while (Date.now() <= finish) {
      const result = condition();
      if (result) return;
      await new Promise(resolve => setTimeout(resolve, 50));
    }
    throw new Error(`Failed within ${timeout!}ms`);
  }
});

Test results:
image

Note that in the failing test, the call to fetchTopicMetadata does not result in an error, but the consumer appears to never receive the message, although we do see in Kafka that the message was sent.

image

@milindl
Copy link
Contributor

milindl commented Dec 6, 2024

Hi @apeloquin-agilysys , could you tell me the cause for the failure (was it a timeout at the await until(() => ready); step?

I'm getting that error when I use mocha to run this test on a 3-broker local cluster.

The cause for this is because, after the topic is created, it takes some time for the topic to be propagated in the metadata on all brokers. So by the time run() is called on the consumer and it tries to subscribe to the topic, it's possible that the topic hasn't yet propagated.

Since we check for metadata on a 1s interval timer, the test times out (the default timeout for mocha seems to be 2s).

The second test doesn't face the same issue because the topic is already created and propagated by this time.

I confirmed this by changing the order of the tests,

  it("without prefetch succeeds", async () => doTest(false));
  it("with prefetch fails", async () => doTest(true));
  it("now with prefetch succeeds", async () => doTest(true));

and now it's the first one which fails.

To fix this you can add a small sleep after topic creation, or use something like

await until(async () => {
      /* If we get anything other than an error, we're good */
      return admin.fetchTopicMetadata({topics: [TOPIC]})
        .then(() => true)
        .catch(e => {
          if (e.code === RdKafka.CODES.ERRORS.ERR_UNKNOWN_TOPIC_OR_PART) {
            return false;
          }
          throw e;
        });
    });

just after topic creation (I've changed the until function for async conditions)

@apeloquin-agilysys
Copy link
Author

@milindl In our case, the failure is not waiting for the consumer to be ready, but rather waiting for the message to be received, i.e. await until(() => receivedMessages.length == 1);.

We are using a 15-second timeout when running these tests. ("timeout": 15000 in .mocharc.json)

We're running this test against a single broker local cluster. The consumer starting is not the issue. Consumer starts, but never receives the message -- even though the message is clearly added in the topic.

Reordering the tests as you suggested does not result in the first test failing; so I'm pretty confident it's the timeout that is preventing you from reproducing the same behavior we see.

@milindl
Copy link
Contributor

milindl commented Dec 10, 2024

Okay, with that timeout increase, I was able to reproduce this. I enabled debug logs to check the reason.

A consumer does the following:

  1. rebalance - get assigned partitions and join the consumer group
  2. determine the offset at which to start consuming from each partition.
    If it is a pre-existing group and the partition has been consumed from already, there is some offset stored in the broker, and we'll use that offset (like test 3)
    If it's a new group and the partition is being consumed from for the first time in this group (like test 1), we either consume from the earliest available message, or else the next available message, depending on the value of fromBeginning in the configuration. The default value makes us consume from the next available message.
  3. start sending fetch requests and consuming.
  4. whenever it's being disconnected, the consumer commits the last offset it has consumed to the broker (in the default settings). This is why test 3 has an offset persisted in the broker, it's because test 2 was successful.

Because the prefetch speeds up the produce quite a bit, step 2. is not completed by the time the message is produced. So when we actually resolve the offsets and decide to consume from the "next possible message", it doesn't consume the message we've produced.

The test in step 3 doesn't fail similarly because it doesn't matter when exactly we produce the message, since we're going to consume from the offsets stored for the consumer group by test 2.

To make this particular test suite work, you could add this to the consumer config:

    consumer = kafka.consumer({
      kafkaJS: {
        groupId: GROUP_ID,
+       fromBeginning: true,

(For our tests, too, we generally set it to true as that prevents these sort of cases where the producer gets ahead of the consumer).

@apeloquin-agilysys
Copy link
Author

@milindl Thanks for the explanation. I will proceed with fromBeginning: true for tests.

Wish list item: a way to indicate when a consumer is truly ready, given that the the rebalance approach does not encompass step #2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants