[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2022-03-03 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17501162#comment-17501162
 ] 

Guozhang Wang commented on KAFKA-6106:
--

As discussed with [~cadonna] offline, when we have completed 
https://issues.apache.org/jira/browse/KAFKA-10199, we should come back to 
revisit this issue. An idea would be to enable processing those ready-to-go 
tasks while others are still being restored by the restore threads, while some 
heuristics can be used on which ready-to-go tasks should be processed, e.g. the 
upstream sub-topologies tasks would get higher priority to be executed while 
downstream sub-topology can still be paused since their inputs rely on upstream 
tasks' processed outputs.

> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -
>
> Key: KAFKA-6106
> URL: https://issues.apache.org/jira/browse/KAFKA-6106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Guozhang Wang
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: new-streams-runtime-should-fix, newbie++
> Fix For: 1.1.1, 2.0.0
>
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2018-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401033#comment-16401033
 ] 

ASF GitHub Bot commented on KAFKA-6106:
---

mjsax closed pull request #4651: KAFKA-6106: Postpone normal processing of 
tasks until restoration of all tasks completed
URL: https://github.com/apache/kafka/pull/4651
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index c806bfde47e..92045713146 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -73,27 +73,12 @@ void addNewTask(final T task) {
 created.put(task.id(), task);
 }
 
-Set uninitializedPartitions() {
-if (created.isEmpty()) {
-return Collections.emptySet();
-}
-final Set partitions = new HashSet<>();
-for (final Map.Entry entry : created.entrySet()) {
-if (entry.getValue().hasStateStores()) {
-partitions.addAll(entry.getValue().partitions());
-}
-}
-return partitions;
-}
-
 /**
- * @return partitions that are ready to be resumed
  * @throws IllegalStateException If store gets registered after 
initialized is already finished
  * @throws StreamsException if the store's change log does not contain the 
partition
  * @throws TaskMigratedException if the task producer got fenced (EOS only)
  */
-Set initializeNewTasks() {
-final Set readyPartitions = new HashSet<>();
+void initializeNewTasks() {
 if (!created.isEmpty()) {
 log.debug("Initializing {}s {}", taskTypeName, created.keySet());
 }
@@ -104,7 +89,7 @@ void addNewTask(final T task) {
 log.debug("Transitioning {} {} to restoring", 
taskTypeName, entry.getKey());
 addToRestoring(entry.getValue());
 } else {
-transitionToRunning(entry.getValue(), readyPartitions);
+transitionToRunning(entry.getValue());
 }
 it.remove();
 } catch (final LockException e) {
@@ -112,21 +97,19 @@ void addNewTask(final T task) {
 log.trace("Could not create {} {} due to {}; will retry", 
taskTypeName, entry.getKey(), e.getMessage());
 }
 }
-return readyPartitions;
 }
 
