lianetm commented on code in PR #15612:
URL: https://github.com/apache/kafka/pull/15612#discussion_r1541452355
##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.{StringDeserializer,
StringSerializer}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.test.MockConsumerInterceptor
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import java.time.Duration
+import java.util
+import java.util.Optional
+import java.util.stream.Stream
+import scala.jdk.CollectionConverters._
+
+/**
+ * Integration tests for the consumer that covers the logic related to
committing offsets.
+ */
+@Timeout(600)
+class PlaintextConsumerCommitTest extends AbstractConsumerTest {
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testAutoCommitOnClose(quorum: String, groupProtocol: String): Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true")
+val consumer = createConsumer()
+
+val numRecords = 1
+val producer = createProducer()
+sendRecords(producer, numRecords, tp)
+
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+// should auto-commit sought positions before closing
+consumer.seek(tp, 300)
+consumer.seek(tp2, 500)
+consumer.close()
+
+// now we should see the committed positions from another consumer
+val anotherConsumer = createConsumer()
+assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(500,
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testAutoCommitOnCloseAfterWakeup(quorum: String, groupProtocol: String):
Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true")
+val consumer = createConsumer()
+
+val numRecords = 1
+val producer = createProducer()
+sendRecords(producer, numRecords, tp)
+
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+// should auto-commit sought positions before closing
+consumer.seek(tp, 300)
+consumer.seek(tp2, 500)
+
+// wakeup the consumer before closing to simulate trying to break a poll
+// loop from another thread
+consumer.wakeup()
+consumer.close()
+
+// now we should see the committed positions from another consumer
+val anotherConsumer = createConsumer()
+assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(500,
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testCommitMetadata(quorum: String, groupProtocol: String): Unit = {
+val consumer = createConsumer()
+consumer.assign(List(tp).asJava)
+
+// sync commit
+val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo")
+consumer.commitSync(Map((tp, syncMetadata)).asJava)
+assertEquals(syncMetadata, consumer.committed(Set(tp).asJava).get(tp))
+
+// async commit
+val asyncMetadata = new OffsetAndMetadata(10, "bar")
+sendAndAwaitAsyncCommit(consumer, Some(Map(tp -> asyncMetadata)))
+assertEquals(asyncMetadata, consumer.committed(Set(tp).asJava).get(tp))
+
+// handle null metadata
+val nullMetadata = new OffsetAndMetadata(5, null)
+consumer.commitSync(Map(tp -> nullMetadata).asJava)
+