apoorvmittal10 commented on code in PR #16916: URL: https://github.com/apache/kafka/pull/16916#discussion_r1723903760
########## core/src/test/scala/unit/kafka/server/RequestUtilitiesTest.scala: ########## @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.server + +import kafka.network.SocketServer +import kafka.server.{ControllerServer, IntegrationTestUtils, KafkaBroker} +import kafka.test.ClusterInstance +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse} +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.serialization.StringSerializer +import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT +import org.junit.jupiter.api.AfterEach + +import java.util.Properties +import java.util.stream.Collectors +import scala.collection.Seq +import scala.collection.convert.ImplicitConversions.{`collection AsScalaIterable`, `map AsScala`} +import scala.reflect.ClassTag + +class RequestUtilitiesTest(cluster: ClusterInstance) { + + protected var producer: KafkaProducer[String, String] = _ + + protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT + protected def listenerName: ListenerName = ListenerName.forSecurityProtocol(securityProtocol) + + private def brokers(): Seq[KafkaBroker] = cluster.brokers.values().stream().collect(Collectors.toList[KafkaBroker]).toSeq + + private def controllerServers(): Seq[ControllerServer] = cluster.controllers().values().toSeq + + @AfterEach + def tearDown(): Unit = { + if (producer != null) + producer.close() + } + + protected def createOffsetsTopic(): Unit = { + TestUtils.createOffsetsTopicWithAdmin( + admin = cluster.createAdminClient(), + brokers = brokers(), + controllers = controllerServers() + ) + } + + protected def createTopic(topic: String, Review Comment: This is good and potentially we should re-use this topic at other places in the test files. It will be good to create a jira atleast, outside kip-932, for the cleanup. ########## core/src/test/scala/unit/kafka/server/RequestUtilitiesTest.scala: ########## @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.server + +import kafka.network.SocketServer +import kafka.server.{ControllerServer, IntegrationTestUtils, KafkaBroker} +import kafka.test.ClusterInstance +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse} +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.serialization.StringSerializer +import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT +import org.junit.jupiter.api.AfterEach + +import java.util.Properties +import java.util.stream.Collectors +import scala.collection.Seq +import scala.collection.convert.ImplicitConversions.{`collection AsScalaIterable`, `map AsScala`} +import scala.reflect.ClassTag + +class RequestUtilitiesTest(cluster: ClusterInstance) { Review Comment: Is it a utility class or a test class? Seems to be utility class so should it be suffixed with `Test` or just `RequestUtilities` should be the right name? ########## core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala: ########## @@ -25,48 +25,14 @@ import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData} import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse} +import org.apache.kafka.common.requests.{ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse} import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import unit.kafka.server.RequestUtilitiesTest import java.util.Comparator -import java.util.stream.Collectors import scala.jdk.CollectionConverters._ -import scala.reflect.ClassTag -class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { - private def brokers(): Seq[KafkaBroker] = cluster.brokers.values().stream().collect(Collectors.toList[KafkaBroker]).asScala.toSeq - - private def controllerServers(): Seq[ControllerServer] = cluster.controllers().values().asScala.toSeq - - protected def createOffsetsTopic(): Unit = { - TestUtils.createOffsetsTopicWithAdmin( - admin = cluster.createAdminClient(), - brokers = brokers(), - controllers = controllerServers() - ) - } - - protected def createTopic( - topic: String, - numPartitions: Int - ): Unit = { - TestUtils.createTopicWithAdmin( - admin = cluster.createAdminClient(), - brokers = brokers(), - controllers = controllerServers(), - topic = topic, - numPartitions = numPartitions - ) - } - - protected def isUnstableApiEnabled: Boolean = { - cluster.config.serverProperties.get("unstable.api.versions.enable") == "true" - } - - protected def isNewGroupCoordinatorEnabled: Boolean = { - cluster.config.serverProperties.get("group.coordinator.new.enable") == "true" || - cluster.config.serverProperties.get("group.coordinator.rebalance.protocols").contains("consumer") - } +class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) extends RequestUtilitiesTest(cluster) { Review Comment: Why the test classes should extend a utility class, seems not right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org