mimaison commented on code in PR #15377:
URL: https://github.com/apache/kafka/pull/15377#discussion_r1534098042


##########
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##########
@@ -13,66 +13,94 @@
 package kafka.api
 
 import java.util
-import kafka.security.authorizer.AclAuthorizer
+import kafka.security.authorizer.{AclAuthorizer}
 import kafka.security.authorizer.AclEntry.{WildcardHost, 
WildcardPrincipalString}
 import kafka.server.KafkaConfig
-import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
+import kafka.utils.{JaasTestUtils, TestUtils}
 import kafka.utils.TestUtils._
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.acl._
-import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, 
CLUSTER_ACTION, CREATE, DELETE, DESCRIBE}
+import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, 
CLUSTER_ACTION, CREATE, DELETE, DESCRIBE, IDEMPOTENT_WRITE}
 import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
-import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, 
TopicConfig}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
InvalidRequestException, TopicAuthorizationException, 
UnknownTopicOrPartitionException}
 import org.apache.kafka.common.resource.PatternType.LITERAL
 import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
 import org.apache.kafka.common.resource.{PatternType, Resource, 
ResourcePattern, ResourcePatternFilter, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.storage.internals.log.LogConfig
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
 
-import java.util.Collections
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
 import scala.concurrent.ExecutionException
 import scala.util.{Failure, Success, Try}
 
+@Timeout(120)
 class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with 
SaslSetup {
   val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
 
-  val authorizationAdmin = new AclAuthorizationAdmin(classOf[AclAuthorizer], 
classOf[AclAuthorizer])
+  val aclAuthorizerClassName = classOf[AclAuthorizer].getName

Review Comment:
   Let's add the type for a public field:
   ```
   val aclAuthorizerClassName: String = classOf[AclAuthorizer].getName
   ```



##########
core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala:
##########
@@ -230,6 +230,13 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
     config
   }
 
+  def createAdminClient: Admin = {

Review Comment:
   Could we update PlaintextAdminIntegrationTest to use this method as well?



##########
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##########
@@ -13,66 +13,94 @@
 package kafka.api
 
 import java.util
-import kafka.security.authorizer.AclAuthorizer
+import kafka.security.authorizer.{AclAuthorizer}

Review Comment:
   We don't need the brackets around `AclAuthorizer`



##########
core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala:
##########
@@ -259,4 +275,22 @@ class SslAdminIntegrationTest extends 
SaslSslAdminIntegrationTest {
     assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: 
${allMetrics.keySet.map(_.getMBeanName)}")
     metrics.map(_.asInstanceOf[Gauge[Int]].value).sum
   }
+
+  override def createAdminClient: Admin = {

Review Comment:
   Could we get rid of this override? We don't have one in the SaslSsl test.



##########
core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala:
##########
@@ -259,4 +275,22 @@ class SslAdminIntegrationTest extends 
SaslSslAdminIntegrationTest {
     assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: 
${allMetrics.keySet.map(_.getMBeanName)}")
     metrics.map(_.asInstanceOf[Gauge[Int]].value).sum
   }
+
+  override def createAdminClient: Admin = {
+    println("should be passing here")

Review Comment:
   This can be deleted now



##########
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##########
@@ -13,66 +13,94 @@
 package kafka.api
 
 import java.util
-import kafka.security.authorizer.AclAuthorizer
+import kafka.security.authorizer.{AclAuthorizer}
 import kafka.security.authorizer.AclEntry.{WildcardHost, 
WildcardPrincipalString}
 import kafka.server.KafkaConfig
-import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
+import kafka.utils.{JaasTestUtils, TestUtils}
 import kafka.utils.TestUtils._
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.acl._
-import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, 
CLUSTER_ACTION, CREATE, DELETE, DESCRIBE}
+import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, 
CLUSTER_ACTION, CREATE, DELETE, DESCRIBE, IDEMPOTENT_WRITE}
 import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
-import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, 
TopicConfig}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
InvalidRequestException, TopicAuthorizationException, 
UnknownTopicOrPartitionException}
 import org.apache.kafka.common.resource.PatternType.LITERAL
 import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
 import org.apache.kafka.common.resource.{PatternType, Resource, 
ResourcePattern, ResourcePatternFilter, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.storage.internals.log.LogConfig
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
 
-import java.util.Collections
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
 import scala.concurrent.ExecutionException
 import scala.util.{Failure, Success, Try}
 
+@Timeout(120)
 class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with 
SaslSetup {
   val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
 
-  val authorizationAdmin = new AclAuthorizationAdmin(classOf[AclAuthorizer], 
classOf[AclAuthorizer])
+  val aclAuthorizerClassName = classOf[AclAuthorizer].getName
+  def kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
JaasTestUtils.KafkaServerPrincipalUnqualifiedName)
 
-  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+  var superUserAdmin: Admin = _
 
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = 
Some(TestUtils.tempFile("truststore", ".jks"))
 
-  override def generateConfigs: Seq[KafkaConfig] = {
-    this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, 
authorizationAdmin.authorizerClassName)
-    super.generateConfigs
-  }
-
-  override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = 
{
-    authorizationAdmin.initializeAcls()
-  }
-
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
+    this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, 
aclAuthorizerClassName)
+    this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+    this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, 
kafkaPrincipal.toString)
+
     setUpSasl()
     super.setUp(testInfo)
