mimaison commented on code in PR #15377: URL: https://github.com/apache/kafka/pull/15377#discussion_r1534098042
########## core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala: ########## @@ -13,66 +13,94 @@ package kafka.api import java.util -import kafka.security.authorizer.AclAuthorizer +import kafka.security.authorizer.{AclAuthorizer} import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString} import kafka.server.KafkaConfig -import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils} +import kafka.utils.{JaasTestUtils, TestUtils} import kafka.utils.TestUtils._ import org.apache.kafka.clients.admin._ import org.apache.kafka.common.Uuid import org.apache.kafka.common.acl._ -import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE} +import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE, IDEMPOTENT_WRITE} import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} -import org.apache.kafka.common.config.{ConfigResource, TopicConfig} +import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, TopicConfig} import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, TopicAuthorizationException, UnknownTopicOrPartitionException} import org.apache.kafka.common.resource.PatternType.LITERAL import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC} import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} -import org.apache.kafka.server.authorizer.Authorizer +import org.apache.kafka.common.utils.Utils import org.apache.kafka.storage.internals.log.LogConfig import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout} -import java.util.Collections import scala.jdk.CollectionConverters._ import scala.collection.Seq import scala.compat.java8.OptionConverters._ import scala.concurrent.ExecutionException import scala.util.{Failure, Success, Try} +@Timeout(120) class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup { val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL) - val authorizationAdmin = new AclAuthorizationAdmin(classOf[AclAuthorizer], classOf[AclAuthorizer]) + val aclAuthorizerClassName = classOf[AclAuthorizer].getName Review Comment: Let's add the type for a public field: ``` val aclAuthorizerClassName: String = classOf[AclAuthorizer].getName ``` ########## core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala: ########## @@ -230,6 +230,13 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg config } + def createAdminClient: Admin = { Review Comment: Could we update PlaintextAdminIntegrationTest to use this method as well? ########## core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala: ########## @@ -13,66 +13,94 @@ package kafka.api import java.util -import kafka.security.authorizer.AclAuthorizer +import kafka.security.authorizer.{AclAuthorizer} Review Comment: We don't need the brackets around `AclAuthorizer` ########## core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala: ########## @@ -259,4 +275,22 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}") metrics.map(_.asInstanceOf[Gauge[Int]].value).sum } + + override def createAdminClient: Admin = { Review Comment: Could we get rid of this override? We don't have one in the SaslSsl test. ########## core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala: ########## @@ -259,4 +275,22 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}") metrics.map(_.asInstanceOf[Gauge[Int]].value).sum } + + override def createAdminClient: Admin = { + println("should be passing here") Review Comment: This can be deleted now ########## core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala: ########## @@ -13,66 +13,94 @@ package kafka.api import java.util -import kafka.security.authorizer.AclAuthorizer +import kafka.security.authorizer.{AclAuthorizer} import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString} import kafka.server.KafkaConfig -import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils} +import kafka.utils.{JaasTestUtils, TestUtils} import kafka.utils.TestUtils._ import org.apache.kafka.clients.admin._ import org.apache.kafka.common.Uuid import org.apache.kafka.common.acl._ -import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE} +import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE, IDEMPOTENT_WRITE} import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} -import org.apache.kafka.common.config.{ConfigResource, TopicConfig} +import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, TopicConfig} import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, TopicAuthorizationException, UnknownTopicOrPartitionException} import org.apache.kafka.common.resource.PatternType.LITERAL import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC} import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} -import org.apache.kafka.server.authorizer.Authorizer +import org.apache.kafka.common.utils.Utils import org.apache.kafka.storage.internals.log.LogConfig import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout} -import java.util.Collections import scala.jdk.CollectionConverters._ import scala.collection.Seq import scala.compat.java8.OptionConverters._ import scala.concurrent.ExecutionException import scala.util.{Failure, Success, Try} +@Timeout(120) class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup { val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL) - val authorizationAdmin = new AclAuthorizationAdmin(classOf[AclAuthorizer], classOf[AclAuthorizer]) + val aclAuthorizerClassName = classOf[AclAuthorizer].getName + def kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaServerPrincipalUnqualifiedName) - this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") + var superUserAdmin: Admin = _ override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) - override def generateConfigs: Seq[KafkaConfig] = { - this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizationAdmin.authorizerClassName) - super.generateConfigs - } - - override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = { - authorizationAdmin.initializeAcls() - } - @BeforeEach override def setUp(testInfo: TestInfo): Unit = { + this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, aclAuthorizerClassName) + this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") + this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString) + setUpSasl() super.setUp(testInfo) + setInitialAcls() } def setUpSasl(): Unit = { startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName)) + + val loginContext = jaasAdminLoginModule("GSSAPI") + superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, loginContext) + } + + private def setInitialAcls(): Unit = { + superUserAdmin = createSuperuserAdminClient() + val ace = new AccessControlEntry(WildcardPrincipalString, WildcardHost, ALL, ALLOW) + superUserAdmin.createAcls(List(new AclBinding(new ResourcePattern(TOPIC, "*", LITERAL), ace)).asJava) + superUserAdmin.createAcls(List(new AclBinding(new ResourcePattern(GROUP, "*", LITERAL), ace)).asJava) + + val clusterAcls = List(clusterAcl(ALLOW, CREATE), + clusterAcl(ALLOW, DELETE), + clusterAcl(ALLOW, CLUSTER_ACTION), + clusterAcl(ALLOW, ALTER_CONFIGS), + clusterAcl(ALLOW, ALTER), + clusterAcl(ALLOW, IDEMPOTENT_WRITE)) + + superUserAdmin.createAcls(clusterAcls.map(ace => new AclBinding(clusterResourcePattern, ace)).asJava) + + brokers.foreach { b => + TestUtils.waitAndVerifyAcls(Set(ace), b.dataPlaneRequestProcessor.authorizer.get, new ResourcePattern(TOPIC, "*", LITERAL)) + TestUtils.waitAndVerifyAcls(Set(ace), b.dataPlaneRequestProcessor.authorizer.get, new ResourcePattern(GROUP, "*", LITERAL)) + TestUtils.waitAndVerifyAcls(clusterAcls.toSet, b.dataPlaneRequestProcessor.authorizer.get, clusterResourcePattern) + } + } + + private def clusterAcl(permissionType: AclPermissionType, operation: AclOperation): AccessControlEntry = { + new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*").toString, + WildcardHost, operation, permissionType) } @AfterEach override def tearDown(): Unit = { super.tearDown() + if (superUserAdmin != null) Utils.closeQuietly(superUserAdmin, "superUserAdminClient") Review Comment: `createSuperuserAdminClient()` adds the client to `IntegrationTestHarness.adminClients` so I think it should already get closed automatically. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org