KAFKA-3266; Describe, Create and Delete ACLs Admin APIs (KIP-140) Includes server-side code, protocol and AdminClient.
Author: Colin P. Mccabe <cmcc...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #2941 from cmccabe/KAFKA-3266 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9815e18f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9815e18f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9815e18f Branch: refs/heads/trunk Commit: 9815e18fefa27d8d901356ec1d994a41b4db4622 Parents: 2491520 Author: Colin P. Mccabe <cmcc...@confluent.io> Authored: Thu May 18 03:20:23 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Thu May 18 03:20:30 2017 +0100 ---------------------------------------------------------------------- checkstyle/import-control.xml | 2 + checkstyle/suppressions.xml | 9 +- .../kafka/clients/admin/AccessControlEntry.java | 86 ++++++++ .../clients/admin/AccessControlEntryData.java | 105 +++++++++ .../clients/admin/AccessControlEntryFilter.java | 117 ++++++++++ .../apache/kafka/clients/admin/AclBinding.java | 74 +++++++ .../kafka/clients/admin/AclBindingFilter.java | 89 ++++++++ .../kafka/clients/admin/AclOperation.java | 122 ++++++++++ .../kafka/clients/admin/AclPermissionType.java | 92 ++++++++ .../apache/kafka/clients/admin/AdminClient.java | 66 ++++++ .../kafka/clients/admin/CreateAclsOptions.java | 34 +++ .../kafka/clients/admin/CreateAclsResults.java | 48 ++++ .../kafka/clients/admin/DeleteAclsOptions.java | 34 +++ .../kafka/clients/admin/DeleteAclsResults.java | 107 +++++++++ .../clients/admin/DescribeAclsOptions.java | 34 +++ .../clients/admin/DescribeAclsResults.java | 37 ++++ .../kafka/clients/admin/KafkaAdminClient.java | 173 ++++++++++++++- .../apache/kafka/clients/admin/Resource.java | 74 +++++++ .../kafka/clients/admin/ResourceFilter.java | 90 ++++++++ .../kafka/clients/admin/ResourceType.java | 97 ++++++++ .../errors/SecurityDisabledException.java | 32 +++ .../apache/kafka/common/protocol/ApiKeys.java | 5 +- .../apache/kafka/common/protocol/Errors.java | 12 +- .../apache/kafka/common/protocol/Protocol.java | 90 ++++++++ .../kafka/common/requests/AbstractRequest.java | 9 + .../kafka/common/requests/AbstractResponse.java | 6 + .../common/requests/CreateAclsRequest.java | 133 +++++++++++ .../common/requests/CreateAclsResponse.java | 106 +++++++++ .../common/requests/DeleteAclsRequest.java | 111 ++++++++++ .../common/requests/DeleteAclsResponse.java | 182 +++++++++++++++ .../common/requests/DescribeAclsRequest.java | 90 ++++++++ .../common/requests/DescribeAclsResponse.java | 128 +++++++++++ .../kafka/common/requests/RequestUtils.java | 82 +++++++ .../kafka/clients/admin/AclBindingTest.java | 110 +++++++++ .../kafka/clients/admin/AclOperationTest.java | 86 ++++++++ .../clients/admin/AclPermissionTypeTest.java | 80 +++++++ .../clients/admin/KafkaAdminClientTest.java | 163 ++++++++++++++ .../kafka/clients/admin/ResourceTypeTest.java | 81 +++++++ .../common/requests/RequestResponseTest.java | 80 +++++++ .../scala/kafka/security/auth/Operation.scala | 56 ++++- .../kafka/security/auth/PermissionType.scala | 17 +- .../src/main/scala/kafka/server/KafkaApis.scala | 221 ++++++++++++++++++- .../api/KafkaAdminClientIntegrationTest.scala | 23 +- .../api/SaslSslAdminClientIntegrationTest.scala | 87 +++++++- .../unit/kafka/server/RequestQuotaTest.scala | 24 +- 45 files changed, 3472 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index d40c4d4..21d9d3c 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -108,8 +108,10 @@ </subpackage> <subpackage name="requests"> + <allow pkg="org.apache.kafka.clients.admin" /> <allow pkg="org.apache.kafka.common.protocol" /> <allow pkg="org.apache.kafka.common.network" /> + <allow pkg="org.apache.kafka.common.requests" /> <allow pkg="org.apache.kafka.common.record" /> <!-- for testing --> <allow pkg="org.apache.kafka.common.errors" /> http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/checkstyle/suppressions.xml ---------------------------------------------------------------------- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 9729ee5..66548d9 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -40,7 +40,7 @@ files=".*/protocol/Errors.java"/> <suppress checks="BooleanExpressionComplexity" - files="(Utils|KafkaLZ4BlockOutputStream).java"/> + files="(Utils|KafkaLZ4BlockOutputStream|AclData).java"/> <suppress checks="CyclomaticComplexity" files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|SsLTransportLayer|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer).java"/> @@ -48,12 +48,15 @@ <suppress checks="JavaNCSS" files="KerberosLogin.java"/> + <suppress checks="JavaNCSS" + files="AbstractRequest.java"/> + <suppress checks="NPathComplexity" files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender).java"/> <!-- clients tests --> <suppress checks="ClassDataAbstractionCoupling" - files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse|TransactionManager)Test.java"/> + files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse|TransactionManager|KafkaAdminClient)Test.java"/> <suppress checks="ClassFanOutComplexity" files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher)Test.java"/> @@ -66,7 +69,7 @@ files="DistributedHerder.java"/> <suppress checks="MethodLength" - files="KafkaConfigBackingStore.java"/> + files="(KafkaConfigBackingStore|RequestResponseTest).java"/> <suppress checks="ParameterNumber" files="WorkerSourceTask.java"/> http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntry.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntry.java new file mode 100644 index 0000000..0c36a21 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntry.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.Objects; + +/** + * Represents an access control entry. ACEs are a tuple of principal, host, + * operation, and permissionType. + */ +public class AccessControlEntry { + final AccessControlEntryData data; + + public AccessControlEntry(String principal, String host, AclOperation operation, AclPermissionType permissionType) { + Objects.requireNonNull(principal); + Objects.requireNonNull(host); + Objects.requireNonNull(operation); + assert operation != AclOperation.ANY; + Objects.requireNonNull(permissionType); + assert permissionType != AclPermissionType.ANY; + this.data = new AccessControlEntryData(principal, host, operation, permissionType); + } + + public String principal() { + return data.principal(); + } + + public String host() { + return data.host(); + } + + public AclOperation operation() { + return data.operation(); + } + + public AclPermissionType permissionType() { + return data.permissionType(); + } + + /** + * Create a filter which matches only this AccessControlEntry. + */ + public AccessControlEntryFilter toFilter() { + return new AccessControlEntryFilter(data); + } + + @Override + public String toString() { + return data.toString(); + } + + /** + * Return true if this AclResource has any UNKNOWN components. + */ + public boolean unknown() { + return data.unknown(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof AccessControlEntry)) + return false; + AccessControlEntry other = (AccessControlEntry) o; + return data.equals(other.data); + } + + @Override + public int hashCode() { + return data.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryData.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryData.java b/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryData.java new file mode 100644 index 0000000..81f57ad --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryData.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.Objects; + +/** + * An internal, private class which contains the data stored in AccessControlEntry and + * AccessControlEntryFilter objects. + */ +class AccessControlEntryData { + private final String principal; + private final String host; + private final AclOperation operation; + private final AclPermissionType permissionType; + + AccessControlEntryData(String principal, String host, AclOperation operation, AclPermissionType permissionType) { + this.principal = principal; + this.host = host; + this.operation = operation; + this.permissionType = permissionType; + } + + String principal() { + return principal; + } + + String host() { + return host; + } + + AclOperation operation() { + return operation; + } + + AclPermissionType permissionType() { + return permissionType; + } + + /** + * Returns a string describing an ANY or UNKNOWN field, or null if there is + * no such field. + */ + public String findIndefiniteField() { + if (principal() == null) + return "Principal is NULL"; + if (host() == null) + return "Host is NULL"; + if (operation() == AclOperation.ANY) + return "Operation is ANY"; + if (operation() == AclOperation.UNKNOWN) + return "Operation is UNKNOWN"; + if (permissionType() == AclPermissionType.ANY) + return "Permission type is ANY"; + if (permissionType() == AclPermissionType.UNKNOWN) + return "Permission type is UNKNOWN"; + return null; + } + + @Override + public String toString() { + return "(principal=" + (principal == null ? "<any>" : principal) + + ", host=" + (host == null ? "<any>" : host) + + ", operation=" + operation + + ", permissionType=" + permissionType + ")"; + } + + /** + * Return true if there are any UNKNOWN components. + */ + boolean unknown() { + return operation.unknown() || permissionType.unknown(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof AccessControlEntryData)) + return false; + AccessControlEntryData other = (AccessControlEntryData) o; + return Objects.equals(principal, other.principal) && + Objects.equals(host, other.host) && + Objects.equals(operation, other.operation) && + Objects.equals(permissionType, other.permissionType); + } + + @Override + public int hashCode() { + return Objects.hash(principal, host, operation, permissionType); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryFilter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryFilter.java b/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryFilter.java new file mode 100644 index 0000000..0ec1027 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryFilter.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.Objects; + +/** + * Represents a filter which matches access control entries. + */ +public class AccessControlEntryFilter { + private final AccessControlEntryData data; + + public static final AccessControlEntryFilter ANY = + new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY); + + public AccessControlEntryFilter(String principal, String host, AclOperation operation, AclPermissionType permissionType) { + Objects.requireNonNull(operation); + Objects.requireNonNull(permissionType); + this.data = new AccessControlEntryData(principal, host, operation, permissionType); + } + + /** + * This is a non-public constructor used in AccessControlEntry#toFilter + * + * @param data The access control data. + */ + AccessControlEntryFilter(AccessControlEntryData data) { + this.data = data; + } + + public String principal() { + return data.principal(); + } + + public String host() { + return data.host(); + } + + public AclOperation operation() { + return data.operation(); + } + + public AclPermissionType permissionType() { + return data.permissionType(); + } + + @Override + public String toString() { + return data.toString(); + } + + /** + * Return true if there are any UNKNOWN components. + */ + public boolean unknown() { + return data.unknown(); + } + + /** + * Returns true if this filter matches the given AccessControlEntry. + */ + public boolean matches(AccessControlEntry other) { + if ((principal() != null) && (!data.principal().equals(other.principal()))) + return false; + if ((host() != null) && (!host().equals(other.host()))) + return false; + if ((operation() != AclOperation.ANY) && (!operation().equals(other.operation()))) + return false; + if ((permissionType() != AclPermissionType.ANY) && (!permissionType().equals(other.permissionType()))) + return false; + return true; + } + + /** + * Returns true if this filter could only match one ACE-- in other words, if + * there are no ANY or UNKNOWN fields. + */ + public boolean matchesAtMostOne() { + return findIndefiniteField() == null; + } + + /** + * Returns a string describing an ANY or UNKNOWN field, or null if there is + * no such field. + */ + public String findIndefiniteField() { + return data.findIndefiniteField(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof AccessControlEntryFilter)) + return false; + AccessControlEntryFilter other = (AccessControlEntryFilter) o; + return data.equals(other.data); + } + + @Override + public int hashCode() { + return data.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AclBinding.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AclBinding.java b/clients/src/main/java/org/apache/kafka/clients/admin/AclBinding.java new file mode 100644 index 0000000..45761b4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AclBinding.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.Objects; + +/** + * Represents a binding between a resource and an access control entry. + */ +public class AclBinding { + private final Resource resource; + private final AccessControlEntry entry; + + public AclBinding(Resource resource, AccessControlEntry entry) { + Objects.requireNonNull(resource); + this.resource = resource; + Objects.requireNonNull(entry); + this.entry = entry; + } + + /** + * Return true if this binding has any UNKNOWN components. + */ + public boolean unknown() { + return resource.unknown() || entry.unknown(); + } + + public Resource resource() { + return resource; + } + + public final AccessControlEntry entry() { + return entry; + } + + /** + * Create a filter which matches only this AclBinding. + */ + public AclBindingFilter toFilter() { + return new AclBindingFilter(resource.toFilter(), entry.toFilter()); + } + + @Override + public String toString() { + return "(resource=" + resource + ", entry=" + entry + ")"; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof AclBinding)) + return false; + AclBinding other = (AclBinding) o; + return resource.equals(other.resource) && entry.equals(other.entry); + } + + @Override + public int hashCode() { + return Objects.hash(resource, entry); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AclBindingFilter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AclBindingFilter.java b/clients/src/main/java/org/apache/kafka/clients/admin/AclBindingFilter.java new file mode 100644 index 0000000..5e4142d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AclBindingFilter.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.Objects; + +/** + * A filter which can match AclBinding objects. + */ +public class AclBindingFilter { + private final ResourceFilter resourceFilter; + private final AccessControlEntryFilter entryFilter; + + /** + * A filter which matches any ACL binding. + */ + public static final AclBindingFilter ANY = new AclBindingFilter( + new ResourceFilter(ResourceType.ANY, null), + new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY)); + + public AclBindingFilter(ResourceFilter resourceFilter, AccessControlEntryFilter entryFilter) { + Objects.requireNonNull(resourceFilter); + this.resourceFilter = resourceFilter; + Objects.requireNonNull(entryFilter); + this.entryFilter = entryFilter; + } + + /** + * Return true if this filter has any UNKNOWN components. + */ + public boolean unknown() { + return resourceFilter.unknown() || entryFilter.unknown(); + } + + public ResourceFilter resourceFilter() { + return resourceFilter; + } + + public final AccessControlEntryFilter entryFilter() { + return entryFilter; + } + + @Override + public String toString() { + return "(resourceFilter=" + resourceFilter + ", entryFilter=" + entryFilter + ")"; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof AclBindingFilter)) + return false; + AclBindingFilter other = (AclBindingFilter) o; + return resourceFilter.equals(other.resourceFilter) && entryFilter.equals(other.entryFilter); + } + + public boolean matchesAtMostOne() { + return resourceFilter.matchesAtMostOne() && entryFilter.matchesAtMostOne(); + } + + public String findIndefiniteField() { + String indefinite = resourceFilter.findIndefiniteField(); + if (indefinite != null) + return indefinite; + return entryFilter.findIndefiniteField(); + } + + public boolean matches(AclBinding binding) { + return resourceFilter.matches(binding.resource()) && entryFilter.matches(binding.entry()); + } + + @Override + public int hashCode() { + return Objects.hash(resourceFilter, entryFilter); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java new file mode 100644 index 0000000..14fb61b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.HashMap; +import java.util.Locale; + +/** + * Represents an operation which an ACL grants or denies permission to perform. + */ +public enum AclOperation { + /** + * Represents any AclOperation which this client cannot understand, perhaps because this + * client is too old. + */ + UNKNOWN((byte) 0), + + /** + * In a filter, matches any AclOperation. + */ + ANY((byte) 1), + + /** + * ALL operation. + */ + ALL((byte) 2), + + /** + * READ operation. + */ + READ((byte) 3), + + /** + * WRITE operation. + */ + WRITE((byte) 4), + + /** + * CREATE operation. + */ + CREATE((byte) 5), + + /** + * DELETE operation. + */ + DELETE((byte) 6), + + /** + * ALTER operation. + */ + ALTER((byte) 7), + + /** + * DESCRIBE operation. + */ + DESCRIBE((byte) 8), + + /** + * CLUSTER_ACTION operation. + */ + CLUSTER_ACTION((byte) 9); + + private final static HashMap<Byte, AclOperation> CODE_TO_VALUE = new HashMap<>(); + + static { + for (AclOperation operation : AclOperation.values()) { + CODE_TO_VALUE.put(operation.code, operation); + } + } + + /** + * Parse the given string as an ACL operation. + * + * @param str The string to parse. + * + * @return The AclOperation, or UNKNOWN if the string could not be matched. + */ + public static AclOperation fromString(String str) throws IllegalArgumentException { + try { + return AclOperation.valueOf(str.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + return UNKNOWN; + } + } + + public static AclOperation fromCode(byte code) { + AclOperation operation = CODE_TO_VALUE.get(code); + if (operation == null) { + return UNKNOWN; + } + return operation; + } + + private final byte code; + + AclOperation(byte code) { + this.code = code; + } + + public byte code() { + return code; + } + + public boolean unknown() { + return this == UNKNOWN; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AclPermissionType.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AclPermissionType.java b/clients/src/main/java/org/apache/kafka/clients/admin/AclPermissionType.java new file mode 100644 index 0000000..9181c6b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AclPermissionType.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.HashMap; +import java.util.Locale; + +/** + * Represents whether an ACL grants or denies permissions. + */ +public enum AclPermissionType { + /** + * Represents any AclPermissionType which this client cannot understand, + * perhaps because this client is too old. + */ + UNKNOWN((byte) 0), + + /** + * In a filter, matches any AclPermissionType. + */ + ANY((byte) 1), + + /** + * Disallows access. + */ + DENY((byte) 2), + + /** + * Grants access. + */ + ALLOW((byte) 3); + + private final static HashMap<Byte, AclPermissionType> CODE_TO_VALUE = new HashMap<>(); + + static { + for (AclPermissionType permissionType : AclPermissionType.values()) { + CODE_TO_VALUE.put(permissionType.code, permissionType); + } + } + + /** + * Parse the given string as an ACL permission. + * + * @param str The string to parse. + * + * @return The AclPermissionType, or UNKNOWN if the string could not be matched. + */ + public static AclPermissionType fromString(String str) { + try { + return AclPermissionType.valueOf(str.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + return UNKNOWN; + } + } + + public static AclPermissionType fromCode(byte code) { + AclPermissionType permissionType = CODE_TO_VALUE.get(code); + if (permissionType == null) { + return UNKNOWN; + } + return permissionType; + } + + private final byte code; + + AclPermissionType(byte code) { + this.code = code; + } + + public byte code() { + return code; + } + + public boolean unknown() { + return this == UNKNOWN; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index a976ca4..8bb495c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -194,4 +194,70 @@ public abstract class AdminClient implements AutoCloseable { * @return The ApiVersionsResults. */ public abstract ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options); + + /** + * Similar to #{@link AdminClient#describeAcls(AclBindingFilter, DescribeAclsOptions), + * but uses the default options. + * + * @param filter The filter to use. + * @return The DeleteAclsResult. + */ + public DescribeAclsResults describeAcls(AclBindingFilter filter) { + return describeAcls(filter, new DescribeAclsOptions()); + } + + /** + * Lists access control lists (ACLs) according to the supplied filter. + * + * Note: it may take some time for changes made by createAcls or deleteAcls to be reflected + * in the output of describeAcls. + * + * @param filter The filter to use. + * @param options The options to use when listing the ACLs. + * @return The DeleteAclsResult. + */ + public abstract DescribeAclsResults describeAcls(AclBindingFilter filter, DescribeAclsOptions options); + + /** + * Similar to #{@link AdminClient#createAcls(Collection<AclBinding>, CreateAclsOptions), + * but uses the default options. + * + * @param acls The ACLs to create + * @return The CreateAclsResult. + */ + public CreateAclsResults createAcls(Collection<AclBinding> acls) { + return createAcls(acls, new CreateAclsOptions()); + } + + /** + * Creates access control lists (ACLs) which are bound to specific resources. + * + * If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but + * no changes will be made. + * + * @param acls The ACLs to create + * @param options The options to use when creating the ACLs. + * @return The CreateAclsResult. + */ + public abstract CreateAclsResults createAcls(Collection<AclBinding> acls, CreateAclsOptions options); + + /** + * Similar to #{@link AdminClient#deleteAcls(Collection<AclBinding>, DeleteAclsOptions), + * but uses the default options. + * + * @param filters The filters to use. + * @return The DeleteAclsResult. + */ + public DeleteAclsResults deleteAcls(Collection<AclBindingFilter> filters) { + return deleteAcls(filters, new DeleteAclsOptions()); + } + + /** + * Deletes access control lists (ACLs) according to the supplied filters. + * + * @param filters The filters to use. + * @param options The options to use when deleting the ACLs. + * @return The DeleteAclsResult. + */ + public abstract DeleteAclsResults deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java new file mode 100644 index 0000000..adaaf1a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +/** + * Options for the createAcls call. + */ +public class CreateAclsOptions { + private Integer timeoutMs = null; + + public CreateAclsOptions timeoutMs(Integer timeoutMs) { + this.timeoutMs = timeoutMs; + return this; + } + + public Integer timeoutMs() { + return timeoutMs; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResults.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResults.java new file mode 100644 index 0000000..6908037 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResults.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; + +import java.util.Map; + +/** + * The result of the createAcls call. + */ +public class CreateAclsResults { + private final Map<AclBinding, KafkaFuture<Void>> futures; + + CreateAclsResults(Map<AclBinding, KafkaFuture<Void>> futures) { + this.futures = futures; + } + + /** + * Return a map from topic names to futures which can be used to check the status of + * individual deletions. + */ + public Map<AclBinding, KafkaFuture<Void>> results() { + return futures; + } + + /** + * Return a future which succeeds only if all the topic deletions succeed. + */ + public KafkaFuture<Void> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java new file mode 100644 index 0000000..4d06cef --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +/** + * Options for the deleteAcls call. + */ +public class DeleteAclsOptions { + private Integer timeoutMs = null; + + public DeleteAclsOptions timeoutMs(Integer timeoutMs) { + this.timeoutMs = timeoutMs; + return this; + } + + public Integer timeoutMs() { + return timeoutMs; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResults.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResults.java new file mode 100644 index 0000000..dfb2e6b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResults.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.errors.ApiException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * The result of the deleteAcls call. + */ +public class DeleteAclsResults { + public static class FilterResult { + private final AclBinding acl; + private final ApiException exception; + + FilterResult(AclBinding acl, ApiException exception) { + this.acl = acl; + this.exception = exception; + } + + public AclBinding acl() { + return acl; + } + + public ApiException exception() { + return exception; + } + } + + public static class FilterResults { + private final List<FilterResult> acls; + + FilterResults(List<FilterResult> acls) { + this.acls = acls; + } + + public List<FilterResult> acls() { + return acls; + } + } + + private final Map<AclBindingFilter, KafkaFuture<FilterResults>> futures; + + DeleteAclsResults(Map<AclBindingFilter, KafkaFuture<FilterResults>> futures) { + this.futures = futures; + } + + /** + * Return a map from topic names to futures which can be used to check the status of + * individual deletions. + */ + public Map<AclBindingFilter, KafkaFuture<FilterResults>> results() { + return futures; + } + + /** + * Return a future which succeeds only if all the ACLs deletions succeed, and which contains all the deleted ACLs. + * Note that it if the filters don't match any ACLs, this is not considered an error. + */ + public KafkaFuture<Collection<AclBinding>> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply( + new KafkaFuture.Function<Void, Collection<AclBinding>>() { + @Override + public Collection<AclBinding> apply(Void v) { + List<AclBinding> acls = new ArrayList<>(); + for (Map.Entry<AclBindingFilter, KafkaFuture<FilterResults>> entry : futures.entrySet()) { + FilterResults results; + try { + results = entry.getValue().get(); + } catch (Throwable e) { + // This should be unreachable, since the future returned by KafkaFuture#allOf should + // have failed if any Future failed. + throw new KafkaException("DeleteAclsResults#all: internal error", e); + } + for (FilterResult result : results.acls()) { + if (result.exception() != null) { + throw result.exception(); + } + acls.add(result.acl()); + } + } + return acls; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java new file mode 100644 index 0000000..3f98304 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +/** + * Options for the describeAcls call. + */ +public class DescribeAclsOptions { + private Integer timeoutMs = null; + + public DescribeAclsOptions timeoutMs(Integer timeoutMs) { + this.timeoutMs = timeoutMs; + return this; + } + + public Integer timeoutMs() { + return timeoutMs; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResults.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResults.java new file mode 100644 index 0000000..dea98ab --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResults.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; + +import java.util.Collection; + +/** + * The result of the describeAcls call. + */ +public class DescribeAclsResults { + private final KafkaFuture<Collection<AclBinding>> future; + + DescribeAclsResults(KafkaFuture<Collection<AclBinding>> future) { + this.future = future; + } + + public KafkaFuture<Collection<AclBinding>> all() { + return future; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 7dde027..9f1b1b2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -25,6 +25,8 @@ import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.admin.DeleteAclsResults.FilterResult; +import org.apache.kafka.clients.admin.DeleteAclsResults.FilterResults; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; @@ -34,9 +36,11 @@ import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.metrics.JmxReporter; @@ -51,10 +55,20 @@ import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.CreateAclsRequest; +import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation; +import org.apache.kafka.common.requests.CreateAclsResponse; +import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.CreateTopicsResponse; +import org.apache.kafka.common.requests.DeleteAclsRequest; +import org.apache.kafka.common.requests.DeleteAclsResponse; +import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; +import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; import org.apache.kafka.common.requests.DeleteTopicsRequest; import org.apache.kafka.common.requests.DeleteTopicsResponse; +import org.apache.kafka.common.requests.DescribeAclsRequest; +import org.apache.kafka.common.requests.DescribeAclsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.utils.KafkaThread; @@ -819,12 +833,12 @@ public class KafkaAdminClient extends AdminClient { public CreateTopicResults createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options) { final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(newTopics.size()); - for (NewTopic newTopic : newTopics) { - topicFutures.put(newTopic.name(), new KafkaFutureImpl<Void>()); - } final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<>(newTopics.size()); for (NewTopic newTopic : newTopics) { - topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails()); + if (topicFutures.get(newTopic.name()) == null) { + topicFutures.put(newTopic.name(), new KafkaFutureImpl<Void>()); + topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails()); + } } final long now = time.milliseconds(); runnable.call(new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()), @@ -875,7 +889,9 @@ public class KafkaAdminClient extends AdminClient { DeleteTopicsOptions options) { final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size()); for (String topicName : topicNames) { - topicFutures.put(topicName, new KafkaFutureImpl<Void>()); + if (topicFutures.get(topicName) == null) { + topicFutures.put(topicName, new KafkaFutureImpl<Void>()); + } } final long now = time.milliseconds(); runnable.call(new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()), @@ -957,8 +973,12 @@ public class KafkaAdminClient extends AdminClient { @Override public DescribeTopicsResults describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options) { final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size()); + final ArrayList<String> topicNamesList = new ArrayList<>(); for (String topicName : topicNames) { - topicFutures.put(topicName, new KafkaFutureImpl<TopicDescription>()); + if (topicFutures.get(topicName) == null) { + topicFutures.put(topicName, new KafkaFutureImpl<TopicDescription>()); + topicNamesList.add(topicName); + } } final long now = time.milliseconds(); runnable.call(new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), @@ -966,7 +986,7 @@ public class KafkaAdminClient extends AdminClient { @Override AbstractRequest.Builder createRequest(int timeoutMs) { - return new MetadataRequest.Builder(new ArrayList<>(topicNames)); + return new MetadataRequest.Builder(topicNamesList); } @Override @@ -1047,6 +1067,8 @@ public class KafkaAdminClient extends AdminClient { final long deadlineMs = calcDeadlineMs(now, options.timeoutMs()); Map<Node, KafkaFuture<NodeApiVersions>> nodeFutures = new HashMap<>(); for (final Node node : nodes) { + if (nodeFutures.get(node) != null) + continue; final KafkaFutureImpl<NodeApiVersions> nodeFuture = new KafkaFutureImpl<>(); nodeFutures.put(node, nodeFuture); runnable.call(new Call("apiVersions", deadlineMs, new ConstantAdminNodeProvider(node)) { @@ -1070,4 +1092,141 @@ public class KafkaAdminClient extends AdminClient { return new ApiVersionsResults(nodeFutures); } + + @Override + public DescribeAclsResults describeAcls(final AclBindingFilter filter, DescribeAclsOptions options) { + final long now = time.milliseconds(); + final KafkaFutureImpl<Collection<AclBinding>> future = new KafkaFutureImpl<>(); + runnable.call(new Call("describeAcls", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new DescribeAclsRequest.Builder(filter); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeAclsResponse response = (DescribeAclsResponse) abstractResponse; + if (response.throwable() != null) { + future.completeExceptionally(response.throwable()); + } else { + future.complete(response.acls()); + } + } + + @Override + void handleFailure(Throwable throwable) { + future.completeExceptionally(throwable); + } + }, now); + return new DescribeAclsResults(future); + } + + @Override + public CreateAclsResults createAcls(Collection<AclBinding> acls, CreateAclsOptions options) { + final long now = time.milliseconds(); + final Map<AclBinding, KafkaFutureImpl<Void>> futures = new HashMap<>(); + final List<AclCreation> aclCreations = new ArrayList<>(); + for (AclBinding acl : acls) { + if (futures.get(acl) == null) { + KafkaFutureImpl<Void> future = new KafkaFutureImpl<>(); + futures.put(acl, future); + String indefinite = acl.toFilter().findIndefiniteField(); + if (indefinite == null) { + aclCreations.add(new AclCreation(acl)); + } else { + future.completeExceptionally(new InvalidRequestException("Invalid ACL creation: " + + indefinite)); + } + } + } + runnable.call(new Call("createAcls", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new CreateAclsRequest.Builder(aclCreations); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + CreateAclsResponse response = (CreateAclsResponse) abstractResponse; + List<AclCreationResponse> responses = response.aclCreationResponses(); + Iterator<AclCreationResponse> iter = responses.iterator(); + for (AclCreation aclCreation : aclCreations) { + KafkaFutureImpl<Void> future = futures.get(aclCreation.acl()); + if (!iter.hasNext()) { + future.completeExceptionally(new UnknownServerException( + "The broker reported no creation result for the given ACL.")); + } else { + AclCreationResponse creation = iter.next(); + if (creation.throwable() != null) { + future.completeExceptionally(creation.throwable()); + } else { + future.complete(null); + } + } + } + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(futures.values(), throwable); + } + }, now); + return new CreateAclsResults(new HashMap<AclBinding, KafkaFuture<Void>>(futures)); + } + + @Override + public DeleteAclsResults deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options) { + final long now = time.milliseconds(); + final Map<AclBindingFilter, KafkaFutureImpl<FilterResults>> futures = new HashMap<>(); + final List<AclBindingFilter> filterList = new ArrayList<>(); + for (AclBindingFilter filter : filters) { + if (futures.get(filter) == null) { + filterList.add(filter); + futures.put(filter, new KafkaFutureImpl<FilterResults>()); + } + } + runnable.call(new Call("deleteAcls", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new DeleteAclsRequest.Builder(filterList); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + DeleteAclsResponse response = (DeleteAclsResponse) abstractResponse; + List<AclFilterResponse> responses = response.responses(); + Iterator<AclFilterResponse> iter = responses.iterator(); + for (AclBindingFilter filter : filterList) { + KafkaFutureImpl<FilterResults> future = futures.get(filter); + if (!iter.hasNext()) { + future.completeExceptionally(new UnknownServerException( + "The broker reported no deletion result for the given filter.")); + } else { + AclFilterResponse deletion = iter.next(); + if (deletion.throwable() != null) { + future.completeExceptionally(deletion.throwable()); + } else { + List<FilterResult> filterResults = new ArrayList<>(); + for (AclDeletionResult deletionResult : deletion.deletions()) { + filterResults.add(new FilterResult(deletionResult.acl(), deletionResult.exception())); + } + future.complete(new FilterResults(filterResults)); + } + } + } + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(futures.values(), throwable); + } + }, now); + return new DeleteAclsResults(new HashMap<AclBindingFilter, KafkaFuture<FilterResults>>(futures)); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java b/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java new file mode 100644 index 0000000..9148aac --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.Objects; + +/** + * Represents a cluster resource with a tuple of (type, name). + */ +public class Resource { + private final ResourceType resourceType; + private final String name; + + public Resource(ResourceType resourceType, String name) { + Objects.requireNonNull(resourceType); + this.resourceType = resourceType; + Objects.requireNonNull(name); + this.name = name; + } + + public ResourceType resourceType() { + return resourceType; + } + + public String name() { + return name; + } + + /** + * Create a filter which matches only this Resource. + */ + public ResourceFilter toFilter() { + return new ResourceFilter(resourceType, name); + } + + @Override + public String toString() { + return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")"; + } + + /** + * Return true if this Resource has any UNKNOWN components. + */ + public boolean unknown() { + return resourceType.unknown(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Resource)) + return false; + Resource other = (Resource) o; + return resourceType.equals(other.resourceType) && Objects.equals(name, other.name); + } + + @Override + public int hashCode() { + return Objects.hash(resourceType, name); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java new file mode 100644 index 0000000..6f453b6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.Objects; + +/** + * A filter which matches Resource objects. + */ +public class ResourceFilter { + private final ResourceType resourceType; + private final String name; + + public static final ResourceFilter ANY = new ResourceFilter(ResourceType.ANY, null); + + public ResourceFilter(ResourceType resourceType, String name) { + Objects.requireNonNull(resourceType); + this.resourceType = resourceType; + this.name = name; + } + + public ResourceType resourceType() { + return resourceType; + } + + public String name() { + return name; + } + + @Override + public String toString() { + return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")"; + } + + /** + * Return true if this ResourceFilter has any UNKNOWN components. + */ + public boolean unknown() { + return resourceType.unknown(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ResourceFilter)) + return false; + ResourceFilter other = (ResourceFilter) o; + return resourceType.equals(other.resourceType) && Objects.equals(name, other.name); + } + + @Override + public int hashCode() { + return Objects.hash(resourceType, name); + } + + public boolean matches(Resource other) { + if ((name != null) && (!name.equals(other.name()))) + return false; + if ((resourceType != ResourceType.ANY) && (!resourceType.equals(other.resourceType()))) + return false; + return true; + } + + public boolean matchesAtMostOne() { + return findIndefiniteField() == null; + } + + public String findIndefiniteField() { + if (resourceType == ResourceType.ANY) + return "Resource type is ANY."; + if (resourceType == ResourceType.UNKNOWN) + return "Resource type is UNKNOWN."; + if (name == null) + return "Resource name is NULL."; + return null; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java new file mode 100644 index 0000000..66a91e3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.HashMap; +import java.util.Locale; + +/** + * Represents a type of resource which an ACL can be applied to. + */ +public enum ResourceType { + /** + * Represents any ResourceType which this client cannot understand, + * perhaps because this client is too old. + */ + UNKNOWN((byte) 0), + + /** + * In a filter, matches any ResourceType. + */ + ANY((byte) 1), + + /** + * A Kafka topic. + */ + TOPIC((byte) 2), + + /** + * A consumer group. + */ + GROUP((byte) 3), + + /** + * The cluster as a whole. + */ + CLUSTER((byte) 4); + + private final static HashMap<Byte, ResourceType> CODE_TO_VALUE = new HashMap<>(); + + static { + for (ResourceType resourceType : ResourceType.values()) { + CODE_TO_VALUE.put(resourceType.code, resourceType); + } + } + + /** + * Parse the given string as an ACL resource type. + * + * @param str The string to parse. + * + * @return The ResourceType, or UNKNOWN if the string could not be matched. + */ + public static ResourceType fromString(String str) throws IllegalArgumentException { + try { + return ResourceType.valueOf(str.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + return UNKNOWN; + } + } + + public static ResourceType fromCode(byte code) { + ResourceType resourceType = CODE_TO_VALUE.get(code); + if (resourceType == null) { + return UNKNOWN; + } + return resourceType; + } + + private final byte code; + + ResourceType(byte code) { + this.code = code; + } + + public byte code() { + return code; + } + + public boolean unknown() { + return this == UNKNOWN; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/errors/SecurityDisabledException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/SecurityDisabledException.java b/clients/src/main/java/org/apache/kafka/common/errors/SecurityDisabledException.java new file mode 100644 index 0000000..25f3f35 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/SecurityDisabledException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * An error indicating that security is disabled on the broker. + */ +public class SecurityDisabledException extends ApiException { + private static final long serialVersionUID = 1L; + + public SecurityDisabledException(String message) { + super(message); + } + + public SecurityDisabledException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index b98a33e..709d927 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -62,7 +62,10 @@ public enum ApiKeys { ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false), END_TXN(26, "EndTxn", false), WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true), - TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false); + TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false), + DESCRIBE_ACLS(29, "DescribeAcls", false), + CREATE_ACLS(30, "CreateAcls", false), + DELETE_ACLS(31, "DeleteAcls", false); private static final ApiKeys[] ID_TO_TYPE; private static final int MIN_API_KEY = 0; http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 7780fbe..c15edc1 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -62,6 +62,7 @@ import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.ReplicaNotAvailableException; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicExistsException; @@ -487,9 +488,14 @@ public enum Errors { public ApiException build(String message) { return new ProducerIdAuthorizationException(message); } - }); - - + }), + SECURITY_DISABLED(55, "Security features are disabled.", + new ApiExceptionBuilder() { + @Override + public ApiException build(String message) { + return new SecurityDisabledException(message); + } + }); private interface ApiExceptionBuilder { ApiException build(String message); http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 08aef4b..e970eb1 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -1549,6 +1549,90 @@ public class Protocol { public static final Schema[] TXN_OFFSET_COMMIT_REQUEST = new Schema[] {TXN_OFFSET_COMMIT_REQUEST_V0}; public static final Schema[] TXN_OFFSET_COMMIT_RESPONSE = new Schema[] {TXN_OFFSET_COMMIT_RESPONSE_V0}; + public static final Schema DESCRIBE_ACLS_REQUEST_V0 = new Schema( + new Field("resource_type", INT8, "The filter resource type."), + new Field("resource_name", NULLABLE_STRING, "The filter resource name."), + new Field("principal", NULLABLE_STRING, "The filter principal name."), + new Field("host", NULLABLE_STRING, "The filter ip address."), + new Field("operation", INT8, "The filter operation type."), + new Field("permission_type", INT8, "The filter permission type.") + ); + + public static final Schema DESCRIBE_ACLS_RESOURCE = new Schema( + new Field("resource_type", INT8, "The resource type"), + new Field("resource_name", STRING, "The resource name"), + new Field("acls", new ArrayOf(new Schema( + new Field("principal", STRING, "The ACL principal"), + new Field("host", STRING, "The ACL host"), + new Field("operation", INT8, "The ACL operation"), + new Field("permission_type", INT8, "The ACL permission type"))))); + + public static final Schema DESCRIBE_ACLS_RESPONSE_V0 = new Schema( + newThrottleTimeField(), + new Field("error_code", INT16, "The error code."), + new Field("error_message", NULLABLE_STRING, "The error message."), + new Field("resources", + new ArrayOf(DESCRIBE_ACLS_RESOURCE), + "The resources and their associated ACLs.")); + + public static final Schema[] DESCRIBE_ACLS_REQUEST = new Schema[] {DESCRIBE_ACLS_REQUEST_V0}; + public static final Schema[] DESCRIBE_ACLS_RESPONSE = new Schema[] {DESCRIBE_ACLS_RESPONSE_V0}; + + public static final Schema CREATE_ACLS_REQUEST_V0 = new Schema( + new Field("creations", + new ArrayOf(new Schema( + new Field("resource_type", INT8, "The resource type."), + new Field("resource_name", STRING, "The resource name."), + new Field("principal", STRING, "The principal."), + new Field("host", STRING, "The ip address."), + new Field("operation", INT8, "The ACL operation"), + new Field("permission_type", INT8, "The ACL permission type") + )))); + + public static final Schema CREATE_ACLS_RESPONSE_V0 = new Schema( + newThrottleTimeField(), + new Field("creation_responses", + new ArrayOf(new Schema( + new Field("error_code", INT16, "The error code."), + new Field("error_message", NULLABLE_STRING, "The error message.") + )))); + + public static final Schema[] CREATE_ACLS_REQUEST = new Schema[] {CREATE_ACLS_REQUEST_V0}; + public static final Schema[] CREATE_ACLS_RESPONSE = new Schema[] {CREATE_ACLS_RESPONSE_V0}; + + public static final Schema DELETE_ACLS_REQUEST_V0 = new Schema( + new Field("filters", + new ArrayOf(new Schema( + new Field("resource_type", INT8, "The resource type filter."), + new Field("resource_name", NULLABLE_STRING, "The resource name filter."), + new Field("principal", NULLABLE_STRING, "The principal filter."), + new Field("host", NULLABLE_STRING, "The ip address filter."), + new Field("operation", INT8, "The ACL operation filter."), + new Field("permission_type", INT8, "The ACL permission type filter.") + )))); + + public static final Schema MATCHING_ACL = new Schema( + new Field("error_code", INT16, "The error code."), + new Field("error_message", NULLABLE_STRING, "The error message."), + new Field("resource_type", INT8, "The resource type."), + new Field("resource_name", STRING, "The resource name."), + new Field("principal", STRING, "The principal."), + new Field("host", STRING, "The ip address."), + new Field("operation", INT8, "The ACL operation"), + new Field("permission_type", INT8, "The ACL permission type") + ); + + public static final Schema DELETE_ACLS_RESPONSE_V0 = new Schema( + newThrottleTimeField(), + new Field("filter_responses", + new ArrayOf(new Schema( + new Field("error_code", INT16, "The error code."), + new Field("error_message", NULLABLE_STRING, "The error message."), + new Field("matching_acls", new ArrayOf(MATCHING_ACL), "The matching ACLs"))))); + + public static final Schema[] DELETE_ACLS_REQUEST = new Schema[] {DELETE_ACLS_REQUEST_V0}; + public static final Schema[] DELETE_ACLS_RESPONSE = new Schema[] {DELETE_ACLS_RESPONSE_V0}; + /* an array of all requests and responses with all schema versions; a null value in the inner array means that the * particular version is not supported */ public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][]; @@ -1588,6 +1672,9 @@ public class Protocol { REQUESTS[ApiKeys.END_TXN.id] = END_TXN_REQUEST; REQUESTS[ApiKeys.WRITE_TXN_MARKERS.id] = WRITE_TXN_REQUEST; REQUESTS[ApiKeys.TXN_OFFSET_COMMIT.id] = TXN_OFFSET_COMMIT_REQUEST; + REQUESTS[ApiKeys.DESCRIBE_ACLS.id] = DESCRIBE_ACLS_REQUEST; + REQUESTS[ApiKeys.CREATE_ACLS.id] = CREATE_ACLS_REQUEST; + REQUESTS[ApiKeys.DELETE_ACLS.id] = DELETE_ACLS_REQUEST; RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE; RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE; @@ -1618,6 +1705,9 @@ public class Protocol { RESPONSES[ApiKeys.END_TXN.id] = END_TXN_RESPONSE; RESPONSES[ApiKeys.WRITE_TXN_MARKERS.id] = WRITE_TXN_RESPONSE; RESPONSES[ApiKeys.TXN_OFFSET_COMMIT.id] = TXN_OFFSET_COMMIT_RESPONSE; + RESPONSES[ApiKeys.DESCRIBE_ACLS.id] = DESCRIBE_ACLS_RESPONSE; + RESPONSES[ApiKeys.CREATE_ACLS.id] = CREATE_ACLS_RESPONSE; + RESPONSES[ApiKeys.DELETE_ACLS.id] = DELETE_ACLS_RESPONSE; /* set the minimum and maximum version of each api */ for (ApiKeys api : ApiKeys.values()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 3aeb879..16c0c21 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -199,6 +199,15 @@ public abstract class AbstractRequest extends AbstractRequestResponse { case TXN_OFFSET_COMMIT: request = new TxnOffsetCommitRequest(struct, version); break; + case DESCRIBE_ACLS: + request = new DescribeAclsRequest(struct, version); + break; + case CREATE_ACLS: + request = new CreateAclsRequest(struct, version); + break; + case DELETE_ACLS: + request = new DeleteAclsRequest(struct, version); + break; default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " + "code should be updated to do so.", apiKey)); http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 617934c..aee4f5e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -108,6 +108,12 @@ public abstract class AbstractResponse extends AbstractRequestResponse { return new WriteTxnMarkersResponse(struct); case TXN_OFFSET_COMMIT: return new TxnOffsetCommitResponse(struct); + case DESCRIBE_ACLS: + return new DescribeAclsResponse(struct); + case CREATE_ACLS: + return new CreateAclsResponse(struct); + case DELETE_ACLS: + return new DeleteAclsResponse(struct); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " + "code should be updated to do so.", apiKey));