chia7712 commented on code in PR #19884: URL: https://github.com/apache/kafka/pull/19884#discussion_r2132779407
########## test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java: ########## @@ -406,4 +407,42 @@ default int getLeaderBrokerId(TopicPartition topicPartition) throws ExecutionExc .orElseThrow(() -> new RuntimeException("Leader not found for tp " + topicPartition)); } } + + /** + * Wait for a leader to be elected or changed using the provided admin client. + */ + default int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin, + String topic, + int partitionNumber, + long timeoutMs) throws Exception { + long startTime = System.currentTimeMillis(); + TopicPartition topicPartition = new TopicPartition(topic, partitionNumber); + + while (System.currentTimeMillis() < startTime + timeoutMs) { + try { + TopicDescription topicDescription = admin.describeTopics(List.of(topic)) + .allTopicNames().get().get(topic); + + Optional<Integer> leader = topicDescription.partitions().stream() + .filter(partitionInfo -> partitionInfo.partition() == partitionNumber) + .findFirst() + .map(partitionInfo -> { + int leaderId = partitionInfo.leader().id(); + return leaderId == Node.noNode().id() ? null : leaderId; + }); + + if (leader.isPresent()) { + return leader.get(); + } + + } catch (InterruptedException | ExecutionException ignored) { + // Continue retrying on any exception (network issues, topic not ready, etc.) + } + + Thread.sleep(Math.min(100L, timeoutMs)); Review Comment: please use TimeUnit instead ########## test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/RaftClusterInvocationContext.java: ########## @@ -14,18 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.test.junit; +package org.apache.kafka.common.test; import kafka.network.SocketServer; import kafka.server.BrokerServer; import kafka.server.ControllerServer; import kafka.server.KafkaBroker; import org.apache.kafka.common.network.ListenerName; -import org.apache.kafka.common.test.ClusterInstance; -import org.apache.kafka.common.test.KafkaClusterTestKit; -import org.apache.kafka.common.test.TestKitNodes; -import org.apache.kafka.common.test.TestUtils; Review Comment: It seems this class uses only one method of `TestUtils`. Maybe we can just eliminate the reference without changing the package? ########## test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java: ########## @@ -406,4 +407,42 @@ default int getLeaderBrokerId(TopicPartition topicPartition) throws ExecutionExc .orElseThrow(() -> new RuntimeException("Leader not found for tp " + topicPartition)); } } + + /** + * Wait for a leader to be elected or changed using the provided admin client. + */ + default int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin, + String topic, + int partitionNumber, + long timeoutMs) throws Exception { + long startTime = System.currentTimeMillis(); + TopicPartition topicPartition = new TopicPartition(topic, partitionNumber); + + while (System.currentTimeMillis() < startTime + timeoutMs) { + try { + TopicDescription topicDescription = admin.describeTopics(List.of(topic)) + .allTopicNames().get().get(topic); + + Optional<Integer> leader = topicDescription.partitions().stream() + .filter(partitionInfo -> partitionInfo.partition() == partitionNumber) + .findFirst() + .map(partitionInfo -> { + int leaderId = partitionInfo.leader().id(); + return leaderId == Node.noNode().id() ? null : leaderId; + }); + + if (leader.isPresent()) { + return leader.get(); + } + + } catch (InterruptedException | ExecutionException ignored) { Review Comment: the "retry" mechanism is already in `Admin`, and hence this catch block seems to be redundant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org