KAFKA-3266; Describe, Create and Delete ACLs Admin APIs (KIP-140)

Includes server-side code, protocol and AdminClient.

Author: Colin P. Mccabe <cmcc...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #2941 from cmccabe/KAFKA-3266


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9815e18f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9815e18f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9815e18f

Branch: refs/heads/trunk
Commit: 9815e18fefa27d8d901356ec1d994a41b4db4622
Parents: 2491520
Author: Colin P. Mccabe <cmcc...@confluent.io>
Authored: Thu May 18 03:20:23 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Thu May 18 03:20:30 2017 +0100

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   2 +
 checkstyle/suppressions.xml                     |   9 +-
 .../kafka/clients/admin/AccessControlEntry.java |  86 ++++++++
 .../clients/admin/AccessControlEntryData.java   | 105 +++++++++
 .../clients/admin/AccessControlEntryFilter.java | 117 ++++++++++
 .../apache/kafka/clients/admin/AclBinding.java  |  74 +++++++
 .../kafka/clients/admin/AclBindingFilter.java   |  89 ++++++++
 .../kafka/clients/admin/AclOperation.java       | 122 ++++++++++
 .../kafka/clients/admin/AclPermissionType.java  |  92 ++++++++
 .../apache/kafka/clients/admin/AdminClient.java |  66 ++++++
 .../kafka/clients/admin/CreateAclsOptions.java  |  34 +++
 .../kafka/clients/admin/CreateAclsResults.java  |  48 ++++
 .../kafka/clients/admin/DeleteAclsOptions.java  |  34 +++
 .../kafka/clients/admin/DeleteAclsResults.java  | 107 +++++++++
 .../clients/admin/DescribeAclsOptions.java      |  34 +++
 .../clients/admin/DescribeAclsResults.java      |  37 ++++
 .../kafka/clients/admin/KafkaAdminClient.java   | 173 ++++++++++++++-
 .../apache/kafka/clients/admin/Resource.java    |  74 +++++++
 .../kafka/clients/admin/ResourceFilter.java     |  90 ++++++++
 .../kafka/clients/admin/ResourceType.java       |  97 ++++++++
 .../errors/SecurityDisabledException.java       |  32 +++
 .../apache/kafka/common/protocol/ApiKeys.java   |   5 +-
 .../apache/kafka/common/protocol/Errors.java    |  12 +-
 .../apache/kafka/common/protocol/Protocol.java  |  90 ++++++++
 .../kafka/common/requests/AbstractRequest.java  |   9 +
 .../kafka/common/requests/AbstractResponse.java |   6 +
 .../common/requests/CreateAclsRequest.java      | 133 +++++++++++
 .../common/requests/CreateAclsResponse.java     | 106 +++++++++
 .../common/requests/DeleteAclsRequest.java      | 111 ++++++++++
 .../common/requests/DeleteAclsResponse.java     | 182 +++++++++++++++
 .../common/requests/DescribeAclsRequest.java    |  90 ++++++++
 .../common/requests/DescribeAclsResponse.java   | 128 +++++++++++
 .../kafka/common/requests/RequestUtils.java     |  82 +++++++
 .../kafka/clients/admin/AclBindingTest.java     | 110 +++++++++
 .../kafka/clients/admin/AclOperationTest.java   |  86 ++++++++
 .../clients/admin/AclPermissionTypeTest.java    |  80 +++++++
 .../clients/admin/KafkaAdminClientTest.java     | 163 ++++++++++++++
 .../kafka/clients/admin/ResourceTypeTest.java   |  81 +++++++
 .../common/requests/RequestResponseTest.java    |  80 +++++++
 .../scala/kafka/security/auth/Operation.scala   |  56 ++++-
 .../kafka/security/auth/PermissionType.scala    |  17 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 221 ++++++++++++++++++-
 .../api/KafkaAdminClientIntegrationTest.scala   |  23 +-
 .../api/SaslSslAdminClientIntegrationTest.scala |  87 +++++++-
 .../unit/kafka/server/RequestQuotaTest.scala    |  24 +-
 45 files changed, 3472 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index d40c4d4..21d9d3c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -108,8 +108,10 @@
     </subpackage>
 
     <subpackage name="requests">
+      <allow pkg="org.apache.kafka.clients.admin" />
       <allow pkg="org.apache.kafka.common.protocol" />
       <allow pkg="org.apache.kafka.common.network" />
+      <allow pkg="org.apache.kafka.common.requests" />
       <allow pkg="org.apache.kafka.common.record" />
       <!-- for testing -->
       <allow pkg="org.apache.kafka.common.errors" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 9729ee5..66548d9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -40,7 +40,7 @@
               files=".*/protocol/Errors.java"/>
 
     <suppress checks="BooleanExpressionComplexity"
-              files="(Utils|KafkaLZ4BlockOutputStream).java"/>
+              files="(Utils|KafkaLZ4BlockOutputStream|AclData).java"/>
 
     <suppress checks="CyclomaticComplexity"
               
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|SsLTransportLayer|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer).java"/>
@@ -48,12 +48,15 @@
     <suppress checks="JavaNCSS"
               files="KerberosLogin.java"/>
 
+    <suppress checks="JavaNCSS"
+              files="AbstractRequest.java"/>
+
     <suppress checks="NPathComplexity"
               
files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender).java"/>
 
     <!-- clients tests -->
     <suppress checks="ClassDataAbstractionCoupling"
-              
files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse|TransactionManager)Test.java"/>
+              
files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse|TransactionManager|KafkaAdminClient)Test.java"/>
 
     <suppress checks="ClassFanOutComplexity"
               
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher)Test.java"/>
@@ -66,7 +69,7 @@
               files="DistributedHerder.java"/>
 
     <suppress checks="MethodLength"
-              files="KafkaConfigBackingStore.java"/>
+              files="(KafkaConfigBackingStore|RequestResponseTest).java"/>
 
     <suppress checks="ParameterNumber"
               files="WorkerSourceTask.java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntry.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntry.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntry.java
