junrao commented on a change in pull request #10564:
URL: https://github.com/apache/kafka/pull/10564#discussion_r616258845



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1119,6 +1039,77 @@ void validateManualPartitionAssignment(List<Integer> 
assignment,
         }
     }
 
+    void generateLeaderAndIsrUpdates(String context,
+                                     boolean excludeCurrentLeaderFromIsr,
+                                     List<ApiMessageAndVersion> records,
+                                     Iterator<TopicIdPartition> iterator) {
+        while (iterator.hasNext()) {
+            TopicIdPartition topicIdPart = iterator.next();
+            TopicControlInfo topic = topics.get(topicIdPart.topicId());
+            if (topic == null) {
+                throw new RuntimeException("Topic ID " + topicIdPart.topicId() 
+ " existed in " +
+                    "isrMembers, but not in the topics map.");
+            }
+            PartitionControlInfo partition = 
topic.parts.get(topicIdPart.partitionId());
+            if (partition == null) {
+                throw new RuntimeException("Partition " + topicIdPart +
+                    " existed in isrMembers, but not in the partitions map.");
+            }
+            int[] newIsr = partition.isr;
+            if (excludeCurrentLeaderFromIsr) {
+                newIsr = Replicas.copyWithout(partition.isr, partition.leader);
+            }
+            int newLeader = bestLeader(partition.replicas, newIsr, false);
+            boolean unclean = newLeader != NO_LEADER && 
!Replicas.contains(newIsr, newLeader);
+            if (unclean) {
+                // After an unclean leader election, the ISR is reset to just 
the new leader.
+                newIsr = new int[] {newLeader};
+            } else if (newIsr.length == 0) {
+                // We never want to shrink the ISR to size 0.
+                newIsr = partition.isr;
+            }
+            PartitionChangeRecord record = new PartitionChangeRecord();
+            if (newLeader != partition.leader) {
+                record.setLeader(newLeader);
+            }
+            if (!Arrays.equals(newIsr, partition.isr)) {
+                record.setIsr(Replicas.toList(newIsr));
+            }
+            if (record.leader() != NO_LEADER_CHANGE || record.isr() != null) {
+                
record.setPartitionId(topicIdPart.partitionId()).setTopicId(topic.id);
+                if (unclean) {
+                    log.info("{}: UNCLEANLY {}", context,
+                        leaderAndIsrUpdateLogMessage(topicIdPart, partition, 
record));
+                } else if (log.isDebugEnabled()) {
+                    log.debug("{}: {}", context,
+                        leaderAndIsrUpdateLogMessage(topicIdPart, partition, 
record));
+                }
+                records.add(new ApiMessageAndVersion(record, (short) 0));
+            }
+        }
+    }
+
+    // VisibleForTesting
+    String leaderAndIsrUpdateLogMessage(TopicIdPartition topicIdPart,

Review comment:
       Ideally, we want to log this when the record is reflected through replay 
in the controller. That's also when we could log the new leaderEpoch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to