chia7712 commented on code in PR #22252:
URL: https://github.com/apache/kafka/pull/22252#discussion_r3255727870


##########
server/src/test/java/org/apache/kafka/server/TestUtils.java:
##########
@@ -181,4 +181,48 @@ public static int awaitLeaderChange(ClusterInstance 
cluster, TopicPartition tp,
 
         return newLeaderExists.get().get();
     }
+
+
+    /**
+     * Wait for the leader of the partition to change from {@code 
oldLeaderOpt} to {@code expectedLeaderOpt},
+     * or for any leader to be elected if both are empty.
+     *
+     * @param brokers The list of brokers
+     * @param tp The topic partition to check
+     * @param oldLeaderOpt If defined, wait for the leader to change away from 
this broker
+     * @param expectedLeaderOpt If defined, wait for this specific broker to 
become leader
+     * @param timeoutMs The duration in ms to wait
+     * @return The elected leader broker id
+     */
+    public static int awaitLeaderChange(
+            Collection<KafkaBroker> brokers,
+            TopicPartition tp,
+            Optional<Integer> oldLeaderOpt,
+            Optional<Integer> expectedLeaderOpt,
+            long timeoutMs) throws InterruptedException {
+        if (oldLeaderOpt.isPresent() && expectedLeaderOpt.isPresent()) {
+            throw new IllegalArgumentException("Cannot define both 
oldLeaderOpt and expectedLeaderOpt");
+        }
+
+        Supplier<Optional<Integer>> newLeaderExists = () -> {
+            if (expectedLeaderOpt.isPresent()) {
+                LOG.debug("Checking leader that has changed to {}", 
expectedLeaderOpt.get());

Review Comment:
   I'm wondering whether we should keep these debug messages in this migration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to