This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d654bc1b15 MINOR: Support KRaft in GroupAuthorizerIntegrationTest
(#12336)
d654bc1b15 is described below
commit d654bc1b15740acf8f1647a0f4533f4cd7f71271
Author: Jason Gustafson
AuthorDate: Mon Jun 27 16:01:15 2022 -0700
MINOR: Support KRaft in GroupAuthorizerIntegrationTest (#12336)
Support KRaft in `GroupAuthorizerIntegrationTest`.
Reviewers: David Arthur
---
.../kafka/api/AuthorizerIntegrationTest.scala | 39 ++
.../kafka/api/GroupAuthorizerIntegrationTest.scala | 60 +++---
.../kafka/server/QuorumTestHarness.scala | 19 ---
.../kafka/integration/KafkaServerTestHarness.scala | 10
4 files changed, 78 insertions(+), 50 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index fdc18c..a109ae8ce4 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -85,9 +85,8 @@ object AuthorizerIntegrationTest {
class PrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
context.listenerName match {
-case BrokerListenerName => BrokerPrincipal
+case BrokerListenerName | ControllerListenerName => BrokerPrincipal
case ClientListenerName => ClientPrincipal
-case ControllerListenerName => BrokerPrincipal
case listenerName => throw new IllegalArgumentException(s"No principal
mapped to listener $listenerName")
}
}
@@ -152,32 +151,32 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
override def brokerPropertyOverrides(properties: Properties): Unit = {
+properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
+addNodeProperties(properties)
+ }
+
+ override def kraftControllerConfigs(): collection.Seq[Properties] = {
+val controllerConfigs = super.kraftControllerConfigs()
+controllerConfigs.foreach(addNodeProperties)
+controllerConfigs
+ }
+
+ private def addNodeProperties(properties: Properties): Unit = {
if (isKRaftTest()) {
properties.put(KafkaConfig.AuthorizerClassNameProp,
classOf[StandardAuthorizer].getName)
- properties.put(StandardAuthorizer.SUPER_USERS_CONFIG,
BrokerPrincipal.toString())
+ properties.put(StandardAuthorizer.SUPER_USERS_CONFIG,
BrokerPrincipal.toString)
} else {
properties.put(KafkaConfig.AuthorizerClassNameProp,
classOf[AclAuthorizer].getName)
}
-properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
+
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
-properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
- classOf[PrincipalBuilder].getName)
+properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
classOf[PrincipalBuilder].getName)
}
- override def kraftControllerConfigs(): Seq[Properties] = {
-val controllerConfigs = Seq(new Properties())
-controllerConfigs.foreach { properties =>
- properties.put(KafkaConfig.AuthorizerClassNameProp,
classOf[StandardAuthorizer].getName())
- properties.put(StandardAuthorizer.SUPER_USERS_CONFIG,
BrokerPrincipal.toString())
- properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
-classOf[PrincipalBuilder].getName)
-}
-controllerConfigs
- }
val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) =>
Map[ApiKeys, Nothing => Errors](
ApiKeys.METADATA -> ((resp: requests.MetadataResponse) =>
resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
@@ -2574,14 +2573,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
}
- private def addAndVerifyAcls(acls: Set[AccessControlEntry], resource:
ResourcePattern): Unit = {
-TestUtils.addAndVerifyAcls(brokers, acls, resource, controllerServers)
- }
-
- private def removeAndVerifyAcls(acls: Set[AccessControlEntry], resource:
ResourcePattern): Unit = {
-TestUtils.removeAndVerifyAcls(brokers, acls, resource, controllerServers)
- }
-
private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
numRecords: Int = 1,