jeffkbkim commented on code in PR #16898:
URL: https://github.com/apache/kafka/pull/16898#discussion_r1720282760
##########
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##########
@@ -140,12 +140,20 @@ class CoordinatorLoaderImpl[T](
batch.asScala.foreach { record =>
val controlRecord = ControlRecordType.parse(record.key)
if (controlRecord == ControlRecordType.COMMIT) {
+ if (isTraceEnabled) {
+ trace(s"Replaying end transaction marker at offset
${record.offset} to commit transaction " +
Review Comment:
should all of the added logging include topic partition id?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -848,26 +844,32 @@ ConsumerGroup consumerGroup(
ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(
String groupId,
boolean createIfNotExists
- ) throws GroupIdNotFoundException {
+ ) throws IllegalStateException {
Group group = groups.get(groupId);
if (group == null && !createIfNotExists) {
- throw new IllegalStateException(String.format("Consumer group %s
not found.", groupId));
+ throw new IllegalStateException(String.format("Consumer group %s
not found", groupId));
}
if (group == null) {
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId, metrics);
groups.put(groupId, consumerGroup);
metrics.onConsumerGroupStateTransition(null,
consumerGroup.state());
return consumerGroup;
+ } else if (group.type() == CONSUMER) {
+ return (ConsumerGroup) group;
+ } else if (group.type() == CLASSIC && ((ClassicGroup)
group).isSimpleGroup()) {
+ // If the group is a simple classic group, it was automatically
created to hold committed
+ // offsets if no group existed. Simple classic groups are not
backed by any records
+ // in the __consumer_offsets topic hence we can safely replace it
here. Without this,
+ // replaying consumer group records after offset commit records
would not work.
Review Comment:
to understand issue#3:
* OMM#replay will create a simple classic group if no group exists yet
* we used to throw IllegalStateException because it would be of type
"classic"
* users can convert a simple group to a classic or a consumer group later.
We are handling the latter case here
is this correct?
##########
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##########
@@ -156,17 +164,45 @@ class CoordinatorLoaderImpl[T](
} else {
batch.asScala.foreach { record =>
numRecords = numRecords + 1
- try {
- coordinator.replay(
- record.offset(),
- batch.producerId,
- batch.producerEpoch,
- deserializer.deserialize(record.key, record.value)
- )
- } catch {
- case ex: UnknownRecordTypeException =>
- warn(s"Unknown record type ${ex.unknownType} while
loading offsets and group metadata " +
- s"from $tp. Ignoring it. It could be a left over from
an aborted upgrade.")
+
+ val coordinatorRecordOpt = {
+ try {
+ Some(deserializer.deserialize(record.key, record.value))
+ } catch {
+ case ex: UnknownRecordTypeException =>
+ warn(s"Unknown record type ${ex.unknownType} while
loading offsets and group metadata " +
+ s"from $tp. Ignoring it. It could be a left over
from an aborted upgrade.")
+ None
+ case ex: RuntimeException =>
+ val msg = s"Deserializing record $record failed due
to: ${ex.getMessage}."
+ error(msg)
+ throw new RuntimeException(msg, ex)
+ }
+ }
+
+ coordinatorRecordOpt.foreach { coordinatorRecord =>
+ try {
+ if (isTraceEnabled) {
+ trace(s"Replaying record $coordinatorRecord at offset
${record.offset()} " +
+ s"with producer id ${batch.producerId} and producer
epoch ${batch.producerEpoch}.")
+ }
+ coordinator.replay(
+ record.offset(),
+ batch.producerId,
+ batch.producerEpoch,
+ coordinatorRecord
+ )
+ } catch {
+ case ex: UnknownRecordTypeException =>
+ warn(s"Unknown record type ${ex.unknownType} while
loading offsets and group metadata " +
+ s"from $tp. Ignoring it. It could be a left over
from an aborted upgrade.")
Review Comment:
can this be thrown here?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -848,26 +844,32 @@ ConsumerGroup consumerGroup(
ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(
String groupId,
boolean createIfNotExists
- ) throws GroupIdNotFoundException {
+ ) throws IllegalStateException {
Group group = groups.get(groupId);
if (group == null && !createIfNotExists) {
- throw new IllegalStateException(String.format("Consumer group %s
not found.", groupId));
+ throw new IllegalStateException(String.format("Consumer group %s
not found", groupId));
Review Comment:
why is it an illegal state exception here?
##########
core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala:
##########
@@ -12,53 +12,212 @@
*/
package kafka.api
-import kafka.integration.KafkaServerTestHarness
import kafka.log.UnifiedLog
-import kafka.server.KafkaConfig
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
+import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.OffsetAndMetadata
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
+import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol,
OffsetAndMetadata}
+import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture,
TopicPartition}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
-import java.util.Properties
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record.CompressionType
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.extension.ExtendWith
-class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
- val offsetsTopicCompressionCodec = CompressionType.GZIP
- val overridingProps = new Properties()
- overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
"1")
-
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG,
offsetsTopicCompressionCodec.id.toString)
+import java.time.Duration
+import java.util.Collections
+import java.util.concurrent.TimeUnit
- override def generateConfigs = TestUtils.createBrokerConfigs(1,
zkConnectOrNull, enableControlledShutdown = false).map {
- KafkaConfig.fromProps(_, overridingProps)
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
+
+ @ClusterTest(
+ types = Array(Type.KRAFT, Type.ZK),
+ serverProperties = Array(
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ new ClusterConfigProperty(key = "offsets.topic.compression.codec", value
= "1"),
+ new ClusterConfigProperty(key = "controlled.shutdown.enable", value =
"false"),
+ )
+ )
+ def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(): Unit = {
+ val logManager = cluster.brokers().asScala.head._2.logManager
+ val consumer = TestUtils.createConsumer(cluster.bootstrapServers())
+
+ try {
+ consumer.commitSync(Map(
+ new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new
OffsetAndMetadata(10, "")
+ ).asJava)
+
+ def getGroupMetadataLogOpt: Option[UnifiedLog] =
+ logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
0))
+
+ TestUtils.waitUntilTrue(() =>
getGroupMetadataLogOpt.exists(_.logSegments.asScala.exists(_.log.batches.asScala.nonEmpty)),
+ "Commit message not appended in time")
+
+ val logSegments = getGroupMetadataLogOpt.get.logSegments.asScala
+ val incorrectCompressionCodecs = logSegments
+ .flatMap(_.log.batches.asScala.map(_.compressionType))
+ .filter(_ != CompressionType.GZIP)
+
+ assertEquals(Seq.empty, incorrectCompressionCodecs, "Incorrect
compression codecs should be empty")
+ } finally {
+ consumer.close()
+ }
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(quorum:
String): Unit = {
- val consumer = TestUtils.createConsumer(bootstrapServers())
- val offsetMap = Map(
- new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new
OffsetAndMetadata(10, "")
- ).asJava
- consumer.commitSync(offsetMap)
- val logManager = brokers.head.logManager
- def getGroupMetadataLogOpt: Option[UnifiedLog] =
- logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0))
-
- TestUtils.waitUntilTrue(() =>
getGroupMetadataLogOpt.exists(_.logSegments.asScala.exists(_.log.batches.asScala.nonEmpty)),
- "Commit message not appended in time")
-
- val logSegments = getGroupMetadataLogOpt.get.logSegments.asScala
- val incorrectCompressionCodecs = logSegments
- .flatMap(_.log.batches.asScala.map(_.compressionType))
- .filter(_ != offsetsTopicCompressionCodec)
- assertEquals(Seq.empty, incorrectCompressionCodecs, "Incorrect compression
codecs should be empty")
-
- consumer.close()
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols",
value = "classic,consumer"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1")
+ )
+ )
+ def
testCoordinatorFailoverWithConsumerGroupRecordsAfterCompactingPartition(): Unit
= {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a consumer group grp1 with one member. The member subscribes
to foo and leaves. This creates
+ // a mix of group records with tombstones to delete the member.
+ withConsumer(groupId = "grp1", groupProtocol = GroupProtocol.CONSUMER) {
consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment.asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+
+ // Create a consumer group grp2 with one member. The member subscribes
to foo, manually commits offsets,
+ // unsubscribes and finally re-subscribes to foo. This creates a mix of
group records with tombstones
+ // and ensure that all the offset commit records are before the consumer
group records due to the
+ // rebalance after the commit sync.
+ withConsumer(groupId = "grp2", groupProtocol = GroupProtocol.CONSUMER,
enableAutoCommit = false) { consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ consumer.commitSync()
+ consumer.unsubscribe()
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+
+ // Create a consumer group grp3 with one member. The member subscribes
to foo and leaves the group. Then
+ // the group is deleted. This creates tombstones to delete the member,
the group and the offsets.
+ withConsumer(groupId = "grp3", groupProtocol = GroupProtocol.CONSUMER) {
consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+
+ admin
+ .deleteConsumerGroups(List("grp3").asJava)
+ .deletedGroups()
+ .get("grp3")
+ .get(10, TimeUnit.SECONDS)
+
+ // Create a classic group grp4 with one member. Upgrades the group to
the consumer
+ // protocol.
+ withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CLASSIC) {
consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+
+ withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CONSUMER) {
consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+ }
+
+ // Force a compaction.
+ val tp = new TopicPartition("__consumer_offsets", 0)
+ val broker = cluster.brokers().asScala.head._2
+ val log = broker.logManager.getLog(tp).get
+ log.roll()
+ assertTrue(broker.logManager.cleaner.awaitCleaned(tp, 0))
+
+ // Restart the broker to reload the group coordinator.
+ cluster.shutdownBroker(broker.config.brokerId)
+ cluster.startBroker(broker.config.brokerId)
+
+ // Verify the state of the groups to ensure that the group coordinator
+ // was correctly loaded. If replaying any of the records fails, the
+ // group coordinator won't be available.
+ withAdmin { admin =>
+ val groups = admin
+ .describeConsumerGroups(List("grp1", "grp2", "grp3", "grp4").asJava)
+ .describedGroups()
+ .asScala
+ .toMap
+
+ assertDescribedGroup(groups, "grp1", GroupType.CONSUMER,
ConsumerGroupState.EMPTY)
+ assertDescribedGroup(groups, "grp2", GroupType.CONSUMER,
ConsumerGroupState.EMPTY)
+ assertDescribedGroup(groups, "grp3", GroupType.CLASSIC,
ConsumerGroupState.DEAD)
+ assertDescribedGroup(groups, "grp4", GroupType.CONSUMER,
ConsumerGroupState.EMPTY)
+ }
+ }
+
+ private def withAdmin(f: Admin => Unit): Unit = {
+ val admin: Admin = cluster.createAdminClient()
+ try {
+ f(admin)
+ } finally {
+ admin.close()
+ }
+ }
+
+ private def withConsumer(
+ groupId: String,
+ groupProtocol: GroupProtocol,
+ enableAutoCommit: Boolean = true
+ )(f: Consumer[Array[Byte], Array[Byte]] => Unit): Unit = {
+ val consumer = TestUtils.createConsumer(
+ brokerList = cluster.bootstrapServers(),
+ groupId = groupId,
+ groupProtocol = groupProtocol,
+ enableAutoCommit = enableAutoCommit
+ )
+ try {
+ f(consumer)
+ } finally {
+ consumer.close()
+ }
+ }
+
+ private def assertDescribedGroup(
+ groups: Map[String, KafkaFuture[ConsumerGroupDescription]],
+ groupId: String,
+ `type`: GroupType,
Review Comment:
nit: i haven't seen this convention used elsewhere. can we use groupType?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3215,7 +3211,14 @@ public void replay(
String groupId = key.groupId();
String memberId = key.memberId();
- ConsumerGroup consumerGroup =
getOrMaybeCreatePersistedConsumerGroup(groupId, value != null);
+ ConsumerGroup consumerGroup;
+ try {
+ consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId,
value != null);
+ } catch (IllegalStateException ex) {
+ // If the group does not exist and a tombstone is replayed, we can
ignore it.
Review Comment:
piggy backing for issue#1
To understand with an example: tombstone -> membership changes ->
compaction: can have tombstones replay first - is this correct?
> if the __consumer_offsets partitions only contains tombstones for the
group or the member.
Can a partition really only contain tombstones?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3388,11 +3412,18 @@ public void replay(
) {
String groupId = key.groupId();
String memberId = key.memberId();
- ModernGroup<?> group = getOrMaybeCreatePersistedConsumerGroup(groupId,
false);
if (value != null) {
+ ConsumerGroup group =
getOrMaybeCreatePersistedConsumerGroup(groupId, true);
Review Comment:
To trigger issue#2, we would need a new ConsumerGroupMemberMetadata record
for an existing member, is this correct?
##########
core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala:
##########
@@ -12,53 +12,212 @@
*/
package kafka.api
-import kafka.integration.KafkaServerTestHarness
import kafka.log.UnifiedLog
-import kafka.server.KafkaConfig
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
+import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.OffsetAndMetadata
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
+import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol,
OffsetAndMetadata}
+import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture,
TopicPartition}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
-import java.util.Properties
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record.CompressionType
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.extension.ExtendWith
-class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
- val offsetsTopicCompressionCodec = CompressionType.GZIP
- val overridingProps = new Properties()
- overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
"1")
-
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG,
offsetsTopicCompressionCodec.id.toString)
+import java.time.Duration
+import java.util.Collections
+import java.util.concurrent.TimeUnit
- override def generateConfigs = TestUtils.createBrokerConfigs(1,
zkConnectOrNull, enableControlledShutdown = false).map {
- KafkaConfig.fromProps(_, overridingProps)
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
+
+ @ClusterTest(
+ types = Array(Type.KRAFT, Type.ZK),
+ serverProperties = Array(
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ new ClusterConfigProperty(key = "offsets.topic.compression.codec", value
= "1"),
+ new ClusterConfigProperty(key = "controlled.shutdown.enable", value =
"false"),
+ )
+ )
+ def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(): Unit = {
+ val logManager = cluster.brokers().asScala.head._2.logManager
+ val consumer = TestUtils.createConsumer(cluster.bootstrapServers())
+
+ try {
+ consumer.commitSync(Map(
+ new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new
OffsetAndMetadata(10, "")
+ ).asJava)
+
+ def getGroupMetadataLogOpt: Option[UnifiedLog] =
+ logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
0))
+
+ TestUtils.waitUntilTrue(() =>
getGroupMetadataLogOpt.exists(_.logSegments.asScala.exists(_.log.batches.asScala.nonEmpty)),
+ "Commit message not appended in time")
+
+ val logSegments = getGroupMetadataLogOpt.get.logSegments.asScala
+ val incorrectCompressionCodecs = logSegments
+ .flatMap(_.log.batches.asScala.map(_.compressionType))
+ .filter(_ != CompressionType.GZIP)
+
+ assertEquals(Seq.empty, incorrectCompressionCodecs, "Incorrect
compression codecs should be empty")
+ } finally {
+ consumer.close()
+ }
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(quorum:
String): Unit = {
- val consumer = TestUtils.createConsumer(bootstrapServers())
- val offsetMap = Map(
- new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new
OffsetAndMetadata(10, "")
- ).asJava
- consumer.commitSync(offsetMap)
- val logManager = brokers.head.logManager
- def getGroupMetadataLogOpt: Option[UnifiedLog] =
- logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0))
-
- TestUtils.waitUntilTrue(() =>
getGroupMetadataLogOpt.exists(_.logSegments.asScala.exists(_.log.batches.asScala.nonEmpty)),
- "Commit message not appended in time")
-
- val logSegments = getGroupMetadataLogOpt.get.logSegments.asScala
- val incorrectCompressionCodecs = logSegments
- .flatMap(_.log.batches.asScala.map(_.compressionType))
- .filter(_ != offsetsTopicCompressionCodec)
- assertEquals(Seq.empty, incorrectCompressionCodecs, "Incorrect compression
codecs should be empty")
-
- consumer.close()
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols",
value = "classic,consumer"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1")
+ )
+ )
+ def
testCoordinatorFailoverWithConsumerGroupRecordsAfterCompactingPartition(): Unit
= {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a consumer group grp1 with one member. The member subscribes
to foo and leaves. This creates
+ // a mix of group records with tombstones to delete the member.
+ withConsumer(groupId = "grp1", groupProtocol = GroupProtocol.CONSUMER) {
consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment.asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+
+ // Create a consumer group grp2 with one member. The member subscribes
to foo, manually commits offsets,
+ // unsubscribes and finally re-subscribes to foo. This creates a mix of
group records with tombstones
+ // and ensure that all the offset commit records are before the consumer
group records due to the
+ // rebalance after the commit sync.
+ withConsumer(groupId = "grp2", groupProtocol = GroupProtocol.CONSUMER,
enableAutoCommit = false) { consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ consumer.commitSync()
+ consumer.unsubscribe()
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+
+ // Create a consumer group grp3 with one member. The member subscribes
to foo and leaves the group. Then
+ // the group is deleted. This creates tombstones to delete the member,
the group and the offsets.
+ withConsumer(groupId = "grp3", groupProtocol = GroupProtocol.CONSUMER) {
consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+
+ admin
+ .deleteConsumerGroups(List("grp3").asJava)
+ .deletedGroups()
+ .get("grp3")
+ .get(10, TimeUnit.SECONDS)
+
+ // Create a classic group grp4 with one member. Upgrades the group to
the consumer
+ // protocol.
+ withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CLASSIC) {
consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+
+ withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CONSUMER) {
consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+ }
Review Comment:
was this accidentally repeated?
##########
core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala:
##########
@@ -12,53 +12,212 @@
*/
package kafka.api
-import kafka.integration.KafkaServerTestHarness
import kafka.log.UnifiedLog
-import kafka.server.KafkaConfig
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
+import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.OffsetAndMetadata
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
+import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol,
OffsetAndMetadata}
+import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture,
TopicPartition}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
-import java.util.Properties
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record.CompressionType
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.extension.ExtendWith
-class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
- val offsetsTopicCompressionCodec = CompressionType.GZIP
- val overridingProps = new Properties()
- overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
"1")
-
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG,
offsetsTopicCompressionCodec.id.toString)
+import java.time.Duration
+import java.util.Collections
+import java.util.concurrent.TimeUnit
- override def generateConfigs = TestUtils.createBrokerConfigs(1,
zkConnectOrNull, enableControlledShutdown = false).map {
- KafkaConfig.fromProps(_, overridingProps)
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
+
+ @ClusterTest(
+ types = Array(Type.KRAFT, Type.ZK),
+ serverProperties = Array(
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ new ClusterConfigProperty(key = "offsets.topic.compression.codec", value
= "1"),
+ new ClusterConfigProperty(key = "controlled.shutdown.enable", value =
"false"),
+ )
+ )
+ def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(): Unit = {
+ val logManager = cluster.brokers().asScala.head._2.logManager
+ val consumer = TestUtils.createConsumer(cluster.bootstrapServers())
+
+ try {
+ consumer.commitSync(Map(
+ new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new
OffsetAndMetadata(10, "")
+ ).asJava)
+
+ def getGroupMetadataLogOpt: Option[UnifiedLog] =
+ logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
0))
+
+ TestUtils.waitUntilTrue(() =>
getGroupMetadataLogOpt.exists(_.logSegments.asScala.exists(_.log.batches.asScala.nonEmpty)),
+ "Commit message not appended in time")
+
+ val logSegments = getGroupMetadataLogOpt.get.logSegments.asScala
+ val incorrectCompressionCodecs = logSegments
+ .flatMap(_.log.batches.asScala.map(_.compressionType))
+ .filter(_ != CompressionType.GZIP)
+
+ assertEquals(Seq.empty, incorrectCompressionCodecs, "Incorrect
compression codecs should be empty")
+ } finally {
+ consumer.close()
+ }
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(quorum:
String): Unit = {
- val consumer = TestUtils.createConsumer(bootstrapServers())
- val offsetMap = Map(
- new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new
OffsetAndMetadata(10, "")
- ).asJava
- consumer.commitSync(offsetMap)
- val logManager = brokers.head.logManager
- def getGroupMetadataLogOpt: Option[UnifiedLog] =
- logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0))
-
- TestUtils.waitUntilTrue(() =>
getGroupMetadataLogOpt.exists(_.logSegments.asScala.exists(_.log.batches.asScala.nonEmpty)),
- "Commit message not appended in time")
-
- val logSegments = getGroupMetadataLogOpt.get.logSegments.asScala
- val incorrectCompressionCodecs = logSegments
- .flatMap(_.log.batches.asScala.map(_.compressionType))
- .filter(_ != offsetsTopicCompressionCodec)
- assertEquals(Seq.empty, incorrectCompressionCodecs, "Incorrect compression
codecs should be empty")
-
- consumer.close()
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols",
value = "classic,consumer"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1")
+ )
+ )
+ def
testCoordinatorFailoverWithConsumerGroupRecordsAfterCompactingPartition(): Unit
= {
Review Comment:
if we can, i think it makes more sense to split the test if possible which
would make it easier to debug in the future if any fails.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]