kirktrue commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1393305110
##########
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##########
@@ -34,13 +35,15 @@ import scala.collection.Seq
*/
abstract class BaseConsumerTest extends AbstractConsumerTest {
- @Test
- def testSimpleConsumption(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("generic", "consumer"))
Review Comment:
Is it possible to have a different type of 'source' that can be defined once
vs. on each test? More of a nit, but curious.
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
s"The current assignment is ${consumer.assignment()}")
}
- @Test
- def testConsumingWithNullGroupId(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("generic")) // Null group ID only supported for
generic group protocol
+ def testConsumingWithNullGroupId(groupProtocol: String): Unit = {
val topic = "test_topic"
- val partition = 0;
+ val partition = 0
val tp = new TopicPartition(topic, partition)
createTopic(topic, 1, 1)
- TestUtils.waitUntilTrue(() => {
- this.zkClient.topicExists(topic)
- }, "Failed to create topic")
-
Review Comment:
Why don't we need this now?
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1657,35 +1759,37 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(numMessages - records.count,
lag.metricValue.asInstanceOf[Double], epsilon, s"The lag should be
${numMessages - records.count}")
}
- @Test
- def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("generic", "consumer"))
+ def testQuotaMetricsNotCreatedIfNoQuotasConfigured(groupProtocol: String):
Unit = {
val numRecords = 1000
val producer = createProducer()
val startingTimestamp = System.currentTimeMillis()
sendRecords(producer, numRecords, tp, startingTimestamp =
startingTimestamp)
+ this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol)
val consumer = createConsumer()
consumer.assign(List(tp).asJava)
consumer.seek(tp, 0)
consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords,
startingOffset = 0, startingTimestamp = startingTimestamp)
- def assertNoMetric(broker: KafkaServer, name: String, quotaType:
QuotaType, clientId: String): Unit = {
- val metricName = broker.metrics.metricName("throttle-time",
- quotaType.toString,
- "",
- "user", "",
- "client-id", clientId)
- assertNull(broker.metrics.metric(metricName), "Metric should not have
been created " + metricName)
+ def assertNoMetric(broker: KafkaBroker, name: String, quotaType:
QuotaType, clientId: String): Unit = {
+ val metricName = broker.metrics.metricName("throttle-time",
+ quotaType.toString,
+ "",
+ "user", "",
+ "client-id", clientId)
+ assertNull(broker.metrics.metric(metricName), "Metric should not have
been created " + metricName)
}
- servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce,
producerClientId))
- servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce,
producerClientId))
- servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch,
consumerClientId))
- servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch,
consumerClientId))
+ brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce,
producerClientId))
+ brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce,
producerClientId))
+ brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch,
consumerClientId))
+ brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch,
consumerClientId))
- servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request,
producerClientId))
- servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request,
producerClientId))
- servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request,
consumerClientId))
- servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request,
consumerClientId))
+ brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request,
producerClientId))
+ brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request,
producerClientId))
+ brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request,
consumerClientId))
+ brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request,
consumerClientId))
Review Comment:
So this is incorrect in `trunk`, right?
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
s"The current assignment is ${consumer.assignment()}")
}
- @Test
- def testConsumingWithNullGroupId(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("generic")) // Null group ID only supported for
generic group protocol
+ def testConsumingWithNullGroupId(groupProtocol: String): Unit = {
val topic = "test_topic"
- val partition = 0;
+ val partition = 0
Review Comment:
This ain't Java, come on!
##########
core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala:
##########
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import kafka.utils.TestUtils.waitUntilTrue
-import org.junit.jupiter.api.Assertions.{assertNotNull, assertNull, assertTrue}
-import org.junit.jupiter.api.Test
-
-import java.time.Duration
-import scala.jdk.CollectionConverters._
-
-class BaseAsyncConsumerTest extends AbstractConsumerTest {
- val defaultBlockingAPITimeoutMs = 1000
-
- @Test
- def testCommitAsync(): Unit = {
- val consumer = createAsyncConsumer()
- val producer = createProducer()
- val numRecords = 10000
- val startingTimestamp = System.currentTimeMillis()
- val cb = new CountConsumerCommitCallback
- sendRecords(producer, numRecords, tp, startingTimestamp =
startingTimestamp)
- consumer.assign(List(tp).asJava)
- consumer.commitAsync(cb)
- waitUntilTrue(() => {
- cb.successCount == 1
- }, "wait until commit is completed successfully",
defaultBlockingAPITimeoutMs)
- val committedOffset = consumer.committed(Set(tp).asJava,
Duration.ofMillis(defaultBlockingAPITimeoutMs))
- assertNotNull(committedOffset)
- // No valid fetch position due to the absence of consumer.poll; and
therefore no offset was committed to
- // tp. The committed offset should be null. This is intentional.
- assertNull(committedOffset.get(tp))
- assertTrue(consumer.assignment.contains(tp))
- }
-
- @Test
- def testCommitSync(): Unit = {
- val consumer = createAsyncConsumer()
- val producer = createProducer()
- val numRecords = 10000
- val startingTimestamp = System.currentTimeMillis()
- sendRecords(producer, numRecords, tp, startingTimestamp =
startingTimestamp)
- consumer.assign(List(tp).asJava)
- consumer.commitSync()
- val committedOffset = consumer.committed(Set(tp).asJava,
Duration.ofMillis(defaultBlockingAPITimeoutMs))
- assertNotNull(committedOffset)
- // No valid fetch position due to the absence of consumer.poll; and
therefore no offset was committed to
- // tp. The committed offset should be null. This is intentional.
- assertNull(committedOffset.get(tp))
- assertTrue(consumer.assignment.contains(tp))
- }
-}
Review Comment:
Good riddance!
##########
core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala:
##########
@@ -66,6 +66,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest {
override protected def brokerPropertyOverrides(properties: Properties): Unit
= {
+ properties.setProperty(KafkaConfig.NewGroupCoordinatorEnableProp, "true")
// enable KIP-848 group coordinator
Review Comment:
At some point we need to test that if the broker's configuration has this
set to `false` that the client won't up and explode. That's a future task...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]