[ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16550903#comment-16550903
 ] 

ASF GitHub Bot commented on KAFKA-7141:
---------------------------------------

hachikuji closed pull request #5356: KAFKA-7141: ConsumerGroupCommand should 
describe group assignment eve…
URL: https://github.com/apache/kafka/pull/5356
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 48c2cffb5d3..1d61720bfa9 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -27,7 +27,6 @@ import kafka.utils._
 import org.apache.kafka.clients.{CommonClientConfigs, admin}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, 
OffsetAndMetadata}
-import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
@@ -35,7 +34,6 @@ import org.apache.kafka.common.{KafkaException, Node, 
TopicPartition}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 import scala.collection.{Seq, Set}
-import scala.concurrent.ExecutionException
 import scala.util.{Failure, Success, Try}
 
 object ConsumerGroupCommand extends Logging {
@@ -340,20 +338,19 @@ object ConsumerGroupCommand extends Logging {
       val state = consumerGroup.state
       val committedOffsets = getCommittedOffsets(groupId).asScala.toMap
       var assignedTopicPartitions = ListBuffer[TopicPartition]()
-      val rowsWithConsumer = if (committedOffsets.isEmpty) 
List[PartitionAssignmentState]() else 
consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
-        .sortWith(_.assignment.topicPartitions.size > 
_.assignment.topicPartitions.size)
-        .flatMap { consumerSummary =>
-          val topicPartitions = 
consumerSummary.assignment.topicPartitions.asScala
-          assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
-          val partitionOffsets = 
consumerSummary.assignment.topicPartitions.asScala
-            .map { topicPartition =>
-              topicPartition -> 
committedOffsets.get(topicPartition).map(_.offset)
-            }.toMap
+      val rowsWithConsumer = 
consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
+        .sortWith(_.assignment.topicPartitions.size > 
_.assignment.topicPartitions.size).flatMap { consumerSummary =>
+        val topicPartitions = 
consumerSummary.assignment.topicPartitions.asScala
+        assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
+        val partitionOffsets = 
consumerSummary.assignment.topicPartitions.asScala
+          .map { topicPartition =>
+            topicPartition -> 
committedOffsets.get(topicPartition).map(_.offset)
+          }.toMap
 
         collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), 
topicPartitions.toList,
           partitionOffsets, Some(s"${consumerSummary.consumerId}"), 
Some(s"${consumerSummary.host}"),
           Some(s"${consumerSummary.clientId}"))
-        }
+      }
 
       val rowsWithoutConsumer = 
committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
         case (topicPartition, offset) =>
diff --git 
a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index cf00e93558c..51082effdbd 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -92,8 +92,9 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness 
{
   def addConsumerGroupExecutor(numConsumers: Int,
                                topic: String = topic,
                                group: String = group,
-                               strategy: String = 
classOf[RangeAssignor].getName): ConsumerGroupExecutor = {
-    val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, 
topic, strategy)
+                               strategy: String = 
classOf[RangeAssignor].getName,
+                               customPropsOpt: Option[Properties] = None): 
ConsumerGroupExecutor = {
+    val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, 
topic, strategy, customPropsOpt)
     addExecutor(executor)
     executor
   }
