jolshan commented on code in PR #16898:
URL: https://github.com/apache/kafka/pull/16898#discussion_r1727460619


##########
core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala:
##########
@@ -12,53 +12,332 @@
  */
 package kafka.api
 
-import kafka.integration.KafkaServerTestHarness
 import kafka.log.UnifiedLog
-import kafka.server.KafkaConfig
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
+import kafka.test.junit.ClusterTestExtensions
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.OffsetAndMetadata
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
+import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol, 
OffsetAndMetadata}
+import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture, 
TopicPartition}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
-import java.util.Properties
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.CompressionType
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.server.config.ServerConfigs
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.extension.ExtendWith
 
-class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
-  val offsetsTopicCompressionCodec = CompressionType.GZIP
-  val overridingProps = new Properties()
-  overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
"1")
-  
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG,
 offsetsTopicCompressionCodec.id.toString)
+import java.time.Duration
+import java.util.Collections
+import java.util.concurrent.TimeUnit
 
-  override def generateConfigs = TestUtils.createBrokerConfigs(1, 
zkConnectOrNull, enableControlledShutdown = false).map {
-    KafkaConfig.fromProps(_, overridingProps)
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
+
+  @ClusterTest(
+    types = Array(Type.KRAFT, Type.ZK),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "false"),
+    )
+  )
+  def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(): Unit = {
+    val logManager = cluster.brokers().asScala.head._2.logManager
+    val consumer = TestUtils.createConsumer(cluster.bootstrapServers())
+
+    try {
+      consumer.commitSync(Map(
+        new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new 
OffsetAndMetadata(10, "")
+      ).asJava)
+
+      def getGroupMetadataLogOpt: Option[UnifiedLog] =
+        logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
0))
+
+      TestUtils.waitUntilTrue(() => 
getGroupMetadataLogOpt.exists(_.logSegments.asScala.exists(_.log.batches.asScala.nonEmpty)),
+        "Commit message not appended in time")
+
+      val logSegments = getGroupMetadataLogOpt.get.logSegments.asScala
+      val incorrectCompressionCodecs = logSegments
+        .flatMap(_.log.batches.asScala.map(_.compressionType))
+        .filter(_ != CompressionType.GZIP)
+
+      assertEquals(Seq.empty, incorrectCompressionCodecs, "Incorrect 
compression codecs should be empty")
+    } finally {
+      consumer.close()
+    }
+  }
+
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def 
testCoordinatorFailoverAfterCompactingPartitionWithConsumerGroupMemberJoiningAndLeaving():
 Unit = {
+    withAdmin { admin =>
+      TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      // Create a consumer group grp1 with one member. The member subscribes 
to foo and leaves. This creates
+      // a mix of group records with tombstones to delete the member.
+      withConsumer(groupId = "grp1", groupProtocol = GroupProtocol.CONSUMER) { 
consumer =>
+        consumer.subscribe(List("foo").asJava)
+        TestUtils.waitUntilTrue(() => {
+          consumer.poll(Duration.ofMillis(50))
+          consumer.assignment.asScala.nonEmpty
+        }, msg = "Consumer did not get an non empty assignment")
+      }
+    }
+
+    // Force a compaction.
+    rollAndCompactConsumerOffsets()
+
+    // Restart the broker to reload the group coordinator.
+    cluster.shutdownBroker(0)
+    cluster.startBroker(0)
+
+    // Verify the state of the groups to ensure that the group coordinator
+    // was correctly loaded. If replaying any of the records fails, the
+    // group coordinator won't be available.
+    withAdmin { admin =>
+      val groups = admin
+        .describeConsumerGroups(List("grp1").asJava)
+        .describedGroups()
+        .asScala
+        .toMap
+
+      assertDescribedGroup(groups, "grp1", GroupType.CONSUMER, 
ConsumerGroupState.EMPTY)
+    }
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(quorum: 
String): Unit = {
-    val consumer = TestUtils.createConsumer(bootstrapServers())
-    val offsetMap = Map(
-      new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new 
OffsetAndMetadata(10, "")
-    ).asJava
-    consumer.commitSync(offsetMap)
-    val logManager = brokers.head.logManager
-    def getGroupMetadataLogOpt: Option[UnifiedLog] =
-      logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0))
-
-    TestUtils.waitUntilTrue(() => 
getGroupMetadataLogOpt.exists(_.logSegments.asScala.exists(_.log.batches.asScala.nonEmpty)),
-                            "Commit message not appended in time")
-
-    val logSegments = getGroupMetadataLogOpt.get.logSegments.asScala
-    val incorrectCompressionCodecs = logSegments
-      .flatMap(_.log.batches.asScala.map(_.compressionType))
-      .filter(_ != offsetsTopicCompressionCodec)
-    assertEquals(Seq.empty, incorrectCompressionCodecs, "Incorrect compression 
codecs should be empty")
-
-    consumer.close()
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def 
testCoordinatorFailoverCompactingPartitionWithManualOffsetCommitsAndConsumerGroupMemberUnsubscribingAndResubscribing():
 Unit = {

Review Comment:
   this test name is quite long 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to