Wenbing Shen created KAFKA-12493:
------------------------------------

             Summary: The controller should handle the consistency between the 
controllerContext and the partition replicas assignment on zookeeper
                 Key: KAFKA-12493
                 URL: https://issues.apache.org/jira/browse/KAFKA-12493
             Project: Kafka
          Issue Type: Bug
          Components: controller
    Affects Versions: 2.7.0, 2.6.0, 2.5.0, 2.4.0, 2.3.0, 2.2.0, 2.1.0, 2.0.0
            Reporter: Wenbing Shen
             Fix For: 3.0.0


This question can be linked to this email: 
[https://lists.apache.org/thread.html/redf5748ec787a9c65fc48597e3d2256ffdd729de14afb873c63e6c5b%40%3Cusers.kafka.apache.org%3E]

 

This is a 100% recurring problem.

Problem description:

In the production environment of our customer’s site, the existing partitions 
were redistributed in the code of colleagues in other departments and written 
into zookeeper. This caused the controller to only judge the newly added 
partitions when processing partition modification events. Partition allocation 
plan and new partition and replica allocation in the partition state machine 
and replica state machine, and issue LeaderAndISR and other control requests.

But the controller did not verify the existing partition replicas assigment in 
the controllerContext and whether the original partition allocation on the 
znode in zookeeper has changed. This seems to be no problem, but when we have 
to restart the broker for some reasons, such as configuration updates and 
upgrades Wait, this will cause this part of the topic in real-time production 
to be abnormal, the controller cannot complete the allocation of the new 
leader, and the original leader cannot correctly identify the replica allocated 
on the current zookeeper. The real-time business in our customer's on-site 
environment is interrupted and partially Data has been lost.

This problem can be stably reproduced in the following ways:

Adding partitions or modifying replicas of an existing topic through the 
following code will cause the original partition replicas to be reallocated and 
finally written to zookeeper.Next, the controller did not accurately process 
this event, restart the topic related broker, this topic will not be able to be 
produced and consumed.

 
{code:java}
public void updateKafkaTopic(KafkaTopicVO kafkaTopicVO) {

    ZkUtils zkUtils = ZkUtils.apply(ZK_LIST, SESSION_TIMEOUT, 
CONNECTION_TIMEOUT, JaasUtils.isZkSecurityEnabled());
    try {
        if (kafkaTopicVO.getPartitionNum() >= 0 && 
kafkaTopicVO.getReplicationNum() >= 0) {
            // Get the original broker data information
            Seq<BrokerMetadata> brokerMetadata = 
AdminUtils.getBrokerMetadatas(zkUtils,
                    RackAwareMode.Enforced$.MODULE$,
                    Option.apply(null));
            // Generate a new partition replica allocation plan
            scala.collection.Map<Object, Seq<Object>> replicaAssign = 
AdminUtils.assignReplicasToBrokers(brokerMetadata,
                    kafkaTopicVO.getPartitionNum(), // Number of partitions
                    kafkaTopicVO.getReplicationNum(), // Number of replicas per 
partition
                    -1,
                    -1);
            // Modify the partition replica allocation plan
            AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils,
                    kafkaTopicVO.getTopicNameList().get(0),
                    replicaAssign,
                    null,
                    true);
        }

    } catch (Exception e) {
        System.out.println("Adjust partition abnormal");
        System.exit(0);
    } finally {
        zkUtils.close();
    }
}
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to