This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0a8f35b  KAFKA-6768; Transactional producer may hang in close with 
pending requests (#4842)
0a8f35b is described below

commit 0a8f35b68415bb3f79b0ec61df6cb0ab0db937c4
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Apr 9 15:39:07 2018 -0700

    KAFKA-6768; Transactional producer may hang in close with pending requests 
(#4842)
    
    This patch fixes an edge case in producer shutdown which prevents `close()` 
from completing due to a pending request which will never be sent due to 
shutdown initiation. I have added a test case which reproduces the scenario.
    
    Reviewers: Apurva Mehta <apu...@confluent.io>, Ismael Juma 
<ism...@juma.me.uk>
---
 .../kafka/clients/producer/internals/Sender.java     |  2 +-
 .../producer/internals/TransactionManagerTest.java   | 20 ++++++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 426b273..0514c99 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -329,7 +329,7 @@ public class Sender implements Runnable {
             return false;
 
         AbstractRequest.Builder<?> requestBuilder = 
nextRequestHandler.requestBuilder();
-        while (running) {
+        while (!forceClose) {
             Node targetNode = null;
             try {
                 if (nextRequestHandler.needsCoordinator()) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 6fcf480..558ec72 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -133,6 +133,26 @@ public class TransactionManagerTest {
     }
 
     @Test
+    public void testSenderShutdownWithPendingAddPartitions() throws Exception {
+        long pid = 13131L;
+        short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.beginTransaction();
+
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        FutureRecordMetadata sendFuture = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, 
MAX_BLOCK_TIMEOUT).future;
+
+        prepareAddPartitionsToTxn(tp0, Errors.NONE);
+        prepareProduceResponse(Errors.NONE, pid, epoch);
+
+        sender.initiateClose();
+        sender.run();
+
+        assertTrue(sendFuture.isDone());
+    }
+
+    @Test
     public void testEndTxnNotSentIfIncompleteBatches() {
         long pid = 13131L;
         short epoch = 1;

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.

Reply via email to