This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 95ad035 KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684) 95ad035 is described below commit 95ad03540f0d15ae47fd73bae935ab1cb3e8f4b9 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Tue Mar 13 08:43:58 2018 -0700 KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684) As titled, not starting new transaction since during restoration producer would have not activity and hence may cause txn expiration. Also delay starting new txn in resuming until initializing topology. Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <b...@confluent.io> --- .../streams/processor/internals/AssignedTasks.java | 11 ++++-- .../streams/processor/internals/StreamTask.java | 41 ++++++++++++---------- .../streams/processor/internals/StreamThread.java | 6 ---- .../streams/processor/internals/TaskManager.java | 12 +------ .../internals/AssignedStreamsTasksTest.java | 3 +- .../processor/internals/StreamTaskTest.java | 7 ++++ 6 files changed, 41 insertions(+), 39 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 8529c9e..c806bfd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -90,6 +90,7 @@ abstract class AssignedTasks<T extends Task> { * @return partitions that are ready to be resumed * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition + * @throws TaskMigratedException if the task producer got fenced (EOS only) */ Set<TopicPartition> initializeNewTasks() { final Set<TopicPartition> readyPartitions = new HashSet<>(); @@ -240,18 +241,21 @@ abstract class AssignedTasks<T extends Task> { log.trace("found suspended {} {}", taskTypeName, taskId); if (task.partitions().equals(partitions)) { suspended.remove(taskId); + task.resume(); try { - task.resume(); + transitionToRunning(task, new HashSet<TopicPartition>()); } catch (final TaskMigratedException e) { + // we need to catch migration exception internally since this function + // is triggered in the rebalance callback log.info("Failed to resume {} {} since it got migrated to another thread already. " + "Closing it as zombie before triggering a new rebalance.", taskTypeName, task.id()); final RuntimeException fatalException = closeZombieTask(task); + running.remove(task.id()); if (fatalException != null) { throw fatalException; } throw e; } - transitionToRunning(task, new HashSet<TopicPartition>()); log.trace("resuming suspended {} {}", taskTypeName, task.id()); return true; } else { @@ -271,6 +275,9 @@ abstract class AssignedTasks<T extends Task> { } } + /** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ private void transitionToRunning(final T task, final Set<TopicPartition> readyPartitions) { log.debug("transitioning {} {} to running", taskTypeName, task.id()); running.put(task.id(), task); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index b8777ad..8d6e56a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -100,7 +100,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * @param cache the {@link ThreadCache} created by the thread * @param time the system {@link Time} of the thread * @param producer the instance of {@link Producer} used to produce records - * @throws TaskMigratedException if the task producer got fenced (EOS only) */ public StreamTask(final TaskId id, final Collection<TopicPartition> partitions, @@ -149,14 +148,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator partitionGroup = new PartitionGroup(partitionQueues); stateMgr.registerGlobalStateStores(topology.globalStateStores()); + + // initialize transactions if eos is turned on, which will block if the previous transaction has not + // completed yet; do not start the first transaction until the topology has been initialized later if (eosEnabled) { - try { - this.producer.initTransactions(); - this.producer.beginTransaction(); - } catch (final ProducerFencedException fatal) { - throw new TaskMigratedException(this, fatal); - } - transactionInFlight = true; + this.producer.initTransactions(); } } @@ -167,31 +163,38 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator return changelogPartitions().isEmpty(); } + /** + * <pre> + * - (re-)initialize the topology of the task + * </pre> + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ @Override public void initializeTopology() { initTopology(); + + if (eosEnabled) { + try { + this.producer.beginTransaction(); + } catch (final ProducerFencedException fatal) { + throw new TaskMigratedException(this, fatal); + } + transactionInFlight = true; + } + processorContext.initialized(); taskInitialized = true; } /** * <pre> - * - re-initialize the task - * - if (eos) begin new transaction + * - resume the task * </pre> - * @throws TaskMigratedException if the task producer got fenced (EOS only) */ @Override public void resume() { + // nothing to do; new transaction will be started only after topology is initialized log.debug("Resuming"); - if (eosEnabled) { - try { - producer.beginTransaction(); - } catch (final ProducerFencedException fatal) { - throw new TaskMigratedException(this, fatal); - } - transactionInFlight = true; - } } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index e1c45b4..9bbc0da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -346,9 +346,6 @@ public class StreamThread extends Thread { return stateDirectory; } - /** - * @throws TaskMigratedException if the task producer got fenced (EOS only) - */ Collection<T> createTasks(final Consumer<byte[], byte[]> consumer, final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) { final List<T> createdTasks = new ArrayList<>(); for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions : tasksToBeCreated.entrySet()) { @@ -401,9 +398,6 @@ public class StreamThread extends Thread { this.threadClientId = threadClientId; } - /** - * @throws TaskMigratedException if the task producer got fenced (EOS only) - */ @Override StreamTask createTask(final Consumer<byte[], byte[]> consumer, final TaskId taskId, final Set<TopicPartition> partitions) { taskCreatedSensor.record(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 9f02834..80df517 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -94,9 +94,6 @@ class TaskManager { this.adminClient = adminClient; } - /** - * @throws TaskMigratedException if the task producer got fenced (EOS only) - */ void createTasks(final Collection<TopicPartition> assignment) { if (consumer == null) { throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen."); @@ -114,9 +111,6 @@ class TaskManager { consumer.pause(partitions); } - /** - * @throws TaskMigratedException if the task producer got fenced (EOS only) - */ private void addStreamTasks(final Collection<TopicPartition> assignment) { if (assignedActiveTasks.isEmpty()) { return; @@ -156,9 +150,6 @@ class TaskManager { } } - /** - * @throws TaskMigratedException if the task producer got fenced (EOS only) - */ private void addStandbyTasks() { final Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = this.assignedStandbyTasks; if (assignedStandbyTasks.isEmpty()) { @@ -173,7 +164,6 @@ class TaskManager { if (!standby.maybeResumeSuspendedTask(taskId, partitions)) { newStandbyTasks.put(taskId, partitions); } - } if (newStandbyTasks.isEmpty()) { @@ -319,7 +309,7 @@ class TaskManager { /** * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition - * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored + * @throws TaskMigratedException if the task producer got fenced or consumer discovered changelog offset changes (EOS only) */ boolean updateNewAndRestoringTasks() { final Set<TopicPartition> resumed = active.initializeNewTasks(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java index 4bb7828..246d047 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java @@ -256,9 +256,10 @@ public class AssignedStreamsTasksTest { } @Test - public void shouldCloseTaskOnResumeIfTaskMigratedException() { + public void shouldCloseTaskOnResumeSuspendedIfTaskMigratedException() { mockRunningTaskSuspension(); t1.resume(); + t1.initializeTopology(); EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1)); t1.close(false, true); EasyMock.expectLastCall(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 1165d76..a305829 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -776,6 +776,7 @@ public class StreamTaskTest { @Test public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() { task = createStatelessTask(true); + task.initializeTopology(); assertTrue(producer.transactionInitialized()); assertTrue(producer.transactionInFlight()); @@ -792,6 +793,7 @@ public class StreamTaskTest { @Test public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() { task = createStatelessTask(true); + task.initializeTopology(); task.addRecords(partition1, Collections.singletonList( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); @@ -806,6 +808,7 @@ public class StreamTaskTest { @Test public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() { task = createStatelessTask(true); + task.initializeTopology(); task.suspend(); assertTrue(producer.transactionCommitted()); @@ -828,6 +831,7 @@ public class StreamTaskTest { @Test public void shouldStartNewTransactionOnResumeIfEosEnabled() { task = createStatelessTask(true); + task.initializeTopology(); task.addRecords(partition1, Collections.singletonList( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); @@ -835,6 +839,7 @@ public class StreamTaskTest { task.suspend(); task.resume(); + task.initializeTopology(); assertTrue(producer.transactionInFlight()); } @@ -854,6 +859,7 @@ public class StreamTaskTest { @Test public void shouldStartNewTransactionOnCommitIfEosEnabled() { task = createStatelessTask(true); + task.initializeTopology(); task.addRecords(partition1, Collections.singletonList( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); @@ -878,6 +884,7 @@ public class StreamTaskTest { @Test public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() { task = createStatelessTask(true); + task.initializeTopology(); task.close(false, false); task = null; -- To stop receiving notification emails like this one, please contact guozh...@apache.org.