-Set updateRestored(final Collection 
restored) {
+void updateRestored(final Collection restored) {
 if (restored.isEmpty()) {
-return Collections.emptySet();
+return;
 }
 log.trace("{} changelog partitions that have completed restoring so 
far: {}", taskTypeName, restored);
-final Set resume = new HashSet<>();
 restoredPartitions.addAll(restored);
 for (final Iterator> it = 
restoring.entrySet().iterator(); it.hasNext(); ) {
 final Map.Entry entry = it.next();
 final T task = entry.getValue();
 if (restoredPartitions.containsAll(task.changelogPartitions())) {
-transitionToRunning(task, resume);
+transitionToRunning(task);
 it.remove();
 log.trace("{} {} completed restoration as all its changelog 
partitions {} have been applied to restore state",
 taskTypeName,
@@ -146,7 +129,6 @@ void addNewTask(final T task) {
 if (allTasksRunning()) {
 restoredPartitions.clear();
 }
-return resume;
 }
 
 boolean allTasksRunning() {
@@ -243,7 +225,7 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final 
Set
 suspended.remove(taskId);
 task.resume();
 try {
-transitionToRunning(task, new HashSet());
+transitionToRunning(task);
 } catch (final TaskMigratedException e) {
 // we need to catch migration exception internally since 
this function
 // is triggered in the rebalance callback
@@ -278,15 +260,12 @@ private void addToRestoring(final T task) {
 /**
  * @throws TaskMigratedException if the task producer got fenced (EOS only)
  */
-private void transitionToRunning(final T task, final Set 
readyPartitions) {
+private void transitionToRunning(final T task) {
 log.debug("transitioning {} {} to 

[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387293#comment-16387293
 ] 

ASF GitHub Bot commented on KAFKA-6106:
---

kamalcph closed pull request #4564: KAFKA-6106; Postpone normal processing of 
tasks within a thread until…
URL: https://github.com/apache/kafka/pull/4564
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 6545fde30e9..d61b281eed1 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -88,7 +88,8 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
   def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
 val brokerIdPath = brokerInfo.path
 val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, 
ZkVersion.NoVersion)
-retryRequestUntilConnected(setDataRequest)
+val response = retryRequestUntilConnected(setDataRequest)
+response.maybeThrow()
 info("Updated broker %d at path %s with addresses: 
%s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
   }
 
@@ -424,7 +425,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
   def deleteLogDirEventNotifications(): Unit = {
 val getChildrenResponse = 
retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
 if (getChildrenResponse.resultCode == Code.OK) {
-  deleteLogDirEventNotifications(getChildrenResponse.children)
+  
deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber))
 } else if (getChildrenResponse.resultCode != Code.NONODE) {
   getChildrenResponse.maybeThrow
 }
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d3726c25c58..e44c2c94e52 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,10 +16,11 @@
 */
 package kafka.zk
 
-import java.util.{Properties, UUID}
+import java.util.{Collections, Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log.LogConfig
 import kafka.security.auth._
@@ -29,17 +30,48 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.TokenInformation
-import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.kafka.common.utils.{SecurityUtils, Time}
+import org.apache.zookeeper.KeeperException.{Code, NoNodeException, 
NodeExistsException}
 import org.junit.Assert._
-import org.junit.Test
-
+import org.junit.{After, Before, Test}
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, mutable}
 import scala.util.Random
 
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper._
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.data.Stat
+
 class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   private val group = "my-group"
+  private val topic1 = "topic1"
+  private val topic2 = "topic2"
+
+  val topicPartition10 = new TopicPartition(topic1, 0)
+  val topicPartition11 = new TopicPartition(topic1, 1)
+  val topicPartition20 = new TopicPartition(topic2, 0)
+  val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
+
+  var otherZkClient: KafkaZkClient = _
+
+  @Before
+  override def setUp(): Unit = {
+super.setUp()
+otherZkClient = KafkaZkClient(zkConnect, 
zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+  zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+if (otherZkClient != null)
+  otherZkClient.close()
+super.tearDown()
+  }
+
   private val topicPartition = new TopicPartition("topic", 0)
 
   @Test
@@ -90,10 +122,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testTopicAssignmentMethods() {
-val topic1 = "topic1"
-val topic2 = "topic2"
+

[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387289#comment-16387289
 ] 

ASF GitHub Bot commented on KAFKA-6106:
---

kamalcph opened a new pull request #4651: KAFKA-6106: Postpone normal 
processing of tasks until restoration of all tasks completed
URL: https://github.com/apache/kafka/pull/4651
 
 
   Once all the state stores are restored, then the processing of tasks takes 
place. This approach will reduce the time taken to restore the state stores as 
single thread is used to restore the state and process the task.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -
>
> Key: KAFKA-6106
> URL: https://issues.apache.org/jira/browse/KAFKA-6106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Guozhang Wang
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: newbie++
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362043#comment-16362043
 ] 

ASF GitHub Bot commented on KAFKA-6106:
---

kamalcph opened a new pull request #4564: KAFKA-6106; Postpone normal 
processing of tasks within a thread until…
URL: https://github.com/apache/kafka/pull/4564
 
 
   … restoration of all tasks have completed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -
>
> Key: KAFKA-6106
> URL: https://issues.apache.org/jira/browse/KAFKA-6106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Guozhang Wang
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: newbie++
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2018-02-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360308#comment-16360308
 ] 

Guozhang Wang commented on KAFKA-6106:
--

Thanks!

> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -
>
> Key: KAFKA-6106
> URL: https://issues.apache.org/jira/browse/KAFKA-6106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Guozhang Wang
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: newbie++
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2018-02-09 Thread Kamal Chandraprakash (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359245#comment-16359245
 ] 

Kamal Chandraprakash commented on KAFKA-6106:
-

[~guozhang] I was busy in other tasks. I try to complete it by this weekend.

> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -
>
> Key: KAFKA-6106
> URL: https://issues.apache.org/jira/browse/KAFKA-6106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Guozhang Wang
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: newbie++
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2018-02-07 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356331#comment-16356331
 ] 

Guozhang Wang commented on KAFKA-6106:
--

[~ckamal] Are you still working on this issue?

> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -
>
> Key: KAFKA-6106
> URL: https://issues.apache.org/jira/browse/KAFKA-6106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Guozhang Wang
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: newbie++
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2017-11-05 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239733#comment-16239733
 ] 

Guozhang Wang commented on KAFKA-6106:
--

Today IQ will not be allowed if the stream thread is not {{State.RUNNING}} and 
the stream thread can only turn to {{RUNNING}} if all its tasks are in RUNNING 
states. So technically we can fix it two ways:

1) the proposal mentioned in this JIRA.
2) change the StateStoreProvider class, to make stores be queryable on the task 
level than on the thread level. So that some tasks' stores can be queryable 
even though other stores are not.

I'm inclined to delay going on the second option for now (in other words, add 
this config to let users choose), because of the following:

a) most streaming applications that leverage on IQ (think: analytics, 
monitoring) can only function when all such state stores are queryable. Making 
only part of these stores to be queryable while others are still bootstrapping 
would not help these applications.
b) a known issue today for IQ is that states from different tasks are not from 
the same snapshots so that if they are logically correlated IQ has "phantom 
reads" scenarios; we have thought about how to remedy such issues and one of 
them is to leverage on exactly once semantics. In that case, moving forward 
some tasks while still restoring other tasks will simply make the tasks to be 
run at further different times.

If in the future we do observe that this is a common request where users do not 
care about partial reads or inconsistent reads across states, but are really 
keen to some of the states to be queryable ASAP (btw on top of my head I feel 
even in this case the right way to solve it would be task assignment 
improvements?), then we can consider the second option.

> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -
>
> Key: KAFKA-6106
> URL: https://issues.apache.org/jira/browse/KAFKA-6106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2017-11-04 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239115#comment-16239115
 ] 

Matthias J. Sax commented on KAFKA-6106:


I understand your argument about easy to understand configs. A main motivation 
to start processing of "ready" tasks is to also allow to query their stores 
asap -- currently, we only allow to query stores if the whole {{KafkaStreams}} 
instance (or {{StreamsThread}}?) is in state {{RUNNING}} -- long "downtimes" 
for IQ is a real issue. But maybe we can also fix this differently -- not 100% 
sure if it's related (but it seems to be a requirement to unlock improved IQ 
behavior) or orthogonal to this ticket. \cc [~bbejeck] [~damianguy]

> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -
>
> Key: KAFKA-6106
> URL: https://issues.apache.org/jira/browse/KAFKA-6106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2017-10-27 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223059#comment-16223059
 ] 

Guozhang Wang commented on KAFKA-6106:
--

I think it is better to enforce this behavior and here's my motivations:

1. I cannot actually think of a scenario that people would prefer start partial 
processing asap while keeping the whole instance's state in 
`PARTITION_ASSIGNED`. But I can be convinced if you have a concrete use case in 
mind.
2. I'd prefer to expose configs that are "easy to understand" for users and not 
leaking too much internal runtime details. This config looks much more "leaky" 
and hard to understand to end users, and hence I'm hesitant to introducing it 
to the user: usually if I find myself needing more than 5 sentences to explain 
a config semantic, it is a good sign that probably I should not add it.

My current PR assumes no config exposed, but we can keep this discussion on the 
ticket or on the PR itself while we review it.


> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -
>
> Key: KAFKA-6106
> URL: https://issues.apache.org/jira/browse/KAFKA-6106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2017-10-23 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16215564#comment-16215564
 ] 

Matthias J. Sax commented on KAFKA-6106:


IMHO, both scenarios (block all processing to speed up restoration as well as 
start partial processing asap) are valuable and we should give a user a config 
to choose between both behaviors. This would require a KIP.

> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -
>
> Key: KAFKA-6106
> URL: https://issues.apache.org/jira/browse/KAFKA-6106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)