cadonna commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1401760510


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -277,8 +294,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(500, 
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
-  @Test
-  def testAutoCommitOnCloseAfterWakeup(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // close() is not committing 
offsets in consumer group protocol

Review Comment:
   This is just a transient state, isn't it? At least 
https://issues.apache.org/jira/browse/KAFKA-15327 says that committing on close 
is planned. If KAFKA-15327 is still valid, can we formulate this comment 
accordingly like `close() is not committing offsets in consumer group protocol 
for now but it should when implementation is complete`.



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
-  @Test
-  def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol

Review Comment:
   I guess the `ConsumerRebalanceListener` is also something that will be 
supported in the future by the consumer group protocol. 



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -537,33 +574,41 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     awaitAssignment(consumer, shrunkenAssignment)
   }
 
-  @Test
-  def testPartitionsFor(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // partitionsFor not supported for 
consumer group protocol
+  def testPartitionsFor(groupProtocol: String): Unit = {
     val numParts = 2
     createTopic("part-test", numParts, 1)
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
     val consumer = createConsumer()
     val parts = consumer.partitionsFor("part-test")
     assertNotNull(parts)
     assertEquals(2, parts.size)
   }
 
-  @Test
-  def testPartitionsForAutoCreate(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // partitionsFor not supported for 
consumer group protocol
+  def testPartitionsForAutoCreate(groupProtocol: String): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
     val consumer = createConsumer()
     // First call would create the topic
     consumer.partitionsFor("non-exist-topic")
     val partitions = consumer.partitionsFor("non-exist-topic")
     assertFalse(partitions.isEmpty)
   }
 
-  @Test
-  def testPartitionsForInvalidTopic(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // partitionsFor not supported for 
consumer group protocol

Review Comment:
   See my comment above about the functionality being supported in future.



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
-  @Test
-  def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol
+  def testMultiConsumerSessionTimeoutOnStopPolling(groupProtocol: String): 
Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
     runMultiConsumerSessionTimeoutTest(false)
   }
 
-  @Test
-  def testMultiConsumerSessionTimeoutOnClose(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol
+  def testMultiConsumerSessionTimeoutOnClose(groupProtocol: String): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
     runMultiConsumerSessionTimeoutTest(true)
   }
 
-  @Test
-  def testInterceptors(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // Consumer interceptors not 
implemented for consumer group protocol

Review Comment:
   Will also interceptors be supported?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
-  @Test
-  def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol
+  def testMultiConsumerSessionTimeoutOnStopPolling(groupProtocol: String): 
Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
     runMultiConsumerSessionTimeoutTest(false)
   }
 
-  @Test
-  def testMultiConsumerSessionTimeoutOnClose(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol

Review Comment:
   Same here



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1411,11 +1498,13 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
-  @Test
-  def testAutoCommitOnRebalance(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol

Review Comment:
   See my other comments.



##########
core/src/test/scala/kafka/utils/TestInfoUtils.scala:
##########
@@ -39,6 +39,12 @@ object TestInfoUtils {
       } else {
         throw new RuntimeException(s"Unknown quorum value")
       }
+    } else if (testInfo.getDisplayName().contains("groupProtocol=")) {

Review Comment:
   Could explain why this is needed?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1919,8 +2038,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       "Expected consumer2 to consume one message from offset 1, which is the 
committed offset of consumer1")
   }
 
-  @Test
-  def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("consumer")) // Empty group ID not supported 
with consumer group protocol
+  def testEmptyGroupIdNotSupported(groupProtocol: String): Unit = {
+    val consumer1Config = new Properties(consumerConfig)
+    consumer1Config.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    consumer1Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
+    consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
+    val consumer1 = createConsumer(configOverrides = consumer1Config)
+
+    consumer1.assign(List(tp).asJava)
+    assertThrows(classOf[InvalidGroupIdException], () => 
consumer1.commitSync())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // Static membership is not 
supported in consumer group protocol

Review Comment:
   Static membership will be supported in future, right?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1947,4 +2082,215 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     consumer2.close()
   }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitAsync(groupProtocol: String): Unit = {
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    val producer = createProducer()
+    val numRecords = 10000
+    val startingTimestamp = System.currentTimeMillis()
+    val cb = new CountConsumerCommitCallback
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.commitAsync(cb)
+    TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || 
cb.lastError.isDefined,
+      "Failed to observe commit callback before timeout", waitTimeMs = 10000)
+    val committedOffset = consumer.committed(Set(tp).asJava)
+    assertNotNull(committedOffset)
+    // No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
+    // tp. The committed offset should be null. This is intentional.
+    assertNull(committedOffset.get(tp))
+    assertTrue(consumer.assignment.contains(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitSync(groupProtocol: String): Unit = {
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    val producer = createProducer()
+    val numRecords = 10000
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.commitSync()
+    val committedOffset = consumer.committed(Set(tp).asJava)
+    assertNotNull(committedOffset)
+    // No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
+    // tp. The committed offset should be null. This is intentional.
+    assertNull(committedOffset.get(tp))
+    assertTrue(consumer.assignment.contains(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitSyncAllConsumed(groupProtocol: String): Unit = {
+    val numRecords = 10000
+
+    val producer = createProducer()
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp, 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0, startingTimestamp = startingTimestamp)
+
+    consumer.commitSync()
+    val committedOffset = consumer.committed(Set(tp).asJava)
+    assertNotNull(committedOffset)
+    assertNotNull(committedOffset.get(tp))
+    assertEquals(numRecords, committedOffset.get(tp).offset())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndConsume(groupProtocol: String): Unit = {
+    val numRecords = 10
+
+    val producer = createProducer()
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props,
+      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0, startingTimestamp = startingTimestamp)
+
+    assertEquals(numRecords, consumer.position(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndConsumeSkippingPosition(groupProtocol: String): Unit = {
+    val numRecords = 10
+
+    val producer = createProducer()
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props,
+      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+    consumer.assign(List(tp).asJava)
+    val offset = 1
+    consumer.seek(tp, offset)
+    consumeAndVerifyRecords(consumer = consumer, numRecords - offset, 
startingOffset = offset,
+      startingKeyAndValueIndex = offset, startingTimestamp = startingTimestamp 
+ offset)
+
+    assertEquals(numRecords, consumer.position(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // partitionsFor not implemented in 
consumer group protocol

Review Comment:
   See my other comments about functionality that will be supported in the 
future.



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
         s"The current assignment is ${consumer.assignment()}")
   }
 
-  @Test
-  def testConsumingWithNullGroupId(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // Null group ID only supported for 
generic group protocol
+  def testConsumingWithNullGroupId(groupProtocol: String): Unit = {
     val topic = "test_topic"
-    val partition = 0;
+    val partition = 0
     val tp = new TopicPartition(topic, partition)
     createTopic(topic, 1, 1)
 
-    TestUtils.waitUntilTrue(() => {
-      this.zkClient.topicExists(topic)
-    }, "Failed to create topic")
-

Review Comment:
   I agree with @AndrewJSchofield . Method `createTopics()` already verifies 
that the topic was created. 



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1347,8 +1428,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(2, topics.get(topic3).size)
   }
 
-  @Test
-  def testUnsubscribeTopic(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol

Review Comment:
   See my other comments



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1206,8 +1277,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     MockProducerInterceptor.resetCounters()
   }
 
-  @Test
-  def testAutoCommitIntercept(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol

Review Comment:
   See my other comments.



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1489,8 +1580,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertNull(consumer.metrics.get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags2)))
   }
 
-  @Test
-  def testPerPartitionLagMetricsCleanUpWithSubscribe(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol

Review Comment:
   See my other comments



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1327,8 +1406,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       startingTimestamp = startTime, timestampType = 
TimestampType.LOG_APPEND_TIME)
   }
 
-  @Test
-  def testListTopics(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // listTopics not supported for 
consumer group protocol

Review Comment:
   See my other comments.



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1947,4 +2082,215 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     consumer2.close()
   }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitAsync(groupProtocol: String): Unit = {
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    val producer = createProducer()
+    val numRecords = 10000
+    val startingTimestamp = System.currentTimeMillis()
+    val cb = new CountConsumerCommitCallback
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.commitAsync(cb)
+    TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || 
cb.lastError.isDefined,
+      "Failed to observe commit callback before timeout", waitTimeMs = 10000)
+    val committedOffset = consumer.committed(Set(tp).asJava)
+    assertNotNull(committedOffset)
+    // No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
+    // tp. The committed offset should be null. This is intentional.
+    assertNull(committedOffset.get(tp))
+    assertTrue(consumer.assignment.contains(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitSync(groupProtocol: String): Unit = {
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    val producer = createProducer()
+    val numRecords = 10000
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.commitSync()
+    val committedOffset = consumer.committed(Set(tp).asJava)
+    assertNotNull(committedOffset)
+    // No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
+    // tp. The committed offset should be null. This is intentional.

Review Comment:
   See my comment above



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1450,8 +1539,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(500, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
-  @Test
-  def testPerPartitionLeadMetricsCleanUpWithSubscribe(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol

Review Comment:
   See my other comments



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1947,4 +2082,215 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     consumer2.close()
   }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitAsync(groupProtocol: String): Unit = {
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    val producer = createProducer()
+    val numRecords = 10000
+    val startingTimestamp = System.currentTimeMillis()
+    val cb = new CountConsumerCommitCallback
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.commitAsync(cb)
+    TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || 
cb.lastError.isDefined,
+      "Failed to observe commit callback before timeout", waitTimeMs = 10000)
+    val committedOffset = consumer.committed(Set(tp).asJava)
+    assertNotNull(committedOffset)
+    // No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
+    // tp. The committed offset should be null. This is intentional.

Review Comment:
   Could you make this clear in the name of the test instead of using a inline 
comment?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1947,4 +2082,215 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     consumer2.close()
   }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitAsync(groupProtocol: String): Unit = {
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    val producer = createProducer()
+    val numRecords = 10000
+    val startingTimestamp = System.currentTimeMillis()
+    val cb = new CountConsumerCommitCallback
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.commitAsync(cb)
+    TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || 
cb.lastError.isDefined,
+      "Failed to observe commit callback before timeout", waitTimeMs = 10000)
+    val committedOffset = consumer.committed(Set(tp).asJava)
+    assertNotNull(committedOffset)
+    // No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
+    // tp. The committed offset should be null. This is intentional.
+    assertNull(committedOffset.get(tp))
+    assertTrue(consumer.assignment.contains(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitSync(groupProtocol: String): Unit = {
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    val producer = createProducer()
+    val numRecords = 10000
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.commitSync()
+    val committedOffset = consumer.committed(Set(tp).asJava)
+    assertNotNull(committedOffset)
+    // No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
+    // tp. The committed offset should be null. This is intentional.
+    assertNull(committedOffset.get(tp))
+    assertTrue(consumer.assignment.contains(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitSyncAllConsumed(groupProtocol: String): Unit = {
+    val numRecords = 10000
+
+    val producer = createProducer()
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp, 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0, startingTimestamp = startingTimestamp)
+
+    consumer.commitSync()
+    val committedOffset = consumer.committed(Set(tp).asJava)
+    assertNotNull(committedOffset)
+    assertNotNull(committedOffset.get(tp))
+    assertEquals(numRecords, committedOffset.get(tp).offset())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndConsume(groupProtocol: String): Unit = {
+    val numRecords = 10
+
+    val producer = createProducer()
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props,
+      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0, startingTimestamp = startingTimestamp)
+
+    assertEquals(numRecords, consumer.position(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndConsumeSkippingPosition(groupProtocol: String): Unit = {
+    val numRecords = 10
+
+    val producer = createProducer()
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props,
+      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+    consumer.assign(List(tp).asJava)
+    val offset = 1
+    consumer.seek(tp, offset)
+    consumeAndVerifyRecords(consumer = consumer, numRecords - offset, 
startingOffset = offset,
+      startingKeyAndValueIndex = offset, startingTimestamp = startingTimestamp 
+ offset)
+
+    assertEquals(numRecords, consumer.position(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // partitionsFor not implemented in 
consumer group protocol
+  def testAssignAndConsumeWithLeaderChangeValidatingPositions(groupProtocol: 
String): Unit = {
+    val numRecords = 10
+    val producer = createProducer()
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props,
+      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0, startingTimestamp = startingTimestamp)
+
+    // Force leader epoch change to trigger position validation
+    var parts: mutable.Buffer[PartitionInfo] = null
+    while (parts == null)
+      parts = consumer.partitionsFor(tp.topic()).asScala
+    val leader = parts.head.leader().id()
+    this.servers(leader).shutdown()
+    this.servers(leader).startup()
+
+    // Consume after leader change
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+    consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
10,
+      startingTimestamp = startingTimestamp)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndFetchCommittedOffsets(groupProtocol: String): Unit = {
+    val numRecords = 100
+    val startingTimestamp = System.currentTimeMillis()
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    consumer.assign(List(tp).asJava)
+    // First consumer consumes and commits offsets
+    consumer.seek(tp, 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0,
+      startingTimestamp = startingTimestamp)
+    consumer.commitSync()
+    assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset)
+    // We should see the committed offsets from another consumer
+    val anotherConsumer = createConsumer(configOverrides = props)
+    anotherConsumer.assign(List(tp).asJava)
+    assertEquals(numRecords, 
anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndConsumeFromCommittedOffsets(groupProtocol: String): Unit = {
+    val producer = createProducer()
+    val numRecords = 100
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords = numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+    // Commit offset with first consumer
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group1")
+    val consumer = createConsumer(configOverrides = props)
+    consumer.assign(List(tp).asJava)
+    val offset = 10
+    consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(offset)))
+      .asJava)
+    assertEquals(offset, consumer.committed(Set(tp).asJava).get(tp).offset)
+    consumer.close()
+
+    // Consume from committed offsets with another consumer in same group
+    val anotherConsumer = createConsumer(configOverrides = props)
+    assertEquals(offset, 
anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
+    anotherConsumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = anotherConsumer, numRecords - offset,
+      startingOffset = offset, startingKeyAndValueIndex = offset,
+      startingTimestamp = startingTimestamp + offset)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndRetrievingCommittedOffsetsMultipleTimes(groupProtocol: 
String): Unit = {
+    val numRecords = 100
+    val startingTimestamp = System.currentTimeMillis()
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    consumer.assign(List(tp).asJava)
+
+    // Consume and commit offsets
+    consumer.seek(tp, 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0,
+      startingTimestamp = startingTimestamp)
+    consumer.commitSync()
+
+    // Check committed offsets twice with same consumer
+    assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset)
+    assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset)

Review Comment:
   Wouldn't it be better to consumer and commit some records before retrieving 
the committed offsets a second time?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1870,31 +1975,45 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(2, numRecords3, "Expected consumer3 to consume from offset 1")
   }
 
-  @Test
-  def testConsumingWithEmptyGroupId(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("consumer")) // Null group ID not supported 
with consumer group protocol
+  def testNullGroupIdNotSupported(groupProtocol: String): Unit = {

Review Comment:
   This name suggests that a group ID of `null` is not supported in general 
with manual assignment. However, `null` is supported as group ID with manual 
assignment. Just commits are not supported with group ID = `null`. Could you 
please use a more descriptive name for this test like 
`testNullGroupIdNotSupportedIfCommitting`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to