[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16909314#comment-16909314
 ] 

Vinoth Chandar commented on KAFKA-7149:
---------------------------------------

To close the loop on turning on compression on the backing `_consumer_offsets` 
topic., seems like it can already be controlled by 
`offsets.topic.compression.codec`.. Please correct me if I am missing something

In code : GroupMetadataManager::storeGroup()
{code}
def storeGroup(group: GroupMetadata,
                 groupAssignment: Map[String, Array[Byte]],
                 responseCallback: Errors => Unit): Unit = {
    getMagic(partitionFor(group.groupId)) match {
      case Some(magicValue) =>
        ...
        val key = GroupMetadataManager.groupMetadataKey(group.groupId)
        val value = GroupMetadataManager.groupMetadataValue(group, 
groupAssignment, interBrokerProtocolVersion)

        val records = {
          val buffer = 
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, 
*compressionType*,
            Seq(new SimpleRecord(timestamp, key, value)).asJava))
          val builder = MemoryRecords.builder(buffer, magicValue, 
compressionType, timestampType, 0L)
          builder.append(timestamp, key, value)
          builder.build()
        }

        val groupMetadataPartition = new 
TopicPartition(*Topic.GROUP_METADATA_TOPIC_NAME*, partitionFor(group.groupId))
        val groupMetadataRecords = Map(groupMetadataPartition -> records)
        val generationId = group.generationId
{code}

> Reduce assignment data size to improve kafka streams scalability
> ----------------------------------------------------------------
>
>                 Key: KAFKA-7149
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7149
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Ashish Surana
>            Assignee: Vinoth Chandar
>            Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to