muralibasani commented on code in PR #22303:
URL: https://github.com/apache/kafka/pull/22303#discussion_r3255280478


##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/AdminUtils.java:
##########
@@ -49,36 +49,69 @@ public Boolean get() {
             }
 
             public void checkLeader() {
-                try {
-                    TopicDescription topicDescription = 
admin.describeTopics(List.of(topic))
-                            .allTopicNames().get().get(topic);
-
-                    Optional<Integer> leader = 
topicDescription.partitions().stream()
-                            .filter(partitionInfo -> partitionInfo.partition() 
== partitionNumber)
-                            .findFirst()
-                            .flatMap(partitionInfo -> 
Optional.ofNullable(partitionInfo.leader()))
-                            .map(node -> {
-                                int leaderId = node.id();
-                                return leaderId == Node.noNode().id() ? null : 
leaderId;
-                            });
-
-                    leader.ifPresent(integer -> this.leader = integer);
-                } catch (ExecutionException e) {
-                    Throwable cause = e.getCause();
-                    boolean isTransient = cause instanceof 
UnknownTopicOrPartitionException
-                            || cause instanceof LeaderNotAvailableException;
-                    if (!isTransient) {
-                        throw new RuntimeException(e);
-                    }
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException(e);
-                }
+                fetchPartitionLeader(admin, topic, 
partitionNumber).ifPresent(integer -> this.leader = integer);
             }
         };
 
         TestUtils.waitForCondition(condition, timeoutMs, "Timing out after %d 
ms since a leader was not elected for partition %s-%d".formatted(timeoutMs, 
topic, partitionNumber));
 
         return condition.leader;
     }
+
+    /**
+     * Fetch the partition leader or wait until the expected leader is 
observed using the provided admin client.
+     */
+    public static int fetchOrWaitForExpectedLeader(Admin admin,

Review Comment:
   This new method and existing method fetchOrWaitForLeader 
https://github.com/apache/kafka/blob/16306d9475de9abe3f71ef49261fe84289849209/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/AdminUtils.java#L38
 are almost identical. There seems to be scope for reducing the duplicate code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to