[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback
[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15620357#comment-15620357 ] ASF GitHub Bot commented on KAFKA-3559: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/2032 > Task creation time taking too long in rebalance callback > > > Key: KAFKA-3559 > URL: https://issues.apache.org/jira/browse/KAFKA-3559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.10.2.0 > > > Currently in Kafka Streams, we create stream tasks upon getting newly > assigned partitions in rebalance callback function {code} onPartitionAssigned > {code}, which involves initialization of the processor state stores as well > (including opening the rocksDB, restore the store from changelog, etc, which > takes time). > With a large number of state stores, the initialization time itself could > take tens of seconds, which usually is larger than the consumer session > timeout. As a result, when the callback is completed, the consumer is already > treated as failed by the coordinator and rebalance again. > We need to consider if we can optimize the initialization process, or move it > out of the callback function, and while initializing the stores one-by-one, > use poll call to send heartbeats to avoid being kicked out by coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback
[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15619884#comment-15619884 ] ASF GitHub Bot commented on KAFKA-3559: --- GitHub user enothereska reopened a pull request: https://github.com/apache/kafka/pull/2032 KAFKA-3559: Recycle old tasks when possible You can merge this pull request into a Git repository by running: $ git pull https://github.com/enothereska/kafka KAFKA-3559-onPartitionAssigned Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2032.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2032 commit 28a50430e4136ba75f0d9b957a67f22e7b1e86a0 Author: Eno ThereskaDate: 2016-10-17T10:46:45Z Recycle old tasks when possible commit b3dc438bf1665b9364b19f5efa908dd35d2b7af3 Author: Eno Thereska Date: 2016-10-19T15:13:36Z Adjusted based on Damian's comments commit f8cfe74d85e0a8cd5efacca87eced236319c83b9 Author: Eno Thereska Date: 2016-10-19T17:44:39Z Refactor commit 62bb3fd4a90dd28bc7bb58bf077b7ecb60207c7e Author: Eno Thereska Date: 2016-10-24T14:24:48Z Merge remote-tracking branch 'origin/trunk' into KAFKA-3559-onPartitionAssigned commit 841caa3721172d2d89ec16ef6dfd149f25498649 Author: Eno Thereska Date: 2016-10-24T17:32:05Z Addressed Guozhang's comments commit c4498564907243c35df832407933b8a9cf32f4ef Author: Eno Thereska Date: 2016-10-25T11:07:28Z Refactor commit 4ba24c1ecb8c6293adce426a92b6021e86c9e8b7 Author: Eno Thereska Date: 2016-10-25T12:20:04Z Merge remote-tracking branch 'origin/trunk' into KAFKA-3559-onPartitionAssigned commit 0fe12633b8593eda3b5b7b75bc87244276c95ce2 Author: Eno Thereska Date: 2016-10-28T20:46:18Z Minor reshuffle commit 7bf5d96cd66ab77130cad39fbff821fccd83aa06 Author: Eno Thereska Date: 2016-10-28T21:44:48Z Guozhang's suggestion to clear queue commit ecc5e8a54f908507cb32ab785ee748e1d9e2cfb4 Author: Eno Thereska Date: 2016-10-29T19:01:59Z Clear another queue commit dffa9a2896eade6501794596bb08a9a2545e81b0 Author: Eno Thereska Date: 2016-10-29T19:14:56Z Merge with trunk commit 8f07571d28d137eaf7951e05c799e9458dca703f Author: Eno Thereska Date: 2016-10-29T22:22:21Z Separate cache test into another file > Task creation time taking too long in rebalance callback > > > Key: KAFKA-3559 > URL: https://issues.apache.org/jira/browse/KAFKA-3559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.10.2.0 > > > Currently in Kafka Streams, we create stream tasks upon getting newly > assigned partitions in rebalance callback function {code} onPartitionAssigned > {code}, which involves initialization of the processor state stores as well > (including opening the rocksDB, restore the store from changelog, etc, which > takes time). > With a large number of state stores, the initialization time itself could > take tens of seconds, which usually is larger than the consumer session > timeout. As a result, when the callback is completed, the consumer is already > treated as failed by the coordinator and rebalance again. > We need to consider if we can optimize the initialization process, or move it > out of the callback function, and while initializing the stores one-by-one, > use poll call to send heartbeats to avoid being kicked out by coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback
[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15619883#comment-15619883 ] ASF GitHub Bot commented on KAFKA-3559: --- Github user enothereska closed the pull request at: https://github.com/apache/kafka/pull/2032 > Task creation time taking too long in rebalance callback > > > Key: KAFKA-3559 > URL: https://issues.apache.org/jira/browse/KAFKA-3559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.10.2.0 > > > Currently in Kafka Streams, we create stream tasks upon getting newly > assigned partitions in rebalance callback function {code} onPartitionAssigned > {code}, which involves initialization of the processor state stores as well > (including opening the rocksDB, restore the store from changelog, etc, which > takes time). > With a large number of state stores, the initialization time itself could > take tens of seconds, which usually is larger than the consumer session > timeout. As a result, when the callback is completed, the consumer is already > treated as failed by the coordinator and rebalance again. > We need to consider if we can optimize the initialization process, or move it > out of the callback function, and while initializing the stores one-by-one, > use poll call to send heartbeats to avoid being kicked out by coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback
[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15617936#comment-15617936 ] ASF GitHub Bot commented on KAFKA-3559: --- Github user enothereska closed the pull request at: https://github.com/apache/kafka/pull/2032 > Task creation time taking too long in rebalance callback > > > Key: KAFKA-3559 > URL: https://issues.apache.org/jira/browse/KAFKA-3559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.10.2.0 > > > Currently in Kafka Streams, we create stream tasks upon getting newly > assigned partitions in rebalance callback function {code} onPartitionAssigned > {code}, which involves initialization of the processor state stores as well > (including opening the rocksDB, restore the store from changelog, etc, which > takes time). > With a large number of state stores, the initialization time itself could > take tens of seconds, which usually is larger than the consumer session > timeout. As a result, when the callback is completed, the consumer is already > treated as failed by the coordinator and rebalance again. > We need to consider if we can optimize the initialization process, or move it > out of the callback function, and while initializing the stores one-by-one, > use poll call to send heartbeats to avoid being kicked out by coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback
[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15617935#comment-15617935 ] ASF GitHub Bot commented on KAFKA-3559: --- GitHub user enothereska reopened a pull request: https://github.com/apache/kafka/pull/2032 KAFKA-3559: Recycle old tasks when possible You can merge this pull request into a Git repository by running: $ git pull https://github.com/enothereska/kafka KAFKA-3559-onPartitionAssigned Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2032.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2032 commit 28a50430e4136ba75f0d9b957a67f22e7b1e86a0 Author: Eno ThereskaDate: 2016-10-17T10:46:45Z Recycle old tasks when possible commit b3dc438bf1665b9364b19f5efa908dd35d2b7af3 Author: Eno Thereska Date: 2016-10-19T15:13:36Z Adjusted based on Damian's comments commit f8cfe74d85e0a8cd5efacca87eced236319c83b9 Author: Eno Thereska Date: 2016-10-19T17:44:39Z Refactor commit 62bb3fd4a90dd28bc7bb58bf077b7ecb60207c7e Author: Eno Thereska Date: 2016-10-24T14:24:48Z Merge remote-tracking branch 'origin/trunk' into KAFKA-3559-onPartitionAssigned commit 841caa3721172d2d89ec16ef6dfd149f25498649 Author: Eno Thereska Date: 2016-10-24T17:32:05Z Addressed Guozhang's comments commit c4498564907243c35df832407933b8a9cf32f4ef Author: Eno Thereska Date: 2016-10-25T11:07:28Z Refactor commit 4ba24c1ecb8c6293adce426a92b6021e86c9e8b7 Author: Eno Thereska Date: 2016-10-25T12:20:04Z Merge remote-tracking branch 'origin/trunk' into KAFKA-3559-onPartitionAssigned commit 0fe12633b8593eda3b5b7b75bc87244276c95ce2 Author: Eno Thereska Date: 2016-10-28T20:46:18Z Minor reshuffle commit 7bf5d96cd66ab77130cad39fbff821fccd83aa06 Author: Eno Thereska Date: 2016-10-28T21:44:48Z Guozhang's suggestion to clear queue > Task creation time taking too long in rebalance callback > > > Key: KAFKA-3559 > URL: https://issues.apache.org/jira/browse/KAFKA-3559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.10.2.0 > > > Currently in Kafka Streams, we create stream tasks upon getting newly > assigned partitions in rebalance callback function {code} onPartitionAssigned > {code}, which involves initialization of the processor state stores as well > (including opening the rocksDB, restore the store from changelog, etc, which > takes time). > With a large number of state stores, the initialization time itself could > take tens of seconds, which usually is larger than the consumer session > timeout. As a result, when the callback is completed, the consumer is already > treated as failed by the coordinator and rebalance again. > We need to consider if we can optimize the initialization process, or move it > out of the callback function, and while initializing the stores one-by-one, > use poll call to send heartbeats to avoid being kicked out by coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback
[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15616982#comment-15616982 ] ASF GitHub Bot commented on KAFKA-3559: --- Github user enothereska closed the pull request at: https://github.com/apache/kafka/pull/2032 > Task creation time taking too long in rebalance callback > > > Key: KAFKA-3559 > URL: https://issues.apache.org/jira/browse/KAFKA-3559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.10.2.0 > > > Currently in Kafka Streams, we create stream tasks upon getting newly > assigned partitions in rebalance callback function {code} onPartitionAssigned > {code}, which involves initialization of the processor state stores as well > (including opening the rocksDB, restore the store from changelog, etc, which > takes time). > With a large number of state stores, the initialization time itself could > take tens of seconds, which usually is larger than the consumer session > timeout. As a result, when the callback is completed, the consumer is already > treated as failed by the coordinator and rebalance again. > We need to consider if we can optimize the initialization process, or move it > out of the callback function, and while initializing the stores one-by-one, > use poll call to send heartbeats to avoid being kicked out by coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback
[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15616983#comment-15616983 ] ASF GitHub Bot commented on KAFKA-3559: --- GitHub user enothereska reopened a pull request: https://github.com/apache/kafka/pull/2032 KAFKA-3559: Recycle old tasks when possible You can merge this pull request into a Git repository by running: $ git pull https://github.com/enothereska/kafka KAFKA-3559-onPartitionAssigned Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2032.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2032 commit 28a50430e4136ba75f0d9b957a67f22e7b1e86a0 Author: Eno ThereskaDate: 2016-10-17T10:46:45Z Recycle old tasks when possible commit b3dc438bf1665b9364b19f5efa908dd35d2b7af3 Author: Eno Thereska Date: 2016-10-19T15:13:36Z Adjusted based on Damian's comments commit f8cfe74d85e0a8cd5efacca87eced236319c83b9 Author: Eno Thereska Date: 2016-10-19T17:44:39Z Refactor commit 62bb3fd4a90dd28bc7bb58bf077b7ecb60207c7e Author: Eno Thereska Date: 2016-10-24T14:24:48Z Merge remote-tracking branch 'origin/trunk' into KAFKA-3559-onPartitionAssigned commit 841caa3721172d2d89ec16ef6dfd149f25498649 Author: Eno Thereska Date: 2016-10-24T17:32:05Z Addressed Guozhang's comments commit c4498564907243c35df832407933b8a9cf32f4ef Author: Eno Thereska Date: 2016-10-25T11:07:28Z Refactor commit 4ba24c1ecb8c6293adce426a92b6021e86c9e8b7 Author: Eno Thereska Date: 2016-10-25T12:20:04Z Merge remote-tracking branch 'origin/trunk' into KAFKA-3559-onPartitionAssigned commit 0fe12633b8593eda3b5b7b75bc87244276c95ce2 Author: Eno Thereska Date: 2016-10-28T20:46:18Z Minor reshuffle commit 7bf5d96cd66ab77130cad39fbff821fccd83aa06 Author: Eno Thereska Date: 2016-10-28T21:44:48Z Guozhang's suggestion to clear queue > Task creation time taking too long in rebalance callback > > > Key: KAFKA-3559 > URL: https://issues.apache.org/jira/browse/KAFKA-3559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.10.2.0 > > > Currently in Kafka Streams, we create stream tasks upon getting newly > assigned partitions in rebalance callback function {code} onPartitionAssigned > {code}, which involves initialization of the processor state stores as well > (including opening the rocksDB, restore the store from changelog, etc, which > takes time). > With a large number of state stores, the initialization time itself could > take tens of seconds, which usually is larger than the consumer session > timeout. As a result, when the callback is completed, the consumer is already > treated as failed by the coordinator and rebalance again. > We need to consider if we can optimize the initialization process, or move it > out of the callback function, and while initializing the stores one-by-one, > use poll call to send heartbeats to avoid being kicked out by coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback
[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15581874#comment-15581874 ] ASF GitHub Bot commented on KAFKA-3559: --- GitHub user enothereska opened a pull request: https://github.com/apache/kafka/pull/2032 KAFKA-3559: Recycle old tasks when possible You can merge this pull request into a Git repository by running: $ git pull https://github.com/enothereska/kafka KAFKA-3559-onPartitionAssigned Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2032.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2032 commit 28a50430e4136ba75f0d9b957a67f22e7b1e86a0 Author: Eno ThereskaDate: 2016-10-17T10:46:45Z Recycle old tasks when possible > Task creation time taking too long in rebalance callback > > > Key: KAFKA-3559 > URL: https://issues.apache.org/jira/browse/KAFKA-3559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.10.2.0 > > > Currently in Kafka Streams, we create stream tasks upon getting newly > assigned partitions in rebalance callback function {code} onPartitionAssigned > {code}, which involves initialization of the processor state stores as well > (including opening the rocksDB, restore the store from changelog, etc, which > takes time). > With a large number of state stores, the initialization time itself could > take tens of seconds, which usually is larger than the consumer session > timeout. As a result, when the callback is completed, the consumer is already > treated as failed by the coordinator and rebalance again. > We need to consider if we can optimize the initialization process, or move it > out of the callback function, and while initializing the stores one-by-one, > use poll call to send heartbeats to avoid being kicked out by coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback
[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571234#comment-15571234 ] Eno Thereska commented on KAFKA-3559: - [~guozhang] actually let's keep this JIRA, no need to create another one after all. > Task creation time taking too long in rebalance callback > > > Key: KAFKA-3559 > URL: https://issues.apache.org/jira/browse/KAFKA-3559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.10.2.0 > > > Currently in Kafka Streams, we create stream tasks upon getting newly > assigned partitions in rebalance callback function {code} onPartitionAssigned > {code}, which involves initialization of the processor state stores as well > (including opening the rocksDB, restore the store from changelog, etc, which > takes time). > With a large number of state stores, the initialization time itself could > take tens of seconds, which usually is larger than the consumer session > timeout. As a result, when the callback is completed, the consumer is already > treated as failed by the coordinator and rebalance again. > We need to consider if we can optimize the initialization process, or move it > out of the callback function, and while initializing the stores one-by-one, > use poll call to send heartbeats to avoid being kicked out by coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback
[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569915#comment-15569915 ] Guozhang Wang commented on KAFKA-3559: -- [~enothereska] If you feel this suggestion deserve another ticket, feel free to create a separate one and since with KIP-4 there is little difference between initializing in the rebalance callback v.s. after the rebalance completes, we can close this ticket if you think so. > Task creation time taking too long in rebalance callback > > > Key: KAFKA-3559 > URL: https://issues.apache.org/jira/browse/KAFKA-3559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.10.2.0 > > > Currently in Kafka Streams, we create stream tasks upon getting newly > assigned partitions in rebalance callback function {code} onPartitionAssigned > {code}, which involves initialization of the processor state stores as well > (including opening the rocksDB, restore the store from changelog, etc, which > takes time). > With a large number of state stores, the initialization time itself could > take tens of seconds, which usually is larger than the consumer session > timeout. As a result, when the callback is completed, the consumer is already > treated as failed by the coordinator and rebalance again. > We need to consider if we can optimize the initialization process, or move it > out of the callback function, and while initializing the stores one-by-one, > use poll call to send heartbeats to avoid being kicked out by coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback
[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15547089#comment-15547089 ] Guozhang Wang commented on KAFKA-3559: -- Here are some more thoughts on this issue and how we can improve the situation: Currently with Kafka Streams each rebalance is expensive, even if it is only "partial" (i.e. only a few of the non-leader members in the consumer group has decided to re-join, which will not trigger a full rebalance but only will cause the coordinator to send back the assignment again), since anyways {{onPartitionRevoked}} and {{onPartitionAssigned}} will be triggered, closing and (re-)constructing the tasks. For example, on my local (a very small) laptop, with a complex topology containing 10+ stores and 15+ internal topics, with 3 threads on rebalance could take up to 20 seconds. On the other hand, we want to close the tasks in {{onPartitionRevoked}} before the synchronization barrier only because threads may hold some file locks related to these tasks. And since tasks are all committed right before closing, I think it is safe to delay the destruction of tasks so that we may be able to save the time of closing / reconstructing such tasks. More specifically: 1. In {{onPartitionRevoked}}, instead of closing the tasks, we only need to commit the tasks and "pause" them by calling their topology processor's newly added {{flush}} calls, releasing the corresponding file locks of the tasks: in fact, it is automatically done since we will not process any messages during the rebalance anyways. 2. Then in {{onPartitionAssigned}}, we can if there are any tasks that have really been migrated out of the thread; for those tasks, closing them (and note that since these tasks are already committed in {{onPartitionRevoked}}, closing them will only involve calling the topology processor's {{close}} function, as well as closing the state stores), otherwise "resume" processing. We need to think through some minor issues such as the above mentioned file locks for persistent state stores, how clean-up will work without introducing deadlocks, etc. But I think in general this solution should work. > Task creation time taking too long in rebalance callback > > > Key: KAFKA-3559 > URL: https://issues.apache.org/jira/browse/KAFKA-3559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.10.2.0 > > > Currently in Kafka Streams, we create stream tasks upon getting newly > assigned partitions in rebalance callback function {code} onPartitionAssigned > {code}, which involves initialization of the processor state stores as well > (including opening the rocksDB, restore the store from changelog, etc, which > takes time). > With a large number of state stores, the initialization time itself could > take tens of seconds, which usually is larger than the consumer session > timeout. As a result, when the callback is completed, the consumer is already > treated as failed by the coordinator and rebalance again. > We need to consider if we can optimize the initialization process, or move it > out of the callback function, and while initializing the stores one-by-one, > use poll call to send heartbeats to avoid being kicked out by coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback
[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15433648#comment-15433648 ] Guozhang Wang commented on KAFKA-3559: -- With KIP-62 (KAFKA-3888) merged into trunk, this issue should be auto-fixed already. [~norwood] [~apovzner] Could you verify if that is the case? > Task creation time taking too long in rebalance callback > > > Key: KAFKA-3559 > URL: https://issues.apache.org/jira/browse/KAFKA-3559 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.10.1.0 > > > Currently in Kafka Streams, we create stream tasks upon getting newly > assigned partitions in rebalance callback function {code} onPartitionAssigned > {code}, which involves initialization of the processor state stores as well > (including opening the rocksDB, restore the store from changelog, etc, which > takes time). > With a large number of state stores, the initialization time itself could > take tens of seconds, which usually is larger than the consumer session > timeout. As a result, when the callback is completed, the consumer is already > treated as failed by the coordinator and rebalance again. > We need to consider if we can optimize the initialization process, or move it > out of the callback function, and while initializing the stores one-by-one, > use poll call to send heartbeats to avoid being kicked out by coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback
[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15264350#comment-15264350 ] ASF GitHub Bot commented on KAFKA-3559: --- Github user enothereska closed the pull request at: https://github.com/apache/kafka/pull/1223 > Task creation time taking too long in rebalance callback > > > Key: KAFKA-3559 > URL: https://issues.apache.org/jira/browse/KAFKA-3559 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.10.1.0 > > > Currently in Kafka Streams, we create stream tasks upon getting newly > assigned partitions in rebalance callback function {code} onPartitionAssigned > {code}, which involves initialization of the processor state stores as well > (including opening the rocksDB, restore the store from changelog, etc, which > takes time). > With a large number of state stores, the initialization time itself could > take tens of seconds, which usually is larger than the consumer session > timeout. As a result, when the callback is completed, the consumer is already > treated as failed by the coordinator and rebalance again. > We need to consider if we can optimize the initialization process, or move it > out of the callback function, and while initializing the stores one-by-one, > use poll call to send heartbeats to avoid being kicked out by coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback
[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15242668#comment-15242668 ] ASF GitHub Bot commented on KAFKA-3559: --- GitHub user enothereska opened a pull request: https://github.com/apache/kafka/pull/1223 KAFKA-3559: lazy initialisation of state stores Instead of initialising state stores on init(), they are initialised on first access. You can merge this pull request into a Git repository by running: $ git pull https://github.com/enothereska/kafka KAFKA-3559-rebalance Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1223.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1223 commit fa970d2126fc10bcba1748d2448ae3c3489e05e7 Author: Eno ThereskaDate: 2016-04-15T08:52:34Z Lazy initialization of state stores (on access, rather than all at once) commit aebb365d04504cc0a5100715ccfd37a82b8b0298 Author: Eno Thereska Date: 2016-04-15T09:02:50Z Check arguments > Task creation time taking too long in rebalance callback > > > Key: KAFKA-3559 > URL: https://issues.apache.org/jira/browse/KAFKA-3559 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.10.0.0 > > > Currently in Kafka Streams, we create stream tasks upon getting newly > assigned partitions in rebalance callback function {code} onPartitionAssigned > {code}, which involves initialization of the processor state stores as well > (including opening the rocksDB, restore the store from changelog, etc, which > takes time). > With a large number of state stores, the initialization time itself could > take tens of seconds, which usually is larger than the consumer session > timeout. As a result, when the callback is completed, the consumer is already > treated as failed by the coordinator and rebalance again. > We need to consider if we can optimize the initialization process, or move it > out of the callback function, and while initializing the stores one-by-one, > use poll call to send heartbeats to avoid being kicked out by coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)