[
https://issues.apache.org/jira/browse/KAFKA-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438777#comment-16438777
]
ASF GitHub Bot commented on KAFKA-6058:
---
guozhangwang closed pull request #4856: KAFKA-6058: Refactor consumer API
result return types
URL: https://github.com/apache/kafka/pull/4856
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
index b4bce264405..dd6835cf10c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
@@ -29,13 +29,24 @@
*/
@InterfaceStability.Evolving
public class DeleteConsumerGroupsResult {
-final KafkaFuture>> futures;
+private final Map> futures;
-DeleteConsumerGroupsResult(KafkaFuture>>
futures) {
+DeleteConsumerGroupsResult(final Map> futures) {
this.futures = futures;
}
-public KafkaFuture>> deletedGroups() {
+/**
+ * Return a map from group id to futures which can be used to check the
status of
+ * individual deletions.
+ */
+public Map> deletedGroups() {
return futures;
}
+
+/**
+ * Return a future which succeeds only if all the consumer group deletions
succeed.
+ */
+public KafkaFuture all() {
+return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
index adde031b678..ac2189cc6dc 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
@@ -32,16 +32,23 @@
@InterfaceStability.Evolving
public class DescribeConsumerGroupsResult {
-private final KafkaFuture>> futures;
+private final Map> futures;
-public DescribeConsumerGroupsResult(KafkaFuture>> futures) {
+public DescribeConsumerGroupsResult(final Map> futures) {
this.futures = futures;
}
/**
- * Return a map from group name to futures which can be used to check the
description of a consumer group.
+ * Return a map from group id to futures which can be used to check the
description of a consumer group.
*/
-public KafkaFuture>>
describedGroups() {
+public Map>
describedGroups() {
return futures;
}
+
+/**
+ * Return a future which succeeds only if all the consumer group
description succeed.
+ */
+public KafkaFuture all() {
+return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 50bcfd38856..fa3f943555b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -46,6 +46,7 @@
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RetriableException;
@@ -53,6 +54,7 @@
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -916,8 +918,11 @@ private void failCalls(long now, List calls,
AuthenticationException authe
* @param correlationIdToCall A map of correlation IDs to calls.
* @param callsInFlight A map of nodes to the calls they have
in flight.
**/
-private void handleResponses(long now, List responses,
Map> callsInFlight,
-Map correlationIdToCall) {
+private void handleResponses(long now,
+ List responses,
+