Hi Kafka team, 

I meet a strange thing about Kafka rebalance. If I increase partitions of a 
topic which subscribed by some java consumers(in same one group), there is no 
rebalance occur. Furthermore, if I start a new consumer (or stop one) to cause 
a rebalance, the increased partitions could not be assigned, until I stop all 
consumers and start them. Is that normal?


Thanks,
Ruiping Li


--------------------------------------------------------------------------------
Below is my test: 
1. Start Kafka, ZK. Create a normal topic(test-topic) with 1 partitions 
./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic test-topic 
--partitions 1 --replication-factor 1 --config retention.ms=604800000 
2. Start 2 java consumers (C1, C2), subscribe test-topic 
3. Increase 2 partitions of test-topic  
rpli@rpli-mac:~/Softwares/Tools/kafka_2.11-1.0.0$ ./bin/kafka-topics.sh 
--zookeeper 127.0.0.1:2181 --alter --topic test-topic --partitions 3 
WARNING: If partitions are increased for a topic that has a key, the partition 
logic or ordering of the messages will be affected 
Adding partitions succeeded! 

Increasing succeeded: 
rpli@rpli-mac:~/Softwares/Tools/kafka_2.11-1.0.0$ ./bin/kafka-topics.sh 
--zookeeper 127.0.0.1:2181 --describe --topic test-topic 
Topic:test-topic    PartitionCount:3    ReplicationFactor:1    
Configs:retention.ms=604800000 
    Topic: test-topic    Partition: 0    Leader: 0    Replicas: 0    Isr: 0 
    Topic: test-topic    Partition: 1    Leader: 0    Replicas: 0    Isr: 0 
    Topic: test-topic    Partition: 2    Leader: 0    Replicas: 0    Isr: 0 

There is no rebalance occur in C1, C2. 
4. Start a new consumer C3 to subscribed test-topic. Rebalance occur, but only 
partition test-topic-0 involved in reassigned, no test-topic-1 and 
test-topic-2.  
5. I try to stop C2, C3, and test-topic-1 and test-topic-2 still not involved. 
6. Stop all running consumers, and then start them. All test-topic-0,1,2 
assigned normally. 


Environment
kafka & java api version: kafka_2.12-2.0.0 (I also tried kafka_2.11-1.0.0 and 
kafka_2.10-0.10.2.1, same result) 
zookeeper: 3.4.13 
consumer code: 
// consumer
public class KafkaConsumerThread extends Thread { 
    // consumer settings 
    public static org.apache.kafka.clients.consumer.KafkaConsumer<String, 
String> createNativeConsumer(String groupName, String kafkaBootstrap) { 
        Properties props = new Properties(); 
        props.put("bootstrap.servers", kafkaBootstrap); 
        props.put("group.id", groupName); 
        props.put("auto.offset.reset", "earliest"); 
        props.put("enable.auto.commit", true); 
        
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
 
        
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
 


        return new KafkaConsumer<String, String>(props); 
    } 


    private static final Logger log = 
LoggerFactory.getLogger(KafkaConsumerThread.class); 
    private boolean stop = false; 
    private KafkaConsumer<String, String> consumer; 
    private String topicName; 
    private ConsumerRebalanceListener consumerRebalanceListener; 
    private AtomicLong receivedRecordNumber = new AtomicLong(0); 


    public KafkaConsumerThread(String topicName, String groupName, 
ConsumerRebalanceListener consumerRebalanceListener, String kafkaBootstrap) { 
        this.consumer = createNativeConsumer(groupName, kafkaBootstrap); 
        this.topicName = topicName; 
        this.consumerRebalanceListener = consumerRebalanceListener; 
    } 


    @Override 
    public void run() { 
        log.info("Start consumer .."); 
        consumer.subscribe(Collections.singleton(topicName), 
consumerRebalanceListener); 
        while (!stop) { 
            try { 
                ConsumerRecords<String, String> records = consumer.poll(100); 
                receivedRecordNumber.addAndGet(records.count()); 
                Iterator<ConsumerRecord<String, String>> iterator = 
records.iterator(); 
                while (iterator.hasNext()) { 
                    ConsumerRecord<String, String> record = iterator.next(); 
                    log.info("Receive [key:{}][value:{}]", record.key(), 
record.value()); 
                } 
            } catch (TimeoutException e) { 
                log.info("no data"); 
            } 
        } 
        consumer.close(); 
    } 


    public void stopConsumer() { 
        this.stop = true; 
    } 
}

Reply via email to