This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 121df46  KAFKA-9582: Do not abort transaction in unclean close (#8143)
121df46 is described below

commit 121df465fad0bc062537c027bf9ed4755112d10c
Author: Boyang Chen <boy...@confluent.io>
AuthorDate: Fri Feb 21 10:27:57 2020 -0800

    KAFKA-9582: Do not abort transaction in unclean close (#8143)
    
    In order to avoid hitting the fatal exception during unclean close, we 
should avoid calling the abortTransaction() call.
    
    Reviewers: John Roesler <j...@confluent.io>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../streams/processor/internals/StreamTask.java    | 49 ++++++----------------
 .../processor/internals/StreamTaskTest.java        | 12 +++---
 2 files changed, 18 insertions(+), 43 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 54da00d..9aa8e79 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -94,7 +94,6 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
     private long idleStartTime;
     private Producer<byte[], byte[]> producer;
     private boolean commitRequested = false;
-    private boolean transactionInFlight = false;
 
     private final String threadId;
 
@@ -294,7 +293,6 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
             } catch (final ProducerFencedException | 
UnknownProducerIdException e) {
                 throw new TaskMigratedException(this, e);
             }
-            transactionInFlight = true;
         }
 
         processorContext.initialize();
@@ -522,10 +520,8 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
             if (eosEnabled) {
                 producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, 
applicationId);
                 producer.commitTransaction();
-                transactionInFlight = false;
                 if (startNewTransaction) {
                     producer.beginTransaction();
-                    transactionInFlight = true;
                 }
             } else {
                 consumer.commitSync(consumedOffsetsAndMetadata);
@@ -602,7 +598,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
      */
     public void suspend() {
         log.debug("Suspending");
-        suspend(true, false);
+        suspend(true);
     }
 
     /**
@@ -618,8 +614,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
      *                               or if the task producer got fenced (EOS)
      */
     // visible for testing
-    void suspend(final boolean clean,
-                 final boolean isZombie) {
+    void suspend(final boolean clean) {
         // this is necessary because all partition times are reset to -1 
during close
         // we need to preserve the original partitions times before calling 
commit
         final Map<TopicPartition, Long> partitionTimes = 
extractPartitionTimes();
@@ -640,14 +635,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
 
                 if (eosEnabled) {
                     stateMgr.checkpoint(activeTaskCheckpointableOffsets());
-
-                    try {
-                        recordCollector.close();
-                    } catch (final RecoverableClientException e) {
-                        taskMigratedException = new 
TaskMigratedException(this, e);
-                    } finally {
-                        producer = null;
-                    }
+                    taskMigratedException = closeRecordCollector();
                 }
             }
             if (taskMigratedException != null) {
@@ -662,37 +650,26 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
             }
 
             if (eosEnabled) {
-                maybeAbortTransactionAndCloseRecordCollector(isZombie);
+                // Ignore any exceptions whilee closing the record collector, 
i.e task producer.
+                closeRecordCollector();
             }
         }
     }
 
-    private void maybeAbortTransactionAndCloseRecordCollector(final boolean 
isZombie) {
-        if (!isZombie) {
-            try {
-                if (transactionInFlight) {
-                    producer.abortTransaction();
-                }
-                transactionInFlight = false;
-            } catch (final ProducerFencedException ignore) {
-                /* TODO
-                 * this should actually never happen atm as we guard the call 
to #abortTransaction
-                 * -> the reason for the guard is a "bug" in the Producer -- 
it throws IllegalStateException
-                 * instead of ProducerFencedException atm. We can remove the 
isZombie flag after KAFKA-5604 got
-                 * fixed and fall-back to this catch-and-swallow code
-                 */
-
-                // can be ignored: transaction got already aborted by 
brokers/transactional-coordinator if this happens
-            }
-        }
+    private TaskMigratedException closeRecordCollector() {
+        TaskMigratedException taskMigratedException = null;
 
         try {
             recordCollector.close();
+        } catch (final RecoverableClientException e) {
+            taskMigratedException = new TaskMigratedException(this, e);
         } catch (final Throwable e) {
             log.error("Failed to close producer due to the following error:", 
e);
         } finally {
             producer = null;
         }
+
+        return taskMigratedException;
     }
 
     private void closeTopology() {
@@ -742,7 +719,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
 
     /**
      * <pre>
-     * - {@link #suspend(boolean, boolean) suspend(clean)}
+     * - {@link #suspend(boolean) suspend(clean)}
      *   - close topology
      *   - if (clean) {@link #commit()}
      *     - flush state and producer
@@ -765,7 +742,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
 
         RuntimeException firstException = null;
         try {
-            suspend(clean, isZombie);
+            suspend(clean);
         } catch (final RuntimeException e) {
             clean = false;
             firstException = e;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1d0ca4f..2832291 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1317,26 +1317,25 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void 
shouldAbortTransactionAndCloseProducerOnUncleanCloseWithEosEnabled() {
+    public void shouldCloseProducerOnUncleanCloseWithEosEnabled() {
         task = createStatelessTask(createConfig(true), 
StreamsConfig.METRICS_LATEST);
         task.initializeTopology();
 
         task.close(false, false);
         task = null;
 
-        assertTrue(producer.transactionAborted());
-        assertFalse(producer.transactionInFlight());
+        // Make sure no method call on the producer during an unclean close 
(such as abort).
+        assertTrue(producer.transactionInFlight());
         assertTrue(producer.closed());
     }
 
     @Test
-    public void 
shouldAbortTransactionAndCloseProducerOnErrorDuringUncleanCloseWithEosEnabled() 
{
+    public void shouldCloseProducerOnErrorDuringUncleanCloseWithEosEnabled() {
         task = createTaskThatThrowsException(true);
         task.initializeTopology();
 
         task.close(false, false);
 
-        assertTrue(producer.transactionAborted());
         assertTrue(producer.closed());
     }
 
@@ -1553,7 +1552,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void 
shouldAbortTransactionButNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled()
 {
+    public void 
shouldNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled() {
         task = createStatelessTask(createConfig(true), 
StreamsConfig.METRICS_LATEST);
         task.initializeTopology();
         producer.fenceProducerOnClose();
@@ -1561,7 +1560,6 @@ public class StreamTaskTest {
         task.close(false, false);
         task = null;
 
-        assertTrue(producer.transactionAborted());
         assertFalse(producer.closed());
     }
 

Reply via email to