[ https://issues.apache.org/jira/browse/KAFKA-8635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Denis Washington updated KAFKA-8635: ------------------------------------ Description: In our Kafka Streams applications (with EOS enabled), we were seeing mysterious long delays between records being produced by a stream task and the same records being consumed by the next task. These delays turned out to always be around {{retry.backoff.ms}} long; reducing that value reduced the delays by about the same amount. After digging further, I pinned down the problem to the following lines in {{org.apache.kafka.clients.producer.internals.Sender#runOnce}}: {{} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest()) {}} {{ // as long as there are outstanding transactional requests, we simply wait for them to return}}}} {{ client.poll(retryBackoffMs, time.milliseconds());}}}}}} {{ return;}} {{}}} This code seems to assume that, if {{maybeSendTransactionalRequest}} returns true, a transactional request has been sent out that should be waited for. However, this is not true if the request requires a coordinator lookup: {{if (nextRequestHandler.needsCoordinator()) {}} {{ targetNode = transactionManager.coordinator(nextRequestHandler.coordinatorType());}} {{ if (targetNode == null) {}} \{{ transactionManager.lookupCoordinator(nextRequestHandler); }}}} \{{ break;}} \{{ }}} {{ ...}} {{lookupCoordinator()}} does not actually send anything, but just enqueues a coordinator lookup request for the {{Sender}}'s next run loop iteration. {{maybeSendTransactionalRequest}} still returns true, though (the {{break}} jumps to a {{return true}} at the end of the method), leading the {{Sender}} to needlessly wait via {{client.poll()}} although there is actually no request in-flight. I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if it merely enqueues the coordinator lookup instead of actually sending anything. But I'm not sure, hence the bug report instead of a pull request. was: In our Kafka Streams applications (with EOS enabled), we were seeing mysterious long delays between records being produced by a stream task and the same records being consumed by the next task. These delays turned out to always be around {{retry.backoff.ms}} long; reducing that value reduced the delays by about the same amount. After digging further, I pinned down the problem to the following lines in {{org.apache.kafka.clients.producer.internals.Sender#runOnce}}: {{} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest()) {}} {{ // as long as there are outstanding transactional requests, we simply wait for them to return}} {{ client.poll(retryBackoffMs, time.milliseconds());}}}}}} {{ return;}} {{ }}} This code seems to assume that, if {{maybeSendTransactionalRequest}} returns true, a transactional request has been sent out that should be waited for. However, this is not true if the request requires a coordinator lookup: {{if (nextRequestHandler.needsCoordinator()) {}} {{ targetNode = transactionManager.coordinator(nextRequestHandler.coordinatorType());}} {{ if (targetNode == null) {}} {{ transactionManager.lookupCoordinator(nextRequestHandler); }}}} {{ break;}} {{ }}} {{ ...}} {{lookupCoordinator()}} does not actually send anything, but just enqueues a coordinator lookup request for the {{Sender}}'s next run loop iteration. {{maybeSendTransactionalRequest}} still returns true, though (the {{break}} jumps to a {{return true}} at the end of the method), leading the {{Sender}} to needlessly wait via {{client.poll()}} although there is actually no request in-flight. I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if it merely enqueues the coordinator lookup instead of actually sending anything. But I'm not sure, hence the bug report instead of a pull request. > Unnecessary wait when looking up coordinator before transactional request > ------------------------------------------------------------------------- > > Key: KAFKA-8635 > URL: https://issues.apache.org/jira/browse/KAFKA-8635 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 2.3.0, 2.2.1 > Reporter: Denis Washington > Priority: Major > > In our Kafka Streams applications (with EOS enabled), we were seeing > mysterious long delays between records being produced by a stream task and > the same records being consumed by the next task. These delays turned out to > always be around {{retry.backoff.ms}} long; reducing that value reduced the > delays by about the same amount. > After digging further, I pinned down the problem to the following lines in > {{org.apache.kafka.clients.producer.internals.Sender#runOnce}}: > {{} else if (transactionManager.hasInFlightTransactionalRequest() || > maybeSendTransactionalRequest()) {}} > {{ // as long as there are outstanding transactional requests, we simply > wait for them to return}}}} > {{ client.poll(retryBackoffMs, time.milliseconds());}}}}}} > {{ return;}} > {{}}} > This code seems to assume that, if {{maybeSendTransactionalRequest}} returns > true, a transactional request has been sent out that should be waited for. > However, this is not true if the request requires a coordinator lookup: > {{if (nextRequestHandler.needsCoordinator()) {}} > {{ targetNode = > transactionManager.coordinator(nextRequestHandler.coordinatorType());}} > {{ if (targetNode == null) {}} > \{{ transactionManager.lookupCoordinator(nextRequestHandler); }}}} > \{{ break;}} > \{{ }}} > {{ ...}} > {{lookupCoordinator()}} does not actually send anything, but just enqueues a > coordinator lookup request for the {{Sender}}'s next run loop iteration. > {{maybeSendTransactionalRequest}} still returns true, though (the {{break}} > jumps to a {{return true}} at the end of the method), leading the {{Sender}} > to needlessly wait via {{client.poll()}} although there is actually no > request in-flight. > I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if > it merely enqueues the coordinator lookup instead of actually sending > anything. But I'm not sure, hence the bug report instead of a pull request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)