If it's of any value to you, we use the following test to check that we have a 
well balanced set of consumer group ids.  Note that in the code, 
ConsumerGroups.ALL_GROUPS is simply a list of all our consumer group ids.  
Spreading the offset commit load across these partitions evenly helps in 
levelling the load on your Brokers.  But beware of changing your group ids in 
an active system - you'll need to migrate carefully and prime the new offsets 
in the renamed group if you wish to avoid message replay (depending on your 
config).

import java.util.HashSet;
import java.util.Set;

import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConsumerGroupsTest {

        private static final Logger LOG = 
LoggerFactory.getLogger(ConsumerGroupsTest.class);

        /* Per Kafka Docs. */
        private static final int KAFKA_OFFSET_PARTITION_COUNT = 50;

        @Test
        void distinctConsumerOffsetPartitions() {
                boolean hasDuplicates = false;
                Set<Integer> usedPartitions = new 
HashSet<>(KAFKA_OFFSET_PARTITION_COUNT);
                for (String group : ConsumerGroups.ALL_GROUPS) {
                        int partition = getConsumerOffsetPartition(group);
                        hasDuplicates = !usedPartitions.add(partition) || 
hasDuplicates;
                }
                /*
                 * NB: It is not an absolute requirement to have no clashes. 
It's simply desirable.
                 */
                Assertions.assertFalse(hasDuplicates,
                                "Multiple Consumer Groups map to same offsets 
partition.  See prior log output.");
        }

        private int getConsumerOffsetPartition(String group) {
                final int partition = Utils.abs(group.hashCode()) % 
KAFKA_OFFSET_PARTITION_COUNT;
                LOG.info("{} --> {}", group, partition);
                return partition;
        }

}


> On 4/09/2021, at 15:30, Luke Chen <show...@gmail.com> wrote:
> 
> Hi Michał,
> Internally, Kafka uses "*consumer group ID*" as the key to decide which
> __consumer_offsets partition to be used.
> The code is like this:
> 
> `Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount`
> 
> You can check the `partitionFor` method in GroupMetadataManager class.
> 
> Hope that helps.
> 
> Thank you.
> Luke
> 
> On Fri, Sep 3, 2021 at 11:57 PM Michał Łowicki <mlowi...@gmail.com> wrote:
> 
>> Hey,
>> 
>> Could someone please point me to the code / doc where I can find
>> information what is used as key and how messages are spread across
>> partitions for this internal topic?
>> 
>> --
>> BR,
>> Michał Łowicki
>> 

Reply via email to