chia7712 commented on code in PR #19884:
URL: https://github.com/apache/kafka/pull/19884#discussion_r2132779407


##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java:
##########
@@ -406,4 +407,42 @@ default int getLeaderBrokerId(TopicPartition 
topicPartition) throws ExecutionExc
                     .orElseThrow(() -> new RuntimeException("Leader not found 
for tp " + topicPartition));
         }
     }
+
+    /**
+     * Wait for a leader to be elected or changed using the provided admin 
client.
+     */
+    default int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
+                                                           String topic,
+                                                           int partitionNumber,
+                                                           long timeoutMs) 
throws Exception {
+        long startTime = System.currentTimeMillis();
+        TopicPartition topicPartition = new TopicPartition(topic, 
partitionNumber);
+
+        while (System.currentTimeMillis() < startTime + timeoutMs) {
+            try {
+                TopicDescription topicDescription = 
admin.describeTopics(List.of(topic))
+                        .allTopicNames().get().get(topic);
+
+                Optional<Integer> leader = 
topicDescription.partitions().stream()
+                        .filter(partitionInfo -> partitionInfo.partition() == 
partitionNumber)
+                        .findFirst()
+                        .map(partitionInfo -> {
+                            int leaderId = partitionInfo.leader().id();
+                            return leaderId == Node.noNode().id() ? null : 
leaderId;
+                        });
+
+                if (leader.isPresent()) {
+                    return leader.get();
+                }
+
+            } catch (InterruptedException | ExecutionException ignored) {
+                // Continue retrying on any exception (network issues, topic 
not ready, etc.)
+            }
+
+            Thread.sleep(Math.min(100L, timeoutMs));

Review Comment:
   please use TimeUnit instead



##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/RaftClusterInvocationContext.java:
##########
@@ -14,18 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.test.junit;
+package org.apache.kafka.common.test;
 
 import kafka.network.SocketServer;
 import kafka.server.BrokerServer;
 import kafka.server.ControllerServer;
 import kafka.server.KafkaBroker;
 
 import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.KafkaClusterTestKit;
-import org.apache.kafka.common.test.TestKitNodes;
-import org.apache.kafka.common.test.TestUtils;

Review Comment:
   It seems this class uses only one method of `TestUtils`. Maybe we can just 
eliminate the reference without changing the package?



##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java:
##########
@@ -406,4 +407,42 @@ default int getLeaderBrokerId(TopicPartition 
topicPartition) throws ExecutionExc
                     .orElseThrow(() -> new RuntimeException("Leader not found 
for tp " + topicPartition));
         }
     }
+
+    /**
+     * Wait for a leader to be elected or changed using the provided admin 
client.
+     */
+    default int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
+                                                           String topic,
+                                                           int partitionNumber,
+                                                           long timeoutMs) 
throws Exception {
+        long startTime = System.currentTimeMillis();
+        TopicPartition topicPartition = new TopicPartition(topic, 
partitionNumber);
+
+        while (System.currentTimeMillis() < startTime + timeoutMs) {
+            try {
+                TopicDescription topicDescription = 
admin.describeTopics(List.of(topic))
+                        .allTopicNames().get().get(topic);
+
+                Optional<Integer> leader = 
topicDescription.partitions().stream()
+                        .filter(partitionInfo -> partitionInfo.partition() == 
partitionNumber)
+                        .findFirst()
+                        .map(partitionInfo -> {
+                            int leaderId = partitionInfo.leader().id();
+                            return leaderId == Node.noNode().id() ? null : 
leaderId;
+                        });
+
+                if (leader.isPresent()) {
+                    return leader.get();
+                }
+
+            } catch (InterruptedException | ExecutionException ignored) {

Review Comment:
   the "retry" mechanism is already in `Admin`, and hence this catch block 
seems to be redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to