[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16948413#comment-16948413 ] Ashish Surana commented on KAFKA-7149: -- I think along with these encoding changes we are also compressing the data before sending out the assignments. If producer is already sending compressed data then compressing again while writing to __offsets topic won't help but instead if it could decrease the performance if the compression algorithm is different at producer and __offset topic. I am not sure it's good to add compression for __offset topic here. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Vinoth Chandar >Priority: Major > Fix For: 2.4.0 > > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923721#comment-16923721 ] ASF GitHub Bot commented on KAFKA-7149: --- guozhangwang commented on pull request #7185: KAFKA-7149 : Reducing streams assignment data size URL: https://github.com/apache/kafka/pull/7185 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Vinoth Chandar >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16909314#comment-16909314 ] Vinoth Chandar commented on KAFKA-7149: --- To close the loop on turning on compression on the backing `_consumer_offsets` topic., seems like it can already be controlled by `offsets.topic.compression.codec`.. Please correct me if I am missing something In code : GroupMetadataManager::storeGroup() {code} def storeGroup(group: GroupMetadata, groupAssignment: Map[String, Array[Byte]], responseCallback: Errors => Unit): Unit = { getMagic(partitionFor(group.groupId)) match { case Some(magicValue) => ... val key = GroupMetadataManager.groupMetadataKey(group.groupId) val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion) val records = { val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, *compressionType*, Seq(new SimpleRecord(timestamp, key, value)).asJava)) val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L) builder.append(timestamp, key, value) builder.build() } val groupMetadataPartition = new TopicPartition(*Topic.GROUP_METADATA_TOPIC_NAME*, partitionFor(group.groupId)) val groupMetadataRecords = Map(groupMetadataPartition -> records) val generationId = group.generationId {code} > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Vinoth Chandar >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16909140#comment-16909140 ] Vinoth Chandar commented on KAFKA-7149: --- [~guozhang] Ping ping :) > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Vinoth Chandar >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16905430#comment-16905430 ] Vinoth Chandar commented on KAFKA-7149: --- While looking at changes to detect topology changes, realized this was still more complicated than it may be has to. Turns out just dictionary encoding the Topic names on the wire, can provide similar gains as both approaches above. PR is updated based on that.. This is a lot simpler . {code:java} oldAssignmentInfoBytes : 77684 , newAssignmentInfoBytes: 33226{code} P.S: My benchmark code {code:java} public static void main(String[] args) { // Assumption : Streams topology with 10 input topics, 4 sub topologies (2 topics per sub topology) = ~20 topics // High number of hosts = 500; High number of partitions = 128 final int numStandbyPerTask = 2; final String topicPrefix = "streams_topic_name"; final int numTopicGroups = 4; final int numHosts = 500; final int numTopics = 20; final int partitionPerTopic = 128; final int numTasks = partitionPerTopic * numTopics; List activeTasks = new ArrayList<>(); Map> standbyTasks = new HashMap<>(); Map> partitionsByHost = new HashMap<>(); // add tasks across each topicGroups for (int tg =0 ; tg < numTopicGroups; tg++) { for (int i=0; i < (numTasks/numTopicGroups)/numHosts; i++) { TaskId taskId = new TaskId(tg, i); activeTasks.add(taskId); for (int j=0; j < numStandbyPerTask; j++) { standbyTasks.computeIfAbsent(taskId, k -> new HashSet<>()) .add(new TopicPartition(topicPrefix+ tg + i + j, j)); } } } // Generate actual global assignment map Random random = new Random(12345); for (int h=0; h < numHosts; h++) { Set topicPartitions = new HashSet<>(); for (int j=0; j < numTasks/numHosts; j++) { int topicGroupId = random.nextInt(numTopicGroups); int topicIndex = random.nextInt(numTopics/numTopicGroups); String topicName = topicPrefix + topicGroupId + topicIndex; int partition = random.nextInt(128); topicPartitions.add(new TopicPartition(topicName, partition)); } HostInfo hostInfo = new HostInfo("streams_host" + h, 123456); partitionsByHost.put(hostInfo, topicPartitions); } final AssignmentInfo oldAssignmentInfo = new AssignmentInfo(4, activeTasks, standbyTasks, partitionsByHost, 0); final AssignmentInfo newAssignmentInfo = new AssignmentInfo(5, activeTasks, standbyTasks, partitionsByHost, 0); System.out.format("oldAssignmentInfoBytes : %d , newAssignmentInfoBytes: %d \n", oldAssignmentInfo.encode().array().length, newAssignmentInfo.encode().array().length); }{code} > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Vinoth Chandar >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16904252#comment-16904252 ] Vinoth Chandar commented on KAFKA-7149: --- [~guozhang] We can squeeze another 20% by simply writing topicGroupId, topicIndex, partition as short instead of ints. Short should be plenty for these IMO, but understand this departs from standard treatment elsewhere.. what do you think about this? > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Vinoth Chandar >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16904245#comment-16904245 ] Vinoth Chandar commented on KAFKA-7149: --- Pasting some size tests for old and new assignment information : {code:java} // Assumptions : Streams topology with 10 input topics, 4 sub topologies (2 topics per sub topology) = ~20 topics // High number of hosts = 500; High number of partitions = 128 oldAssignmentInfoBytes : 77684 , newAssignmentInfoBytes: 42698{code} Roughly ~45% savings.. For 500 hosts, we will be reducing from ~39MB -> ~21MB (NOTE: this is still a single object only, we do need the protocol change/compression on internal topics to ultimately fix the large message problem) > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Vinoth Chandar >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16904106#comment-16904106 ] Vinoth Chandar commented on KAFKA-7149: --- I changed the approach from the original PR. Noticed few corner cases for TaskID . -> TopicPartition translations. Specifically, during {{partitionsForTask <- DefaultPartitionGrouper::partitionGroups()}}. for each topicGroup, it creates max(numPartitions of all source topics) tasks. e,g if topic t1 (p0,p1) t2 (p0, p1, p2) is the topic group A, then there is three tasks and task A_2 will only cater to t2_p2 and have no topic partitions for t1. Thus we cannot simply use the TaskId::partition as the topic partition. Spent sometime to see if we can derive this dynamically inside {{onAssignment()}}. But we cannot then handle the case where the leader has already seen a partition added to one of the topics and computed the assignment based off that > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Vinoth Chandar >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16904096#comment-16904096 ] ASF GitHub Bot commented on KAFKA-7149: --- vinothchandar commented on pull request #7185: KAFKA-7149 : [WIP] Reducing streams assignment data size URL: https://github.com/apache/kafka/pull/7185 - Leader instance encodes topic names as an index of its position in topology - Follower instances decode and map the integer back to topic name string - Relies on leader and follower instances having the same topology object - Existing tests redone based on new metadata exchanged *Summary of testing strategy (including rationale)* - [ ] Adding new unit tests - [ ] Size reduction tests - [ ] Test upgrade scenarios - [ ] Test topology mismatch scenarios ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Vinoth Chandar >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16900634#comment-16900634 ] Navinder Brar commented on KAFKA-7149: -- [~vinoth] yeah feel free to reassign this to yourself. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Navinder Brar >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16900364#comment-16900364 ] Matthias J. Sax commented on KAFKA-7149: [~vinoth] If there will be no response from [~NaviBrar] in the next couple of days, feel free to reassign the ticket to yourself. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Navinder Brar >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16899224#comment-16899224 ] Vinoth Chandar commented on KAFKA-7149: --- [~guozhang] [~NaviBrar] Do you mind if I claim this issue, since the last person working on it no longer has cycles.. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Navinder Brar >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16863450#comment-16863450 ] ASF GitHub Bot commented on KAFKA-7149: --- guozhangwang commented on pull request #6162: KAFKA-7149 Reduce assignment data size URL: https://github.com/apache/kafka/pull/6162 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Navinder Brar >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16770501#comment-16770501 ] Matthias J. Sax commented on KAFKA-7149: Moving all major/minor/trivial tickets that are not merged yet out of 2.2 release. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Navinder Brar >Priority: Major > Fix For: 2.2.0 > > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745082#comment-16745082 ] ASF GitHub Bot commented on KAFKA-7149: --- brary commented on pull request #5663: KAFKA-7149 Reduce assignment data size URL: https://github.com/apache/kafka/pull/5663 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Navinder Brar >Priority: Major > Fix For: 2.2.0 > > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744765#comment-16744765 ] ASF GitHub Bot commented on KAFKA-7149: --- brary commented on pull request #6162: KAFKA-7149 Reduce assignment data size URL: https://github.com/apache/kafka/pull/6162 The PR contains the changes related to Assignment Info re-design where the TopicPartitions are replaced with TaskIDs and GZIP compression is being done on assignmentInfo to reduce the assignment size in version 4. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Navinder Brar >Priority: Major > Fix For: 2.2.0 > > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744195#comment-16744195 ] Navinder Brar commented on KAFKA-7149: -- [~mjsax] The PR has passed all the checks and code review comments have been implemented. Please take further action. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Navinder Brar >Priority: Major > Fix For: 2.2.0 > > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16742857#comment-16742857 ] Navinder Brar commented on KAFKA-7149: -- [~mjsax] yeah I had submitted a PR, there were some core test cases which were failing I guess because I had not done a rebase, I will rebase and send a PR again and update here. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Navinder Brar >Priority: Major > Fix For: 2.2.0 > > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16742728#comment-16742728 ] Matthias J. Sax commented on KAFKA-7149: There was not much progress recently. [~NaviBrar] are you still on it? Your PR seems to need a rebase. Can review it afterwards. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Navinder Brar >Priority: Major > Fix For: 2.2.0 > > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16740052#comment-16740052 ] Ashish Surana commented on KAFKA-7149: -- Is this change on track for 2.2.0 release. I believe this is very important improvement for anybody using kafka-streams at decent scale. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Navinder Brar >Priority: Major > Fix For: 2.2.0 > > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657902#comment-16657902 ] Ismael Juma commented on KAFKA-7149: It's way too late for 2.1.0 unless it's a blocker. Can we please move this to 2.2.0? > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Navinder Brar >Priority: Major > Fix For: 2.1.0 > > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16637351#comment-16637351 ] Matthias J. Sax commented on KAFKA-7149: [~guozhang] Do we still aim to get this into 2.1 ? > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Navinder Brar >Priority: Major > Fix For: 2.1.0 > > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16636555#comment-16636555 ] Dong Lin commented on KAFKA-7149: - Added Navinder to contributor list and assigned Jira to Navinder. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Navinder Brar >Priority: Major > Fix For: 2.1.0 > > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16620252#comment-16620252 ] ASF GitHub Bot commented on KAFKA-7149: --- brary opened a new pull request #5663: KAFKA-7149 Reduce assignment data size URL: https://github.com/apache/kafka/pull/5663 The PR contains the changes related to Assignment Info re-design where the TopicPartitions are replaced with TaskIDs and GZIP compression is being done on assignmentInfo to reduce the assignment size in version 4. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16615354#comment-16615354 ] Navinder Brar commented on KAFKA-7149: -- Yes, I agree with all you explained above. But for all the consumers *tasksByHost* map is common, so I am suggesting instead of sending it in encoded *AssignmentInfo* for each consumer, send it once in a map in *Assignment* object as I suggested above to the broker coordinator and do code changes in the broker coordinator to send *tasksByHost* map along with the individual assignments to each consumer. So, all the consumers will receive global tasksByHost along with their own assignments. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16615339#comment-16615339 ] Guozhang Wang commented on KAFKA-7149: -- The {{Assignment}} object is sent one for each consumer, which includes the assigned partitions and the userData which is decoded into {{AssignmentInfo}}. Note that after the consumer -> assignment information is sent to the broker coordinator, it will send this assignment to each corresponding consumer. So it is actually a one-to-one mapping from {{Assignment}} to {{AssignmentInfo}}, not one-to-many mapping. As a result, each consumer will only get one decoded {{AssignmentInfo}}. Note that each consumer indeed need to know the global tasksByHost map in order to support interactive queries (users can ask any instance, where to ask for a specific key, for example). Does that make sense? > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16615271#comment-16615271 ] Navinder Brar commented on KAFKA-7149: -- Hi [~guozhang] What I mean is currently the Assignment which is shared to Group Coordinator looks like this: {code:java} [{consumer1: {activePartitions1, assignmentInfo1}}, {consumer2: {activePartitions2, assignmentInfo2}}, ]{code} where {code:java} AssignmentInfo= {List activeTasks, Map> standbyTasks, Map> partitionsByHost} {code} Now in the first version, I am changing this AssignmentInfo to: *V1:* {code:java} AssignmentInfo= {List activeTasks, Map> standbyTasks, Map> tasksByHost} {code} But, my point is if there are 500 consumers, the tasksByHost map will be same for all, which will contain global Assignment. But we are unnecessarily sending this same map inside the Assignment array for all the consumers. Instead, we can some an object like something below which is shared with GroupCoordinator. *V2:* {code:java} Assignment= {Map> tasksByHost, [{consumer1: {activePartitions1, assignmentInfo1}}, {consumer2: {activePartitions2, assignmentInfo2}}, ]}{code} where {code:java} AssignmentInfo= {List activeTasks, Map> standbyTasks}{code} > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16615254#comment-16615254 ] Guozhang Wang commented on KAFKA-7149: -- [~NaviBrar] Thanks for looking into this!! Could you clarify a bit more on what do you mean by "Maybe we can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array so that it is not replicated for all the hosts and is sent just once." Without the PR it is a bit hard to guess the actual approach, but I'd just boldly assume that you have changed the format of {{partitions-by-host}} from {code} num.partitions-by-host, [partitions-by-host] where partitions-by-host: host, port, num.partitions, [partitions] {code} to {code} num.tasks-by-host, [tasks-by-host] where tasks-by-host: host, port, num.tasks, [task-ids] {code} And you are further optimizing it to {code} num.hosts, [host-tasks] where host-tasks: host, port, num.tasks, [task-ids] {code} ? > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611610#comment-16611610 ] Navinder Brar commented on KAFKA-7149: -- I am working with [~asurana] on raising this PR, will send it in a couple of days. Currently, the changes I have made is using taskIds instead of topicPartitions in AssignmentInfo. But another thing I observed is we are sending the same assignmentInfo to all consumers, so we are replicating the complete assignment(of all hosts and partitions) to all the consumers. Maybe we can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array so that it is not replicated for all the hosts and is sent just once. With the current changes(changing TopicPartitions to TaskIDs and using GZIP compression) I have reduced assignment size(on 400 hosts with 3 threads each, having 512 partitions) from 196 MBs to 8 MB). If we can stop the replication of partitionsByHost on each consumer, the assignment size can be reduced to a few hundred kbs). Please share your thoughts. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609986#comment-16609986 ] Guozhang Wang commented on KAFKA-7149: -- Also note that [~yuzhih...@gmail.com] did a PR in https://github.com/apache/kafka/pull/5322, which already changed the format of the protocol and hence bumped up the protocol version, so we can piggy back any protocol changes you may have within the same new versions which will be released in 2.1.0. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609985#comment-16609985 ] Guozhang Wang commented on KAFKA-7149: -- [~asurana] I did not find any PR or attachments for your changes, could you check if it is submitted successfully? > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606957#comment-16606957 ] Ashish Surana commented on KAFKA-7149: -- [~guozhang] As you suggested, below changes can reduce the assignment data size especially when a topic-group has multiple input topics as topic-partitions are repeated in assignment-info in that case: 1. Replace num.partitions with num.tasks 2. Replace [topic-partitions] with [task-ids] This will also require some changes to identify the corresponding host while doing lookup for a given key in store. Along with these encoding changes, compression is definitely required for further size reduction. [~mjsax] suggestion to handle this is what I was also thinking and makes sense to support backward compatibility. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544346#comment-16544346 ] Ted Yu commented on KAFKA-7149: --- Took a look at AssignmentInfo#writeTopicPartitions where : {code} out.writeUTF(partition.topic()); {code} whereas persisting TaskId doesn't involve String. So Guozhang's suggestion above would reduce metadata size. I wonder if Ashish can verify whether Guozhang's suggestion would give his use case enough savings (thru POC). > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544337#comment-16544337 ] Guozhang Wang commented on KAFKA-7149: -- Searched on the JIRA list but did not find it. Maybe we once discussed about it on the mailing list, but not created a ticket for it yet? As we may likely add more information with the metadata version bumped up to 4: 1) KAFKA-5037: added an global error code ([~yuzhih...@gmail.com]) 2) KAFKA-4696: state-ware standby task assignment ([~Yohan123]) 3) KAFKA-6718: rack-aware task assignment ([~_deepakgoyal]) 4) KIP-258: state store format change with embedded timestamp ([~mjsax]) I think it urges us to consider this issue sooner than later. So I'd like to collect more feedbacks on the tentative proposals, also cc'ed all the contributors of the above features to think about how to reduce the encoded bytes. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)