@@ -114,9 +115,10 @@ class ConsumerGroupCommandTest extends 
KafkaServerTestHarness {
 
 object ConsumerGroupCommandTest {
 
-  abstract class AbstractConsumerRunnable(broker: String, groupId: String) 
extends Runnable {
+  abstract class AbstractConsumerRunnable(broker: String, groupId: String, 
customPropsOpt: Option[Properties] = None) extends Runnable {
     val props = new Properties
     configure(props)
+    customPropsOpt.foreach(props.asScala ++= _.asScala)
     val consumer = new KafkaConsumer(props)
 
     def configure(props: Properties): Unit = {
@@ -145,8 +147,8 @@ object ConsumerGroupCommandTest {
     }
   }
 
-  class ConsumerRunnable(broker: String, groupId: String, topic: String, 
strategy: String)
-    extends AbstractConsumerRunnable(broker, groupId) {
+  class ConsumerRunnable(broker: String, groupId: String, topic: String, 
strategy: String, customPropsOpt: Option[Properties] = None)
+    extends AbstractConsumerRunnable(broker, groupId, customPropsOpt) {
 
     override def configure(props: Properties): Unit = {
       super.configure(props)
@@ -182,11 +184,12 @@ object ConsumerGroupCommandTest {
     }
   }
 
-  class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: 
String, topic: String, strategy: String)
+  class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: 
String, topic: String, strategy: String,
+                              customPropsOpt: Option[Properties] = None)
     extends AbstractConsumerGroupExecutor(numConsumers) {
 
     for (_ <- 1 to numConsumers) {
-      submit(new ConsumerRunnable(broker, groupId, topic, strategy))
+      submit(new ConsumerRunnable(broker, groupId, topic, strategy, 
customPropsOpt))
     }
 
   }
diff --git 
a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala 
b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index fac34a71ee1..950676b9222 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -16,9 +16,11 @@
  */
 package kafka.admin
 
+import java.util.Properties
+
 import joptsimple.OptionException
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.RoundRobinAssignor
+import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{TimeoutException}
 import org.junit.Assert._
@@ -605,5 +607,29 @@ class DescribeConsumerGroupTest extends 
ConsumerGroupCommandTest {
     fail("Expected an error due to presence of unrecognized --new-consumer 
option")
   }
 
+  @Test
+  def testDescribeNonOffsetCommitGroup() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+
+    val customProps = new Properties
+    // create a consumer group that never commits offsets
+    customProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(numConsumers = 1, customPropsOpt = 
Some(customProps))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val (state, assignments) = service.collectGroupOffsets()
+      state.contains("Stable") &&
+        assignments.isDefined &&
+        assignments.get.count(_.group == group) == 1 &&
+        assignments.get.filter(_.group == group).head.consumerId.exists(_.trim 
!= ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
+        assignments.get.filter(_.group == group).head.clientId.exists(_.trim 
!= ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
+        assignments.get.filter(_.group == group).head.host.exists(_.trim != 
ConsumerGroupCommand.MISSING_COLUMN_VALUE)
+    }, s"Expected a 'Stable' group status, rows and valid values for consumer 
id / client id / host columns in describe results for non-offset-committing 
group $group.")
+  }
+
 }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> kafka-consumer-group doesn't describe existing group
> ----------------------------------------------------
>
>                 Key: KAFKA-7141
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7141
>             Project: Kafka
>          Issue Type: Bug
>          Components: admin
>    Affects Versions: 0.11.0.0, 1.0.1
>            Reporter: Bohdana Panchenko
>            Assignee: huxihx
>            Priority: Major
>             Fix For: 2.1.0
>
>
> I am running two consumers: akka-stream-kafka consumer with standard config 
> section as described in the 
> [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
> kafka-console-consumer.
> akka-stream-kafka consumer configuration looks like this
> {color:#333333}_akka.kafka.consumer{_{color}
> {color:#333333}  _kafka-clients{_{color}
> {color:#333333}    _group.id = "myakkastreamkafka-1"_{color}
> {color:#333333}   _enable.auto.commit = false_{color}
> }
> {color:#333333} }{color}
>  
>  I am able to see the both groups with the command
>  
>  *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
>  _Note: This will not show information about old Zookeeper-based consumers._
>  
>  _myakkastreamkafka-1_
>  _console-consumer-57171_
> {color:#333333}I am able to view details about the console consumer 
> group{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> console-consumer-57171*
>  _{color:#205081}Note: This will not show information about old 
> Zookeeper-based consumers.{color}_
> _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID{color}_
>  _{color:#205081}STREAM-TEST 0 0 0 0 
> consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_
> {color:#333333}But the command to describe my akka stream consumer gives me 
> empty output:{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> myakkastreamkafka-1*
>  {color:#205081}_Note: This will not show information about old 
> Zookeeper-based consumers._{color}
> {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID_{color}
>  
> {color:#333333}That is strange. Can you please check the issue?{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to