new file mode 100644
index 0000000..0c36a21
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntry.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents an access control entry.  ACEs are a tuple of principal, host,
+ * operation, and permissionType.
+ */
+public class AccessControlEntry {
+    final AccessControlEntryData data;
+
+    public AccessControlEntry(String principal, String host, AclOperation 
operation, AclPermissionType permissionType) {
+        Objects.requireNonNull(principal);
+        Objects.requireNonNull(host);
+        Objects.requireNonNull(operation);
+        assert operation != AclOperation.ANY;
+        Objects.requireNonNull(permissionType);
+        assert permissionType != AclPermissionType.ANY;
+        this.data = new AccessControlEntryData(principal, host, operation, 
permissionType);
+    }
+
+    public String principal() {
+        return data.principal();
+    }
+
+    public String host() {
+        return data.host();
+    }
+
+    public AclOperation operation() {
+        return data.operation();
+    }
+
+    public AclPermissionType permissionType() {
+        return data.permissionType();
+    }
+
+    /**
+     * Create a filter which matches only this AccessControlEntry.
+     */
+    public AccessControlEntryFilter toFilter() {
+        return new AccessControlEntryFilter(data);
+    }
+
+    @Override
+    public String toString() {
+        return data.toString();
+    }
+
+    /**
+     * Return true if this AclResource has any UNKNOWN components.
+     */
+    public boolean unknown() {
+        return data.unknown();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof AccessControlEntry))
+            return false;
+        AccessControlEntry other = (AccessControlEntry) o;
+        return data.equals(other.data);
+    }
+
+    @Override
+    public int hashCode() {
+        return data.hashCode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryData.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryData.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryData.java
new file mode 100644
index 0000000..81f57ad
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryData.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * An internal, private class which contains the data stored in 
AccessControlEntry and
+ * AccessControlEntryFilter objects.
+ */
+class AccessControlEntryData {
+    private final String principal;
+    private final String host;
+    private final AclOperation operation;
+    private final AclPermissionType permissionType;
+
+    AccessControlEntryData(String principal, String host, AclOperation 
operation, AclPermissionType permissionType) {
+        this.principal = principal;
+        this.host = host;
+        this.operation = operation;
+        this.permissionType = permissionType;
+    }
+
+    String principal() {
+        return principal;
+    }
+
+    String host() {
+        return host;
+    }
+
+    AclOperation operation() {
+        return operation;
+    }
+
+    AclPermissionType permissionType() {
+        return permissionType;
+    }
+
+    /**
+     * Returns a string describing an ANY or UNKNOWN field, or null if there is
+     * no such field.
+     */
+    public String findIndefiniteField() {
+        if (principal() == null)
+            return "Principal is NULL";
+        if (host() == null)
+            return "Host is NULL";
+        if (operation() == AclOperation.ANY)
+            return "Operation is ANY";
+        if (operation() == AclOperation.UNKNOWN)
+            return "Operation is UNKNOWN";
+        if (permissionType() == AclPermissionType.ANY)
+            return "Permission type is ANY";
+        if (permissionType() == AclPermissionType.UNKNOWN)
+            return "Permission type is UNKNOWN";
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return "(principal=" + (principal == null ? "<any>" : principal) +
+               ", host=" + (host == null ? "<any>" : host) +
+               ", operation=" + operation +
+               ", permissionType=" + permissionType + ")";
+    }
+
+    /**
+     * Return true if there are any UNKNOWN components.
+     */
+    boolean unknown() {
+        return operation.unknown() || permissionType.unknown();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof AccessControlEntryData))
+            return false;
+        AccessControlEntryData other = (AccessControlEntryData) o;
+        return Objects.equals(principal, other.principal) &&
+            Objects.equals(host, other.host) &&
+            Objects.equals(operation, other.operation) &&
+            Objects.equals(permissionType, other.permissionType);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(principal, host, operation, permissionType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryFilter.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryFilter.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryFilter.java
new file mode 100644
index 0000000..0ec1027
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryFilter.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a filter which matches access control entries.
+ */
+public class AccessControlEntryFilter {
+    private final AccessControlEntryData data;
+
+    public static final AccessControlEntryFilter ANY =
+        new AccessControlEntryFilter(null, null, AclOperation.ANY, 
AclPermissionType.ANY);
+
+    public AccessControlEntryFilter(String principal, String host, 
AclOperation operation, AclPermissionType permissionType) {
+        Objects.requireNonNull(operation);
+        Objects.requireNonNull(permissionType);
+        this.data = new AccessControlEntryData(principal, host, operation, 
permissionType);
+    }
+
+    /**
+     * This is a non-public constructor used in AccessControlEntry#toFilter
+     *
+     * @param data     The access control data.
+     */
+    AccessControlEntryFilter(AccessControlEntryData data) {
+        this.data = data;
+    }
+
+    public String principal() {
+        return data.principal();
+    }
+
+    public String host() {
+        return data.host();
+    }
+
+    public AclOperation operation() {
+        return data.operation();
+    }
+
+    public AclPermissionType permissionType() {
+        return data.permissionType();
+    }
+
+    @Override
+    public String toString() {
+        return data.toString();
+    }
+
+    /**
+     * Return true if there are any UNKNOWN components.
+     */
+    public boolean unknown() {
+        return data.unknown();
+    }
+
+    /**
+     * Returns true if this filter matches the given AccessControlEntry.
+     */
+    public boolean matches(AccessControlEntry other) {
+        if ((principal() != null) && 
(!data.principal().equals(other.principal())))
+            return false;
+        if ((host() != null) && (!host().equals(other.host())))
+            return false;
+        if ((operation() != AclOperation.ANY) && 
(!operation().equals(other.operation())))
+            return false;
+        if ((permissionType() != AclPermissionType.ANY) && 
(!permissionType().equals(other.permissionType())))
+            return false;
+        return true;
+    }
+
+    /**
+     * Returns true if this filter could only match one ACE-- in other words, 
if
+     * there are no ANY or UNKNOWN fields.
+     */
+    public boolean matchesAtMostOne() {
+        return findIndefiniteField() == null;
+    }
+
+    /**
+     * Returns a string describing an ANY or UNKNOWN field, or null if there is
+     * no such field.
+     */
+    public String findIndefiniteField() {
+        return data.findIndefiniteField();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof AccessControlEntryFilter))
+            return false;
+        AccessControlEntryFilter other = (AccessControlEntryFilter) o;
+        return data.equals(other.data);
+    }
+
+    @Override
+    public int hashCode() {
+        return data.hashCode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AclBinding.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AclBinding.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AclBinding.java
new file mode 100644
index 0000000..45761b4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AclBinding.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a binding between a resource and an access control entry.
+ */
+public class AclBinding {
+    private final Resource resource;
+    private final AccessControlEntry entry;
+
+    public AclBinding(Resource resource, AccessControlEntry entry) {
+        Objects.requireNonNull(resource);
+        this.resource = resource;
+        Objects.requireNonNull(entry);
+        this.entry = entry;
+    }
+
+    /**
+     * Return true if this binding has any UNKNOWN components.
+     */
+    public boolean unknown() {
+        return resource.unknown() || entry.unknown();
+    }
+
+    public Resource resource() {
+        return resource;
+    }
+
+    public final AccessControlEntry entry() {
+        return entry;
+    }
+
+    /**
+     * Create a filter which matches only this AclBinding.
+     */
+    public AclBindingFilter toFilter() {
+        return new AclBindingFilter(resource.toFilter(), entry.toFilter());
+    }
+
+    @Override
+    public String toString() {
+        return "(resource=" + resource + ", entry=" + entry + ")";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof AclBinding))
+            return false;
+        AclBinding other = (AclBinding) o;
+        return resource.equals(other.resource) && entry.equals(other.entry);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(resource, entry);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AclBindingFilter.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AclBindingFilter.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AclBindingFilter.java
new file mode 100644
index 0000000..5e4142d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AclBindingFilter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * A filter which can match AclBinding objects.
+ */
+public class AclBindingFilter {
+    private final ResourceFilter resourceFilter;
+    private final AccessControlEntryFilter entryFilter;
+
+    /**
+     * A filter which matches any ACL binding.
+     */
+    public static final AclBindingFilter ANY = new AclBindingFilter(
+        new ResourceFilter(ResourceType.ANY, null),
+        new AccessControlEntryFilter(null, null, AclOperation.ANY, 
AclPermissionType.ANY));
+
+    public AclBindingFilter(ResourceFilter resourceFilter, 
AccessControlEntryFilter entryFilter) {
+        Objects.requireNonNull(resourceFilter);
+        this.resourceFilter = resourceFilter;
+        Objects.requireNonNull(entryFilter);
+        this.entryFilter = entryFilter;
+    }
+
+    /**
+     * Return true if this filter has any UNKNOWN components.
+     */
+    public boolean unknown() {
+        return resourceFilter.unknown() || entryFilter.unknown();
+    }
+
+    public ResourceFilter resourceFilter() {
+        return resourceFilter;
+    }
+
+    public final AccessControlEntryFilter entryFilter() {
+        return entryFilter;
+    }
+
+    @Override
+    public String toString() {
+        return "(resourceFilter=" + resourceFilter + ", entryFilter=" + 
entryFilter + ")";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof AclBindingFilter))
+            return false;
+        AclBindingFilter other = (AclBindingFilter) o;
+        return resourceFilter.equals(other.resourceFilter) && 
entryFilter.equals(other.entryFilter);
+    }
+
+    public boolean matchesAtMostOne() {
+        return resourceFilter.matchesAtMostOne() && 
entryFilter.matchesAtMostOne();
+    }
+
+    public String findIndefiniteField() {
+        String indefinite = resourceFilter.findIndefiniteField();
+        if (indefinite != null)
+            return indefinite;
+        return entryFilter.findIndefiniteField();
+    }
+
+    public boolean matches(AclBinding binding) {
+        return resourceFilter.matches(binding.resource()) && 
entryFilter.matches(binding.entry());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(resourceFilter, entryFilter);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
new file mode 100644
index 0000000..14fb61b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.HashMap;
+import java.util.Locale;
+
+/**
+ * Represents an operation which an ACL grants or denies permission to perform.
+ */
+public enum AclOperation {
+    /**
+     * Represents any AclOperation which this client cannot understand, 
perhaps because this
+     * client is too old.
+     */
+    UNKNOWN((byte) 0),
+
+    /**
+     * In a filter, matches any AclOperation.
+     */
+    ANY((byte) 1),
+
+    /**
+     * ALL operation.
+     */
+    ALL((byte) 2),
+
+    /**
+     * READ operation.
+     */
+    READ((byte) 3),
+
+    /**
+     * WRITE operation.
+     */
+    WRITE((byte) 4),
+
+    /**
+     * CREATE operation.
+     */
+    CREATE((byte) 5),
+
+    /**
+     * DELETE operation.
+     */
+    DELETE((byte) 6),
+
+    /**
+     * ALTER operation.
+     */
+    ALTER((byte) 7),
+
+    /**
+     * DESCRIBE operation.
+     */
+    DESCRIBE((byte) 8),
+
+    /**
+     * CLUSTER_ACTION operation.
+     */
+    CLUSTER_ACTION((byte) 9);
+
+    private final static HashMap<Byte, AclOperation> CODE_TO_VALUE = new 
HashMap<>();
+
+    static {
+        for (AclOperation operation : AclOperation.values()) {
+            CODE_TO_VALUE.put(operation.code, operation);
+        }
+    }
+
+    /**
+     * Parse the given string as an ACL operation.
+     *
+     * @param str    The string to parse.
+     *
+     * @return       The AclOperation, or UNKNOWN if the string could not be 
matched.
+     */
+    public static AclOperation fromString(String str) throws 
IllegalArgumentException {
+        try {
+            return AclOperation.valueOf(str.toUpperCase(Locale.ROOT));
+        } catch (IllegalArgumentException e) {
+            return UNKNOWN;
+        }
+    }
+
+    public static AclOperation fromCode(byte code) {
+        AclOperation operation = CODE_TO_VALUE.get(code);
+        if (operation == null) {
+            return UNKNOWN;
+        }
+        return operation;
+    }
+
+    private final byte code;
+
+    AclOperation(byte code) {
+        this.code = code;
+    }
+
+    public byte code() {
+        return code;
+    }
+
+    public boolean unknown() {
+        return this == UNKNOWN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AclPermissionType.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AclPermissionType.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AclPermissionType.java
new file mode 100644
index 0000000..9181c6b
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AclPermissionType.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.HashMap;
+import java.util.Locale;
+
+/**
+ * Represents whether an ACL grants or denies permissions.
+ */
+public enum AclPermissionType {
+    /**
+     * Represents any AclPermissionType which this client cannot understand,
+     * perhaps because this client is too old.
+     */
+    UNKNOWN((byte) 0),
+
+    /**
+     * In a filter, matches any AclPermissionType.
+     */
+    ANY((byte) 1),
+
+    /**
+     * Disallows access.
+     */
+    DENY((byte) 2),
+
+    /**
+     * Grants access.
+     */
+    ALLOW((byte) 3);
+
+    private final static HashMap<Byte, AclPermissionType> CODE_TO_VALUE = new 
HashMap<>();
+
+    static {
+        for (AclPermissionType permissionType : AclPermissionType.values()) {
+            CODE_TO_VALUE.put(permissionType.code, permissionType);
+        }
+    }
+
+    /**
+    * Parse the given string as an ACL permission.
+    *
+    * @param str    The string to parse.
+    *
+    * @return       The AclPermissionType, or UNKNOWN if the string could not 
be matched.
+    */
+    public static AclPermissionType fromString(String str) {
+        try {
+            return AclPermissionType.valueOf(str.toUpperCase(Locale.ROOT));
+        } catch (IllegalArgumentException e) {
+            return UNKNOWN;
+        }
+    }
+
+    public static AclPermissionType fromCode(byte code) {
+        AclPermissionType permissionType = CODE_TO_VALUE.get(code);
+        if (permissionType == null) {
+            return UNKNOWN;
+        }
+        return permissionType;
+    }
+
+    private final byte code;
+
+    AclPermissionType(byte code) {
+        this.code = code;
+    }
+
+    public byte code() {
+        return code;
+    }
+
+    public boolean unknown() {
+        return this == UNKNOWN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index a976ca4..8bb495c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -194,4 +194,70 @@ public abstract class AdminClient implements AutoCloseable 
{
      * @return                  The ApiVersionsResults.
      */
     public abstract ApiVersionsResults apiVersions(Collection<Node> nodes, 
ApiVersionsOptions options);
+
+    /**
+     * Similar to #{@link AdminClient#describeAcls(AclBindingFilter, 
DescribeAclsOptions),
+     * but uses the default options.
+     *
+     * @param filter            The filter to use.
+     * @return                  The DeleteAclsResult.
+     */
+    public DescribeAclsResults describeAcls(AclBindingFilter filter) {
+        return describeAcls(filter, new DescribeAclsOptions());
+    }
+
+    /**
+     * Lists access control lists (ACLs) according to the supplied filter.
+     *
+     * Note: it may take some time for changes made by createAcls or 
deleteAcls to be reflected
+     * in the output of describeAcls.
+     *
+     * @param filter            The filter to use.
+     * @param options           The options to use when listing the ACLs.
+     * @return                  The DeleteAclsResult.
+     */
+    public abstract DescribeAclsResults describeAcls(AclBindingFilter filter, 
DescribeAclsOptions options);
+
+    /**
+     * Similar to #{@link AdminClient#createAcls(Collection<AclBinding>, 
CreateAclsOptions),
+     * but uses the default options.
+     *
+     * @param acls              The ACLs to create
+     * @return                  The CreateAclsResult.
+     */
+    public CreateAclsResults createAcls(Collection<AclBinding> acls) {
+        return createAcls(acls, new CreateAclsOptions());
+    }
+
+    /**
+     * Creates access control lists (ACLs) which are bound to specific 
resources.
+     *
+     * If you attempt to add an ACL that duplicates an existing ACL, no error 
will be raised, but
+     * no changes will be made.
+     *
+     * @param acls              The ACLs to create
+     * @param options           The options to use when creating the ACLs.
+     * @return                  The CreateAclsResult.
+     */
+    public abstract CreateAclsResults createAcls(Collection<AclBinding> acls, 
CreateAclsOptions options);
+
+    /**
+     * Similar to #{@link AdminClient#deleteAcls(Collection<AclBinding>, 
DeleteAclsOptions),
+     * but uses the default options.
+     *
+     * @param filters           The filters to use.
+     * @return                  The DeleteAclsResult.
+     */
+    public DeleteAclsResults deleteAcls(Collection<AclBindingFilter> filters) {
+        return deleteAcls(filters, new DeleteAclsOptions());
+    }
+
+    /**
+     * Deletes access control lists (ACLs) according to the supplied filters.
+     *
+     * @param filters           The filters to use.
+     * @param options           The options to use when deleting the ACLs.
+     * @return                  The DeleteAclsResult.
+     */
+    public abstract DeleteAclsResults deleteAcls(Collection<AclBindingFilter> 
filters, DeleteAclsOptions options);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
new file mode 100644
index 0000000..adaaf1a
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * Options for the createAcls call.
+ */
+public class CreateAclsOptions {
+    private Integer timeoutMs = null;
+
+    public CreateAclsOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResults.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResults.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResults.java
new file mode 100644
index 0000000..6908037
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResults.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+
+import java.util.Map;
+
+/**
+ * The result of the createAcls call.
+ */
+public class CreateAclsResults {
+    private final Map<AclBinding, KafkaFuture<Void>> futures;
+
+    CreateAclsResults(Map<AclBinding, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from topic names to futures which can be used to check the 
status of
+     * individual deletions.
+     */
+    public Map<AclBinding, KafkaFuture<Void>> results() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds only if all the topic deletions succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
new file mode 100644
index 0000000..4d06cef
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * Options for the deleteAcls call.
+ */
+public class DeleteAclsOptions {
+    private Integer timeoutMs = null;
+
+    public DeleteAclsOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResults.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResults.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResults.java
new file mode 100644
index 0000000..dfb2e6b
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResults.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.ApiException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The result of the deleteAcls call.
+ */
+public class DeleteAclsResults {
+    public static class FilterResult {
+        private final AclBinding acl;
+        private final ApiException exception;
+
+        FilterResult(AclBinding acl, ApiException exception) {
+            this.acl = acl;
+            this.exception = exception;
+        }
+
+        public AclBinding acl() {
+            return acl;
+        }
+
+        public ApiException exception() {
+            return exception;
+        }
+    }
+
+    public static class FilterResults {
+        private final List<FilterResult> acls;
+
+        FilterResults(List<FilterResult> acls) {
+            this.acls = acls;
+        }
+
+        public List<FilterResult> acls() {
+            return acls;
+        }
+    }
+
+    private final Map<AclBindingFilter, KafkaFuture<FilterResults>> futures;
+
+    DeleteAclsResults(Map<AclBindingFilter, KafkaFuture<FilterResults>> 
futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from topic names to futures which can be used to check the 
status of
+     * individual deletions.
+     */
+    public Map<AclBindingFilter, KafkaFuture<FilterResults>> results() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds only if all the ACLs deletions succeed, 
and which contains all the deleted ACLs.
+     * Note that it if the filters don't match any ACLs, this is not 
considered an error.
+     */
+    public KafkaFuture<Collection<AclBinding>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new 
KafkaFuture[0])).thenApply(
+            new KafkaFuture.Function<Void, Collection<AclBinding>>() {
+                @Override
+                public Collection<AclBinding> apply(Void v) {
+                    List<AclBinding> acls = new ArrayList<>();
+                    for (Map.Entry<AclBindingFilter, 
KafkaFuture<FilterResults>> entry : futures.entrySet()) {
+                        FilterResults results;
+                        try {
+                            results = entry.getValue().get();
+                        } catch (Throwable e) {
+                            // This should be unreachable, since the future 
returned by KafkaFuture#allOf should
+                            // have failed if any Future failed.
+                            throw new KafkaException("DeleteAclsResults#all: 
internal error", e);
+                        }
+                        for (FilterResult result : results.acls()) {
+                            if (result.exception() != null) {
+                                throw result.exception();
+                            }
+                            acls.add(result.acl());
+                        }
+                    }
+                    return acls;
+                }
+            });
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
new file mode 100644
index 0000000..3f98304
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * Options for the describeAcls call.
+ */
+public class DescribeAclsOptions {
+    private Integer timeoutMs = null;
+
+    public DescribeAclsOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResults.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResults.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResults.java
new file mode 100644
index 0000000..dea98ab
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResults.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+
+import java.util.Collection;
+
+/**
+ * The result of the describeAcls call.
+ */
+public class DescribeAclsResults {
+    private final KafkaFuture<Collection<AclBinding>> future;
+
+    DescribeAclsResults(KafkaFuture<Collection<AclBinding>> future) {
+        this.future = future;
+    }
+
+    public KafkaFuture<Collection<AclBinding>> all() {
+        return future;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 7dde027..9f1b1b2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -25,6 +25,8 @@ import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.admin.DeleteAclsResults.FilterResult;
+import org.apache.kafka.clients.admin.DeleteAclsResults.FilterResults;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
@@ -34,9 +36,11 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.metrics.JmxReporter;
@@ -51,10 +55,20 @@ import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.CreateAclsRequest;
+import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
+import org.apache.kafka.common.requests.CreateAclsResponse;
+import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.DeleteAclsRequest;
+import org.apache.kafka.common.requests.DeleteAclsResponse;
+import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
+import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
 import org.apache.kafka.common.requests.DeleteTopicsRequest;
 import org.apache.kafka.common.requests.DeleteTopicsResponse;
+import org.apache.kafka.common.requests.DescribeAclsRequest;
+import org.apache.kafka.common.requests.DescribeAclsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.KafkaThread;
@@ -819,12 +833,12 @@ public class KafkaAdminClient extends AdminClient {
     public CreateTopicResults createTopics(final Collection<NewTopic> 
newTopics,
                                            final CreateTopicsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> topicFutures = new 
HashMap<>(newTopics.size());
-        for (NewTopic newTopic : newTopics) {
-            topicFutures.put(newTopic.name(), new KafkaFutureImpl<Void>());
-        }
         final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new 
HashMap<>(newTopics.size());
         for (NewTopic newTopic : newTopics) {
-            topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails());
+            if (topicFutures.get(newTopic.name()) == null) {
+                topicFutures.put(newTopic.name(), new KafkaFutureImpl<Void>());
+                topicsMap.put(newTopic.name(), 
newTopic.convertToTopicDetails());
+            }
         }
         final long now = time.milliseconds();
         runnable.call(new Call("createTopics", calcDeadlineMs(now, 
options.timeoutMs()),
@@ -875,7 +889,9 @@ public class KafkaAdminClient extends AdminClient {
                                            DeleteTopicsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> topicFutures = new 
HashMap<>(topicNames.size());
         for (String topicName : topicNames) {
-            topicFutures.put(topicName, new KafkaFutureImpl<Void>());
+            if (topicFutures.get(topicName) == null) {
+                topicFutures.put(topicName, new KafkaFutureImpl<Void>());
+            }
         }
         final long now = time.milliseconds();
         runnable.call(new Call("deleteTopics", calcDeadlineMs(now, 
options.timeoutMs()),
@@ -957,8 +973,12 @@ public class KafkaAdminClient extends AdminClient {
     @Override
     public DescribeTopicsResults describeTopics(final Collection<String> 
topicNames, DescribeTopicsOptions options) {
         final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = 
new HashMap<>(topicNames.size());
+        final ArrayList<String> topicNamesList = new ArrayList<>();
         for (String topicName : topicNames) {
-            topicFutures.put(topicName, new 
KafkaFutureImpl<TopicDescription>());
+            if (topicFutures.get(topicName) == null) {
+                topicFutures.put(topicName, new 
KafkaFutureImpl<TopicDescription>());
+                topicNamesList.add(topicName);
+            }
         }
         final long now = time.milliseconds();
         runnable.call(new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
@@ -966,7 +986,7 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new MetadataRequest.Builder(new 
ArrayList<>(topicNames));
+                return new MetadataRequest.Builder(topicNamesList);
             }
 
             @Override
@@ -1047,6 +1067,8 @@ public class KafkaAdminClient extends AdminClient {
         final long deadlineMs = calcDeadlineMs(now, options.timeoutMs());
         Map<Node, KafkaFuture<NodeApiVersions>> nodeFutures = new HashMap<>();
         for (final Node node : nodes) {
+            if (nodeFutures.get(node) != null)
+                continue;
             final KafkaFutureImpl<NodeApiVersions> nodeFuture = new 
KafkaFutureImpl<>();
             nodeFutures.put(node, nodeFuture);
             runnable.call(new Call("apiVersions", deadlineMs, new 
ConstantAdminNodeProvider(node)) {
@@ -1070,4 +1092,141 @@ public class KafkaAdminClient extends AdminClient {
         return new ApiVersionsResults(nodeFutures);
 
     }
+
+    @Override
+    public DescribeAclsResults describeAcls(final AclBindingFilter filter, 
DescribeAclsOptions options) {
+        final long now = time.milliseconds();
+        final KafkaFutureImpl<Collection<AclBinding>> future = new 
KafkaFutureImpl<>();
+        runnable.call(new Call("describeAcls", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new DescribeAclsRequest.Builder(filter);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                DescribeAclsResponse response = (DescribeAclsResponse) 
abstractResponse;
+                if (response.throwable() != null) {
+                    future.completeExceptionally(response.throwable());
+                } else {
+                    future.complete(response.acls());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                future.completeExceptionally(throwable);
+            }
+        }, now);
+        return new DescribeAclsResults(future);
+    }
+
+    @Override
+    public CreateAclsResults createAcls(Collection<AclBinding> acls, 
CreateAclsOptions options) {
+        final long now = time.milliseconds();
+        final Map<AclBinding, KafkaFutureImpl<Void>> futures = new HashMap<>();
+        final List<AclCreation> aclCreations = new ArrayList<>();
+        for (AclBinding acl : acls) {
+            if (futures.get(acl) == null) {
+                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+                futures.put(acl, future);
+                String indefinite = acl.toFilter().findIndefiniteField();
+                if (indefinite == null) {
+                    aclCreations.add(new AclCreation(acl));
+                } else {
+                    future.completeExceptionally(new 
InvalidRequestException("Invalid ACL creation: " +
+                        indefinite));
+                }
+            }
+        }
+        runnable.call(new Call("createAcls", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new CreateAclsRequest.Builder(aclCreations);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                CreateAclsResponse response = (CreateAclsResponse) 
abstractResponse;
+                List<AclCreationResponse> responses = 
response.aclCreationResponses();
+                Iterator<AclCreationResponse> iter = responses.iterator();
+                for (AclCreation aclCreation : aclCreations) {
+                    KafkaFutureImpl<Void> future = 
futures.get(aclCreation.acl());
+                    if (!iter.hasNext()) {
+                        future.completeExceptionally(new 
UnknownServerException(
+                            "The broker reported no creation result for the 
given ACL."));
+                    } else {
+                        AclCreationResponse creation = iter.next();
+                        if (creation.throwable() != null) {
+                            future.completeExceptionally(creation.throwable());
+                        } else {
+                            future.complete(null);
+                        }
+                    }
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(futures.values(), throwable);
+            }
+        }, now);
+        return new CreateAclsResults(new HashMap<AclBinding, 
KafkaFuture<Void>>(futures));
+    }
+
+    @Override
+    public DeleteAclsResults deleteAcls(Collection<AclBindingFilter> filters, 
DeleteAclsOptions options) {
+        final long now = time.milliseconds();
+        final Map<AclBindingFilter, KafkaFutureImpl<FilterResults>> futures = 
new HashMap<>();
+        final List<AclBindingFilter> filterList = new ArrayList<>();
+        for (AclBindingFilter filter : filters) {
+            if (futures.get(filter) == null) {
+                filterList.add(filter);
+                futures.put(filter, new KafkaFutureImpl<FilterResults>());
+            }
+        }
+        runnable.call(new Call("deleteAcls", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new DeleteAclsRequest.Builder(filterList);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                DeleteAclsResponse response = (DeleteAclsResponse) 
abstractResponse;
+                List<AclFilterResponse> responses = response.responses();
+                Iterator<AclFilterResponse> iter = responses.iterator();
+                for (AclBindingFilter filter : filterList) {
+                    KafkaFutureImpl<FilterResults> future = 
futures.get(filter);
+                    if (!iter.hasNext()) {
+                        future.completeExceptionally(new 
UnknownServerException(
+                            "The broker reported no deletion result for the 
given filter."));
+                    } else {
+                        AclFilterResponse deletion = iter.next();
+                        if (deletion.throwable() != null) {
+                            future.completeExceptionally(deletion.throwable());
+                        } else {
+                            List<FilterResult> filterResults = new 
ArrayList<>();
+                            for (AclDeletionResult deletionResult : 
deletion.deletions()) {
+                                filterResults.add(new 
FilterResult(deletionResult.acl(), deletionResult.exception()));
+                            }
+                            future.complete(new FilterResults(filterResults));
+                        }
+                    }
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(futures.values(), throwable);
+            }
+        }, now);
+        return new DeleteAclsResults(new HashMap<AclBindingFilter, 
KafkaFuture<FilterResults>>(futures));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java
new file mode 100644
index 0000000..9148aac
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a cluster resource with a tuple of (type, name).
+ */
+public class Resource {
+    private final ResourceType resourceType;
+    private final String name;
+
+    public Resource(ResourceType resourceType, String name) {
+        Objects.requireNonNull(resourceType);
+        this.resourceType = resourceType;
+        Objects.requireNonNull(name);
+        this.name = name;
+    }
+
+    public ResourceType resourceType() {
+        return resourceType;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Create a filter which matches only this Resource.
+     */
+    public ResourceFilter toFilter() {
+        return new ResourceFilter(resourceType, name);
+    }
+
+    @Override
+    public String toString() {
+        return "(resourceType=" + resourceType + ", name=" + ((name == null) ? 
"<any>" : name) + ")";
+    }
+
+    /**
+     * Return true if this Resource has any UNKNOWN components.
+     */
+    public boolean unknown() {
+        return resourceType.unknown();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof Resource))
+            return false;
+        Resource other = (Resource) o;
+        return resourceType.equals(other.resourceType) && Objects.equals(name, 
other.name);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(resourceType, name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java
new file mode 100644
index 0000000..6f453b6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * A filter which matches Resource objects.
+ */
+public class ResourceFilter {
+    private final ResourceType resourceType;
+    private final String name;
+
+    public static final ResourceFilter ANY = new 
ResourceFilter(ResourceType.ANY, null);
+
+    public ResourceFilter(ResourceType resourceType, String name) {
+        Objects.requireNonNull(resourceType);
+        this.resourceType = resourceType;
+        this.name = name;
+    }
+
+    public ResourceType resourceType() {
+        return resourceType;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public String toString() {
+        return "(resourceType=" + resourceType + ", name=" + ((name == null) ? 
"<any>" : name) + ")";
+    }
+
+    /**
+     * Return true if this ResourceFilter has any UNKNOWN components.
+     */
+    public boolean unknown() {
+        return resourceType.unknown();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof ResourceFilter))
+            return false;
+        ResourceFilter other = (ResourceFilter) o;
+        return resourceType.equals(other.resourceType) && Objects.equals(name, 
other.name);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(resourceType, name);
+    }
+
+    public boolean matches(Resource other) {
+        if ((name != null) && (!name.equals(other.name())))
+            return false;
+        if ((resourceType != ResourceType.ANY) && 
(!resourceType.equals(other.resourceType())))
+            return false;
+        return true;
+    }
+
+    public boolean matchesAtMostOne() {
+        return findIndefiniteField() == null;
+    }
+
+    public String findIndefiniteField() {
+        if (resourceType == ResourceType.ANY)
+            return "Resource type is ANY.";
+        if (resourceType == ResourceType.UNKNOWN)
+            return "Resource type is UNKNOWN.";
+        if (name == null)
+            return "Resource name is NULL.";
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
new file mode 100644
index 0000000..66a91e3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.HashMap;
+import java.util.Locale;
+
+/**
+ * Represents a type of resource which an ACL can be applied to.
+ */
+public enum ResourceType {
+    /**
+     * Represents any ResourceType which this client cannot understand,
+     * perhaps because this client is too old.
+     */
+    UNKNOWN((byte) 0),
+
+    /**
+     * In a filter, matches any ResourceType.
+     */
+    ANY((byte) 1),
+
+    /**
+     * A Kafka topic.
+     */
+    TOPIC((byte) 2),
+
+    /**
+     * A consumer group.
+     */
+    GROUP((byte) 3),
+
+    /**
+     * The cluster as a whole.
+     */
+    CLUSTER((byte) 4);
+
+    private final static HashMap<Byte, ResourceType> CODE_TO_VALUE = new 
HashMap<>();
+
+    static {
+        for (ResourceType resourceType : ResourceType.values()) {
+            CODE_TO_VALUE.put(resourceType.code, resourceType);
+        }
+    }
+
+    /**
+     * Parse the given string as an ACL resource type.
+     *
+     * @param str    The string to parse.
+     *
+     * @return       The ResourceType, or UNKNOWN if the string could not be 
matched.
+     */
+    public static ResourceType fromString(String str) throws 
IllegalArgumentException {
+        try {
+            return ResourceType.valueOf(str.toUpperCase(Locale.ROOT));
+        } catch (IllegalArgumentException e) {
+            return UNKNOWN;
+        }
+    }
+
+    public static ResourceType fromCode(byte code) {
+        ResourceType resourceType = CODE_TO_VALUE.get(code);
+        if (resourceType == null) {
+            return UNKNOWN;
+        }
+        return resourceType;
+    }
+
+    private final byte code;
+
+    ResourceType(byte code) {
+        this.code = code;
+    }
+
+    public byte code() {
+        return code;
+    }
+
+    public boolean unknown() {
+        return this == UNKNOWN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/errors/SecurityDisabledException.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/SecurityDisabledException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/SecurityDisabledException.java
new file mode 100644
index 0000000..25f3f35
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/SecurityDisabledException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * An error indicating that security is disabled on the broker.
+ */
+public class SecurityDisabledException extends ApiException {
+    private static final long serialVersionUID = 1L;
+
+    public SecurityDisabledException(String message) {
+        super(message);
+    }
+
+    public SecurityDisabledException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index b98a33e..709d927 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -62,7 +62,10 @@ public enum ApiKeys {
     ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false),
     END_TXN(26, "EndTxn", false),
     WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true),
-    TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false);
+    TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false),
+    DESCRIBE_ACLS(29, "DescribeAcls", false),
+    CREATE_ACLS(30, "CreateAcls", false),
+    DELETE_ACLS(31, "DeleteAcls", false);
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 7780fbe..c15edc1 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -62,6 +62,7 @@ import 
org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.ReplicaNotAvailableException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TopicExistsException;
@@ -487,9 +488,14 @@ public enum Errors {
         public ApiException build(String message) {
             return new ProducerIdAuthorizationException(message);
         }
-    });
-
-
+    }),
+    SECURITY_DISABLED(55, "Security features are disabled.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new SecurityDisabledException(message);
+            }
+        });
              
     private interface ApiExceptionBuilder {
         ApiException build(String message);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 08aef4b..e970eb1 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -1549,6 +1549,90 @@ public class Protocol {
     public static final Schema[] TXN_OFFSET_COMMIT_REQUEST = new Schema[] 
{TXN_OFFSET_COMMIT_REQUEST_V0};
     public static final Schema[] TXN_OFFSET_COMMIT_RESPONSE = new Schema[] 
{TXN_OFFSET_COMMIT_RESPONSE_V0};
 
+    public static final Schema DESCRIBE_ACLS_REQUEST_V0 = new Schema(
+        new Field("resource_type", INT8, "The filter resource type."),
+        new Field("resource_name", NULLABLE_STRING, "The filter resource 
name."),
+        new Field("principal", NULLABLE_STRING, "The filter principal name."),
+        new Field("host", NULLABLE_STRING, "The filter ip address."),
+        new Field("operation", INT8, "The filter operation type."),
+        new Field("permission_type", INT8, "The filter permission type.")
+    );
+
+    public static final Schema DESCRIBE_ACLS_RESOURCE = new Schema(
+        new Field("resource_type", INT8, "The resource type"),
+        new Field("resource_name", STRING, "The resource name"),
+        new Field("acls", new ArrayOf(new Schema(
+            new Field("principal", STRING, "The ACL principal"),
+            new Field("host", STRING, "The ACL host"),
+            new Field("operation", INT8, "The ACL operation"),
+            new Field("permission_type", INT8, "The ACL permission type")))));
+
+    public static final Schema DESCRIBE_ACLS_RESPONSE_V0 = new Schema(
+        newThrottleTimeField(),
+        new Field("error_code", INT16, "The error code."),
+        new Field("error_message", NULLABLE_STRING, "The error message."),
+        new Field("resources",
+            new ArrayOf(DESCRIBE_ACLS_RESOURCE),
+            "The resources and their associated ACLs."));
+
+    public static final Schema[] DESCRIBE_ACLS_REQUEST = new Schema[] 
{DESCRIBE_ACLS_REQUEST_V0};
+    public static final Schema[] DESCRIBE_ACLS_RESPONSE  = new Schema[] 
{DESCRIBE_ACLS_RESPONSE_V0};
+
+    public static final Schema CREATE_ACLS_REQUEST_V0 = new Schema(
+        new Field("creations",
+            new ArrayOf(new Schema(
+                new Field("resource_type", INT8, "The resource type."),
+                new Field("resource_name", STRING, "The resource name."),
+                new Field("principal", STRING, "The principal."),
+                new Field("host", STRING, "The ip address."),
+                new Field("operation", INT8, "The ACL operation"),
+                new Field("permission_type", INT8, "The ACL permission type")
+            ))));
+
+    public static final Schema CREATE_ACLS_RESPONSE_V0 = new Schema(
+        newThrottleTimeField(),
+        new Field("creation_responses",
+            new ArrayOf(new Schema(
+                new Field("error_code", INT16, "The error code."),
+                new Field("error_message", NULLABLE_STRING, "The error 
message.")
+            ))));
+
+    public static final Schema[] CREATE_ACLS_REQUEST = new Schema[] 
{CREATE_ACLS_REQUEST_V0};
+    public static final Schema[] CREATE_ACLS_RESPONSE = new Schema[] 
{CREATE_ACLS_RESPONSE_V0};
+
+    public static final Schema DELETE_ACLS_REQUEST_V0 = new Schema(
+        new Field("filters",
+            new ArrayOf(new Schema(
+                new Field("resource_type", INT8, "The resource type filter."),
+                new Field("resource_name", NULLABLE_STRING, "The resource name 
filter."),
+                new Field("principal", NULLABLE_STRING, "The principal 
filter."),
+                new Field("host", NULLABLE_STRING, "The ip address filter."),
+                new Field("operation", INT8, "The ACL operation filter."),
+                new Field("permission_type", INT8, "The ACL permission type 
filter.")
+            ))));
+
+    public static final Schema MATCHING_ACL = new Schema(
+        new Field("error_code", INT16, "The error code."),
+        new Field("error_message", NULLABLE_STRING, "The error message."),
+        new Field("resource_type", INT8, "The resource type."),
+        new Field("resource_name", STRING, "The resource name."),
+        new Field("principal", STRING, "The principal."),
+        new Field("host", STRING, "The ip address."),
+        new Field("operation", INT8, "The ACL operation"),
+        new Field("permission_type", INT8, "The ACL permission type")
+    );
+
+    public static final Schema DELETE_ACLS_RESPONSE_V0 = new Schema(
+        newThrottleTimeField(),
+        new Field("filter_responses",
+            new ArrayOf(new Schema(
+                new Field("error_code", INT16, "The error code."),
+                new Field("error_message", NULLABLE_STRING, "The error 
message."),
+                new Field("matching_acls", new ArrayOf(MATCHING_ACL), "The 
matching ACLs")))));
+
+    public static final Schema[] DELETE_ACLS_REQUEST = new Schema[] 
{DELETE_ACLS_REQUEST_V0};
+    public static final Schema[] DELETE_ACLS_RESPONSE = new Schema[] 
{DELETE_ACLS_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions; a null 
value in the inner array means that the
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 
1][];
@@ -1588,6 +1672,9 @@ public class Protocol {
         REQUESTS[ApiKeys.END_TXN.id] = END_TXN_REQUEST;
         REQUESTS[ApiKeys.WRITE_TXN_MARKERS.id] = WRITE_TXN_REQUEST;
         REQUESTS[ApiKeys.TXN_OFFSET_COMMIT.id] = TXN_OFFSET_COMMIT_REQUEST;
+        REQUESTS[ApiKeys.DESCRIBE_ACLS.id] = DESCRIBE_ACLS_REQUEST;
+        REQUESTS[ApiKeys.CREATE_ACLS.id] = CREATE_ACLS_REQUEST;
+        REQUESTS[ApiKeys.DELETE_ACLS.id] = DELETE_ACLS_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -1618,6 +1705,9 @@ public class Protocol {
         RESPONSES[ApiKeys.END_TXN.id] = END_TXN_RESPONSE;
         RESPONSES[ApiKeys.WRITE_TXN_MARKERS.id] = WRITE_TXN_RESPONSE;
         RESPONSES[ApiKeys.TXN_OFFSET_COMMIT.id] = TXN_OFFSET_COMMIT_RESPONSE;
+        RESPONSES[ApiKeys.DESCRIBE_ACLS.id] = DESCRIBE_ACLS_RESPONSE;
+        RESPONSES[ApiKeys.CREATE_ACLS.id] = CREATE_ACLS_RESPONSE;
+        RESPONSES[ApiKeys.DELETE_ACLS.id] = DELETE_ACLS_RESPONSE;
 
         /* set the minimum and maximum version of each api */
         for (ApiKeys api : ApiKeys.values()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 3aeb879..16c0c21 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -199,6 +199,15 @@ public abstract class AbstractRequest extends 
AbstractRequestResponse {
             case TXN_OFFSET_COMMIT:
                 request = new TxnOffsetCommitRequest(struct, version);
                 break;
+            case DESCRIBE_ACLS:
+                request = new DescribeAclsRequest(struct, version);
+                break;
+            case CREATE_ACLS:
+                request = new CreateAclsRequest(struct, version);
+                break;
+            case DELETE_ACLS:
+                request = new DeleteAclsRequest(struct, version);
+                break;
             default:
                 throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `getRequest`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 617934c..aee4f5e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -108,6 +108,12 @@ public abstract class AbstractResponse extends 
AbstractRequestResponse {
                 return new WriteTxnMarkersResponse(struct);
             case TXN_OFFSET_COMMIT:
                 return new TxnOffsetCommitResponse(struct);
+            case DESCRIBE_ACLS:
+                return new DescribeAclsResponse(struct);
+            case CREATE_ACLS:
+                return new CreateAclsResponse(struct);
+            case DELETE_ACLS:
+                return new DeleteAclsResponse(struct);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `getResponse`, the " +
                         "code should be updated to do so.", apiKey));

Reply via email to