Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Errors are not sent back to the caller #51

Open
dermasmid opened this issue Jun 2, 2024 · 12 comments
Open

Errors are not sent back to the caller #51

dermasmid opened this issue Jun 2, 2024 · 12 comments
Labels
question A question is asked

Comments

@dermasmid
Copy link

im using the kafkaJs variant and producer.send and i was observing logs like this that were not send back to the caller.

timestamp: 1717074105728
  fac: 'REQTMOUT',
  message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]: sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out 2 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests',
@milindl
Copy link
Contributor

milindl commented Jun 3, 2024

Hey @dermasmid , you should be getting a thrown error if you are awaiting producer.send, or a promise rejection if you are using "catch" on the promise returned by producer.send.

It will look similar to this:

Error: Local: Message timed out
    at Function.createLibrdkafkaError [as create] (/home/milindl/git/confluent-kafka-js/lib/error.js:456:10)
    at Producer.<anonymous> (/home/milindl/git/confluent-kafka-js/lib/producer.js:91:31) {
  name: 'KafkaJSError',
  retriable: false,
  fatal: false,
  abortable: false,
  code: -192,
  type: 'ERR__MSG_TIMED_OUT'
}

where my code was like this

producer.send({
            topic: 'mytopic',
            messages: [
                { value: 'v222', partition: 0 },
                { value: 'v11', partition: 0, key: 'x' },
            ]
        }).catch(console.error)

The logs are an additional warning which gives you the details on how the messages timed out (some of them may be timed out in flight, some of them might have timed out in the local queue the library maintains before bunching up and sending the messages, and so on).

If you do want to get every log message, you can use a custom logger, like in here - #41 (comment) .

@milindl milindl added the question A question is asked label Jun 3, 2024
@dermasmid
Copy link
Author

i did not get the error thrown to me i'm pretty sure.
i think the error might have been suppressed in librdkafka side

@milindl
Copy link
Contributor

milindl commented Jun 3, 2024

Ah, okay, there is one more possibility. For any produce requests, there are two timeouts, the socket.timeout.ms, and the delivery.timeout.ms.

The first one is on a per-protocol-message basis. If a request is inflight for more than socket.timeout.ms, we get the REQTMOUT thing. However, the library automatically retries the message repeatedly with some backoff until delivery.timeout.ms is exhausted, and the user level is thrown only if the message completely failed delivery, including retries (since if the retry succeeds, we have recovered from the error and no user action is needed)

I am guessing that's what happened here - the actual protocol request timed out, but the library dealt with it through this mechanism of internal retries.

Is it possible to share the logs around the REQTMOUT thing? I might be able to confirm that theory.

If you don't want any internal retry behaviour, you can disable it through certain properties, by setting the allowed number of retries to 0.

         kafkaJS: {
            brokers: ['brokerName'],
            // ...,
            retry: {
                retries: 0
            }
        },

@milindl
Copy link
Contributor

milindl commented Jun 3, 2024

Also, both socket.timeout.ms, and the delivery.timeout.ms are configurable outside the kafkaJS block of the config. More details on this page about what they do.

@dermasmid
Copy link
Author

i can share more logs, just note that i did actually loose some messages, it wasnt just the log, in fact because i saw i was missing data i went look what happened

@milindl
Copy link
Contributor

milindl commented Jun 3, 2024

Oh, that should definitely not happen. Please do share the logs.

Is it something you can reproduce on occasion, too? If so then it would be very good if you could turn on the full debug logs. They are quite verbose, though.

@dermasmid
Copy link
Author


