[ 
https://issues.apache.org/jira/browse/KAFKA-7117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16607821#comment-16607821
 ] 

ASF GitHub Bot commented on KAFKA-7117:
---------------------------------------

junrao closed pull request #5463: KAFKA-7117: Support AdminClient API in 
AclCommand (KIP-332)
URL: https://github.com/apache/kafka/pull/5463
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala 
b/core/src/main/scala/kafka/admin/AclCommand.scala
index 31e6c53dc11..c2dda33d5ab 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -17,17 +17,22 @@
 
 package kafka.admin
 
+import java.util.Properties
+
 import joptsimple._
 import joptsimple.util.EnumConverter
 import kafka.security.auth._
 import kafka.server.KafkaConfig
 import kafka.utils._
+import org.apache.kafka.clients.admin.{AdminClientConfig, AdminClient => 
JAdminClient}
+import org.apache.kafka.common.acl._
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.resource.{PatternType, ResourcePatternFilter, 
Resource => JResource, ResourceType => JResourceType}
+import org.apache.kafka.common.utils.{SecurityUtils, Utils}
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 object AclCommand extends Logging {
 
@@ -52,13 +57,21 @@ object AclCommand extends Logging {
 
     opts.checkArgs()
 
+    val aclCommandService = {
+      if (opts.options.has(opts.bootstrapServerOpt)) {
+        new AdminClientService(opts)
+      } else {
+        new AuthorizerService(opts)
+      }
+    }
+
     try {
       if (opts.options.has(opts.addOpt))
-        addAcl(opts)
+        aclCommandService.addAcls()
       else if (opts.options.has(opts.removeOpt))
-        removeAcl(opts)
+        aclCommandService.removeAcls()
       else if (opts.options.has(opts.listOpt))
-        listAcl(opts)
+        aclCommandService.listAcls()
     } catch {
       case e: Throwable =>
         println(s"Error while executing ACL command: ${e.getMessage}")
@@ -67,91 +80,202 @@ object AclCommand extends Logging {
     }
   }
 
-  def withAuthorizer(opts: AclCommandOptions)(f: Authorizer => Unit) {
-    val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> 
JaasUtils.isZkSecurityEnabled)
-    val authorizerProperties =
-      if (opts.options.has(opts.authorizerPropertiesOpt)) {
-        val authorizerProperties = 
opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala
-        defaultProps ++ 
CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue = 
false).asScala
-      } else {
-        defaultProps
+  sealed trait AclCommandService {
+    def addAcls(): Unit
+    def removeAcls(): Unit
+    def listAcls(): Unit
+  }
+
+  class AdminClientService(val opts: AclCommandOptions) extends 
AclCommandService with Logging {
+
+    private def withAdminClient(opts: AclCommandOptions)(f: JAdminClient => 
Unit) {
+      val props = if (opts.options.has(opts.commandConfigOpt))
+        Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+      else
+        new Properties()
+      props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt))
+      val adminClient = JAdminClient.create(props)
+
+      try {
+        f(adminClient)
+      } finally {
+        adminClient.close()
       }
+    }
 
-    val authorizerClass = opts.options.valueOf(opts.authorizerOpt)
-    val authZ = CoreUtils.createObject[Authorizer](authorizerClass)
-    try {
-      authZ.configure(authorizerProperties.asJava)
-      f(authZ)
+    def addAcls(): Unit = {
+      val resourceToAcl = getResourceToAcls(opts)
+      withAdminClient(opts) { adminClient =>
+        for ((resource, acls) <- resourceToAcl) {
+          val resourcePattern = resource.toPattern
+          println(s"Adding ACLs for resource `$resourcePattern`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
+          val aclBindings = acls.map(acl => new AclBinding(resourcePattern, 
getAccessControlEntry(acl))).asJavaCollection
+          adminClient.createAcls(aclBindings).all().get()
+        }
+
+        listAcls()
+      }
     }
-    finally CoreUtils.swallow(authZ.close(), this)
-  }
 
-  private def addAcl(opts: AclCommandOptions) {
-    val patternType: PatternType = 
opts.options.valueOf(opts.resourcePatternType)
-    if (!patternType.isSpecific)
-      CommandLineUtils.printUsageAndDie(opts.parser, s"A 
'--resource-pattern-type' value of '$patternType' is not valid when adding 
acls.")
+    def removeAcls(): Unit = {
+      withAdminClient(opts) { adminClient =>
+        val filterToAcl = getResourceFilterToAcls(opts)
+
+        for ((filter, acls) <- filterToAcl) {
+          if (acls.isEmpty) {
+            if (confirmAction(opts, s"Are you sure you want to delete all ACLs 
for resource filter `$filter`? (y/n)"))
+              removeAcls(adminClient, acls, filter)
+          } else {
+            if (confirmAction(opts, s"Are you sure you want to remove ACLs: 
$Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter 
`$filter`? (y/n)"))
+              removeAcls(adminClient, acls, filter)
+          }
+        }
+
+        listAcls()
+      }
+    }
 
-    withAuthorizer(opts) { authorizer =>
-      val resourceToAcl = getResourceFilterToAcls(opts).map {
-        case (filter, acls) =>
-          Resource(ResourceType.fromJava(filter.resourceType()), 
filter.name(), filter.patternType()) -> acls
+    def listAcls(): Unit = {
+      withAdminClient(opts) { adminClient =>
+        val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+        val resourceToAcls = getAcls(adminClient, filters)
+
+        for ((resource, acls) <- resourceToAcls)
+          println(s"Current ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
       }
+    }
 
-      if (resourceToAcl.values.exists(_.isEmpty))
-        CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one 
of: --allow-principal, --deny-principal when trying to add ACLs.")
+    private def getAccessControlEntry(acl: Acl): AccessControlEntry = {
+      new AccessControlEntry(acl.principal.toString, acl.host, 
acl.operation.toJava, acl.permissionType.toJava)
+    }
 
-      for ((resource, acls) <- resourceToAcl) {
-        println(s"Adding ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
-        authorizer.addAcls(acls, resource)
+    private def removeAcls(adminClient: JAdminClient, acls: Set[Acl], filter: 
ResourcePatternFilter): Unit = {
+      if (acls.isEmpty)
+        adminClient.deleteAcls(List(new AclBindingFilter(filter, 
AccessControlEntryFilter.ANY)).asJava).all().get()
+      else {
+        val aclBindingFilters = acls.map(acl => new AclBindingFilter(filter, 
getAccessControlEntryFilter(acl))).toList.asJava
+        adminClient.deleteAcls(aclBindingFilters).all().get()
       }
+    }
+
+    private def getAccessControlEntryFilter(acl: Acl): 
AccessControlEntryFilter = {
+      new AccessControlEntryFilter(acl.principal.toString, acl.host, 
acl.operation.toJava, acl.permissionType.toJava)
+    }
 
-      listAcl(opts)
+    private def getAcls(adminClient: JAdminClient, filters: 
Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = {
+      val aclBindings =
+        if (filters.isEmpty) 
adminClient.describeAcls(AclBindingFilter.ANY).values().get().asScala.toList
+        else {
+          val results = for (filter <- filters) yield {
+            adminClient.describeAcls(new AclBindingFilter(filter, 
AccessControlEntryFilter.ANY)).values().get().asScala.toList
+          }
+          results.reduceLeft(_ ++ _)
+        }
+
+      val resourceToAcls = mutable.Map[ResourcePattern, 
Set[AccessControlEntry]]().withDefaultValue(Set())
+
+      aclBindings.foreach(aclBinding => resourceToAcls(aclBinding.pattern()) = 
resourceToAcls(aclBinding.pattern()) + aclBinding.entry())
+      resourceToAcls.toMap
     }
   }
 
-  private def removeAcl(opts: AclCommandOptions) {
-    withAuthorizer(opts) { authorizer =>
-      val filterToAcl = getResourceFilterToAcls(opts)
+  class AuthorizerService(val opts: AclCommandOptions) extends 
AclCommandService with Logging {
 
-      for ((filter, acls) <- filterToAcl) {
-        if (acls.isEmpty) {
-          if (confirmAction(opts, s"Are you sure you want to delete all ACLs 
for resource filter `$filter`? (y/n)"))
-            removeAcls(authorizer, acls, filter)
+    private def withAuthorizer()(f: Authorizer => Unit) {
+      val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> 
JaasUtils.isZkSecurityEnabled)
+      val authorizerProperties =
+        if (opts.options.has(opts.authorizerPropertiesOpt)) {
+          val authorizerProperties = 
opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala
+          defaultProps ++ 
CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue = 
false).asScala
         } else {
-          if (confirmAction(opts, s"Are you sure you want to remove ACLs: 
$Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter 
`$filter`? (y/n)"))
-            removeAcls(authorizer, acls, filter)
+          defaultProps
         }
+
+      val authorizerClass = if (opts.options.has(opts.authorizerOpt))
+        opts.options.valueOf(opts.authorizerOpt)
+      else
+        classOf[SimpleAclAuthorizer].getName
+
+      val authZ = CoreUtils.createObject[Authorizer](authorizerClass)
+      try {
+        authZ.configure(authorizerProperties.asJava)
+        f(authZ)
       }
+      finally CoreUtils.swallow(authZ.close(), this)
+    }
+
+    def addAcls(): Unit = {
+      val resourceToAcl = getResourceToAcls(opts)
+      withAuthorizer() { authorizer =>
+        for ((resource, acls) <- resourceToAcl) {
+          println(s"Adding ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
+          authorizer.addAcls(acls, resource)
+        }
 
-      listAcl(opts)
+        listAcls()
+      }
     }
-  }
 
-  private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter: 
ResourcePatternFilter) {
-    getAcls(authorizer, filter)
-      .keys
-      .foreach(resource =>
-        if (acls.isEmpty) authorizer.removeAcls(resource)
-        else authorizer.removeAcls(acls, resource)
-      )
-  }
+    def removeAcls(): Unit = {
+      withAuthorizer() { authorizer =>
+        val filterToAcl = getResourceFilterToAcls(opts)
+
+        for ((filter, acls) <- filterToAcl) {
+          if (acls.isEmpty) {
+            if (confirmAction(opts, s"Are you sure you want to delete all ACLs 
for resource filter `$filter`? (y/n)"))
+              removeAcls(authorizer, acls, filter)
+          } else {
+            if (confirmAction(opts, s"Are you sure you want to remove ACLs: 
$Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter 
`$filter`? (y/n)"))
+              removeAcls(authorizer, acls, filter)
+          }
+        }
 
-  private def listAcl(opts: AclCommandOptions) {
-    withAuthorizer(opts) { authorizer =>
-      val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+        listAcls()
+      }
+    }
+
+    def listAcls(): Unit = {
+      withAuthorizer() { authorizer =>
+        val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+
+        val resourceToAcls: Iterable[(Resource, Set[Acl])] =
+          if (filters.isEmpty) authorizer.getAcls()
+          else filters.flatMap(filter => getAcls(authorizer, filter))
 
-      val resourceToAcls: Iterable[(Resource, Set[Acl])] =
-        if (filters.isEmpty) authorizer.getAcls()
-        else filters.flatMap(filter => getAcls(authorizer, filter))
+        for ((resource, acls) <- resourceToAcls)
+          println(s"Current ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
+      }
+    }
 
-      for ((resource, acls) <- resourceToAcls)
-        println(s"Current ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
+    private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter: 
ResourcePatternFilter) {
+      getAcls(authorizer, filter)
+        .keys
+        .foreach(resource =>
+          if (acls.isEmpty) authorizer.removeAcls(resource)
+          else authorizer.removeAcls(acls, resource)
+        )
     }
+
+    private def getAcls(authorizer: Authorizer, filter: 
ResourcePatternFilter): Map[Resource, Set[Acl]] =
+      authorizer.getAcls()
+        .filter { case (resource, acl) => filter.matches(resource.toPattern) }
   }
 
-  private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter): 
Map[Resource, Set[Acl]] =
-    authorizer.getAcls()
-      .filter { case (resource, acl) => filter.matches(resource.toPattern) }
+  private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, 
Set[Acl]] = {
+    val patternType: PatternType = 
opts.options.valueOf(opts.resourcePatternType)
+    if (!patternType.isSpecific)
+      CommandLineUtils.printUsageAndDie(opts.parser, s"A 
'--resource-pattern-type' value of '$patternType' is not valid when adding 
acls.")
+
+    val resourceToAcl = getResourceFilterToAcls(opts).map {
+      case (filter, acls) =>
+        Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), 
filter.patternType()) -> acls
+    }
+
+    if (resourceToAcl.values.exists(_.isEmpty))
+      CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: 
--allow-principal, --deny-principal when trying to add ACLs.")
+
+    resourceToAcl
+  }
 
   private def getResourceFilterToAcls(opts: AclCommandOptions): 
Map[ResourcePatternFilter, Set[Acl]] = {
     var resourceToAcls = Map.empty[ResourcePatternFilter, Set[Acl]]
@@ -257,7 +381,7 @@ object AclCommand extends Logging {
 
   private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: 
ArgumentAcceptingOptionSpec[String]): Set[KafkaPrincipal] = {
     if (opts.options.has(principalOptionSpec))
-      opts.options.valuesOf(principalOptionSpec).asScala.map(s => 
KafkaPrincipal.fromString(s.trim)).toSet
+      opts.options.valuesOf(principalOptionSpec).asScala.map(s => 
SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
     else
       Set.empty[KafkaPrincipal]
   }
@@ -305,11 +429,23 @@ object AclCommand extends Logging {
 
   class AclCommandOptions(args: Array[String]) {
     val parser = new OptionParser(false)
+    val CommandConfigDoc = "A property file containing configs to be passed to 
Admin Client."
+
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", "A list of 
host/port pairs to use for establishing the connection to the Kafka cluster." +
+      " This list should be in the form host1:port1,host2:port2,... This 
config is required for acl management using admin client API.")
+      .withRequiredArg
+      .describedAs("server to connect to")
+      .ofType(classOf[String])
+
+    val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
+      .withOptionalArg()
+      .describedAs("command-config")
+      .ofType(classOf[String])
+
     val authorizerOpt = parser.accepts("authorizer", "Fully qualified class 
name of the authorizer, defaults to kafka.security.auth.SimpleAclAuthorizer.")
       .withRequiredArg
       .describedAs("authorizer")
       .ofType(classOf[String])
-      .defaultsTo(classOf[SimpleAclAuthorizer].getName)
 
     val authorizerPropertiesOpt = parser.accepts("authorizer-properties", 
"REQUIRED: properties required to configure an instance of Authorizer. " +
       "These are key=val pairs. For the default authorizer the example values 
are: zookeeper.connect=localhost:2181")
@@ -410,7 +546,17 @@ object AclCommand extends Logging {
     val options = parser.parse(args: _*)
 
     def checkArgs() {
-      CommandLineUtils.checkRequiredArgs(parser, options, 
authorizerPropertiesOpt)
+      if (options.has(bootstrapServerOpt) && options.has(authorizerOpt))
+        CommandLineUtils.printUsageAndDie(parser, "Only one of 
--bootstrap-server or --authorizer must be specified")
+
+      if (!options.has(bootstrapServerOpt))
+        CommandLineUtils.checkRequiredArgs(parser, options, 
authorizerPropertiesOpt)
+
+      if (options.has(commandConfigOpt) && !options.has(bootstrapServerOpt))
+        CommandLineUtils.printUsageAndDie(parser, "The --command-config option 
can only be used with --bootstrap-server option")
+
+      if (options.has(authorizerPropertiesOpt) && 
options.has(bootstrapServerOpt))
+        CommandLineUtils.printUsageAndDie(parser, "The --authorizer-properties 
option can only be used with --authorizer option")
 
       val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
       if (actions != 1)
diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala 
b/core/src/main/scala/kafka/security/SecurityUtils.scala
index 5d42871f66f..311e195795d 100644
--- a/core/src/main/scala/kafka/security/SecurityUtils.scala
+++ b/core/src/main/scala/kafka/security/SecurityUtils.scala
@@ -22,8 +22,7 @@ import org.apache.kafka.common.acl.{AccessControlEntry, 
AclBinding, AclBindingFi
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.ApiError
 import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-
+import org.apache.kafka.common.utils.SecurityUtils._
 import scala.util.{Failure, Success, Try}
 
 
@@ -32,7 +31,7 @@ object SecurityUtils {
   def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, 
(Resource, Acl)] = {
     (for {
       resourceType <- 
Try(ResourceType.fromJava(filter.patternFilter.resourceType))
-      principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
+      principal <- Try(parseKafkaPrincipal(filter.entryFilter.principal))
       operation <- Try(Operation.fromJava(filter.entryFilter.operation))
       permissionType <- 
Try(PermissionType.fromJava(filter.entryFilter.permissionType))
       resource = Resource(resourceType, filter.patternFilter.name, 
filter.patternFilter.patternType)
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 05f61897e6a..d5535a50ab1 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -20,20 +20,24 @@ import java.util.Properties
 
 import kafka.admin.AclCommand.AclCommandOptions
 import kafka.security.auth._
-import kafka.server.KafkaConfig
+import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{Exit, Logging, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.resource.PatternType
+import org.apache.kafka.common.network.ListenerName
+
 import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.junit.{Before, Test}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.utils.SecurityUtils
+import org.junit.{After, Before, Test}
 
 class AclCommandTest extends ZooKeeperTestHarness with Logging {
 
-  private val principal: KafkaPrincipal = 
KafkaPrincipal.fromString("User:test2")
-  private val Users = 
Set(KafkaPrincipal.fromString("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
-    principal,
-    KafkaPrincipal.fromString("""User:CN=\#User with special chars in CN : (\, 
\+ \" \\ \< \> \; ')"""))
+  var servers: Seq[KafkaServer] = Seq()
+
+  private val principal: KafkaPrincipal = 
SecurityUtils.parseKafkaPrincipal("User:test2")
+  private val Users = 
Set(SecurityUtils.parseKafkaPrincipal("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
+    principal, SecurityUtils.parseKafkaPrincipal("""User:CN=\#User with 
special chars in CN : (\, \+ \" \\ \< \> \; ')"""))
   private val Hosts = Set("host1", "host2")
   private val AllowHostCommand = Array("--allow-host", "host1", 
"--allow-host", "host2")
   private val DenyHostCommand = Array("--deny-host", "host1", "--deny-host", 
"host2")
@@ -87,6 +91,7 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
 
   private var brokerProps: Properties = _
   private var zkArgs: Array[String] = _
+  private var adminArgs: Array[String] = _
 
   @Before
   override def setUp(): Unit = {
@@ -94,33 +99,66 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
 
     brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
     brokerProps.put(KafkaConfig.AuthorizerClassNameProp, 
"kafka.security.auth.SimpleAclAuthorizer")
+    brokerProps.put(SimpleAclAuthorizer.SuperUsersProp, "User:ANONYMOUS")
 
     zkArgs = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
   }
 
+  @After
+  override def tearDown() {
+    TestUtils.shutdownServers(servers)
+    super.tearDown()
+  }
+
   @Test
-  def testAclCli() {
+  def testAclCliWithAuthorizer(): Unit = {
+    testAclCli(zkArgs)
+  }
+
+  @Test
+  def testAclCliWithAdminAPI(): Unit = {
+    createServer()
+    testAclCli(adminArgs)
+  }
+
+  private def createServer(): Unit = {
+    servers = Seq(TestUtils.createServer(KafkaConfig.fromProps(brokerProps)))
+    val listenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    adminArgs = Array("--bootstrap-server", 
TestUtils.bootstrapServers(servers, listenerName))
+  }
+
+  private def testAclCli(cmdArgs: Array[String]) {
     for ((resources, resourceCmd) <- ResourceToCommand) {
       for (permissionType <- PermissionType.values) {
         val operationToCmd = ResourceToOperations(resources)
         val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
-          AclCommand.main(zkArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ 
"--add")
+          AclCommand.main(cmdArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 
:+ "--add")
           for (resource <- resources) {
             withAuthorizer() { authorizer =>
               TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
             }
           }
 
-          testRemove(resources, resourceCmd, brokerProps)
+          testRemove(cmdArgs, resources, resourceCmd)
       }
     }
   }
 
   @Test
-  def testProducerConsumerCli() {
+  def testProducerConsumerCliWithAuthorizer(): Unit = {
+    testProducerConsumerCli(zkArgs)
+  }
+
+  @Test
+  def testProducerConsumerCliWithAdminAPI(): Unit = {
+    createServer()
+    testProducerConsumerCli(adminArgs)
+  }
+
+  private def testProducerConsumerCli(cmdArgs: Array[String]) {
     for ((cmd, resourcesToAcls) <- CmdToResourcesToAcl) {
       val resourceCommand: Array[String] = 
resourcesToAcls.keys.map(ResourceToCommand).foldLeft(Array[String]())(_ ++ _)
-      AclCommand.main(zkArgs ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ 
"--add")
+      AclCommand.main(cmdArgs ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ 
"--add")
       for ((resources, acls) <- resourcesToAcls) {
         for (resource <- resources) {
           withAuthorizer() { authorizer =>
@@ -128,15 +166,25 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
           }
         }
       }
-      testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand ++ cmd, 
brokerProps)
+      testRemove(cmdArgs, resourcesToAcls.keys.flatten.toSet, resourceCommand 
++ cmd)
     }
   }
 
   @Test
-  def testAclsOnPrefixedResources(): Unit = {
+  def testAclsOnPrefixedResourcesWithAuthorizer(): Unit = {
+    testAclsOnPrefixedResources(zkArgs)
+  }
+
+  @Test
+  def testAclsOnPrefixedResourcesWithAdminAPI(): Unit = {
+    createServer()
+    testAclsOnPrefixedResources(adminArgs)
+  }
+
+  private def testAclsOnPrefixedResources(cmdArgs: Array[String]): Unit = {
     val cmd = Array("--allow-principal", principal.toString, "--producer", 
"--topic", "Test-", "--resource-pattern-type", "Prefixed")
 
-    AclCommand.main(zkArgs ++ cmd :+ "--add")
+    AclCommand.main(cmdArgs ++ cmd :+ "--add")
 
     withAuthorizer() { authorizer =>
       val writeAcl = Acl(principal, Allow, Acl.WildCardHost, Write)
@@ -145,7 +193,7 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
       TestUtils.waitAndVerifyAcls(Set(writeAcl, describeAcl, createAcl), 
authorizer, Resource(Topic, "Test-", PREFIXED))
     }
 
-    AclCommand.main(zkArgs ++ cmd :+ "--remove" :+ "--force")
+    AclCommand.main(cmdArgs ++ cmd :+ "--remove" :+ "--force")
 
     withAuthorizer() { authorizer =>
       TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, 
Resource(Cluster, "kafka-cluster", LITERAL))
@@ -156,7 +204,8 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
   @Test(expected = classOf[IllegalArgumentException])
   def testInvalidAuthorizerProperty() {
     val args = Array("--authorizer-properties", "zookeeper.connect " + 
zkConnect)
-    AclCommand.withAuthorizer(new AclCommandOptions(args))(null)
+    val aclCommandService = new AclCommand.AuthorizerService(new 
AclCommandOptions(args))
+    aclCommandService.listAcls()
   }
 
   @Test
@@ -188,9 +237,9 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
     }
   }
 
-  private def testRemove(resources: Set[Resource], resourceCmd: Array[String], 
brokerProps: Properties) {
+  private def testRemove(cmdArgs: Array[String], resources: Set[Resource], 
resourceCmd: Array[String]) {
     for (resource <- resources) {
-      AclCommand.main(zkArgs ++ resourceCmd :+ "--remove" :+ "--force")
+      AclCommand.main(cmdArgs ++ resourceCmd :+ "--remove" :+ "--force")
       withAuthorizer() { authorizer =>
         TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource)
       }
@@ -208,7 +257,7 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
     Users.foldLeft(cmd) ((cmd, user) => cmd ++ Array(principalCmd, 
user.toString))
   }
 
-  def withAuthorizer()(f: Authorizer => Unit) {
+  private def withAuthorizer()(f: Authorizer => Unit) {
     val kafkaConfig = KafkaConfig.fromProps(brokerProps, doLog = false)
     val authZ = new SimpleAclAuthorizer
     try {
diff --git a/docs/security.html b/docs/security.html
index d7859e08d72..e856a7e1686 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -1075,6 +1075,18 @@ <h4><a id="security_authz_cli" 
href="#security_authz_cli">Command Line Interface
             <td></td>
             <td>Configuration</td>
         </tr>
+        <tr>
+            <td>--bootstrap-server</td>
+            <td>A list of host/port pairs to use for establishing the 
connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer 
option must be specified.</td>
+            <td></td>
+            <td>Configuration</td>
+        </tr>
+        <tr>
+            <td>--command-config</td>
+            <td>A property file containing configs to be passed to Admin 
Client. This option can only be used with --bootstrap-server option.</td>
+            <td></td>
+            <td>Configuration</td>
+        </tr>
         <tr>
             <td>--cluster</td>
             <td>Indicates to the script that the user is trying to interact 
with acls on the singular cluster resource.</td>
@@ -1199,7 +1211,17 @@ <h4><a id="security_authz_examples" 
href="#security_authz_examples">Examples</a>
             <pre class="brush: bash;"> bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:Bob --consumer --topic Test-topic --group Group-1 </pre>
                 Note that for consumer option we must also specify the 
consumer group.
                 In order to remove a principal from producer or consumer role 
we just need to pass --remove option. </li>
-        </ul>
+
+        <li><b>AdminClient API based acl management</b><br>
+            Users having Alter permission on ClusterResource can use 
AdminClient API for ACL management. kafka-acls.sh script supports AdminClient 
API to manage ACLs without interacting with zookeeper/authorizer directly.
+            All the above examples can be executed by using 
<b>--bootstrap-server</b> option. For example:
+
+            <pre class="brush: bash;">
+            bin/kafka-acls.sh --bootstrap-server localhost:9092 
--command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob 
--producer --topic Test-topic
+            bin/kafka-acls.sh --bootstrap-server localhost:9092 
--command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob 
--consumer --topic Test-topic --group Group-1
+            bin/kafka-acls.sh --bootstrap-server localhost:9092 
--command-config /tmp/adminclient-configs.conf --list --topic 
Test-topic</pre></li>
+
+    </ul>
 
     <h3><a id="security_rolling_upgrade" href="#security_rolling_upgrade">7.5 
Incorporating Security Features in a Running Cluster</a></h3>
         You can secure a running cluster via one or more of the supported 
protocols discussed previously. This is done in phases:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow AclCommand to use AdminClient API
> ---------------------------------------
>
>                 Key: KAFKA-7117
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7117
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Manikumar
>            Assignee: Manikumar
>            Priority: Major
>              Labels: needs-kip
>             Fix For: 2.1.0
>
>
> Currently AclCommand (kafka-acls.sh) uses authorizer class (default 
> SimpleAclAuthorizer) to manage acls.
> We should also allow AclCommand to support AdminClient API based acl 
> management. This will allow kafka-acls.sh script users to manage acls without 
> interacting zookeeper/authorizer directly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to