This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 0a8f35b KAFKA-6768; Transactional producer may hang in close with pending requests (#4842) 0a8f35b is described below commit 0a8f35b68415bb3f79b0ec61df6cb0ab0db937c4 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Mon Apr 9 15:39:07 2018 -0700 KAFKA-6768; Transactional producer may hang in close with pending requests (#4842) This patch fixes an edge case in producer shutdown which prevents `close()` from completing due to a pending request which will never be sent due to shutdown initiation. I have added a test case which reproduces the scenario. Reviewers: Apurva Mehta <apu...@confluent.io>, Ismael Juma <ism...@juma.me.uk> --- .../kafka/clients/producer/internals/Sender.java | 2 +- .../producer/internals/TransactionManagerTest.java | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 426b273..0514c99 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -329,7 +329,7 @@ public class Sender implements Runnable { return false; AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder(); - while (running) { + while (!forceClose) { Node targetNode = null; try { if (nextRequestHandler.needsCoordinator()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 6fcf480..558ec72 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -133,6 +133,26 @@ public class TransactionManagerTest { } @Test + public void testSenderShutdownWithPendingAddPartitions() throws Exception { + long pid = 13131L; + short epoch = 1; + doInitTransactions(pid, epoch); + transactionManager.beginTransaction(); + + transactionManager.maybeAddPartitionToTransaction(tp0); + FutureRecordMetadata sendFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + + prepareAddPartitionsToTxn(tp0, Errors.NONE); + prepareProduceResponse(Errors.NONE, pid, epoch); + + sender.initiateClose(); + sender.run(); + + assertTrue(sendFuture.isDone()); + } + + @Test public void testEndTxnNotSentIfIncompleteBatches() { long pid = 13131L; short epoch = 1; -- To stop receiving notification emails like this one, please contact j...@apache.org.