[ https://issues.apache.org/jira/browse/KAFKA-7117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16607821#comment-16607821 ]
ASF GitHub Bot commented on KAFKA-7117: --------------------------------------- junrao closed pull request #5463: KAFKA-7117: Support AdminClient API in AclCommand (KIP-332) URL: https://github.com/apache/kafka/pull/5463 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 31e6c53dc11..c2dda33d5ab 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -17,17 +17,22 @@ package kafka.admin +import java.util.Properties + import joptsimple._ import joptsimple.util.EnumConverter import kafka.security.auth._ import kafka.server.KafkaConfig import kafka.utils._ +import org.apache.kafka.clients.admin.{AdminClientConfig, AdminClient => JAdminClient} +import org.apache.kafka.common.acl._ import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.common.resource.{PatternType, ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType} +import org.apache.kafka.common.utils.{SecurityUtils, Utils} +import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType} import scala.collection.JavaConverters._ +import scala.collection.mutable object AclCommand extends Logging { @@ -52,13 +57,21 @@ object AclCommand extends Logging { opts.checkArgs() + val aclCommandService = { + if (opts.options.has(opts.bootstrapServerOpt)) { + new AdminClientService(opts) + } else { + new AuthorizerService(opts) + } + } + try { if (opts.options.has(opts.addOpt)) - addAcl(opts) + aclCommandService.addAcls() else if (opts.options.has(opts.removeOpt)) - removeAcl(opts) + aclCommandService.removeAcls() else if (opts.options.has(opts.listOpt)) - listAcl(opts) + aclCommandService.listAcls() } catch { case e: Throwable => println(s"Error while executing ACL command: ${e.getMessage}") @@ -67,91 +80,202 @@ object AclCommand extends Logging { } } - def withAuthorizer(opts: AclCommandOptions)(f: Authorizer => Unit) { - val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSecurityEnabled) - val authorizerProperties = - if (opts.options.has(opts.authorizerPropertiesOpt)) { - val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala - defaultProps ++ CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue = false).asScala - } else { - defaultProps + sealed trait AclCommandService { + def addAcls(): Unit + def removeAcls(): Unit + def listAcls(): Unit + } + + class AdminClientService(val opts: AclCommandOptions) extends AclCommandService with Logging { + + private def withAdminClient(opts: AclCommandOptions)(f: JAdminClient => Unit) { + val props = if (opts.options.has(opts.commandConfigOpt)) + Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) + else + new Properties() + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)) + val adminClient = JAdminClient.create(props) + + try { + f(adminClient) + } finally { + adminClient.close() } + } - val authorizerClass = opts.options.valueOf(opts.authorizerOpt) - val authZ = CoreUtils.createObject[Authorizer](authorizerClass) - try { - authZ.configure(authorizerProperties.asJava) - f(authZ) + def addAcls(): Unit = { + val resourceToAcl = getResourceToAcls(opts) + withAdminClient(opts) { adminClient => + for ((resource, acls) <- resourceToAcl) { + val resourcePattern = resource.toPattern + println(s"Adding ACLs for resource `$resourcePattern`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + val aclBindings = acls.map(acl => new AclBinding(resourcePattern, getAccessControlEntry(acl))).asJavaCollection + adminClient.createAcls(aclBindings).all().get() + } + + listAcls() + } } - finally CoreUtils.swallow(authZ.close(), this) - } - private def addAcl(opts: AclCommandOptions) { - val patternType: PatternType = opts.options.valueOf(opts.resourcePatternType) - if (!patternType.isSpecific) - CommandLineUtils.printUsageAndDie(opts.parser, s"A '--resource-pattern-type' value of '$patternType' is not valid when adding acls.") + def removeAcls(): Unit = { + withAdminClient(opts) { adminClient => + val filterToAcl = getResourceFilterToAcls(opts) + + for ((filter, acls) <- filterToAcl) { + if (acls.isEmpty) { + if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource filter `$filter`? (y/n)")) + removeAcls(adminClient, acls, filter) + } else { + if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter `$filter`? (y/n)")) + removeAcls(adminClient, acls, filter) + } + } + + listAcls() + } + } - withAuthorizer(opts) { authorizer => - val resourceToAcl = getResourceFilterToAcls(opts).map { - case (filter, acls) => - Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), filter.patternType()) -> acls + def listAcls(): Unit = { + withAdminClient(opts) { adminClient => + val filters = getResourceFilter(opts, dieIfNoResourceFound = false) + val resourceToAcls = getAcls(adminClient, filters) + + for ((resource, acls) <- resourceToAcls) + println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") } + } - if (resourceToAcl.values.exists(_.isEmpty)) - CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.") + private def getAccessControlEntry(acl: Acl): AccessControlEntry = { + new AccessControlEntry(acl.principal.toString, acl.host, acl.operation.toJava, acl.permissionType.toJava) + } - for ((resource, acls) <- resourceToAcl) { - println(s"Adding ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") - authorizer.addAcls(acls, resource) + private def removeAcls(adminClient: JAdminClient, acls: Set[Acl], filter: ResourcePatternFilter): Unit = { + if (acls.isEmpty) + adminClient.deleteAcls(List(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).asJava).all().get() + else { + val aclBindingFilters = acls.map(acl => new AclBindingFilter(filter, getAccessControlEntryFilter(acl))).toList.asJava + adminClient.deleteAcls(aclBindingFilters).all().get() } + } + + private def getAccessControlEntryFilter(acl: Acl): AccessControlEntryFilter = { + new AccessControlEntryFilter(acl.principal.toString, acl.host, acl.operation.toJava, acl.permissionType.toJava) + } - listAcl(opts) + private def getAcls(adminClient: JAdminClient, filters: Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = { + val aclBindings = + if (filters.isEmpty) adminClient.describeAcls(AclBindingFilter.ANY).values().get().asScala.toList + else { + val results = for (filter <- filters) yield { + adminClient.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get().asScala.toList + } + results.reduceLeft(_ ++ _) + } + + val resourceToAcls = mutable.Map[ResourcePattern, Set[AccessControlEntry]]().withDefaultValue(Set()) + + aclBindings.foreach(aclBinding => resourceToAcls(aclBinding.pattern()) = resourceToAcls(aclBinding.pattern()) + aclBinding.entry()) + resourceToAcls.toMap } } - private def removeAcl(opts: AclCommandOptions) { - withAuthorizer(opts) { authorizer => - val filterToAcl = getResourceFilterToAcls(opts) + class AuthorizerService(val opts: AclCommandOptions) extends AclCommandService with Logging { - for ((filter, acls) <- filterToAcl) { - if (acls.isEmpty) { - if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource filter `$filter`? (y/n)")) - removeAcls(authorizer, acls, filter) + private def withAuthorizer()(f: Authorizer => Unit) { + val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSecurityEnabled) + val authorizerProperties = + if (opts.options.has(opts.authorizerPropertiesOpt)) { + val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala + defaultProps ++ CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue = false).asScala } else { - if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter `$filter`? (y/n)")) - removeAcls(authorizer, acls, filter) + defaultProps } + + val authorizerClass = if (opts.options.has(opts.authorizerOpt)) + opts.options.valueOf(opts.authorizerOpt) + else + classOf[SimpleAclAuthorizer].getName + + val authZ = CoreUtils.createObject[Authorizer](authorizerClass) + try { + authZ.configure(authorizerProperties.asJava) + f(authZ) } + finally CoreUtils.swallow(authZ.close(), this) + } + + def addAcls(): Unit = { + val resourceToAcl = getResourceToAcls(opts) + withAuthorizer() { authorizer => + for ((resource, acls) <- resourceToAcl) { + println(s"Adding ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + authorizer.addAcls(acls, resource) + } - listAcl(opts) + listAcls() + } } - } - private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter: ResourcePatternFilter) { - getAcls(authorizer, filter) - .keys - .foreach(resource => - if (acls.isEmpty) authorizer.removeAcls(resource) - else authorizer.removeAcls(acls, resource) - ) - } + def removeAcls(): Unit = { + withAuthorizer() { authorizer => + val filterToAcl = getResourceFilterToAcls(opts) + + for ((filter, acls) <- filterToAcl) { + if (acls.isEmpty) { + if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource filter `$filter`? (y/n)")) + removeAcls(authorizer, acls, filter) + } else { + if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter `$filter`? (y/n)")) + removeAcls(authorizer, acls, filter) + } + } - private def listAcl(opts: AclCommandOptions) { - withAuthorizer(opts) { authorizer => - val filters = getResourceFilter(opts, dieIfNoResourceFound = false) + listAcls() + } + } + + def listAcls(): Unit = { + withAuthorizer() { authorizer => + val filters = getResourceFilter(opts, dieIfNoResourceFound = false) + + val resourceToAcls: Iterable[(Resource, Set[Acl])] = + if (filters.isEmpty) authorizer.getAcls() + else filters.flatMap(filter => getAcls(authorizer, filter)) - val resourceToAcls: Iterable[(Resource, Set[Acl])] = - if (filters.isEmpty) authorizer.getAcls() - else filters.flatMap(filter => getAcls(authorizer, filter)) + for ((resource, acls) <- resourceToAcls) + println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + } + } - for ((resource, acls) <- resourceToAcls) - println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter: ResourcePatternFilter) { + getAcls(authorizer, filter) + .keys + .foreach(resource => + if (acls.isEmpty) authorizer.removeAcls(resource) + else authorizer.removeAcls(acls, resource) + ) } + + private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter): Map[Resource, Set[Acl]] = + authorizer.getAcls() + .filter { case (resource, acl) => filter.matches(resource.toPattern) } } - private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter): Map[Resource, Set[Acl]] = - authorizer.getAcls() - .filter { case (resource, acl) => filter.matches(resource.toPattern) } + private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { + val patternType: PatternType = opts.options.valueOf(opts.resourcePatternType) + if (!patternType.isSpecific) + CommandLineUtils.printUsageAndDie(opts.parser, s"A '--resource-pattern-type' value of '$patternType' is not valid when adding acls.") + + val resourceToAcl = getResourceFilterToAcls(opts).map { + case (filter, acls) => + Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), filter.patternType()) -> acls + } + + if (resourceToAcl.values.exists(_.isEmpty)) + CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.") + + resourceToAcl + } private def getResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[Acl]] = { var resourceToAcls = Map.empty[ResourcePatternFilter, Set[Acl]] @@ -257,7 +381,7 @@ object AclCommand extends Logging { private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Set[KafkaPrincipal] = { if (opts.options.has(principalOptionSpec)) - opts.options.valuesOf(principalOptionSpec).asScala.map(s => KafkaPrincipal.fromString(s.trim)).toSet + opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet else Set.empty[KafkaPrincipal] } @@ -305,11 +429,23 @@ object AclCommand extends Logging { class AclCommandOptions(args: Array[String]) { val parser = new OptionParser(false) + val CommandConfigDoc = "A property file containing configs to be passed to Admin Client." + + val bootstrapServerOpt = parser.accepts("bootstrap-server", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." + + " This list should be in the form host1:port1,host2:port2,... This config is required for acl management using admin client API.") + .withRequiredArg + .describedAs("server to connect to") + .ofType(classOf[String]) + + val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc) + .withOptionalArg() + .describedAs("command-config") + .ofType(classOf[String]) + val authorizerOpt = parser.accepts("authorizer", "Fully qualified class name of the authorizer, defaults to kafka.security.auth.SimpleAclAuthorizer.") .withRequiredArg .describedAs("authorizer") .ofType(classOf[String]) - .defaultsTo(classOf[SimpleAclAuthorizer].getName) val authorizerPropertiesOpt = parser.accepts("authorizer-properties", "REQUIRED: properties required to configure an instance of Authorizer. " + "These are key=val pairs. For the default authorizer the example values are: zookeeper.connect=localhost:2181") @@ -410,7 +546,17 @@ object AclCommand extends Logging { val options = parser.parse(args: _*) def checkArgs() { - CommandLineUtils.checkRequiredArgs(parser, options, authorizerPropertiesOpt) + if (options.has(bootstrapServerOpt) && options.has(authorizerOpt)) + CommandLineUtils.printUsageAndDie(parser, "Only one of --bootstrap-server or --authorizer must be specified") + + if (!options.has(bootstrapServerOpt)) + CommandLineUtils.checkRequiredArgs(parser, options, authorizerPropertiesOpt) + + if (options.has(commandConfigOpt) && !options.has(bootstrapServerOpt)) + CommandLineUtils.printUsageAndDie(parser, "The --command-config option can only be used with --bootstrap-server option") + + if (options.has(authorizerPropertiesOpt) && options.has(bootstrapServerOpt)) + CommandLineUtils.printUsageAndDie(parser, "The --authorizer-properties option can only be used with --authorizer option") val actions = Seq(addOpt, removeOpt, listOpt).count(options.has) if (actions != 1) diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala b/core/src/main/scala/kafka/security/SecurityUtils.scala index 5d42871f66f..311e195795d 100644 --- a/core/src/main/scala/kafka/security/SecurityUtils.scala +++ b/core/src/main/scala/kafka/security/SecurityUtils.scala @@ -22,8 +22,7 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFi import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.ApiError import org.apache.kafka.common.resource.ResourcePattern -import org.apache.kafka.common.security.auth.KafkaPrincipal - +import org.apache.kafka.common.utils.SecurityUtils._ import scala.util.{Failure, Success, Try} @@ -32,7 +31,7 @@ object SecurityUtils { def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)] = { (for { resourceType <- Try(ResourceType.fromJava(filter.patternFilter.resourceType)) - principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal)) + principal <- Try(parseKafkaPrincipal(filter.entryFilter.principal)) operation <- Try(Operation.fromJava(filter.entryFilter.operation)) permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType)) resource = Resource(resourceType, filter.patternFilter.name, filter.patternFilter.patternType) diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala index 05f61897e6a..d5535a50ab1 100644 --- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala @@ -20,20 +20,24 @@ import java.util.Properties import kafka.admin.AclCommand.AclCommandOptions import kafka.security.auth._ -import kafka.server.KafkaConfig +import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{Exit, Logging, TestUtils} import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.resource.PatternType +import org.apache.kafka.common.network.ListenerName + import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED} -import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.junit.{Before, Test} +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.common.utils.SecurityUtils +import org.junit.{After, Before, Test} class AclCommandTest extends ZooKeeperTestHarness with Logging { - private val principal: KafkaPrincipal = KafkaPrincipal.fromString("User:test2") - private val Users = Set(KafkaPrincipal.fromString("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"), - principal, - KafkaPrincipal.fromString("""User:CN=\#User with special chars in CN : (\, \+ \" \\ \< \> \; ')""")) + var servers: Seq[KafkaServer] = Seq() + + private val principal: KafkaPrincipal = SecurityUtils.parseKafkaPrincipal("User:test2") + private val Users = Set(SecurityUtils.parseKafkaPrincipal("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"), + principal, SecurityUtils.parseKafkaPrincipal("""User:CN=\#User with special chars in CN : (\, \+ \" \\ \< \> \; ')""")) private val Hosts = Set("host1", "host2") private val AllowHostCommand = Array("--allow-host", "host1", "--allow-host", "host2") private val DenyHostCommand = Array("--deny-host", "host1", "--deny-host", "host2") @@ -87,6 +91,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { private var brokerProps: Properties = _ private var zkArgs: Array[String] = _ + private var adminArgs: Array[String] = _ @Before override def setUp(): Unit = { @@ -94,33 +99,66 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { brokerProps = TestUtils.createBrokerConfig(0, zkConnect) brokerProps.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer") + brokerProps.put(SimpleAclAuthorizer.SuperUsersProp, "User:ANONYMOUS") zkArgs = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect) } + @After + override def tearDown() { + TestUtils.shutdownServers(servers) + super.tearDown() + } + @Test - def testAclCli() { + def testAclCliWithAuthorizer(): Unit = { + testAclCli(zkArgs) + } + + @Test + def testAclCliWithAdminAPI(): Unit = { + createServer() + testAclCli(adminArgs) + } + + private def createServer(): Unit = { + servers = Seq(TestUtils.createServer(KafkaConfig.fromProps(brokerProps))) + val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) + adminArgs = Array("--bootstrap-server", TestUtils.bootstrapServers(servers, listenerName)) + } + + private def testAclCli(cmdArgs: Array[String]) { for ((resources, resourceCmd) <- ResourceToCommand) { for (permissionType <- PermissionType.values) { val operationToCmd = ResourceToOperations(resources) val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1) - AclCommand.main(zkArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add") + AclCommand.main(cmdArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add") for (resource <- resources) { withAuthorizer() { authorizer => TestUtils.waitAndVerifyAcls(acls, authorizer, resource) } } - testRemove(resources, resourceCmd, brokerProps) + testRemove(cmdArgs, resources, resourceCmd) } } } @Test - def testProducerConsumerCli() { + def testProducerConsumerCliWithAuthorizer(): Unit = { + testProducerConsumerCli(zkArgs) + } + + @Test + def testProducerConsumerCliWithAdminAPI(): Unit = { + createServer() + testProducerConsumerCli(adminArgs) + } + + private def testProducerConsumerCli(cmdArgs: Array[String]) { for ((cmd, resourcesToAcls) <- CmdToResourcesToAcl) { val resourceCommand: Array[String] = resourcesToAcls.keys.map(ResourceToCommand).foldLeft(Array[String]())(_ ++ _) - AclCommand.main(zkArgs ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add") + AclCommand.main(cmdArgs ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add") for ((resources, acls) <- resourcesToAcls) { for (resource <- resources) { withAuthorizer() { authorizer => @@ -128,15 +166,25 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { } } } - testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand ++ cmd, brokerProps) + testRemove(cmdArgs, resourcesToAcls.keys.flatten.toSet, resourceCommand ++ cmd) } } @Test - def testAclsOnPrefixedResources(): Unit = { + def testAclsOnPrefixedResourcesWithAuthorizer(): Unit = { + testAclsOnPrefixedResources(zkArgs) + } + + @Test + def testAclsOnPrefixedResourcesWithAdminAPI(): Unit = { + createServer() + testAclsOnPrefixedResources(adminArgs) + } + + private def testAclsOnPrefixedResources(cmdArgs: Array[String]): Unit = { val cmd = Array("--allow-principal", principal.toString, "--producer", "--topic", "Test-", "--resource-pattern-type", "Prefixed") - AclCommand.main(zkArgs ++ cmd :+ "--add") + AclCommand.main(cmdArgs ++ cmd :+ "--add") withAuthorizer() { authorizer => val writeAcl = Acl(principal, Allow, Acl.WildCardHost, Write) @@ -145,7 +193,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { TestUtils.waitAndVerifyAcls(Set(writeAcl, describeAcl, createAcl), authorizer, Resource(Topic, "Test-", PREFIXED)) } - AclCommand.main(zkArgs ++ cmd :+ "--remove" :+ "--force") + AclCommand.main(cmdArgs ++ cmd :+ "--remove" :+ "--force") withAuthorizer() { authorizer => TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Cluster, "kafka-cluster", LITERAL)) @@ -156,7 +204,8 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { @Test(expected = classOf[IllegalArgumentException]) def testInvalidAuthorizerProperty() { val args = Array("--authorizer-properties", "zookeeper.connect " + zkConnect) - AclCommand.withAuthorizer(new AclCommandOptions(args))(null) + val aclCommandService = new AclCommand.AuthorizerService(new AclCommandOptions(args)) + aclCommandService.listAcls() } @Test @@ -188,9 +237,9 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { } } - private def testRemove(resources: Set[Resource], resourceCmd: Array[String], brokerProps: Properties) { + private def testRemove(cmdArgs: Array[String], resources: Set[Resource], resourceCmd: Array[String]) { for (resource <- resources) { - AclCommand.main(zkArgs ++ resourceCmd :+ "--remove" :+ "--force") + AclCommand.main(cmdArgs ++ resourceCmd :+ "--remove" :+ "--force") withAuthorizer() { authorizer => TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource) } @@ -208,7 +257,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { Users.foldLeft(cmd) ((cmd, user) => cmd ++ Array(principalCmd, user.toString)) } - def withAuthorizer()(f: Authorizer => Unit) { + private def withAuthorizer()(f: Authorizer => Unit) { val kafkaConfig = KafkaConfig.fromProps(brokerProps, doLog = false) val authZ = new SimpleAclAuthorizer try { diff --git a/docs/security.html b/docs/security.html index d7859e08d72..e856a7e1686 100644 --- a/docs/security.html +++ b/docs/security.html @@ -1075,6 +1075,18 @@ <h4><a id="security_authz_cli" href="#security_authz_cli">Command Line Interface <td></td> <td>Configuration</td> </tr> + <tr> + <td>--bootstrap-server</td> + <td>A list of host/port pairs to use for establishing the connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer option must be specified.</td> + <td></td> + <td>Configuration</td> + </tr> + <tr> + <td>--command-config</td> + <td>A property file containing configs to be passed to Admin Client. This option can only be used with --bootstrap-server option.</td> + <td></td> + <td>Configuration</td> + </tr> <tr> <td>--cluster</td> <td>Indicates to the script that the user is trying to interact with acls on the singular cluster resource.</td> @@ -1199,7 +1211,17 @@ <h4><a id="security_authz_examples" href="#security_authz_examples">Examples</a> <pre class="brush: bash;"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 </pre> Note that for consumer option we must also specify the consumer group. In order to remove a principal from producer or consumer role we just need to pass --remove option. </li> - </ul> + + <li><b>AdminClient API based acl management</b><br> + Users having Alter permission on ClusterResource can use AdminClient API for ACL management. kafka-acls.sh script supports AdminClient API to manage ACLs without interacting with zookeeper/authorizer directly. + All the above examples can be executed by using <b>--bootstrap-server</b> option. For example: + + <pre class="brush: bash;"> + bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --producer --topic Test-topic + bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 + bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --list --topic Test-topic</pre></li> + + </ul> <h3><a id="security_rolling_upgrade" href="#security_rolling_upgrade">7.5 Incorporating Security Features in a Running Cluster</a></h3> You can secure a running cluster via one or more of the supported protocols discussed previously. This is done in phases: ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow AclCommand to use AdminClient API > --------------------------------------- > > Key: KAFKA-7117 > URL: https://issues.apache.org/jira/browse/KAFKA-7117 > Project: Kafka > Issue Type: Improvement > Reporter: Manikumar > Assignee: Manikumar > Priority: Major > Labels: needs-kip > Fix For: 2.1.0 > > > Currently AclCommand (kafka-acls.sh) uses authorizer class (default > SimpleAclAuthorizer) to manage acls. > We should also allow AclCommand to support AdminClient API based acl > management. This will allow kafka-acls.sh script users to manage acls without > interacting zookeeper/authorizer directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)