+    setInitialAcls()
   }
 
   def setUpSasl(): Unit = {
     startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, 
JaasTestUtils.KafkaServerContextName))
+
+    val loginContext = jaasAdminLoginModule("GSSAPI")
+    superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, loginContext)
+  }
+
+  private def setInitialAcls(): Unit = {
+    superUserAdmin = createSuperuserAdminClient()
+    val ace = new AccessControlEntry(WildcardPrincipalString, WildcardHost, 
ALL, ALLOW)
+    superUserAdmin.createAcls(List(new AclBinding(new ResourcePattern(TOPIC, 
"*", LITERAL), ace)).asJava)
+    superUserAdmin.createAcls(List(new AclBinding(new ResourcePattern(GROUP, 
"*", LITERAL), ace)).asJava)
+
+    val clusterAcls = List(clusterAcl(ALLOW, CREATE),
+      clusterAcl(ALLOW, DELETE),
+      clusterAcl(ALLOW, CLUSTER_ACTION),
+      clusterAcl(ALLOW, ALTER_CONFIGS),
+      clusterAcl(ALLOW, ALTER),
+      clusterAcl(ALLOW, IDEMPOTENT_WRITE))
+
+    superUserAdmin.createAcls(clusterAcls.map(ace => new 
AclBinding(clusterResourcePattern, ace)).asJava)
+
+    brokers.foreach { b =>
+      TestUtils.waitAndVerifyAcls(Set(ace), 
b.dataPlaneRequestProcessor.authorizer.get, new ResourcePattern(TOPIC, "*", 
LITERAL))
+      TestUtils.waitAndVerifyAcls(Set(ace), 
b.dataPlaneRequestProcessor.authorizer.get, new ResourcePattern(GROUP, "*", 
LITERAL))
+      TestUtils.waitAndVerifyAcls(clusterAcls.toSet, 
b.dataPlaneRequestProcessor.authorizer.get, clusterResourcePattern)
+    }
+  }
+
+  private def clusterAcl(permissionType: AclPermissionType, operation: 
AclOperation): AccessControlEntry = {
+    new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"*").toString,
+      WildcardHost, operation, permissionType)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
     super.tearDown()
+    if (superUserAdmin != null)  Utils.closeQuietly(superUserAdmin, 
"superUserAdminClient")

Review Comment:
   `createSuperuserAdminClient()` adds the client to 
`IntegrationTestHarness.adminClients` so I think it should already get closed 
automatically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to