errno: -1
code: -1,
message: 'timed out',
origin: 'local',
message: [LibrdKafkaError: Local: Timed out] {
{
}
timestamp: 1717074108080
fac: 'FAIL',
message: '[thrd:sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0]:
sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0: 2 request(s) timed out: disconnect (after 1862053ms in
state UP)',
{
}
timestamp: 1717074108080
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0]:
sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0: Timed out 2 in-flight, 0 retry-queued, 0 out-queue, 0
partially-sent requests',
{
}
timestamp: 1717074108080
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0]:
sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0: Timed out ProduceRequest in flight (after 60181ms, timeout
#1)',
{
}
timestamp: 1717074108076
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0]:
sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0: Timed out ProduceRequest in flight (after 60182ms, timeout
#0)',
{
}
}
errno: -1
code: -1,
message: 'timed out',
origin: 'local',
message: [LibrdKafkaError: Local: Timed out] {
{
}
}
errno: -1
code: -1,
message: 'timed out',
origin: 'local',
message: [LibrdKafkaError: Local: Timed out] {
{
}
timestamp: 1717074106466
fac: 'FAIL',
message: '[thrd:sonic-cluster-broker-2.kafka.internal.triplestack.io:9092/2]:
sonic-cluster-broker-2.kafka.internal.triplestack.io:9092/2: 1 request(s) timed out: disconnect (after 1790796ms in
state UP)',
{
}
timestamp: 1717074106466
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-2.kafka.internal.triplestack.io:9092/2]:
sonic-cluster-broker-2.kafka.internal.triplestack.io:9092/2: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0
partially-sent requests',
{
}
timestamp: 1717074106465
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-2.kafka.internal.triplestack.io:9092/2]:
sonic-cluster-broker-2.kafka.internal.triplestack.io:9092/2: Timed out ProduceRequest in flight (after 60739ms, timeout
#0)',
{
}
timestamp: 1717074106371
fac: 'FAIL',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: 1 request(s) timed out: disconnect (after 1792292ms in
state UP)',
{
}
timestamp: 1717074106371
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0
partially-sent requests',
{
}
timestamp: 1717074106370
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out ProduceRequest in flight (after 60572ms, timeout
#0)',
{
}
}
errno: -1
code: -1,
message: 'timed out',
origin: 'local',
message: [LibrdKafkaError: Local: Timed out] {
{
}
}
errno: -1
code: -1,
message: 'timed out',
origin: 'local',
message: [LibrdKafkaError: Local: Timed out] {
{
}
timestamp: 1717074105826
fac: 'FAIL',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: 1 request(s) timed out: disconnect (after 1017523ms in
state UP)',
{
}
timestamp: 1717074105826
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0
partially-sent requests',
{
}
timestamp: 1717074105822
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out ProduceRequest in flight (after 60036ms, timeout
#0)',
{
}
timestamp: 1717074105729
fac: 'FAIL',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: 2 request(s) timed out: disconnect (after 1016854ms in
state UP)',
{
}
timestamp: 1717074105728
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out 2 in-flight, 0 retry-queued, 0 out-queue, 0
partially-sent requests',
{
}
timestamp: 1717074105728
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out ProduceRequest in flight (after 60513ms, timeout
#1)',
{
}
timestamp: 1717074105725
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out ProduceRequest in flight (after 60513ms, timeout
#0)',

@milindl
Copy link
Contributor

milindl commented Jun 6, 2024

Thanks for the logs, I took a look.

Where is this particular log line being logged from - is it somewhere within your code? This is the error that should have been thrown by the producer.send() function on await.

It doesn't have the other stuff (timestamp/fac) that's present in the library's logger.

Is it possible to share the code which is producing, too? It'll probably help. Thanks.

errno: -1
code: -1,
message: 'timed out',
origin: 'local',
message: [LibrdKafkaError: Local: Timed out] {
{
}
}

@dermasmid
Copy link
Author

this is coming from librdkafka, i had extracted it from gcp, that why it might look a little different you can see the timestamp is there too

@milindl
Copy link
Contributor

milindl commented Jun 10, 2024

Okay. Could you attach the relevant bits of code, too? I can reproduce the logging, but it's resulting in an exception thrown for me (or the other case I talked about, where the message is retried until it's sent).

@Dhruv-Garg79
Copy link

I faced a similar issue yesterday in production, the requests to Kafka are taking an indefinite time, without any timeout or error thrown. I was also using Kafkajs version.

try {
	const res = await MyKafka.getInstance().producer.send({
		topic: topic,
		messages: messages,
	});
	ctx.logger.debug('sent message to kafka %s %j', topic, res);
} catch (error) {
	ctx.trace.recordException(error);
} 

@milindl
Copy link
Contributor

milindl commented Jun 13, 2024

Thanks @Dhruv-Garg79. Are you getting the same logging as above about the timeout?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question A question is asked
Projects
None yet
Development

No branches or pull requests

3 participants