This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push: new 121df46 KAFKA-9582: Do not abort transaction in unclean close (#8143) 121df46 is described below commit 121df465fad0bc062537c027bf9ed4755112d10c Author: Boyang Chen <boy...@confluent.io> AuthorDate: Fri Feb 21 10:27:57 2020 -0800 KAFKA-9582: Do not abort transaction in unclean close (#8143) In order to avoid hitting the fatal exception during unclean close, we should avoid calling the abortTransaction() call. Reviewers: John Roesler <j...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../streams/processor/internals/StreamTask.java | 49 ++++++---------------- .../processor/internals/StreamTaskTest.java | 12 +++--- 2 files changed, 18 insertions(+), 43 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 54da00d..9aa8e79 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -94,7 +94,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator private long idleStartTime; private Producer<byte[], byte[]> producer; private boolean commitRequested = false; - private boolean transactionInFlight = false; private final String threadId; @@ -294,7 +293,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } catch (final ProducerFencedException | UnknownProducerIdException e) { throw new TaskMigratedException(this, e); } - transactionInFlight = true; } processorContext.initialize(); @@ -522,10 +520,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator if (eosEnabled) { producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId); producer.commitTransaction(); - transactionInFlight = false; if (startNewTransaction) { producer.beginTransaction(); - transactionInFlight = true; } } else { consumer.commitSync(consumedOffsetsAndMetadata); @@ -602,7 +598,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator */ public void suspend() { log.debug("Suspending"); - suspend(true, false); + suspend(true); } /** @@ -618,8 +614,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * or if the task producer got fenced (EOS) */ // visible for testing - void suspend(final boolean clean, - final boolean isZombie) { + void suspend(final boolean clean) { // this is necessary because all partition times are reset to -1 during close // we need to preserve the original partitions times before calling commit final Map<TopicPartition, Long> partitionTimes = extractPartitionTimes(); @@ -640,14 +635,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator if (eosEnabled) { stateMgr.checkpoint(activeTaskCheckpointableOffsets()); - - try { - recordCollector.close(); - } catch (final RecoverableClientException e) { - taskMigratedException = new TaskMigratedException(this, e); - } finally { - producer = null; - } + taskMigratedException = closeRecordCollector(); } } if (taskMigratedException != null) { @@ -662,37 +650,26 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } if (eosEnabled) { - maybeAbortTransactionAndCloseRecordCollector(isZombie); + // Ignore any exceptions whilee closing the record collector, i.e task producer. + closeRecordCollector(); } } } - private void maybeAbortTransactionAndCloseRecordCollector(final boolean isZombie) { - if (!isZombie) { - try { - if (transactionInFlight) { - producer.abortTransaction(); - } - transactionInFlight = false; - } catch (final ProducerFencedException ignore) { - /* TODO - * this should actually never happen atm as we guard the call to #abortTransaction - * -> the reason for the guard is a "bug" in the Producer -- it throws IllegalStateException - * instead of ProducerFencedException atm. We can remove the isZombie flag after KAFKA-5604 got - * fixed and fall-back to this catch-and-swallow code - */ - - // can be ignored: transaction got already aborted by brokers/transactional-coordinator if this happens - } - } + private TaskMigratedException closeRecordCollector() { + TaskMigratedException taskMigratedException = null; try { recordCollector.close(); + } catch (final RecoverableClientException e) { + taskMigratedException = new TaskMigratedException(this, e); } catch (final Throwable e) { log.error("Failed to close producer due to the following error:", e); } finally { producer = null; } + + return taskMigratedException; } private void closeTopology() { @@ -742,7 +719,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator /** * <pre> - * - {@link #suspend(boolean, boolean) suspend(clean)} + * - {@link #suspend(boolean) suspend(clean)} * - close topology * - if (clean) {@link #commit()} * - flush state and producer @@ -765,7 +742,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator RuntimeException firstException = null; try { - suspend(clean, isZombie); + suspend(clean); } catch (final RuntimeException e) { clean = false; firstException = e; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 1d0ca4f..2832291 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -1317,26 +1317,25 @@ public class StreamTaskTest { } @Test - public void shouldAbortTransactionAndCloseProducerOnUncleanCloseWithEosEnabled() { + public void shouldCloseProducerOnUncleanCloseWithEosEnabled() { task = createStatelessTask(createConfig(true), StreamsConfig.METRICS_LATEST); task.initializeTopology(); task.close(false, false); task = null; - assertTrue(producer.transactionAborted()); - assertFalse(producer.transactionInFlight()); + // Make sure no method call on the producer during an unclean close (such as abort). + assertTrue(producer.transactionInFlight()); assertTrue(producer.closed()); } @Test - public void shouldAbortTransactionAndCloseProducerOnErrorDuringUncleanCloseWithEosEnabled() { + public void shouldCloseProducerOnErrorDuringUncleanCloseWithEosEnabled() { task = createTaskThatThrowsException(true); task.initializeTopology(); task.close(false, false); - assertTrue(producer.transactionAborted()); assertTrue(producer.closed()); } @@ -1553,7 +1552,7 @@ public class StreamTaskTest { } @Test - public void shouldAbortTransactionButNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled() { + public void shouldNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled() { task = createStatelessTask(createConfig(true), StreamsConfig.METRICS_LATEST); task.initializeTopology(); producer.fenceProducerOnClose(); @@ -1561,7 +1560,6 @@ public class StreamTaskTest { task.close(false, false); task = null; - assertTrue(producer.transactionAborted()); assertFalse(producer.closed()); }