cadonna commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1401760510
########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -277,8 +294,10 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(500, anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset) } - @Test - def testAutoCommitOnCloseAfterWakeup(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // close() is not committing offsets in consumer group protocol Review Comment: This is just a transient state, isn't it? At least https://issues.apache.org/jira/browse/KAFKA-15327 says that committing on close is planned. If KAFKA-15327 is still valid, can we formulate this comment accordingly like `close() is not committing offsets in consumer group protocol for now but it should when implementation is complete`. ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } - @Test - def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol Review Comment: I guess the `ConsumerRebalanceListener` is also something that will be supported in the future by the consumer group protocol. ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -537,33 +574,41 @@ class PlaintextConsumerTest extends BaseConsumerTest { awaitAssignment(consumer, shrunkenAssignment) } - @Test - def testPartitionsFor(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // partitionsFor not supported for consumer group protocol + def testPartitionsFor(groupProtocol: String): Unit = { val numParts = 2 createTopic("part-test", numParts, 1) + this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) val consumer = createConsumer() val parts = consumer.partitionsFor("part-test") assertNotNull(parts) assertEquals(2, parts.size) } - @Test - def testPartitionsForAutoCreate(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // partitionsFor not supported for consumer group protocol + def testPartitionsForAutoCreate(groupProtocol: String): Unit = { + this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) val consumer = createConsumer() // First call would create the topic consumer.partitionsFor("non-exist-topic") val partitions = consumer.partitionsFor("non-exist-topic") assertFalse(partitions.isEmpty) } - @Test - def testPartitionsForInvalidTopic(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // partitionsFor not supported for consumer group protocol Review Comment: See my comment above about the functionality being supported in future. ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } - @Test - def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol + def testMultiConsumerSessionTimeoutOnStopPolling(groupProtocol: String): Unit = { + this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) runMultiConsumerSessionTimeoutTest(false) } - @Test - def testMultiConsumerSessionTimeoutOnClose(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol + def testMultiConsumerSessionTimeoutOnClose(groupProtocol: String): Unit = { + this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) runMultiConsumerSessionTimeoutTest(true) } - @Test - def testInterceptors(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // Consumer interceptors not implemented for consumer group protocol Review Comment: Will also interceptors be supported? ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } - @Test - def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol + def testMultiConsumerSessionTimeoutOnStopPolling(groupProtocol: String): Unit = { + this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) runMultiConsumerSessionTimeoutTest(false) } - @Test - def testMultiConsumerSessionTimeoutOnClose(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol Review Comment: Same here ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1411,11 +1498,13 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } - @Test - def testAutoCommitOnRebalance(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol Review Comment: See my other comments. ########## core/src/test/scala/kafka/utils/TestInfoUtils.scala: ########## @@ -39,6 +39,12 @@ object TestInfoUtils { } else { throw new RuntimeException(s"Unknown quorum value") } + } else if (testInfo.getDisplayName().contains("groupProtocol=")) { Review Comment: Could explain why this is needed? ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1919,8 +2038,23 @@ class PlaintextConsumerTest extends BaseConsumerTest { "Expected consumer2 to consume one message from offset 1, which is the committed offset of consumer1") } - @Test - def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("consumer")) // Empty group ID not supported with consumer group protocol + def testEmptyGroupIdNotSupported(groupProtocol: String): Unit = { + val consumer1Config = new Properties(consumerConfig) + consumer1Config.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + consumer1Config.put(ConsumerConfig.GROUP_ID_CONFIG, "") + consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1") + val consumer1 = createConsumer(configOverrides = consumer1Config) + + consumer1.assign(List(tp).asJava) + assertThrows(classOf[InvalidGroupIdException], () => consumer1.commitSync()) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic")) // Static membership is not supported in consumer group protocol Review Comment: Static membership will be supported in future, right? ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1947,4 +2082,215 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer2.close() } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndCommitAsync(groupProtocol: String): Unit = { + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props) + val producer = createProducer() + val numRecords = 10000 + val startingTimestamp = System.currentTimeMillis() + val cb = new CountConsumerCommitCallback + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + consumer.assign(List(tp).asJava) + consumer.commitAsync(cb) + TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || cb.lastError.isDefined, + "Failed to observe commit callback before timeout", waitTimeMs = 10000) + val committedOffset = consumer.committed(Set(tp).asJava) + assertNotNull(committedOffset) + // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to + // tp. The committed offset should be null. This is intentional. + assertNull(committedOffset.get(tp)) + assertTrue(consumer.assignment.contains(tp)) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndCommitSync(groupProtocol: String): Unit = { + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props) + val producer = createProducer() + val numRecords = 10000 + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + consumer.assign(List(tp).asJava) + consumer.commitSync() + val committedOffset = consumer.committed(Set(tp).asJava) + assertNotNull(committedOffset) + // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to + // tp. The committed offset should be null. This is intentional. + assertNull(committedOffset.get(tp)) + assertTrue(consumer.assignment.contains(tp)) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndCommitSyncAllConsumed(groupProtocol: String): Unit = { + val numRecords = 10000 + + val producer = createProducer() + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props) + consumer.assign(List(tp).asJava) + consumer.seek(tp, 0) + consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) + + consumer.commitSync() + val committedOffset = consumer.committed(Set(tp).asJava) + assertNotNull(committedOffset) + assertNotNull(committedOffset.get(tp)) + assertEquals(numRecords, committedOffset.get(tp).offset()) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndConsume(groupProtocol: String): Unit = { + val numRecords = 10 + + val producer = createProducer() + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props, + configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + consumer.assign(List(tp).asJava) + consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) + + assertEquals(numRecords, consumer.position(tp)) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndConsumeSkippingPosition(groupProtocol: String): Unit = { + val numRecords = 10 + + val producer = createProducer() + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props, + configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + consumer.assign(List(tp).asJava) + val offset = 1 + consumer.seek(tp, offset) + consumeAndVerifyRecords(consumer = consumer, numRecords - offset, startingOffset = offset, + startingKeyAndValueIndex = offset, startingTimestamp = startingTimestamp + offset) + + assertEquals(numRecords, consumer.position(tp)) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic")) // partitionsFor not implemented in consumer group protocol Review Comment: See my other comments about functionality that will be supported in the future. ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest { s"The current assignment is ${consumer.assignment()}") } - @Test - def testConsumingWithNullGroupId(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // Null group ID only supported for generic group protocol + def testConsumingWithNullGroupId(groupProtocol: String): Unit = { val topic = "test_topic" - val partition = 0; + val partition = 0 val tp = new TopicPartition(topic, partition) createTopic(topic, 1, 1) - TestUtils.waitUntilTrue(() => { - this.zkClient.topicExists(topic) - }, "Failed to create topic") - Review Comment: I agree with @AndrewJSchofield . Method `createTopics()` already verifies that the topic was created. ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1347,8 +1428,10 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(2, topics.get(topic3).size) } - @Test - def testUnsubscribeTopic(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol Review Comment: See my other comments ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1206,8 +1277,9 @@ class PlaintextConsumerTest extends BaseConsumerTest { MockProducerInterceptor.resetCounters() } - @Test - def testAutoCommitIntercept(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol Review Comment: See my other comments. ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1489,8 +1580,9 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags2))) } - @Test - def testPerPartitionLagMetricsCleanUpWithSubscribe(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol Review Comment: See my other comments ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1327,8 +1406,9 @@ class PlaintextConsumerTest extends BaseConsumerTest { startingTimestamp = startTime, timestampType = TimestampType.LOG_APPEND_TIME) } - @Test - def testListTopics(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // listTopics not supported for consumer group protocol Review Comment: See my other comments. ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1947,4 +2082,215 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer2.close() } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndCommitAsync(groupProtocol: String): Unit = { + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props) + val producer = createProducer() + val numRecords = 10000 + val startingTimestamp = System.currentTimeMillis() + val cb = new CountConsumerCommitCallback + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + consumer.assign(List(tp).asJava) + consumer.commitAsync(cb) + TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || cb.lastError.isDefined, + "Failed to observe commit callback before timeout", waitTimeMs = 10000) + val committedOffset = consumer.committed(Set(tp).asJava) + assertNotNull(committedOffset) + // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to + // tp. The committed offset should be null. This is intentional. + assertNull(committedOffset.get(tp)) + assertTrue(consumer.assignment.contains(tp)) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndCommitSync(groupProtocol: String): Unit = { + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props) + val producer = createProducer() + val numRecords = 10000 + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + consumer.assign(List(tp).asJava) + consumer.commitSync() + val committedOffset = consumer.committed(Set(tp).asJava) + assertNotNull(committedOffset) + // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to + // tp. The committed offset should be null. This is intentional. Review Comment: See my comment above ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1450,8 +1539,9 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(500, consumer.committed(Set(tp2).asJava).get(tp2).offset) } - @Test - def testPerPartitionLeadMetricsCleanUpWithSubscribe(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol Review Comment: See my other comments ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1947,4 +2082,215 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer2.close() } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndCommitAsync(groupProtocol: String): Unit = { + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props) + val producer = createProducer() + val numRecords = 10000 + val startingTimestamp = System.currentTimeMillis() + val cb = new CountConsumerCommitCallback + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + consumer.assign(List(tp).asJava) + consumer.commitAsync(cb) + TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || cb.lastError.isDefined, + "Failed to observe commit callback before timeout", waitTimeMs = 10000) + val committedOffset = consumer.committed(Set(tp).asJava) + assertNotNull(committedOffset) + // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to + // tp. The committed offset should be null. This is intentional. Review Comment: Could you make this clear in the name of the test instead of using a inline comment? ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1947,4 +2082,215 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer2.close() } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndCommitAsync(groupProtocol: String): Unit = { + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props) + val producer = createProducer() + val numRecords = 10000 + val startingTimestamp = System.currentTimeMillis() + val cb = new CountConsumerCommitCallback + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + consumer.assign(List(tp).asJava) + consumer.commitAsync(cb) + TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || cb.lastError.isDefined, + "Failed to observe commit callback before timeout", waitTimeMs = 10000) + val committedOffset = consumer.committed(Set(tp).asJava) + assertNotNull(committedOffset) + // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to + // tp. The committed offset should be null. This is intentional. + assertNull(committedOffset.get(tp)) + assertTrue(consumer.assignment.contains(tp)) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndCommitSync(groupProtocol: String): Unit = { + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props) + val producer = createProducer() + val numRecords = 10000 + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + consumer.assign(List(tp).asJava) + consumer.commitSync() + val committedOffset = consumer.committed(Set(tp).asJava) + assertNotNull(committedOffset) + // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to + // tp. The committed offset should be null. This is intentional. + assertNull(committedOffset.get(tp)) + assertTrue(consumer.assignment.contains(tp)) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndCommitSyncAllConsumed(groupProtocol: String): Unit = { + val numRecords = 10000 + + val producer = createProducer() + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props) + consumer.assign(List(tp).asJava) + consumer.seek(tp, 0) + consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) + + consumer.commitSync() + val committedOffset = consumer.committed(Set(tp).asJava) + assertNotNull(committedOffset) + assertNotNull(committedOffset.get(tp)) + assertEquals(numRecords, committedOffset.get(tp).offset()) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndConsume(groupProtocol: String): Unit = { + val numRecords = 10 + + val producer = createProducer() + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props, + configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + consumer.assign(List(tp).asJava) + consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) + + assertEquals(numRecords, consumer.position(tp)) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndConsumeSkippingPosition(groupProtocol: String): Unit = { + val numRecords = 10 + + val producer = createProducer() + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props, + configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + consumer.assign(List(tp).asJava) + val offset = 1 + consumer.seek(tp, offset) + consumeAndVerifyRecords(consumer = consumer, numRecords - offset, startingOffset = offset, + startingKeyAndValueIndex = offset, startingTimestamp = startingTimestamp + offset) + + assertEquals(numRecords, consumer.position(tp)) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic")) // partitionsFor not implemented in consumer group protocol + def testAssignAndConsumeWithLeaderChangeValidatingPositions(groupProtocol: String): Unit = { + val numRecords = 10 + val producer = createProducer() + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props, + configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + consumer.assign(List(tp).asJava) + consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) + + // Force leader epoch change to trigger position validation + var parts: mutable.Buffer[PartitionInfo] = null + while (parts == null) + parts = consumer.partitionsFor(tp.topic()).asScala + val leader = parts.head.leader().id() + this.servers(leader).shutdown() + this.servers(leader).startup() + + // Consume after leader change + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 10, + startingTimestamp = startingTimestamp) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndFetchCommittedOffsets(groupProtocol: String): Unit = { + val numRecords = 100 + val startingTimestamp = System.currentTimeMillis() + val producer = createProducer() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props) + consumer.assign(List(tp).asJava) + // First consumer consumes and commits offsets + consumer.seek(tp, 0) + consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, + startingTimestamp = startingTimestamp) + consumer.commitSync() + assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset) + // We should see the committed offsets from another consumer + val anotherConsumer = createConsumer(configOverrides = props) + anotherConsumer.assign(List(tp).asJava) + assertEquals(numRecords, anotherConsumer.committed(Set(tp).asJava).get(tp).offset) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndConsumeFromCommittedOffsets(groupProtocol: String): Unit = { + val producer = createProducer() + val numRecords = 100 + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords = numRecords, tp, startingTimestamp = startingTimestamp) + + // Commit offset with first consumer + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group1") + val consumer = createConsumer(configOverrides = props) + consumer.assign(List(tp).asJava) + val offset = 10 + consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(offset))) + .asJava) + assertEquals(offset, consumer.committed(Set(tp).asJava).get(tp).offset) + consumer.close() + + // Consume from committed offsets with another consumer in same group + val anotherConsumer = createConsumer(configOverrides = props) + assertEquals(offset, anotherConsumer.committed(Set(tp).asJava).get(tp).offset) + anotherConsumer.assign(List(tp).asJava) + consumeAndVerifyRecords(consumer = anotherConsumer, numRecords - offset, + startingOffset = offset, startingKeyAndValueIndex = offset, + startingTimestamp = startingTimestamp + offset) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndRetrievingCommittedOffsetsMultipleTimes(groupProtocol: String): Unit = { + val numRecords = 100 + val startingTimestamp = System.currentTimeMillis() + val producer = createProducer() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + + val props = new Properties() + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val consumer = createConsumer(configOverrides = props) + consumer.assign(List(tp).asJava) + + // Consume and commit offsets + consumer.seek(tp, 0) + consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, + startingTimestamp = startingTimestamp) + consumer.commitSync() + + // Check committed offsets twice with same consumer + assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset) Review Comment: Wouldn't it be better to consumer and commit some records before retrieving the committed offsets a second time? ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -1870,31 +1975,45 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(2, numRecords3, "Expected consumer3 to consume from offset 1") } - @Test - def testConsumingWithEmptyGroupId(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("consumer")) // Null group ID not supported with consumer group protocol + def testNullGroupIdNotSupported(groupProtocol: String): Unit = { Review Comment: This name suggests that a group ID of `null` is not supported in general with manual assignment. However, `null` is supported as group ID with manual assignment. Just commits are not supported with group ID = `null`. Could you please use a more descriptive name for this test like `testNullGroupIdNotSupportedIfCommitting`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org