[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17501162#comment-17501162 ] Guozhang Wang commented on KAFKA-6106: -- As discussed with [~cadonna] offline, when we have completed https://issues.apache.org/jira/browse/KAFKA-10199, we should come back to revisit this issue. An idea would be to enable processing those ready-to-go tasks while others are still being restored by the restore threads, while some heuristics can be used on which ready-to-go tasks should be processed, e.g. the upstream sub-topologies tasks would get higher priority to be executed while downstream sub-topology can still be paused since their inputs rely on upstream tasks' processed outputs. > Postpone normal processing of tasks within a thread until restoration of all > tasks have completed > - > > Key: KAFKA-6106 > URL: https://issues.apache.org/jira/browse/KAFKA-6106 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Guozhang Wang >Assignee: Kamal Chandraprakash >Priority: Major > Labels: new-streams-runtime-should-fix, newbie++ > Fix For: 1.1.1, 2.0.0 > > > Let's say a stream thread hosts multiple tasks, A and B. At the very > beginning when A and B are assigned to the thread, the thread state is > {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during > this state using the restore consumer while using normal consumer for > heartbeating. > If task A's restoration has completed earlier than task B, then the thread > will start processing A immediately even when it is still in the > {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of > task B since it is single-thread. So the thread's transition to {{RUNNING}} > when all of its assigned tasks have completed restoring and now can be > processed will be delayed. > Note that the streams instance's state will only transit to {{RUNNING}} when > all of its threads have transit to {{RUNNING}}, so the instance's transition > will also be delayed by this scenario. > We'd better to not start processing ready tasks immediately, but instead > focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the > overall time of the instance's state transition. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401033#comment-16401033 ] ASF GitHub Bot commented on KAFKA-6106: --- mjsax closed pull request #4651: KAFKA-6106: Postpone normal processing of tasks until restoration of all tasks completed URL: https://github.com/apache/kafka/pull/4651 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index c806bfde47e..92045713146 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -73,27 +73,12 @@ void addNewTask(final T task) { created.put(task.id(), task); } -Set uninitializedPartitions() { -if (created.isEmpty()) { -return Collections.emptySet(); -} -final Set partitions = new HashSet<>(); -for (final Map.Entryentry : created.entrySet()) { -if (entry.getValue().hasStateStores()) { -partitions.addAll(entry.getValue().partitions()); -} -} -return partitions; -} - /** - * @return partitions that are ready to be resumed * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition * @throws TaskMigratedException if the task producer got fenced (EOS only) */ -Set initializeNewTasks() { -final Set readyPartitions = new HashSet<>(); +void initializeNewTasks() { if (!created.isEmpty()) { log.debug("Initializing {}s {}", taskTypeName, created.keySet()); } @@ -104,7 +89,7 @@ void addNewTask(final T task) { log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey()); addToRestoring(entry.getValue()); } else { -transitionToRunning(entry.getValue(), readyPartitions); +transitionToRunning(entry.getValue()); } it.remove(); } catch (final LockException e) { @@ -112,21 +97,19 @@ void addNewTask(final T task) { log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(), e.getMessage()); } } -return readyPartitions; } -Set updateRestored(final Collection restored) { +void updateRestored(final Collection restored) { if (restored.isEmpty()) { -return Collections.emptySet(); +return; } log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName, restored); -final Set resume = new HashSet<>(); restoredPartitions.addAll(restored); for (final Iterator > it = restoring.entrySet().iterator(); it.hasNext(); ) { final Map.Entry entry = it.next(); final T task = entry.getValue(); if (restoredPartitions.containsAll(task.changelogPartitions())) { -transitionToRunning(task, resume); +transitionToRunning(task); it.remove(); log.trace("{} {} completed restoration as all its changelog partitions {} have been applied to restore state", taskTypeName, @@ -146,7 +129,6 @@ void addNewTask(final T task) { if (allTasksRunning()) { restoredPartitions.clear(); } -return resume; } boolean allTasksRunning() { @@ -243,7 +225,7 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set suspended.remove(taskId); task.resume(); try { -transitionToRunning(task, new HashSet()); +transitionToRunning(task); } catch (final TaskMigratedException e) { // we need to catch migration exception internally since this function // is triggered in the rebalance callback @@ -278,15 +260,12 @@ private void addToRestoring(final T task) { /** * @throws TaskMigratedException if the task producer got fenced (EOS only) */ -private void transitionToRunning(final T task, final Set readyPartitions) { +private void transitionToRunning(final T task) { log.debug("transitioning {} {} to
[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387293#comment-16387293 ] ASF GitHub Bot commented on KAFKA-6106: --- kamalcph closed pull request #4564: KAFKA-6106; Postpone normal processing of tasks within a thread until… URL: https://github.com/apache/kafka/pull/4564 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 6545fde30e9..d61b281eed1 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -88,7 +88,8 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = { val brokerIdPath = brokerInfo.path val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion) -retryRequestUntilConnected(setDataRequest) +val response = retryRequestUntilConnected(setDataRequest) +response.maybeThrow() info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints)) } @@ -424,7 +425,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def deleteLogDirEventNotifications(): Unit = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path)) if (getChildrenResponse.resultCode == Code.OK) { - deleteLogDirEventNotifications(getChildrenResponse.children) + deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)) } else if (getChildrenResponse.resultCode != Code.NONODE) { getChildrenResponse.maybeThrow } diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index d3726c25c58..e44c2c94e52 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -16,10 +16,11 @@ */ package kafka.zk -import java.util.{Properties, UUID} +import java.util.{Collections, Properties, UUID} import java.nio.charset.StandardCharsets.UTF_8 +import java.util.concurrent.{CountDownLatch, TimeUnit} -import kafka.api.ApiVersion +import kafka.api.{ApiVersion, LeaderAndIsr} import kafka.cluster.{Broker, EndPoint} import kafka.log.LogConfig import kafka.security.auth._ @@ -29,17 +30,48 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.TokenInformation -import org.apache.kafka.common.utils.SecurityUtils -import org.apache.zookeeper.KeeperException.NodeExistsException +import org.apache.kafka.common.utils.{SecurityUtils, Time} +import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException} import org.junit.Assert._ -import org.junit.Test - +import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.collection.{Seq, mutable} import scala.util.Random +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult +import kafka.zookeeper._ +import org.apache.kafka.common.security.JaasUtils +import org.apache.zookeeper.data.Stat + class KafkaZkClientTest extends ZooKeeperTestHarness { private val group = "my-group" + private val topic1 = "topic1" + private val topic2 = "topic2" + + val topicPartition10 = new TopicPartition(topic1, 0) + val topicPartition11 = new TopicPartition(topic1, 1) + val topicPartition20 = new TopicPartition(topic2, 0) + val topicPartitions10_11 = Seq(topicPartition10, topicPartition11) + + var otherZkClient: KafkaZkClient = _ + + @Before + override def setUp(): Unit = { +super.setUp() +otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout, + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM) + } + + @After + override def tearDown(): Unit = { +if (otherZkClient != null) + otherZkClient.close() +super.tearDown() + } + private val topicPartition = new TopicPartition("topic", 0) @Test @@ -90,10 +122,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Test def testTopicAssignmentMethods() { -val topic1 = "topic1" -val topic2 = "topic2" +
[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387289#comment-16387289 ] ASF GitHub Bot commented on KAFKA-6106: --- kamalcph opened a new pull request #4651: KAFKA-6106: Postpone normal processing of tasks until restoration of all tasks completed URL: https://github.com/apache/kafka/pull/4651 Once all the state stores are restored, then the processing of tasks takes place. This approach will reduce the time taken to restore the state stores as single thread is used to restore the state and process the task. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Postpone normal processing of tasks within a thread until restoration of all > tasks have completed > - > > Key: KAFKA-6106 > URL: https://issues.apache.org/jira/browse/KAFKA-6106 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Guozhang Wang >Assignee: Kamal Chandraprakash >Priority: Major > Labels: newbie++ > > Let's say a stream thread hosts multiple tasks, A and B. At the very > beginning when A and B are assigned to the thread, the thread state is > {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during > this state using the restore consumer while using normal consumer for > heartbeating. > If task A's restoration has completed earlier than task B, then the thread > will start processing A immediately even when it is still in the > {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of > task B since it is single-thread. So the thread's transition to {{RUNNING}} > when all of its assigned tasks have completed restoring and now can be > processed will be delayed. > Note that the streams instance's state will only transit to {{RUNNING}} when > all of its threads have transit to {{RUNNING}}, so the instance's transition > will also be delayed by this scenario. > We'd better to not start processing ready tasks immediately, but instead > focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the > overall time of the instance's state transition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362043#comment-16362043 ] ASF GitHub Bot commented on KAFKA-6106: --- kamalcph opened a new pull request #4564: KAFKA-6106; Postpone normal processing of tasks within a thread until… URL: https://github.com/apache/kafka/pull/4564 … restoration of all tasks have completed. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Postpone normal processing of tasks within a thread until restoration of all > tasks have completed > - > > Key: KAFKA-6106 > URL: https://issues.apache.org/jira/browse/KAFKA-6106 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Guozhang Wang >Assignee: Kamal Chandraprakash >Priority: Major > Labels: newbie++ > > Let's say a stream thread hosts multiple tasks, A and B. At the very > beginning when A and B are assigned to the thread, the thread state is > {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during > this state using the restore consumer while using normal consumer for > heartbeating. > If task A's restoration has completed earlier than task B, then the thread > will start processing A immediately even when it is still in the > {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of > task B since it is single-thread. So the thread's transition to {{RUNNING}} > when all of its assigned tasks have completed restoring and now can be > processed will be delayed. > Note that the streams instance's state will only transit to {{RUNNING}} when > all of its threads have transit to {{RUNNING}}, so the instance's transition > will also be delayed by this scenario. > We'd better to not start processing ready tasks immediately, but instead > focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the > overall time of the instance's state transition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360308#comment-16360308 ] Guozhang Wang commented on KAFKA-6106: -- Thanks! > Postpone normal processing of tasks within a thread until restoration of all > tasks have completed > - > > Key: KAFKA-6106 > URL: https://issues.apache.org/jira/browse/KAFKA-6106 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Guozhang Wang >Assignee: Kamal Chandraprakash >Priority: Major > Labels: newbie++ > > Let's say a stream thread hosts multiple tasks, A and B. At the very > beginning when A and B are assigned to the thread, the thread state is > {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during > this state using the restore consumer while using normal consumer for > heartbeating. > If task A's restoration has completed earlier than task B, then the thread > will start processing A immediately even when it is still in the > {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of > task B since it is single-thread. So the thread's transition to {{RUNNING}} > when all of its assigned tasks have completed restoring and now can be > processed will be delayed. > Note that the streams instance's state will only transit to {{RUNNING}} when > all of its threads have transit to {{RUNNING}}, so the instance's transition > will also be delayed by this scenario. > We'd better to not start processing ready tasks immediately, but instead > focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the > overall time of the instance's state transition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359245#comment-16359245 ] Kamal Chandraprakash commented on KAFKA-6106: - [~guozhang] I was busy in other tasks. I try to complete it by this weekend. > Postpone normal processing of tasks within a thread until restoration of all > tasks have completed > - > > Key: KAFKA-6106 > URL: https://issues.apache.org/jira/browse/KAFKA-6106 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Guozhang Wang >Assignee: Kamal Chandraprakash >Priority: Major > Labels: newbie++ > > Let's say a stream thread hosts multiple tasks, A and B. At the very > beginning when A and B are assigned to the thread, the thread state is > {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during > this state using the restore consumer while using normal consumer for > heartbeating. > If task A's restoration has completed earlier than task B, then the thread > will start processing A immediately even when it is still in the > {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of > task B since it is single-thread. So the thread's transition to {{RUNNING}} > when all of its assigned tasks have completed restoring and now can be > processed will be delayed. > Note that the streams instance's state will only transit to {{RUNNING}} when > all of its threads have transit to {{RUNNING}}, so the instance's transition > will also be delayed by this scenario. > We'd better to not start processing ready tasks immediately, but instead > focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the > overall time of the instance's state transition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356331#comment-16356331 ] Guozhang Wang commented on KAFKA-6106: -- [~ckamal] Are you still working on this issue? > Postpone normal processing of tasks within a thread until restoration of all > tasks have completed > - > > Key: KAFKA-6106 > URL: https://issues.apache.org/jira/browse/KAFKA-6106 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Guozhang Wang >Assignee: Kamal Chandraprakash >Priority: Major > Labels: newbie++ > > Let's say a stream thread hosts multiple tasks, A and B. At the very > beginning when A and B are assigned to the thread, the thread state is > {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during > this state using the restore consumer while using normal consumer for > heartbeating. > If task A's restoration has completed earlier than task B, then the thread > will start processing A immediately even when it is still in the > {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of > task B since it is single-thread. So the thread's transition to {{RUNNING}} > when all of its assigned tasks have completed restoring and now can be > processed will be delayed. > Note that the streams instance's state will only transit to {{RUNNING}} when > all of its threads have transit to {{RUNNING}}, so the instance's transition > will also be delayed by this scenario. > We'd better to not start processing ready tasks immediately, but instead > focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the > overall time of the instance's state transition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239733#comment-16239733 ] Guozhang Wang commented on KAFKA-6106: -- Today IQ will not be allowed if the stream thread is not {{State.RUNNING}} and the stream thread can only turn to {{RUNNING}} if all its tasks are in RUNNING states. So technically we can fix it two ways: 1) the proposal mentioned in this JIRA. 2) change the StateStoreProvider class, to make stores be queryable on the task level than on the thread level. So that some tasks' stores can be queryable even though other stores are not. I'm inclined to delay going on the second option for now (in other words, add this config to let users choose), because of the following: a) most streaming applications that leverage on IQ (think: analytics, monitoring) can only function when all such state stores are queryable. Making only part of these stores to be queryable while others are still bootstrapping would not help these applications. b) a known issue today for IQ is that states from different tasks are not from the same snapshots so that if they are logically correlated IQ has "phantom reads" scenarios; we have thought about how to remedy such issues and one of them is to leverage on exactly once semantics. In that case, moving forward some tasks while still restoring other tasks will simply make the tasks to be run at further different times. If in the future we do observe that this is a common request where users do not care about partial reads or inconsistent reads across states, but are really keen to some of the states to be queryable ASAP (btw on top of my head I feel even in this case the right way to solve it would be task assignment improvements?), then we can consider the second option. > Postpone normal processing of tasks within a thread until restoration of all > tasks have completed > - > > Key: KAFKA-6106 > URL: https://issues.apache.org/jira/browse/KAFKA-6106 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Guozhang Wang >Assignee: Guozhang Wang > > Let's say a stream thread hosts multiple tasks, A and B. At the very > beginning when A and B are assigned to the thread, the thread state is > {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during > this state using the restore consumer while using normal consumer for > heartbeating. > If task A's restoration has completed earlier than task B, then the thread > will start processing A immediately even when it is still in the > {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of > task B since it is single-thread. So the thread's transition to {{RUNNING}} > when all of its assigned tasks have completed restoring and now can be > processed will be delayed. > Note that the streams instance's state will only transit to {{RUNNING}} when > all of its threads have transit to {{RUNNING}}, so the instance's transition > will also be delayed by this scenario. > We'd better to not start processing ready tasks immediately, but instead > focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the > overall time of the instance's state transition. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239115#comment-16239115 ] Matthias J. Sax commented on KAFKA-6106: I understand your argument about easy to understand configs. A main motivation to start processing of "ready" tasks is to also allow to query their stores asap -- currently, we only allow to query stores if the whole {{KafkaStreams}} instance (or {{StreamsThread}}?) is in state {{RUNNING}} -- long "downtimes" for IQ is a real issue. But maybe we can also fix this differently -- not 100% sure if it's related (but it seems to be a requirement to unlock improved IQ behavior) or orthogonal to this ticket. \cc [~bbejeck] [~damianguy] > Postpone normal processing of tasks within a thread until restoration of all > tasks have completed > - > > Key: KAFKA-6106 > URL: https://issues.apache.org/jira/browse/KAFKA-6106 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > Let's say a stream thread hosts multiple tasks, A and B. At the very > beginning when A and B are assigned to the thread, the thread state is > {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during > this state using the restore consumer while using normal consumer for > heartbeating. > If task A's restoration has completed earlier than task B, then the thread > will start processing A immediately even when it is still in the > {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of > task B since it is single-thread. So the thread's transition to {{RUNNING}} > when all of its assigned tasks have completed restoring and now can be > processed will be delayed. > Note that the streams instance's state will only transit to {{RUNNING}} when > all of its threads have transit to {{RUNNING}}, so the instance's transition > will also be delayed by this scenario. > We'd better to not start processing ready tasks immediately, but instead > focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the > overall time of the instance's state transition. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223059#comment-16223059 ] Guozhang Wang commented on KAFKA-6106: -- I think it is better to enforce this behavior and here's my motivations: 1. I cannot actually think of a scenario that people would prefer start partial processing asap while keeping the whole instance's state in `PARTITION_ASSIGNED`. But I can be convinced if you have a concrete use case in mind. 2. I'd prefer to expose configs that are "easy to understand" for users and not leaking too much internal runtime details. This config looks much more "leaky" and hard to understand to end users, and hence I'm hesitant to introducing it to the user: usually if I find myself needing more than 5 sentences to explain a config semantic, it is a good sign that probably I should not add it. My current PR assumes no config exposed, but we can keep this discussion on the ticket or on the PR itself while we review it. > Postpone normal processing of tasks within a thread until restoration of all > tasks have completed > - > > Key: KAFKA-6106 > URL: https://issues.apache.org/jira/browse/KAFKA-6106 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Guozhang Wang >Assignee: Guozhang Wang > > Let's say a stream thread hosts multiple tasks, A and B. At the very > beginning when A and B are assigned to the thread, the thread state is > {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during > this state using the restore consumer while using normal consumer for > heartbeating. > If task A's restoration has completed earlier than task B, then the thread > will start processing A immediately even when it is still in the > {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of > task B since it is single-thread. So the thread's transition to {{RUNNING}} > when all of its assigned tasks have completed restoring and now can be > processed will be delayed. > Note that the streams instance's state will only transit to {{RUNNING}} when > all of its threads have transit to {{RUNNING}}, so the instance's transition > will also be delayed by this scenario. > We'd better to not start processing ready tasks immediately, but instead > focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the > overall time of the instance's state transition. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16215564#comment-16215564 ] Matthias J. Sax commented on KAFKA-6106: IMHO, both scenarios (block all processing to speed up restoration as well as start partial processing asap) are valuable and we should give a user a config to choose between both behaviors. This would require a KIP. > Postpone normal processing of tasks within a thread until restoration of all > tasks have completed > - > > Key: KAFKA-6106 > URL: https://issues.apache.org/jira/browse/KAFKA-6106 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Guozhang Wang >Assignee: Guozhang Wang > > Let's say a stream thread hosts multiple tasks, A and B. At the very > beginning when A and B are assigned to the thread, the thread state is > {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during > this state using the restore consumer while using normal consumer for > heartbeating. > If task A's restoration has completed earlier than task B, then the thread > will start processing A immediately even when it is still in the > {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of > task B since it is single-thread. So the thread's transition to {{RUNNING}} > when all of its assigned tasks have completed restoring and now can be > processed will be delayed. > Note that the streams instance's state will only transit to {{RUNNING}} when > all of its threads have transit to {{RUNNING}}, so the instance's transition > will also be delayed by this scenario. > We'd better to not start processing ready tasks immediately, but instead > focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the > overall time of the instance's state transition. -- This message was sent by Atlassian JIRA (v6.4.14#64029)