[ https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16550903#comment-16550903 ]
ASF GitHub Bot commented on KAFKA-7141: --------------------------------------- hachikuji closed pull request #5356: KAFKA-7141: ConsumerGroupCommand should describe group assignment eve… URL: https://github.com/apache/kafka/pull/5356 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 48c2cffb5d3..1d61720bfa9 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -27,7 +27,6 @@ import kafka.utils._ import org.apache.kafka.clients.{CommonClientConfigs, admin} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} -import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{KafkaException, Node, TopicPartition} @@ -35,7 +34,6 @@ import org.apache.kafka.common.{KafkaException, Node, TopicPartition} import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import scala.collection.{Seq, Set} -import scala.concurrent.ExecutionException import scala.util.{Failure, Success, Try} object ConsumerGroupCommand extends Logging { @@ -340,20 +338,19 @@ object ConsumerGroupCommand extends Logging { val state = consumerGroup.state val committedOffsets = getCommittedOffsets(groupId).asScala.toMap var assignedTopicPartitions = ListBuffer[TopicPartition]() - val rowsWithConsumer = if (committedOffsets.isEmpty) List[PartitionAssignmentState]() else consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq - .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size) - .flatMap { consumerSummary => - val topicPartitions = consumerSummary.assignment.topicPartitions.asScala - assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions - val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala - .map { topicPartition => - topicPartition -> committedOffsets.get(topicPartition).map(_.offset) - }.toMap + val rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq + .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary => + val topicPartitions = consumerSummary.assignment.topicPartitions.asScala + assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions + val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala + .map { topicPartition => + topicPartition -> committedOffsets.get(topicPartition).map(_.offset) + }.toMap collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList, partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"), Some(s"${consumerSummary.clientId}")) - } + } val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap { case (topicPartition, offset) => diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala index cf00e93558c..51082effdbd 100644 --- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala @@ -92,8 +92,9 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { def addConsumerGroupExecutor(numConsumers: Int, topic: String = topic, group: String = group, - strategy: String = classOf[RangeAssignor].getName): ConsumerGroupExecutor = { - val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic, strategy) + strategy: String = classOf[RangeAssignor].getName, + customPropsOpt: Option[Properties] = None): ConsumerGroupExecutor = { + val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic, strategy, customPropsOpt) addExecutor(executor) executor } @@ -114,9 +115,10 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { object ConsumerGroupCommandTest { - abstract class AbstractConsumerRunnable(broker: String, groupId: String) extends Runnable { + abstract class AbstractConsumerRunnable(broker: String, groupId: String, customPropsOpt: Option[Properties] = None) extends Runnable { val props = new Properties configure(props) + customPropsOpt.foreach(props.asScala ++= _.asScala) val consumer = new KafkaConsumer(props) def configure(props: Properties): Unit = { @@ -145,8 +147,8 @@ object ConsumerGroupCommandTest { } } - class ConsumerRunnable(broker: String, groupId: String, topic: String, strategy: String) - extends AbstractConsumerRunnable(broker, groupId) { + class ConsumerRunnable(broker: String, groupId: String, topic: String, strategy: String, customPropsOpt: Option[Properties] = None) + extends AbstractConsumerRunnable(broker, groupId, customPropsOpt) { override def configure(props: Properties): Unit = { super.configure(props) @@ -182,11 +184,12 @@ object ConsumerGroupCommandTest { } } - class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String, strategy: String) + class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String, strategy: String, + customPropsOpt: Option[Properties] = None) extends AbstractConsumerGroupExecutor(numConsumers) { for (_ <- 1 to numConsumers) { - submit(new ConsumerRunnable(broker, groupId, topic, strategy)) + submit(new ConsumerRunnable(broker, groupId, topic, strategy, customPropsOpt)) } } diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala index fac34a71ee1..950676b9222 100644 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -16,9 +16,11 @@ */ package kafka.admin +import java.util.Properties + import joptsimple.OptionException import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.RoundRobinAssignor +import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{TimeoutException} import org.junit.Assert._ @@ -605,5 +607,29 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { fail("Expected an error due to presence of unrecognized --new-consumer option") } + @Test + def testDescribeNonOffsetCommitGroup() { + TestUtils.createOffsetsTopic(zkClient, servers) + + val customProps = new Properties + // create a consumer group that never commits offsets + customProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + // run one consumer in the group consuming from a single-partition topic + addConsumerGroupExecutor(numConsumers = 1, customPropsOpt = Some(customProps)) + + val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) + val service = getConsumerGroupService(cgcArgs) + + TestUtils.waitUntilTrue(() => { + val (state, assignments) = service.collectGroupOffsets() + state.contains("Stable") && + assignments.isDefined && + assignments.get.count(_.group == group) == 1 && + assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) && + assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) && + assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) + }, s"Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for non-offset-committing group $group.") + } + } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > kafka-consumer-group doesn't describe existing group > ---------------------------------------------------- > > Key: KAFKA-7141 > URL: https://issues.apache.org/jira/browse/KAFKA-7141 > Project: Kafka > Issue Type: Bug > Components: admin > Affects Versions: 0.11.0.0, 1.0.1 > Reporter: Bohdana Panchenko > Assignee: huxihx > Priority: Major > Fix For: 2.1.0 > > > I am running two consumers: akka-stream-kafka consumer with standard config > section as described in the > [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and > kafka-console-consumer. > akka-stream-kafka consumer configuration looks like this > {color:#333333}_akka.kafka.consumer{_{color} > {color:#333333} _kafka-clients{_{color} > {color:#333333} _group.id = "myakkastreamkafka-1"_{color} > {color:#333333} _enable.auto.commit = false_{color} > } > {color:#333333} }{color} > > I am able to see the both groups with the command > > *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list* > _Note: This will not show information about old Zookeeper-based consumers._ > > _myakkastreamkafka-1_ > _console-consumer-57171_ > {color:#333333}I am able to view details about the console consumer > group{color} > *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group > console-consumer-57171* > _{color:#205081}Note: This will not show information about old > Zookeeper-based consumers.{color}_ > _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID > HOST CLIENT-ID{color}_ > _{color:#205081}STREAM-TEST 0 0 0 0 > consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_ > {color:#333333}But the command to describe my akka stream consumer gives me > empty output:{color} > *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group > myakkastreamkafka-1* > {color:#205081}_Note: This will not show information about old > Zookeeper-based consumers._{color} > {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID > HOST CLIENT-ID_{color} > > {color:#333333}That is strange. Can you please check the issue?{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)