[jira] [Commented] (KAFKA-6634) Delay initiating the txn on producers until initializeTopology with EOS turned on

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397130#comment-16397130
 ] 

ASF GitHub Bot commented on KAFKA-6634:
---

guozhangwang closed pull request #4684: KAFKA-6634: Delay starting new 
transaction in task.initializeTopology
URL: https://github.com/apache/kafka/pull/4684
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 8529c9eca88..c806bfde47e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -90,6 +90,7 @@ void addNewTask(final T task) {
  * @return partitions that are ready to be resumed
  * @throws IllegalStateException If store gets registered after 
initialized is already finished
  * @throws StreamsException if the store's change log does not contain the 
partition
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
  */
 Set initializeNewTasks() {
 final Set readyPartitions = new HashSet<>();
@@ -240,18 +241,21 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, 
final Set
 log.trace("found suspended {} {}", taskTypeName, taskId);
 if (task.partitions().equals(partitions)) {
 suspended.remove(taskId);
+task.resume();
 try {
-task.resume();
+transitionToRunning(task, new HashSet());
 } catch (final TaskMigratedException e) {
+// we need to catch migration exception internally since 
this function
+// is triggered in the rebalance callback
 log.info("Failed to resume {} {} since it got migrated to 
another thread already. " +
 "Closing it as zombie before triggering a new 
rebalance.", taskTypeName, task.id());
 final RuntimeException fatalException = 
closeZombieTask(task);
+running.remove(task.id());
 if (fatalException != null) {
 throw fatalException;
 }
 throw e;
 }
-transitionToRunning(task, new HashSet());
 log.trace("resuming suspended {} {}", taskTypeName, task.id());
 return true;
 } else {
@@ -271,6 +275,9 @@ private void addToRestoring(final T task) {
 }
 }
 
+/**
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
+ */
 private void transitionToRunning(final T task, final Set 
readyPartitions) {
 log.debug("transitioning {} {} to running", taskTypeName, task.id());
 running.put(task.id(), task);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index b8777ad5521..8d6e56a17aa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -100,7 +100,6 @@ void removeAllSensors() {
  * @param cache the {@link ThreadCache} created by the 
thread
  * @param time  the system {@link Time} of the thread
  * @param producer  the instance of {@link Producer} used to 
produce records
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
  */
 public StreamTask(final TaskId id,
   final Collection partitions,
@@ -149,14 +148,11 @@ public StreamTask(final TaskId id,
 partitionGroup = new PartitionGroup(partitionQueues);
 
 stateMgr.registerGlobalStateStores(topology.globalStateStores());
+
+// initialize transactions if eos is turned on, which will block if 
the previous transaction has not
+// completed yet; do not start the first transaction until the 
topology has been initialized later
 if (eosEnabled) {
-try {
-this.producer.initTransactions();
-this.producer.beginTransaction();
-} catch (final ProducerFencedException fatal) {
-throw new TaskMigratedException(this, fatal);
-}
-transactionInFlight = true;
+this.producer.initTransactions();
 }
 }
 
@@ 

[jira] [Commented] (KAFKA-6634) Delay initiating the txn on producers until initializeTopology with EOS turned on

2018-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394595#comment-16394595
 ] 

ASF GitHub Bot commented on KAFKA-6634:
---

guozhangwang opened a new pull request #4684: KAFKA-6634: Delay starting new 
transaction in task.initializeTopology
URL: https://github.com/apache/kafka/pull/4684
 
 
   1. As titled, not starting new transaction since during restoration producer 
would have not activity and hence may cause txn expiration.
   1.a. Also delay starting new txn in resuming until initializing topology.
   
   2. Fixed a minor bug, that when resuming process hits a migration exception, 
we should remove that task from the running list if possible.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Delay initiating the txn on producers until initializeTopology with EOS 
> turned on
> -
>
> Key: KAFKA-6634
> URL: https://issues.apache.org/jira/browse/KAFKA-6634
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> In Streams EOS implementation, the created producers for tasks will initiate 
> a txn immediately after being created in the constructor of `StreamTask`. 
> However, the task may not process any data and hence producer may not send 
> any records for that started txn for a long time because of the restoration 
> process. And with default txn.session.timeout valued at 60 seconds, it means 
> that if the restoration takes more than that amount of time, upon starting 
> the producer will immediately get the error that its producer epoch is 
> already old.
> To fix this, we should consider instantiating the txn only after the 
> restoration phase is done. Although this may have a caveat that if the 
> producer is already fenced, it will not be notified until then, in 
> initializeTopology. But I think this should not be a correctness issue since 
> during the restoration process we do not make any changes to the processing 
> state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)