[
https://issues.apache.org/jira/browse/KAFKA-6634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397130#comment-16397130
]
ASF GitHub Bot commented on KAFKA-6634:
---
guozhangwang closed pull request #4684: KAFKA-6634: Delay starting new
transaction in task.initializeTopology
URL: https://github.com/apache/kafka/pull/4684
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 8529c9eca88..c806bfde47e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -90,6 +90,7 @@ void addNewTask(final T task) {
* @return partitions that are ready to be resumed
* @throws IllegalStateException If store gets registered after
initialized is already finished
* @throws StreamsException if the store's change log does not contain the
partition
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
Set initializeNewTasks() {
final Set readyPartitions = new HashSet<>();
@@ -240,18 +241,21 @@ boolean maybeResumeSuspendedTask(final TaskId taskId,
final Set
log.trace("found suspended {} {}", taskTypeName, taskId);
if (task.partitions().equals(partitions)) {
suspended.remove(taskId);
+task.resume();
try {
-task.resume();
+transitionToRunning(task, new HashSet());
} catch (final TaskMigratedException e) {
+// we need to catch migration exception internally since
this function
+// is triggered in the rebalance callback
log.info("Failed to resume {} {} since it got migrated to
another thread already. " +
"Closing it as zombie before triggering a new
rebalance.", taskTypeName, task.id());
final RuntimeException fatalException =
closeZombieTask(task);
+running.remove(task.id());
if (fatalException != null) {
throw fatalException;
}
throw e;
}
-transitionToRunning(task, new HashSet());
log.trace("resuming suspended {} {}", taskTypeName, task.id());
return true;
} else {
@@ -271,6 +275,9 @@ private void addToRestoring(final T task) {
}
}
+/**
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
+ */
private void transitionToRunning(final T task, final Set
readyPartitions) {
log.debug("transitioning {} {} to running", taskTypeName, task.id());
running.put(task.id(), task);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index b8777ad5521..8d6e56a17aa 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -100,7 +100,6 @@ void removeAllSensors() {
* @param cache the {@link ThreadCache} created by the
thread
* @param time the system {@link Time} of the thread
* @param producer the instance of {@link Producer} used to
produce records
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
public StreamTask(final TaskId id,
final Collection partitions,
@@ -149,14 +148,11 @@ public StreamTask(final TaskId id,
partitionGroup = new PartitionGroup(partitionQueues);
stateMgr.registerGlobalStateStores(topology.globalStateStores());
+
+// initialize transactions if eos is turned on, which will block if
the previous transaction has not
+// completed yet; do not start the first transaction until the
topology has been initialized later
if (eosEnabled) {
-try {
-this.producer.initTransactions();
-this.producer.beginTransaction();
-} catch (final ProducerFencedException fatal) {
-throw new TaskMigratedException(this, fatal);
-}
-transactionInFlight = true;
+this.producer.initTransactions();
}
}
@@