KAFKA-5783; Add KafkaPrincipalBuilder with support for SASL (KIP-189) Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <ism...@juma.me.uk>, Rajini Sivaram <rajinisiva...@googlemail.com>, Manikumar Reddy <manikumar.re...@gmail.com> Closes #3795 from hachikuji/KAFKA-5783 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3b5d88fe Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3b5d88fe Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3b5d88fe Branch: refs/heads/trunk Commit: 3b5d88febb186c4b92cc514fa2338de7bc338f67 Parents: f7b1add Author: Jason Gustafson <ja...@confluent.io> Authored: Thu Sep 14 10:16:00 2017 +0100 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Thu Sep 14 10:16:00 2017 +0100 ---------------------------------------------------------------------- checkstyle/import-control.xml | 5 +- .../apache/kafka/common/config/SaslConfigs.java | 44 +++-- .../apache/kafka/common/config/SslConfigs.java | 42 +++-- .../config/internals/BrokerSecurityConfigs.java | 70 ++++++++ .../kafka/common/network/Authenticator.java | 19 +-- .../kafka/common/network/ChannelBuilder.java | 3 +- .../kafka/common/network/ChannelBuilders.java | 48 ++++-- .../common/network/DefaultAuthenticator.java | 63 ------- .../kafka/common/network/KafkaChannel.java | 12 +- .../common/network/PlaintextChannelBuilder.java | 64 +++++-- .../common/network/SaslChannelBuilder.java | 43 ++--- .../kafka/common/network/SslChannelBuilder.java | 81 +++++++-- .../security/auth/AuthenticationContext.java | 36 ++++ .../security/auth/DefaultPrincipalBuilder.java | 6 +- .../common/security/auth/KafkaPrincipal.java | 70 +++++--- .../security/auth/KafkaPrincipalBuilder.java | 36 ++++ .../auth/PlaintextAuthenticationContext.java | 40 +++++ .../common/security/auth/PrincipalBuilder.java | 5 +- .../auth/SaslAuthenticationContext.java | 48 ++++++ .../security/auth/SslAuthenticationContext.java | 46 +++++ .../DefaultKafkaPrincipalBuilder.java | 149 ++++++++++++++++ .../authenticator/SaslClientAuthenticator.java | 33 ++-- .../authenticator/SaslServerAuthenticator.java | 67 +++++--- .../SaslServerCallbackHandler.java | 21 +-- .../kafka/common/security/ssl/SslFactory.java | 3 +- .../kafka/common/utils/SecurityUtils.java | 37 ++++ .../org/apache/kafka/common/utils/Utils.java | 4 +- .../apache/kafka/common/network/CertStores.java | 8 +- .../common/network/ChannelBuildersTest.java | 107 ++++++++++++ .../kafka/common/network/SslSelectorTest.java | 37 ++-- .../common/network/SslTransportLayerTest.java | 62 +++---- .../common/security/TestSecurityConfig.java | 16 +- .../auth/DefaultKafkaPrincipalBuilderTest.java | 170 +++++++++++++++++++ .../security/auth/KafkaPrincipalTest.java | 9 - .../authenticator/SaslAuthenticatorTest.java | 9 +- .../SaslServerAuthenticatorTest.java | 22 +-- .../kafka/common/utils/SecurityUtilsTest.java | 43 +++++ .../main/scala/kafka/network/SocketServer.scala | 6 +- .../main/scala/kafka/server/KafkaConfig.scala | 24 +-- .../kafka/api/EndToEndAuthorizationTest.scala | 18 +- .../kafka/api/IntegrationTestHarness.scala | 14 +- .../PlaintextEndToEndAuthorizationTest.scala | 71 ++++++++ .../SaslPlainSslEndToEndAuthorizationTest.scala | 29 +++- .../scala/integration/kafka/api/SaslSetup.scala | 3 +- .../api/SslEndToEndAuthorizationTest.scala | 35 +++- .../integration/KafkaServerTestHarness.scala | 2 +- .../security/auth/SimpleAclAuthorizerTest.scala | 20 +-- .../unit/kafka/server/RequestQuotaTest.scala | 11 +- .../scala/unit/kafka/utils/JaasTestUtils.scala | 8 +- 49 files changed, 1398 insertions(+), 421 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 489c004..bea7e20 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -89,16 +89,15 @@ <allow pkg="org.apache.kafka.common.annotation" /> <allow pkg="org.apache.kafka.common.network" /> <allow pkg="org.apache.kafka.common.config" /> + <allow pkg="org.apache.kafka.common.protocol" /> + <allow pkg="org.apache.kafka.common.errors" /> <subpackage name="authenticator"> - <allow pkg="org.apache.kafka.common.protocol" /> <allow pkg="org.apache.kafka.common.protocol.types" /> <allow pkg="org.apache.kafka.common.requests" /> - <allow pkg="org.apache.kafka.common.errors" /> <allow pkg="org.apache.kafka.clients" /> </subpackage> <subpackage name="scram"> <allow pkg="javax.crypto" /> - <allow pkg="org.apache.kafka.common.errors" /> </subpackage> </subpackage> http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java index 7dc5012..f61b7dd 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java @@ -16,7 +16,8 @@ */ package org.apache.kafka.common.config; -import java.util.Collections; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; + import java.util.List; public class SaslConfigs { @@ -29,11 +30,21 @@ public class SaslConfigs { public static final String GSSAPI_MECHANISM = "GSSAPI"; public static final String DEFAULT_SASL_MECHANISM = GSSAPI_MECHANISM; - public static final String SASL_ENABLED_MECHANISMS = "sasl.enabled.mechanisms"; - public static final String SASL_ENABLED_MECHANISMS_DOC = "The list of SASL mechanisms enabled in the Kafka server. " - + "The list may contain any mechanism for which a security provider is available. " - + "Only GSSAPI is enabled by default."; - public static final List<String> DEFAULT_SASL_ENABLED_MECHANISMS = Collections.singletonList(GSSAPI_MECHANISM); + /** + * @deprecated As of 1.0.0. This field will be removed in a future major release. + */ + @Deprecated + public static final String SASL_ENABLED_MECHANISMS = BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG; + /** + * @deprecated As of 1.0.0. This field will be removed in a future major release. + */ + @Deprecated + public static final String SASL_ENABLED_MECHANISMS_DOC = BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_DOC; + /** + * @deprecated As of 1.0.0. This field will be removed in a future major release. + */ + @Deprecated + public static final List<String> DEFAULT_SASL_ENABLED_MECHANISMS = BrokerSecurityConfigs.DEFAULT_SASL_ENABLED_MECHANISMS; public static final String SASL_JAAS_CONFIG = "sasl.jaas.config"; public static final String SASL_JAAS_CONFIG_DOC = "JAAS login context parameters for SASL connections in the format used by JAAS configuration files. " @@ -61,12 +72,21 @@ public class SaslConfigs { public static final String SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC = "Login thread sleep time between refresh attempts."; public static final long DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN = 1 * 60 * 1000L; - public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = "sasl.kerberos.principal.to.local.rules"; - public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A list of rules for mapping from principal names to short names (typically operating system usernames). " + - "The rules are evaluated in order and the first rule that matches a principal name is used to map it to a short name. Any later rules in the list are ignored. " + - "By default, principal names of the form {username}/{hostname}@{REALM} are mapped to {username}. " + - "For more details on the format please see <a href=\"#security_authz\"> security authorization and acls</a>."; - public static final List<String> DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList("DEFAULT"); + /** + * @deprecated As of 1.0.0. This field will be removed in a future major release. + */ + @Deprecated + public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG; + /** + * @deprecated As of 1.0.0. This field will be removed in a future major release. + */ + @Deprecated + public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC; + /** + * @deprecated As of 1.0.0. This field will be removed in a future major release. + */ + @Deprecated + public static final List<String> DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES; public static void addClientSaslSupport(ConfigDef config) { config.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC) http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java index 8cae00d..5e8d304 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.common.config; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.security.auth.DefaultPrincipalBuilder; + import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.TrustManagerFactory; @@ -24,10 +27,22 @@ public class SslConfigs { * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE. */ - public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class"; - public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the PrincipalBuilder interface, " + - "which is currently used to build the Principal for connections with the SSL SecurityProtocol."; - public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = "org.apache.kafka.common.security.auth.DefaultPrincipalBuilder"; + /** + * @deprecated As of 1.0.0. This field will be removed in a future major release. + */ + @Deprecated + public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG; + /** + * @deprecated As of 1.0.0. This field will be removed in a future major release. + */ + @Deprecated + public static final String PRINCIPAL_BUILDER_CLASS_DOC = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC; + /** + * @deprecated As of 1.0.0. This field will be removed in a future major release. In recent versions, + * the config is optional and there is no default. + */ + @Deprecated + public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = DefaultPrincipalBuilder.class.getName(); public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol"; public static final String SSL_PROTOCOL_DOC = "The SSL protocol used to generate the SSLContext. " @@ -91,15 +106,16 @@ public class SslConfigs { public static final String SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG = "ssl.secure.random.implementation"; public static final String SSL_SECURE_RANDOM_IMPLEMENTATION_DOC = "The SecureRandom PRNG implementation to use for SSL cryptography operations. "; - public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth"; - public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker to request client authentication." - + " The following settings are common: " - + " <ul>" - + " <li><code>ssl.client.auth=required</code> If set to required" - + " client authentication is required." - + " <li><code>ssl.client.auth=requested</code> This means client authentication is optional." - + " unlike requested , if this option is set client can choose not to provide authentication information about itself" - + " <li><code>ssl.client.auth=none</code> This means client authentication is not needed."; + /** + * @deprecated As of 1.0.0. This field will be removed in a future major release. + */ + @Deprecated + public static final String SSL_CLIENT_AUTH_CONFIG = BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG; + /** + * @deprecated As of 1.0.0. This field will be removed in a future major release. + */ + @Deprecated + public static final String SSL_CLIENT_AUTH_DOC = BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC; public static void addClientSslSupport(ConfigDef config) { config.define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC) http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java new file mode 100644 index 0000000..18616ec --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.internals; + +import org.apache.kafka.common.config.SaslConfigs; + +import java.util.Collections; +import java.util.List; + +/** + * Common home for broker-side security configs which need to be accessible from the libraries shared + * between the broker and the client. + * + * Note this is an internal API and subject to change without notice. + */ +public class BrokerSecurityConfigs { + + public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class"; + public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG = "sasl.kerberos.principal.to.local.rules"; + public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth"; + public static final String SASL_ENABLED_MECHANISMS_CONFIG = "sasl.enabled.mechanisms"; + + public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the " + + "KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during " + + "authorization. This config also supports the deprecated PrincipalBuilder interface which was previously " + + "used for client authentication over SSL. If no principal builder is defined, the default behavior depends " + + "on the security protocol in use. For SSL authentication, the principal name will be the distinguished " + + "name from the client certificate if one is provided; otherwise, if client authentication is not required, " + + "the principal name will be ANONYMOUS. For SASL authentication, the principal will be derived using the " + + "rules defined by <code>" + SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG + "</code> if GSSAPI is in use, " + + "and the SASL authentication ID for other mechanisms. For PLAINTEXT, the principal will be ANONYMOUS."; + + public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A list of rules for mapping from principal " + + "names to short names (typically operating system usernames). The rules are evaluated in order and the " + + "first rule that matches a principal name is used to map it to a short name. Any later rules in the list are " + + "ignored. By default, principal names of the form {username}/{hostname}@{REALM} are mapped to {username}. " + + "For more details on the format please see <a href=\"#security_authz\"> security authorization and acls</a>. " + + "Note that this configuration is ignored if an extension of KafkaPrincipalBuilder is provided by the " + + "<code>" + PRINCIPAL_BUILDER_CLASS_CONFIG + "</code> configuration."; + public static final List<String> DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList("DEFAULT"); + + public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker to request client authentication." + + " The following settings are common: " + + " <ul>" + + " <li><code>ssl.client.auth=required</code> If set to required" + + " client authentication is required." + + " <li><code>ssl.client.auth=requested</code> This means client authentication is optional." + + " unlike requested , if this option is set client can choose not to provide authentication information about itself" + + " <li><code>ssl.client.auth=none</code> This means client authentication is not needed."; + + public static final String SASL_ENABLED_MECHANISMS_DOC = "The list of SASL mechanisms enabled in the Kafka server. " + + "The list may contain any mechanism for which a security provider is available. " + + "Only GSSAPI is enabled by default."; + public static final List<String> DEFAULT_SASL_ENABLED_MECHANISMS = Collections.singletonList(SaslConfigs.GSSAPI_MECHANISM); + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java index b242967..1fe3beb 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java @@ -16,28 +16,15 @@ */ package org.apache.kafka.common.network; +import org.apache.kafka.common.security.auth.KafkaPrincipal; + import java.io.Closeable; import java.io.IOException; -import java.util.Map; -import java.security.Principal; - -import org.apache.kafka.common.security.auth.PrincipalBuilder; -import org.apache.kafka.common.KafkaException; /** * Authentication for Channel */ public interface Authenticator extends Closeable { - - /** - * Configures Authenticator using the provided parameters. - * - * @param transportLayer The transport layer used to read or write tokens - * @param principalBuilder The builder used to construct `Principal` - * @param configs Additional configuration parameters as key/value pairs - */ - void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs); - /** * Implements any authentication mechanism. Use transportLayer to read or write tokens. * If no further authentication needs to be done returns. @@ -47,7 +34,7 @@ public interface Authenticator extends Closeable { /** * Returns Principal using PrincipalBuilder */ - Principal principal() throws KafkaException; + KafkaPrincipal principal(); /** * returns true if authentication is complete otherwise returns false; http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java index 6c8890a..54689f3 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java @@ -33,7 +33,6 @@ public interface ChannelBuilder extends AutoCloseable { */ void configure(Map<String, ?> configs) throws KafkaException; - /** * returns a Channel with TransportLayer and Authenticator configured. * @param id channel id @@ -44,10 +43,10 @@ public interface ChannelBuilder extends AutoCloseable { */ KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException; - /** * Closes ChannelBuilder */ + @Override void close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index 225f5a8..6dd3ddd 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -16,13 +16,18 @@ */ package org.apache.kafka.common.network; +import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.security.JaasContext; +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; import org.apache.kafka.common.security.auth.DefaultPrincipalBuilder; +import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; import org.apache.kafka.common.security.auth.PrincipalBuilder; import org.apache.kafka.common.security.authenticator.CredentialCache; +import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.utils.Utils; import java.util.Map; @@ -113,12 +118,13 @@ public class ChannelBuilders { return channelBuilder; } - /** - * Returns a configured `PrincipalBuilder`. - */ - static PrincipalBuilder createPrincipalBuilder(Map<String, ?> configs) { - // this is a server-only config so it will always be null on the client - Class<?> principalBuilderClass = (Class<?>) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG); + private static void requireNonNullMode(Mode mode, SecurityProtocol securityProtocol) { + if (mode == null) + throw new IllegalArgumentException("`mode` must be non-null if `securityProtocol` is `" + securityProtocol + "`"); + } + + @SuppressWarnings("deprecation") + private static PrincipalBuilder createPrincipalBuilder(Class<?> principalBuilderClass, Map<String, ?> configs) { PrincipalBuilder principalBuilder; if (principalBuilderClass == null) principalBuilder = new DefaultPrincipalBuilder(); @@ -128,9 +134,31 @@ public class ChannelBuilders { return principalBuilder; } - private static void requireNonNullMode(Mode mode, SecurityProtocol securityProtocol) { - if (mode == null) - throw new IllegalArgumentException("`mode` must be non-null if `securityProtocol` is `" + securityProtocol + "`"); + @SuppressWarnings("deprecation") + public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> configs, + TransportLayer transportLayer, + Authenticator authenticator, + KerberosShortNamer kerberosShortNamer) { + Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG); + final KafkaPrincipalBuilder builder; + + if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) { + builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer); + } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { + builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass); + } else if (PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { + PrincipalBuilder oldPrincipalBuilder = createPrincipalBuilder(principalBuilderClass, configs); + builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer, + oldPrincipalBuilder, kerberosShortNamer); + } else { + throw new InvalidConfigurationException("Type " + principalBuilderClass.getName() + " is not " + + "an instance of " + PrincipalBuilder.class.getName() + " or " + KafkaPrincipalBuilder.class.getName()); + } + + if (builder instanceof Configurable) + ((Configurable) builder).configure(configs); + + return builder; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java deleted file mode 100644 index 77d94a6..0000000 --- a/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.network; - -import java.security.Principal; -import java.io.IOException; -import java.util.Map; - -import org.apache.kafka.common.security.auth.PrincipalBuilder; -import org.apache.kafka.common.KafkaException; - -public class DefaultAuthenticator implements Authenticator { - - private TransportLayer transportLayer; - private PrincipalBuilder principalBuilder; - private Principal principal; - - public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) { - this.transportLayer = transportLayer; - this.principalBuilder = principalBuilder; - } - - /** - * No-Op for default authenticator - */ - public void authenticate() throws IOException {} - - /** - * Constructs Principal using configured principalBuilder. - * @return Principal - * @throws KafkaException - */ - public Principal principal() throws KafkaException { - if (principal == null) - principal = principalBuilder.buildPrincipal(transportLayer, this); - return principal; - } - - public void close() throws IOException {} - - /** - * DefaultAuthenticator doesn't implement any additional authentication mechanism. - * @return true - */ - public boolean complete() { - return true; - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 1e76c43..c759f00 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -17,17 +17,15 @@ package org.apache.kafka.common.network; -import java.io.IOException; +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Utils; +import java.io.IOException; import java.net.InetAddress; import java.net.Socket; import java.nio.channels.SelectionKey; - -import java.security.Principal; - import java.util.Objects; -import org.apache.kafka.common.memory.MemoryPool; -import org.apache.kafka.common.utils.Utils; public class KafkaChannel { private final String id; @@ -66,7 +64,7 @@ public class KafkaChannel { /** * Returns the principal returned by `authenticator.principal()`. */ - public Principal principal() throws IOException { + public KafkaPrincipal principal() { return authenticator.principal(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java index ad63671..c0d1059 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java @@ -16,45 +16,73 @@ */ package org.apache.kafka.common.network; -import java.nio.channels.SelectionKey; -import java.util.Map; - -import org.apache.kafka.common.memory.MemoryPool; -import org.apache.kafka.common.security.auth.PrincipalBuilder; import org.apache.kafka.common.KafkaException; - +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; +import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.IOException; +import java.net.InetAddress; +import java.nio.channels.SelectionKey; +import java.util.Map; + public class PlaintextChannelBuilder implements ChannelBuilder { private static final Logger log = LoggerFactory.getLogger(PlaintextChannelBuilder.class); - private PrincipalBuilder principalBuilder; private Map<String, ?> configs; public void configure(Map<String, ?> configs) throws KafkaException { - try { - this.configs = configs; - principalBuilder = ChannelBuilders.createPrincipalBuilder(configs); - } catch (Exception e) { - throw new KafkaException(e); - } + this.configs = configs; } @Override public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException { try { PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key); - Authenticator authenticator = new DefaultAuthenticator(); - authenticator.configure(transportLayer, this.principalBuilder, this.configs); - return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE); + PlaintextAuthenticator authenticator = new PlaintextAuthenticator(configs, transportLayer); + return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, + memoryPool != null ? memoryPool : MemoryPool.NONE); } catch (Exception e) { log.warn("Failed to create channel due to ", e); throw new KafkaException(e); } } - public void close() { - this.principalBuilder.close(); + @Override + public void close() {} + + private static class PlaintextAuthenticator implements Authenticator { + private final PlaintextTransportLayer transportLayer; + private final KafkaPrincipalBuilder principalBuilder; + + private PlaintextAuthenticator(Map<String, ?> configs, PlaintextTransportLayer transportLayer) { + this.transportLayer = transportLayer; + this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null); + } + + @Override + public void authenticate() throws IOException {} + + @Override + public KafkaPrincipal principal() { + InetAddress clientAddress = transportLayer.socketChannel().socket().getInetAddress(); + return principalBuilder.build(new PlaintextAuthenticationContext(clientAddress)); + } + + @Override + public boolean complete() { + return true; + } + + @Override + public void close() { + if (principalBuilder instanceof Closeable) + Utils.closeQuietly((Closeable) principalBuilder, "principal builder"); + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index 0f98463..9d7eac0 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -16,29 +16,30 @@ */ package org.apache.kafka.common.network; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.List; -import java.util.Map; - +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.security.JaasContext; -import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.authenticator.CredentialCache; import org.apache.kafka.common.security.authenticator.LoginManager; import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator; import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator; +import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.utils.Java; -import org.apache.kafka.common.protocol.SecurityProtocol; -import org.apache.kafka.common.KafkaException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.List; +import java.util.Map; + public class SaslChannelBuilder implements ChannelBuilder { private static final Logger log = LoggerFactory.getLogger(SaslChannelBuilder.class); @@ -71,12 +72,13 @@ public class SaslChannelBuilder implements ChannelBuilder { this.credentialCache = credentialCache; } + @Override public void configure(Map<String, ?> configs) throws KafkaException { try { this.configs = configs; boolean hasKerberos; if (mode == Mode.SERVER) { - List<String> enabledMechanisms = (List<String>) this.configs.get(SaslConfigs.SASL_ENABLED_MECHANISMS); + List<String> enabledMechanisms = (List<String>) this.configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG); hasKerberos = enabledMechanisms == null || enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM); } else { hasKerberos = clientSaslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM); @@ -90,7 +92,7 @@ public class SaslChannelBuilder implements ChannelBuilder { defaultRealm = ""; } @SuppressWarnings("unchecked") - List<String> principalToLocalRules = (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES); + List<String> principalToLocalRules = (List<String>) configs.get(BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG); if (principalToLocalRules != null) kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules); } @@ -114,14 +116,12 @@ public class SaslChannelBuilder implements ChannelBuilder { TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel); Authenticator authenticator; if (mode == Mode.SERVER) - authenticator = new SaslServerAuthenticator(id, jaasContext, loginManager.subject(), - kerberosShortNamer, socketChannel.socket().getLocalAddress(), credentialCache, - listenerName, securityProtocol); + authenticator = new SaslServerAuthenticator(configs, id, jaasContext, loginManager.subject(), + kerberosShortNamer, credentialCache, listenerName, securityProtocol, transportLayer); else - authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(), - socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism, handshakeRequestEnable); - // Both authenticators don't use `PrincipalBuilder`, so we pass `null` for now. Reconsider if this changes. - authenticator.configure(transportLayer, null, this.configs); + authenticator = new SaslClientAuthenticator(configs, id, loginManager.subject(), loginManager.serviceName(), + socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism, + handshakeRequestEnable, transportLayer); return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE); } catch (Exception e) { log.info("Failed to create channel due to ", e); @@ -129,6 +129,7 @@ public class SaslChannelBuilder implements ChannelBuilder { } } + @Override public void close() { if (loginManager != null) { loginManager.release(); @@ -136,7 +137,7 @@ public class SaslChannelBuilder implements ChannelBuilder { } } - protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException { + private TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException { if (this.securityProtocol == SecurityProtocol.SASL_SSL) { return SslTransportLayer.create(id, key, sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort())); http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java index 01b72fe..b6ef625 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java @@ -16,23 +16,27 @@ */ package org.apache.kafka.common.network; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; +import org.apache.kafka.common.security.auth.SslAuthenticationContext; +import org.apache.kafka.common.security.ssl.SslFactory; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Map; -import org.apache.kafka.common.memory.MemoryPool; -import org.apache.kafka.common.security.auth.PrincipalBuilder; -import org.apache.kafka.common.security.ssl.SslFactory; -import org.apache.kafka.common.KafkaException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class SslChannelBuilder implements ChannelBuilder { private static final Logger log = LoggerFactory.getLogger(SslChannelBuilder.class); private SslFactory sslFactory; - private PrincipalBuilder principalBuilder; private Mode mode; private Map<String, ?> configs; @@ -45,7 +49,6 @@ public class SslChannelBuilder implements ChannelBuilder { this.configs = configs; this.sslFactory = new SslFactory(mode); this.sslFactory.configure(this.configs); - this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs); } catch (Exception e) { throw new KafkaException(e); } @@ -55,23 +58,21 @@ public class SslChannelBuilder implements ChannelBuilder { public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException { try { SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key)); - Authenticator authenticator = new DefaultAuthenticator(); - authenticator.configure(transportLayer, this.principalBuilder, this.configs); - return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE); + Authenticator authenticator = new SslAuthenticator(configs, transportLayer); + return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, + memoryPool != null ? memoryPool : MemoryPool.NONE); } catch (Exception e) { log.info("Failed to create channel due to ", e); throw new KafkaException(e); } } - public void close() { - this.principalBuilder.close(); - } + @Override + public void close() {} protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, String host) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); - return SslTransportLayer.create(id, key, - sslFactory.createSslEngine(host, socketChannel.socket().getPort())); + return SslTransportLayer.create(id, key, sslFactory.createSslEngine(host, socketChannel.socket().getPort())); } /** @@ -112,4 +113,50 @@ public class SslChannelBuilder implements ChannelBuilder { SocketChannel socketChannel = (SocketChannel) key.channel(); return new InetSocketAddress(socketChannel.socket().getInetAddress(), 0).getHostString(); } + + /** + * Note that client SSL authentication is handled in {@link SslTransportLayer}. This class is only used + * to transform the derived principal using a {@link KafkaPrincipalBuilder} configured by the user. + */ + private static class SslAuthenticator implements Authenticator { + private final SslTransportLayer transportLayer; + private final KafkaPrincipalBuilder principalBuilder; + + private SslAuthenticator(Map<String, ?> configs, SslTransportLayer transportLayer) { + this.transportLayer = transportLayer; + this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null); + } + /** + * No-Op for plaintext authenticator + */ + @Override + public void authenticate() throws IOException {} + + /** + * Constructs Principal using configured principalBuilder. + * @return the built principal + */ + @Override + public KafkaPrincipal principal() { + InetAddress clientAddress = transportLayer.socketChannel().socket().getInetAddress(); + SslAuthenticationContext context = new SslAuthenticationContext(transportLayer.sslSession(), clientAddress); + return principalBuilder.build(context); + } + + @Override + public void close() throws IOException { + if (principalBuilder instanceof Closeable) + Utils.closeQuietly((Closeable) principalBuilder, "principal builder"); + } + + /** + * SslAuthenticator doesn't implement any additional authentication mechanism. + * @return true + */ + @Override + public boolean complete() { + return true; + } + + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java new file mode 100644 index 0000000..8c82954 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.auth; + +import java.net.InetAddress; + +/** + * An object representing contextual information from the authentication session. See + * {@link SaslAuthenticationContext} and {@link SslAuthenticationContext}. + */ +public interface AuthenticationContext { + /** + * Underlying security protocol of the authentication session. + * @return The name of the security protocol (i.e. PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, SSL) + */ + String securityProtocolName(); + + /** + * Address of the authenticated client + */ + InetAddress clientAddress(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java index 04e4c90..424cfaa 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java @@ -23,7 +23,11 @@ import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.KafkaException; -/** DefaultPrincipalBuilder which return transportLayer's peer Principal **/ +/** + * DefaultPrincipalBuilder which return transportLayer's peer Principal + * @deprecated As of Kafka 1.0.0. This will be removed in a future major release. + **/ +@Deprecated public class DefaultPrincipalBuilder implements PrincipalBuilder { public void configure(Map<String, ?> configs) {} http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java index 4bacdcb..ed3c956 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java @@ -16,53 +16,69 @@ */ package org.apache.kafka.common.security.auth; +import org.apache.kafka.common.utils.SecurityUtils; + import java.security.Principal; +import static java.util.Objects.requireNonNull; + +/** + * Principals in Kafka are defined by a type and a name. The principal type will always be "User" + * for the simple authorizer that is enabled by default, but custom authorizers can leverage different + * principal types (such as to enable group or role-based ACLs). The {@link KafkaPrincipalBuilder} interface + * is used when you need to derive a different principal type from the authentication context, or when + * you need to represent relations between different principals. For example, you could extend + * {@link KafkaPrincipal} in order to link a user principal to one or more role principals. + * + * For custom extensions of {@link KafkaPrincipal}, there two key points to keep in mind: + * + * 1. To be compatible with the ACL APIs provided by Kafka (including the command line tool), each ACL + * can only represent a permission granted to a single principal (consisting of a principal type and name). + * It is possible to use richer ACL semantics, but you must implement your own mechanisms for adding + * and removing ACLs. + * 2. In general, {@link KafkaPrincipal} extensions are only useful when the corresponding Authorizer + * is also aware of the extension. If you have a {@link KafkaPrincipalBuilder} which derives user groups + * from the authentication context (e.g. from an SSL client certificate), then you need a custom + * authorizer which is capable of using the additional group information. + */ public class KafkaPrincipal implements Principal { - public static final String SEPARATOR = ":"; public static final String USER_TYPE = "User"; public final static KafkaPrincipal ANONYMOUS = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "ANONYMOUS"); - private String principalType; - private String name; + private final String principalType; + private final String name; public KafkaPrincipal(String principalType, String name) { - if (principalType == null || name == null) { - throw new IllegalArgumentException("principalType and name can not be null"); - } - this.principalType = principalType; - this.name = name; + this.principalType = requireNonNull(principalType, "Principal type cannot be null"); + this.name = requireNonNull(name, "Principal name cannot be null"); } + /** + * Parse a {@link KafkaPrincipal} instance from a string. This method cannot be used for {@link KafkaPrincipal} + * extensions. + * + * @param str The input string formatted as "{principalType}:{principalName}" + * @return The parsed {@link KafkaPrincipal} instance + * @deprecated As of 1.0.0. This method will be removed in a future major release. + */ + @Deprecated public static KafkaPrincipal fromString(String str) { - if (str == null || str.isEmpty()) { - throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str); - } - - String[] split = str.split(SEPARATOR, 2); - - if (split == null || split.length != 2) { - throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str); - } - - return new KafkaPrincipal(split[0], split[1]); + return SecurityUtils.parseKafkaPrincipal(str); } @Override public String toString() { - return principalType + SEPARATOR + name; + return principalType + ":" + name; } @Override public boolean equals(Object o) { if (this == o) return true; - if (!(o instanceof KafkaPrincipal)) return false; + if (o == null) return false; + if (getClass() != o.getClass()) return false; KafkaPrincipal that = (KafkaPrincipal) o; - - if (!principalType.equals(that.principalType)) return false; - return name.equals(that.name); - + return principalType.equals(that.principalType) && name.equals(that.name); } @Override @@ -80,7 +96,5 @@ public class KafkaPrincipal implements Principal { public String getPrincipalType() { return principalType; } -} - - +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java new file mode 100644 index 0000000..941d3b1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.auth; + +/** + * Pluggable principal builder interface which supports both SSL authentication through + * {@link SslAuthenticationContext} and SASL through {@link SaslAuthenticationContext}. + * + * Note that the {@link org.apache.kafka.common.Configurable} and {@link java.io.Closeable} + * interfaces are respected if implemented. Additionally, implementations must provide a + * default no-arg constructor. + */ +public interface KafkaPrincipalBuilder { + /** + * Build a kafka principal from the authentication context. + * @param context The authentication context (either {@link SslAuthenticationContext} or + * {@link SaslAuthenticationContext}) + * @return The built principal which may provide additional enrichment through a subclass of + * {@link KafkaPrincipalBuilder}. + */ + KafkaPrincipal build(AuthenticationContext context); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java b/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java new file mode 100644 index 0000000..96b8376 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.auth; + +import org.apache.kafka.common.protocol.SecurityProtocol; + +import java.net.InetAddress; + +public class PlaintextAuthenticationContext implements AuthenticationContext { + private final InetAddress clientAddress; + + public PlaintextAuthenticationContext(InetAddress clientAddress) { + this.clientAddress = clientAddress; + } + + @Override + public String securityProtocolName() { + return SecurityProtocol.PLAINTEXT.name; + } + + @Override + public InetAddress clientAddress() { + return clientAddress; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java index fc2a930..d58170a 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java @@ -25,10 +25,13 @@ import org.apache.kafka.common.Configurable; import java.util.Map; import java.security.Principal; -/* +/** * PrincipalBuilder for Authenticator + * @deprecated As of Kafka 1.0.0, use {@link KafkaPrincipalBuilder} instead. This will be removed in + * a future major release. */ @InterfaceStability.Unstable +@Deprecated public interface PrincipalBuilder extends Configurable { /** http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java b/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java new file mode 100644 index 0000000..f98164b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.auth; + +import org.apache.kafka.common.protocol.SecurityProtocol; + +import javax.security.sasl.SaslServer; +import java.net.InetAddress; + +public class SaslAuthenticationContext implements AuthenticationContext { + private final SaslServer server; + private final SecurityProtocol securityProtocol; + private final InetAddress clientAddress; + + public SaslAuthenticationContext(SaslServer server, SecurityProtocol securityProtocol, InetAddress clientAddress) { + this.server = server; + this.securityProtocol = securityProtocol; + this.clientAddress = clientAddress; + } + + public SaslServer server() { + return server; + } + + @Override + public String securityProtocolName() { + return securityProtocol.name; + } + + @Override + public InetAddress clientAddress() { + return clientAddress; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java b/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java new file mode 100644 index 0000000..325c282 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.auth; + +import org.apache.kafka.common.protocol.SecurityProtocol; + +import javax.net.ssl.SSLSession; +import java.net.InetAddress; + +public class SslAuthenticationContext implements AuthenticationContext { + private final SSLSession session; + private final InetAddress clientAddress; + + public SslAuthenticationContext(SSLSession session, InetAddress clientAddress) { + this.session = session; + this.clientAddress = clientAddress; + } + + public SSLSession session() { + return session; + } + + @Override + public String securityProtocolName() { + return SecurityProtocol.SSL.name; + } + + @Override + public InetAddress clientAddress() { + return clientAddress; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java new file mode 100644 index 0000000..1d0295a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.authenticator; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.network.Authenticator; +import org.apache.kafka.common.network.TransportLayer; +import org.apache.kafka.common.security.auth.AuthenticationContext; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; +import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext; +import org.apache.kafka.common.security.auth.PrincipalBuilder; +import org.apache.kafka.common.security.auth.SaslAuthenticationContext; +import org.apache.kafka.common.security.auth.SslAuthenticationContext; +import org.apache.kafka.common.security.kerberos.KerberosName; +import org.apache.kafka.common.security.kerberos.KerberosShortNamer; + +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; +import javax.security.sasl.SaslServer; +import java.io.Closeable; +import java.io.IOException; +import java.security.Principal; + +import static java.util.Objects.requireNonNull; + +/** + * Default implementation of {@link KafkaPrincipalBuilder} which provides basic support for + * SSL authentication and SASL authentication. In the latter case, when GSSAPI is used, this + * class applies {@link org.apache.kafka.common.security.kerberos.KerberosShortNamer} to transform + * the name. + * + * NOTE: This is an internal class and can change without notice. Unlike normal implementations + * of {@link KafkaPrincipalBuilder}, there is no default no-arg constructor since this class + * must adapt implementations of the older {@link PrincipalBuilder} interface. + */ +public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Closeable { + @SuppressWarnings("deprecation") + private final PrincipalBuilder oldPrincipalBuilder; + private final Authenticator authenticator; + private final TransportLayer transportLayer; + private final KerberosShortNamer kerberosShortNamer; + + /** + * Construct a new instance which wraps an instance of the older {@link PrincipalBuilder}. + * + * @param authenticator The authenticator in use + * @param transportLayer The underlying transport layer + * @param oldPrincipalBuilder Instance of {@link PrincipalBuilder} + * @param kerberosShortNamer Kerberos name rewrite rules or null if none have been configured + */ + @SuppressWarnings("deprecation") + public static DefaultKafkaPrincipalBuilder fromOldPrincipalBuilder(Authenticator authenticator, + TransportLayer transportLayer, + PrincipalBuilder oldPrincipalBuilder, + KerberosShortNamer kerberosShortNamer) { + return new DefaultKafkaPrincipalBuilder( + requireNonNull(authenticator), + requireNonNull(transportLayer), + requireNonNull(oldPrincipalBuilder), + kerberosShortNamer); + } + + @SuppressWarnings("deprecation") + private DefaultKafkaPrincipalBuilder(Authenticator authenticator, + TransportLayer transportLayer, + PrincipalBuilder oldPrincipalBuilder, + KerberosShortNamer kerberosShortNamer) { + this.authenticator = authenticator; + this.transportLayer = transportLayer; + this.oldPrincipalBuilder = oldPrincipalBuilder; + this.kerberosShortNamer = kerberosShortNamer; + } + + /** + * Construct a new instance. + * + * @param kerberosShortNamer Kerberos name rewrite rules or null if none have been configured + */ + public DefaultKafkaPrincipalBuilder(KerberosShortNamer kerberosShortNamer) { + this(null, null, null, kerberosShortNamer); + } + + @Override + public KafkaPrincipal build(AuthenticationContext context) { + if (context instanceof PlaintextAuthenticationContext) { + if (oldPrincipalBuilder != null) + return convertToKafkaPrincipal(oldPrincipalBuilder.buildPrincipal(transportLayer, authenticator)); + + return KafkaPrincipal.ANONYMOUS; + } else if (context instanceof SslAuthenticationContext) { + SSLSession sslSession = ((SslAuthenticationContext) context).session(); + + if (oldPrincipalBuilder != null) + return convertToKafkaPrincipal(oldPrincipalBuilder.buildPrincipal(transportLayer, authenticator)); + + try { + return convertToKafkaPrincipal(sslSession.getPeerPrincipal()); + } catch (SSLPeerUnverifiedException se) { + return KafkaPrincipal.ANONYMOUS; + } + } else if (context instanceof SaslAuthenticationContext) { + SaslServer saslServer = ((SaslAuthenticationContext) context).server(); + if (SaslConfigs.GSSAPI_MECHANISM.equals(saslServer.getMechanismName())) + return applyKerberosShortNamer(saslServer.getAuthorizationID()); + else + return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID()); + } else { + throw new IllegalArgumentException("Unhandled authentication context type: " + context.getClass().getName()); + } + } + + private KafkaPrincipal applyKerberosShortNamer(String authorizationId) { + KerberosName kerberosName = KerberosName.parse(authorizationId); + try { + String shortName = kerberosShortNamer.shortName(kerberosName); + return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, shortName); + } catch (IOException e) { + throw new KafkaException("Failed to set name for '" + kerberosName + + "' based on Kerberos authentication rules.", e); + } + } + + private KafkaPrincipal convertToKafkaPrincipal(Principal principal) { + return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal.getName()); + } + + @Override + public void close() { + if (oldPrincipalBuilder != null) + oldPrincipalBuilder.close(); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java index 6bf9b2a..7b68abe 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java @@ -38,7 +38,6 @@ import org.apache.kafka.common.requests.SaslHandshakeRequest; import org.apache.kafka.common.requests.SaslHandshakeResponse; import org.apache.kafka.common.security.auth.AuthCallbackHandler; import org.apache.kafka.common.security.auth.KafkaPrincipal; -import org.apache.kafka.common.security.auth.PrincipalBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,14 +69,11 @@ public class SaslClientAuthenticator implements Authenticator { private final String host; private final String node; private final String mechanism; - private final boolean handshakeRequestEnable; - - // assigned in `configure` - private SaslClient saslClient; - private Map<String, ?> configs; - private String clientPrincipalName; - private AuthCallbackHandler callbackHandler; - private TransportLayer transportLayer; + private final TransportLayer transportLayer; + private final SaslClient saslClient; + private final Map<String, ?> configs; + private final String clientPrincipalName; + private final AuthCallbackHandler callbackHandler; // buffers used in `authenticate` private NetworkReceive netInBuffer; @@ -92,21 +88,24 @@ public class SaslClientAuthenticator implements Authenticator { // Request header for which response from the server is pending private RequestHeader currentRequestHeader; - public SaslClientAuthenticator(String node, Subject subject, String servicePrincipal, String host, String mechanism, boolean handshakeRequestEnable) throws IOException { + public SaslClientAuthenticator(Map<String, ?> configs, + String node, + Subject subject, + String servicePrincipal, + String host, + String mechanism, + boolean handshakeRequestEnable, + TransportLayer transportLayer) throws IOException { this.node = node; this.subject = subject; this.host = host; this.servicePrincipal = servicePrincipal; this.mechanism = mechanism; - this.handshakeRequestEnable = handshakeRequestEnable; this.correlationId = -1; - } + this.transportLayer = transportLayer; + this.configs = configs; - public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) throws KafkaException { try { - this.transportLayer = transportLayer; - this.configs = configs; - setSaslState(handshakeRequestEnable ? SaslState.SEND_HANDSHAKE_REQUEST : SaslState.INITIAL); // determine client principal from subject for Kerberos to use as authorization id for the SaslClient. @@ -252,7 +251,7 @@ public class SaslClientAuthenticator implements Authenticator { return serverPacket; } - public Principal principal() { + public KafkaPrincipal principal() { return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, clientPrincipalName); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 276d067..28aa995 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -18,11 +18,13 @@ package org.apache.kafka.common.security.authenticator; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.IllegalSaslStateException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.network.Authenticator; +import org.apache.kafka.common.network.ChannelBuilders; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.network.NetworkReceive; @@ -43,12 +45,14 @@ import org.apache.kafka.common.requests.SaslHandshakeResponse; import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.auth.AuthCallbackHandler; import org.apache.kafka.common.security.auth.KafkaPrincipal; -import org.apache.kafka.common.security.auth.PrincipalBuilder; +import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; +import org.apache.kafka.common.security.auth.SaslAuthenticationContext; import org.apache.kafka.common.security.kerberos.KerberosName; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.scram.ScramCredential; import org.apache.kafka.common.security.scram.ScramMechanism; import org.apache.kafka.common.security.scram.ScramServerCallbackHandler; +import org.apache.kafka.common.utils.Utils; import org.ietf.jgss.GSSContext; import org.ietf.jgss.GSSCredential; import org.ietf.jgss.GSSException; @@ -62,11 +66,11 @@ import javax.security.auth.Subject; import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import java.io.Closeable; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; -import java.security.Principal; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.HashSet; @@ -80,7 +84,7 @@ public class SaslServerAuthenticator implements Authenticator { static final int MAX_RECEIVE_SIZE = 524288; private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class); - public enum SaslState { + private enum SaslState { GSSAPI_OR_HANDSHAKE_REQUEST, HANDSHAKE_REQUEST, AUTHENTICATE, COMPLETE, FAILED } @@ -89,9 +93,11 @@ public class SaslServerAuthenticator implements Authenticator { private final String connectionId; private final JaasContext jaasContext; private final Subject subject; - private final KerberosShortNamer kerberosNamer; - private final InetAddress clientAddress; private final CredentialCache credentialCache; + private final TransportLayer transportLayer; + private final Set<String> enabledMechanisms; + private final Map<String, ?> configs; + private final KafkaPrincipalBuilder principalBuilder; // Current SASL state private SaslState saslState = SaslState.GSSAPI_OR_HANDSHAKE_REQUEST; @@ -101,48 +107,44 @@ public class SaslServerAuthenticator implements Authenticator { private String saslMechanism; private AuthCallbackHandler callbackHandler; - // assigned in `configure` - private TransportLayer transportLayer; - private Set<String> enabledMechanisms; - private Map<String, ?> configs; - // buffers used in `authenticate` private NetworkReceive netInBuffer; private Send netOutBuffer; - public SaslServerAuthenticator(String connectionId, + public SaslServerAuthenticator(Map<String, ?> configs, + String connectionId, JaasContext jaasContext, Subject subject, KerberosShortNamer kerberosNameParser, - InetAddress clientAddress, CredentialCache credentialCache, ListenerName listenerName, - SecurityProtocol securityProtocol) throws IOException { + SecurityProtocol securityProtocol, + TransportLayer transportLayer) throws IOException { if (subject == null) throw new IllegalArgumentException("subject cannot be null"); this.connectionId = connectionId; this.jaasContext = jaasContext; this.subject = subject; - this.kerberosNamer = kerberosNameParser; - this.clientAddress = clientAddress; this.credentialCache = credentialCache; this.listenerName = listenerName; this.securityProtocol = securityProtocol; - } - - public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) { this.transportLayer = transportLayer; + this.configs = configs; - List<String> enabledMechanisms = (List<String>) this.configs.get(SaslConfigs.SASL_ENABLED_MECHANISMS); + List<String> enabledMechanisms = (List<String>) this.configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG); if (enabledMechanisms == null || enabledMechanisms.isEmpty()) throw new IllegalArgumentException("No SASL mechanisms are enabled"); this.enabledMechanisms = new HashSet<>(enabledMechanisms); + + // Note that the old principal builder does not support SASL, so we do not need to pass the + // authenticator or the transport layer + this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, null, null, kerberosNameParser); } private void createSaslServer(String mechanism) throws IOException { this.saslMechanism = mechanism; if (!ScramMechanism.isScram(mechanism)) - callbackHandler = new SaslServerCallbackHandler(jaasContext, kerberosNamer); + callbackHandler = new SaslServerCallbackHandler(jaasContext); else callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class)); callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism); @@ -152,7 +154,8 @@ public class SaslServerAuthenticator implements Authenticator { try { saslServer = Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() { public SaslServer run() throws SaslException { - return Sasl.createSaslServer(saslMechanism, "kafka", clientAddress.getHostName(), configs, callbackHandler); + return Sasl.createSaslServer(saslMechanism, "kafka", serverAddress().getHostName(), + configs, callbackHandler); } }); } catch (PrivilegedActionException e) { @@ -214,6 +217,7 @@ public class SaslServerAuthenticator implements Authenticator { * The messages are sent and received as size delimited bytes that consists of a 4 byte network-ordered size N * followed by N bytes representing the opaque payload. */ + @Override public void authenticate() throws IOException { if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps()) return; @@ -263,15 +267,21 @@ public class SaslServerAuthenticator implements Authenticator { } } - public Principal principal() { - return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID()); + @Override + public KafkaPrincipal principal() { + SaslAuthenticationContext context = new SaslAuthenticationContext(saslServer, securityProtocol, clientAddress()); + return principalBuilder.build(context); } + @Override public boolean complete() { return saslState == SaslState.COMPLETE; } + @Override public void close() throws IOException { + if (principalBuilder instanceof Closeable) + Utils.closeQuietly((Closeable) principalBuilder, "principal builder"); if (saslServer != null) saslServer.dispose(); if (callbackHandler != null) @@ -305,6 +315,14 @@ public class SaslServerAuthenticator implements Authenticator { return netOutBuffer.completed(); } + private InetAddress serverAddress() { + return transportLayer.socketChannel().socket().getLocalAddress(); + } + + private InetAddress clientAddress() { + return transportLayer.socketChannel().socket().getInetAddress(); + } + private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException { boolean isKafkaRequest = false; String clientMechanism = null; @@ -325,7 +343,8 @@ public class SaslServerAuthenticator implements Authenticator { LOG.debug("Handling Kafka request {}", apiKey); - RequestContext requestContext = new RequestContext(header, connectionId, clientAddress, + + RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(), KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol); RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer); if (apiKey == ApiKeys.API_VERSIONS)