KAFKA-5783; Add KafkaPrincipalBuilder with support for SASL (KIP-189)

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Rajini Sivaram 
<rajinisiva...@googlemail.com>, Manikumar Reddy <manikumar.re...@gmail.com>

Closes #3795 from hachikuji/KAFKA-5783


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3b5d88fe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3b5d88fe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3b5d88fe

Branch: refs/heads/trunk
Commit: 3b5d88febb186c4b92cc514fa2338de7bc338f67
Parents: f7b1add
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Sep 14 10:16:00 2017 +0100
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Thu Sep 14 10:16:00 2017 +0100

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   5 +-
 .../apache/kafka/common/config/SaslConfigs.java |  44 +++--
 .../apache/kafka/common/config/SslConfigs.java  |  42 +++--
 .../config/internals/BrokerSecurityConfigs.java |  70 ++++++++
 .../kafka/common/network/Authenticator.java     |  19 +--
 .../kafka/common/network/ChannelBuilder.java    |   3 +-
 .../kafka/common/network/ChannelBuilders.java   |  48 ++++--
 .../common/network/DefaultAuthenticator.java    |  63 -------
 .../kafka/common/network/KafkaChannel.java      |  12 +-
 .../common/network/PlaintextChannelBuilder.java |  64 +++++--
 .../common/network/SaslChannelBuilder.java      |  43 ++---
 .../kafka/common/network/SslChannelBuilder.java |  81 +++++++--
 .../security/auth/AuthenticationContext.java    |  36 ++++
 .../security/auth/DefaultPrincipalBuilder.java  |   6 +-
 .../common/security/auth/KafkaPrincipal.java    |  70 +++++---
 .../security/auth/KafkaPrincipalBuilder.java    |  36 ++++
 .../auth/PlaintextAuthenticationContext.java    |  40 +++++
 .../common/security/auth/PrincipalBuilder.java  |   5 +-
 .../auth/SaslAuthenticationContext.java         |  48 ++++++
 .../security/auth/SslAuthenticationContext.java |  46 +++++
 .../DefaultKafkaPrincipalBuilder.java           | 149 ++++++++++++++++
 .../authenticator/SaslClientAuthenticator.java  |  33 ++--
 .../authenticator/SaslServerAuthenticator.java  |  67 +++++---
 .../SaslServerCallbackHandler.java              |  21 +--
 .../kafka/common/security/ssl/SslFactory.java   |   3 +-
 .../kafka/common/utils/SecurityUtils.java       |  37 ++++
 .../org/apache/kafka/common/utils/Utils.java    |   4 +-
 .../apache/kafka/common/network/CertStores.java |   8 +-
 .../common/network/ChannelBuildersTest.java     | 107 ++++++++++++
 .../kafka/common/network/SslSelectorTest.java   |  37 ++--
 .../common/network/SslTransportLayerTest.java   |  62 +++----
 .../common/security/TestSecurityConfig.java     |  16 +-
 .../auth/DefaultKafkaPrincipalBuilderTest.java  | 170 +++++++++++++++++++
 .../security/auth/KafkaPrincipalTest.java       |   9 -
 .../authenticator/SaslAuthenticatorTest.java    |   9 +-
 .../SaslServerAuthenticatorTest.java            |  22 +--
 .../kafka/common/utils/SecurityUtilsTest.java   |  43 +++++
 .../main/scala/kafka/network/SocketServer.scala |   6 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  24 +--
 .../kafka/api/EndToEndAuthorizationTest.scala   |  18 +-
 .../kafka/api/IntegrationTestHarness.scala      |  14 +-
 .../PlaintextEndToEndAuthorizationTest.scala    |  71 ++++++++
 .../SaslPlainSslEndToEndAuthorizationTest.scala |  29 +++-
 .../scala/integration/kafka/api/SaslSetup.scala |   3 +-
 .../api/SslEndToEndAuthorizationTest.scala      |  35 +++-
 .../integration/KafkaServerTestHarness.scala    |   2 +-
 .../security/auth/SimpleAclAuthorizerTest.scala |  20 +--
 .../unit/kafka/server/RequestQuotaTest.scala    |  11 +-
 .../scala/unit/kafka/utils/JaasTestUtils.scala  |   8 +-
 49 files changed, 1398 insertions(+), 421 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 489c004..bea7e20 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -89,16 +89,15 @@
       <allow pkg="org.apache.kafka.common.annotation" />
       <allow pkg="org.apache.kafka.common.network" />
       <allow pkg="org.apache.kafka.common.config" />
+      <allow pkg="org.apache.kafka.common.protocol" />
+      <allow pkg="org.apache.kafka.common.errors" />
       <subpackage name="authenticator">
-        <allow pkg="org.apache.kafka.common.protocol" />
         <allow pkg="org.apache.kafka.common.protocol.types" />
         <allow pkg="org.apache.kafka.common.requests" />
-        <allow pkg="org.apache.kafka.common.errors" />
         <allow pkg="org.apache.kafka.clients" />
       </subpackage>
       <subpackage name="scram">
         <allow pkg="javax.crypto" />
-        <allow pkg="org.apache.kafka.common.errors" />
       </subpackage>
     </subpackage>
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java 
b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
index 7dc5012..f61b7dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
@@ -16,7 +16,8 @@
  */
 package org.apache.kafka.common.config;
 
-import java.util.Collections;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+
 import java.util.List;
 
 public class SaslConfigs {
@@ -29,11 +30,21 @@ public class SaslConfigs {
     public static final String GSSAPI_MECHANISM = "GSSAPI";
     public static final String DEFAULT_SASL_MECHANISM = GSSAPI_MECHANISM;
 
-    public static final String SASL_ENABLED_MECHANISMS = 
"sasl.enabled.mechanisms";
-    public static final String SASL_ENABLED_MECHANISMS_DOC = "The list of SASL 
mechanisms enabled in the Kafka server. "
-        + "The list may contain any mechanism for which a security provider is 
available. "
-        + "Only GSSAPI is enabled by default.";
-    public static final List<String> DEFAULT_SASL_ENABLED_MECHANISMS = 
Collections.singletonList(GSSAPI_MECHANISM);
+    /**
+     * @deprecated As of 1.0.0. This field will be removed in a future major 
release.
+     */
+    @Deprecated
+    public static final String SASL_ENABLED_MECHANISMS = 
BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG;
+    /**
+     * @deprecated As of 1.0.0. This field will be removed in a future major 
release.
+     */
+    @Deprecated
+    public static final String SASL_ENABLED_MECHANISMS_DOC = 
BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_DOC;
+    /**
+     * @deprecated As of 1.0.0. This field will be removed in a future major 
release.
+     */
+    @Deprecated
+    public static final List<String> DEFAULT_SASL_ENABLED_MECHANISMS = 
BrokerSecurityConfigs.DEFAULT_SASL_ENABLED_MECHANISMS;
 
     public static final String SASL_JAAS_CONFIG = "sasl.jaas.config";
     public static final String SASL_JAAS_CONFIG_DOC = "JAAS login context 
parameters for SASL connections in the format used by JAAS configuration files. 
"
@@ -61,12 +72,21 @@ public class SaslConfigs {
     public static final String SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC = 
"Login thread sleep time between refresh attempts.";
     public static final long DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN = 1 * 60 
* 1000L;
 
-    public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = 
"sasl.kerberos.principal.to.local.rules";
-    public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A 
list of rules for mapping from principal names to short names (typically 
operating system usernames). " +
-            "The rules are evaluated in order and the first rule that matches 
a principal name is used to map it to a short name. Any later rules in the list 
are ignored. " +
-            "By default, principal names of the form 
{username}/{hostname}@{REALM} are mapped to {username}. " +
-            "For more details on the format please see <a 
href=\"#security_authz\"> security authorization and acls</a>.";
-    public static final List<String> 
DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = 
Collections.singletonList("DEFAULT");
+    /**
+     * @deprecated As of 1.0.0. This field will be removed in a future major 
release.
+     */
+    @Deprecated
+    public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = 
BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG;
+    /**
+     * @deprecated As of 1.0.0. This field will be removed in a future major 
release.
+     */
+    @Deprecated
+    public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = 
BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC;
+    /**
+     * @deprecated As of 1.0.0. This field will be removed in a future major 
release.
+     */
+    @Deprecated
+    public static final List<String> 
DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = 
BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES;
 
     public static void addClientSaslSupport(ConfigDef config) {
         config.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, 
ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, 
SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java 
b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
index 8cae00d..5e8d304 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.common.config;
 
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.security.auth.DefaultPrincipalBuilder;
+
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.TrustManagerFactory;
 
@@ -24,10 +27,22 @@ public class SslConfigs {
      * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC 
API AND CHANGE WILL BREAK USER CODE.
      */
 
-    public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = 
"principal.builder.class";
-    public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully 
qualified name of a class that implements the PrincipalBuilder interface, " +
-            "which is currently used to build the Principal for connections 
with the SSL SecurityProtocol.";
-    public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = 
"org.apache.kafka.common.security.auth.DefaultPrincipalBuilder";
+    /**
+     * @deprecated As of 1.0.0. This field will be removed in a future major 
release.
+     */
+    @Deprecated
+    public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = 
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG;
+    /**
+     * @deprecated As of 1.0.0. This field will be removed in a future major 
release.
+     */
+    @Deprecated
+    public static final String PRINCIPAL_BUILDER_CLASS_DOC = 
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC;
+    /**
+     * @deprecated As of 1.0.0. This field will be removed in a future major 
release. In recent versions,
+     *   the config is optional and there is no default.
+     */
+    @Deprecated
+    public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = 
DefaultPrincipalBuilder.class.getName();
 
     public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
     public static final String SSL_PROTOCOL_DOC = "The SSL protocol used to 
generate the SSLContext. "
@@ -91,15 +106,16 @@ public class SslConfigs {
     public static final String SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG = 
"ssl.secure.random.implementation";
     public static final String SSL_SECURE_RANDOM_IMPLEMENTATION_DOC = "The 
SecureRandom PRNG implementation to use for SSL cryptography operations. ";
 
-    public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth";
-    public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker 
to request client authentication."
-                                           + " The following settings are 
common: "
-                                           + " <ul>"
-                                           + " 
<li><code>ssl.client.auth=required</code> If set to required"
-                                           + " client authentication is 
required."
-                                           + " 
<li><code>ssl.client.auth=requested</code> This means client authentication is 
optional."
-                                           + " unlike requested , if this 
option is set client can choose not to provide authentication information about 
itself"
-                                           + " 
<li><code>ssl.client.auth=none</code> This means client authentication is not 
needed.";
+    /**
+     * @deprecated As of 1.0.0. This field will be removed in a future major 
release.
+     */
+    @Deprecated
+    public static final String SSL_CLIENT_AUTH_CONFIG = 
BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG;
+    /**
+     * @deprecated As of 1.0.0. This field will be removed in a future major 
release.
+     */
+    @Deprecated
+    public static final String SSL_CLIENT_AUTH_DOC = 
BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC;
 
     public static void addClientSslSupport(ConfigDef config) {
         config.define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, 
SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, 
SslConfigs.SSL_PROTOCOL_DOC)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
 
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
new file mode 100644
index 0000000..18616ec
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.internals;
+
+import org.apache.kafka.common.config.SaslConfigs;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Common home for broker-side security configs which need to be accessible 
from the libraries shared
+ * between the broker and the client.
+ *
+ * Note this is an internal API and subject to change without notice.
+ */
+public class BrokerSecurityConfigs {
+
+    public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = 
"principal.builder.class";
+    public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG = 
"sasl.kerberos.principal.to.local.rules";
+    public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth";
+    public static final String SASL_ENABLED_MECHANISMS_CONFIG = 
"sasl.enabled.mechanisms";
+
+    public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully 
qualified name of a class that implements the " +
+            "KafkaPrincipalBuilder interface, which is used to build the 
KafkaPrincipal object used during " +
+            "authorization. This config also supports the deprecated 
PrincipalBuilder interface which was previously " +
+            "used for client authentication over SSL. If no principal builder 
is defined, the default behavior depends " +
+            "on the security protocol in use. For SSL authentication, the 
principal name will be the distinguished " +
+            "name from the client certificate if one is provided; otherwise, 
if client authentication is not required, " +
+            "the principal name will be ANONYMOUS. For SASL authentication, 
the principal will be derived using the " +
+            "rules defined by <code>" + 
SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG + "</code> if GSSAPI is in use, " 
+
+            "and the SASL authentication ID for other mechanisms. For 
PLAINTEXT, the principal will be ANONYMOUS.";
+
+    public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A 
list of rules for mapping from principal " +
+            "names to short names (typically operating system usernames). The 
rules are evaluated in order and the " +
+            "first rule that matches a principal name is used to map it to a 
short name. Any later rules in the list are " +
+            "ignored. By default, principal names of the form 
{username}/{hostname}@{REALM} are mapped to {username}. " +
+            "For more details on the format please see <a 
href=\"#security_authz\"> security authorization and acls</a>. " +
+            "Note that this configuration is ignored if an extension of 
KafkaPrincipalBuilder is provided by the " +
+            "<code>" + PRINCIPAL_BUILDER_CLASS_CONFIG + "</code> 
configuration.";
+    public static final List<String> 
DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = 
Collections.singletonList("DEFAULT");
+
+    public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker 
to request client authentication."
+            + " The following settings are common: "
+            + " <ul>"
+            + " <li><code>ssl.client.auth=required</code> If set to required"
+            + " client authentication is required."
+            + " <li><code>ssl.client.auth=requested</code> This means client 
authentication is optional."
+            + " unlike requested , if this option is set client can choose not 
to provide authentication information about itself"
+            + " <li><code>ssl.client.auth=none</code> This means client 
authentication is not needed.";
+
+    public static final String SASL_ENABLED_MECHANISMS_DOC = "The list of SASL 
mechanisms enabled in the Kafka server. "
+            + "The list may contain any mechanism for which a security 
provider is available. "
+            + "Only GSSAPI is enabled by default.";
+    public static final List<String> DEFAULT_SASL_ENABLED_MECHANISMS = 
Collections.singletonList(SaslConfigs.GSSAPI_MECHANISM);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
index b242967..1fe3beb 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
@@ -16,28 +16,15 @@
  */
 package org.apache.kafka.common.network;
 
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Map;
-import java.security.Principal;
-
-import org.apache.kafka.common.security.auth.PrincipalBuilder;
-import org.apache.kafka.common.KafkaException;
 
 /**
  * Authentication for Channel
  */
 public interface Authenticator extends Closeable {
-
-    /**
-     * Configures Authenticator using the provided parameters.
-     *
-     * @param transportLayer The transport layer used to read or write tokens
-     * @param principalBuilder The builder used to construct `Principal`
-     * @param configs Additional configuration parameters as key/value pairs
-     */
-    void configure(TransportLayer transportLayer, PrincipalBuilder 
principalBuilder, Map<String, ?> configs);
-
     /**
      * Implements any authentication mechanism. Use transportLayer to read or 
write tokens.
      * If no further authentication needs to be done returns.
@@ -47,7 +34,7 @@ public interface Authenticator extends Closeable {
     /**
      * Returns Principal using PrincipalBuilder
      */
-    Principal principal() throws KafkaException;
+    KafkaPrincipal principal();
 
     /**
      * returns true if authentication is complete otherwise returns false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java 
b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
index 6c8890a..54689f3 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
@@ -33,7 +33,6 @@ public interface ChannelBuilder extends AutoCloseable {
      */
     void configure(Map<String, ?> configs) throws KafkaException;
 
-
     /**
      * returns a Channel with TransportLayer and Authenticator configured.
      * @param  id  channel id
@@ -44,10 +43,10 @@ public interface ChannelBuilder extends AutoCloseable {
      */
     KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, 
MemoryPool memoryPool) throws KafkaException;
 
-
     /**
      * Closes ChannelBuilder
      */
+    @Override
     void close();
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java 
b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 225f5a8..6dd3ddd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -16,13 +16,18 @@
  */
 package org.apache.kafka.common.network;
 
+import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.security.JaasContext;
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
 import org.apache.kafka.common.security.auth.DefaultPrincipalBuilder;
+import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
+import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.Map;
@@ -113,12 +118,13 @@ public class ChannelBuilders {
         return channelBuilder;
     }
 
-    /**
-     * Returns a configured `PrincipalBuilder`.
-     */
-    static PrincipalBuilder createPrincipalBuilder(Map<String, ?> configs) {
-        // this is a server-only config so it will always be null on the client
-        Class<?> principalBuilderClass = (Class<?>) 
configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
+    private static void requireNonNullMode(Mode mode, SecurityProtocol 
securityProtocol) {
+        if (mode == null)
+            throw new IllegalArgumentException("`mode` must be non-null if 
`securityProtocol` is `" + securityProtocol + "`");
+    }
+
+    @SuppressWarnings("deprecation")
+    private static PrincipalBuilder createPrincipalBuilder(Class<?> 
principalBuilderClass, Map<String, ?> configs) {
         PrincipalBuilder principalBuilder;
         if (principalBuilderClass == null)
             principalBuilder = new DefaultPrincipalBuilder();
@@ -128,9 +134,31 @@ public class ChannelBuilders {
         return principalBuilder;
     }
 
-    private static void requireNonNullMode(Mode mode, SecurityProtocol 
securityProtocol) {
-        if (mode == null)
-            throw new IllegalArgumentException("`mode` must be non-null if 
`securityProtocol` is `" + securityProtocol + "`");
+    @SuppressWarnings("deprecation")
+    public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> 
configs,
+                                                               TransportLayer 
transportLayer,
+                                                               Authenticator 
authenticator,
+                                                               
KerberosShortNamer kerberosShortNamer) {
+        Class<?> principalBuilderClass = (Class<?>) 
configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
+        final KafkaPrincipalBuilder builder;
+
+        if (principalBuilderClass == null || principalBuilderClass == 
DefaultKafkaPrincipalBuilder.class) {
+            builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer);
+        } else if 
(KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
+            builder = (KafkaPrincipalBuilder) 
Utils.newInstance(principalBuilderClass);
+        } else if 
(PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
+            PrincipalBuilder oldPrincipalBuilder = 
createPrincipalBuilder(principalBuilderClass, configs);
+            builder = 
DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, 
transportLayer,
+                    oldPrincipalBuilder, kerberosShortNamer);
+        } else {
+            throw new InvalidConfigurationException("Type " + 
principalBuilderClass.getName() + " is not " +
+                    "an instance of " + PrincipalBuilder.class.getName() + " 
or " + KafkaPrincipalBuilder.class.getName());
+        }
+
+        if (builder instanceof Configurable)
+            ((Configurable) builder).configure(configs);
+
+        return builder;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
deleted file mode 100644
index 77d94a6..0000000
--- 
a/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.network;
-
-import java.security.Principal;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.kafka.common.security.auth.PrincipalBuilder;
-import org.apache.kafka.common.KafkaException;
-
-public class DefaultAuthenticator implements Authenticator {
-
-    private TransportLayer transportLayer;
-    private PrincipalBuilder principalBuilder;
-    private Principal principal;
-
-    public void configure(TransportLayer transportLayer, PrincipalBuilder 
principalBuilder, Map<String, ?> configs) {
-        this.transportLayer = transportLayer;
-        this.principalBuilder = principalBuilder;
-    }
-
-    /**
-     * No-Op for default authenticator
-     */
-    public void authenticate() throws IOException {}
-
-    /**
-     * Constructs Principal using configured principalBuilder.
-     * @return Principal
-     * @throws KafkaException
-     */
-    public Principal principal() throws KafkaException {
-        if (principal == null)
-            principal = principalBuilder.buildPrincipal(transportLayer, this);
-        return principal;
-    }
-
-    public void close() throws IOException {}
-
-    /**
-     * DefaultAuthenticator doesn't implement any additional authentication 
mechanism.
-     * @return true
-     */
-    public boolean complete() {
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 1e76c43..c759f00 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -17,17 +17,15 @@
 package org.apache.kafka.common.network;
 
 
-import java.io.IOException;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Utils;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.nio.channels.SelectionKey;
-
-import java.security.Principal;
-
 import java.util.Objects;
-import org.apache.kafka.common.memory.MemoryPool;
-import org.apache.kafka.common.utils.Utils;
 
 public class KafkaChannel {
     private final String id;
@@ -66,7 +64,7 @@ public class KafkaChannel {
     /**
      * Returns the principal returned by `authenticator.principal()`.
      */
-    public Principal principal() throws IOException {
+    public KafkaPrincipal principal() {
         return authenticator.principal();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index ad63671..c0d1059 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -16,45 +16,73 @@
  */
 package org.apache.kafka.common.network;
 
-import java.nio.channels.SelectionKey;
-import java.util.Map;
-
-import org.apache.kafka.common.memory.MemoryPool;
-import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.apache.kafka.common.KafkaException;
-
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
+import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.channels.SelectionKey;
+import java.util.Map;
+
 public class PlaintextChannelBuilder implements ChannelBuilder {
     private static final Logger log = 
LoggerFactory.getLogger(PlaintextChannelBuilder.class);
-    private PrincipalBuilder principalBuilder;
     private Map<String, ?> configs;
 
     public void configure(Map<String, ?> configs) throws KafkaException {
-        try {
-            this.configs = configs;
-            principalBuilder = ChannelBuilders.createPrincipalBuilder(configs);
-        } catch (Exception e) {
-            throw new KafkaException(e);
-        }
+        this.configs = configs;
     }
 
     @Override
     public KafkaChannel buildChannel(String id, SelectionKey key, int 
maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
         try {
             PlaintextTransportLayer transportLayer = new 
PlaintextTransportLayer(key);
-            Authenticator authenticator = new DefaultAuthenticator();
-            authenticator.configure(transportLayer, this.principalBuilder, 
this.configs);
-            return new KafkaChannel(id, transportLayer, authenticator, 
maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
+            PlaintextAuthenticator authenticator = new 
PlaintextAuthenticator(configs, transportLayer);
+            return new KafkaChannel(id, transportLayer, authenticator, 
maxReceiveSize,
+                    memoryPool != null ? memoryPool : MemoryPool.NONE);
         } catch (Exception e) {
             log.warn("Failed to create channel due to ", e);
             throw new KafkaException(e);
         }
     }
 
-    public void close() {
-        this.principalBuilder.close();
+    @Override
+    public void close() {}
+
+    private static class PlaintextAuthenticator implements Authenticator {
+        private final PlaintextTransportLayer transportLayer;
+        private final KafkaPrincipalBuilder principalBuilder;
+
+        private PlaintextAuthenticator(Map<String, ?> configs, 
PlaintextTransportLayer transportLayer) {
+            this.transportLayer = transportLayer;
+            this.principalBuilder = 
ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null);
+        }
+
+        @Override
+        public void authenticate() throws IOException {}
+
+        @Override
+        public KafkaPrincipal principal() {
+            InetAddress clientAddress = 
transportLayer.socketChannel().socket().getInetAddress();
+            return principalBuilder.build(new 
PlaintextAuthenticationContext(clientAddress));
+        }
+
+        @Override
+        public boolean complete() {
+            return true;
+        }
+
+        @Override
+        public void close() {
+            if (principalBuilder instanceof Closeable)
+                Utils.closeQuietly((Closeable) principalBuilder, "principal 
builder");
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 0f98463..9d7eac0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -16,29 +16,30 @@
  */
 package org.apache.kafka.common.network;
 
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.List;
-import java.util.Map;
-
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.security.JaasContext;
-import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.authenticator.LoginManager;
 import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
 import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
+import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.utils.Java;
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kafka.common.KafkaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import java.util.Map;
+
 public class SaslChannelBuilder implements ChannelBuilder {
     private static final Logger log = 
LoggerFactory.getLogger(SaslChannelBuilder.class);
 
@@ -71,12 +72,13 @@ public class SaslChannelBuilder implements ChannelBuilder {
         this.credentialCache = credentialCache;
     }
 
+    @Override
     public void configure(Map<String, ?> configs) throws KafkaException {
         try {
             this.configs = configs;
             boolean hasKerberos;
             if (mode == Mode.SERVER) {
-                List<String> enabledMechanisms = (List<String>) 
this.configs.get(SaslConfigs.SASL_ENABLED_MECHANISMS);
+                List<String> enabledMechanisms = (List<String>) 
this.configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG);
                 hasKerberos = enabledMechanisms == null || 
enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM);
             } else {
                 hasKerberos = 
clientSaslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM);
@@ -90,7 +92,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
                     defaultRealm = "";
                 }
                 @SuppressWarnings("unchecked")
-                List<String> principalToLocalRules = (List<String>) 
configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES);
+                List<String> principalToLocalRules = (List<String>) 
configs.get(BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG);
                 if (principalToLocalRules != null)
                     kerberosShortNamer = 
KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
             }
@@ -114,14 +116,12 @@ public class SaslChannelBuilder implements ChannelBuilder 
{
             TransportLayer transportLayer = buildTransportLayer(id, key, 
socketChannel);
             Authenticator authenticator;
             if (mode == Mode.SERVER)
-                authenticator = new SaslServerAuthenticator(id, jaasContext, 
loginManager.subject(),
-                        kerberosShortNamer, 
socketChannel.socket().getLocalAddress(), credentialCache,
-                        listenerName, securityProtocol);
+                authenticator = new SaslServerAuthenticator(configs, id, 
jaasContext, loginManager.subject(),
+                        kerberosShortNamer, credentialCache, listenerName, 
securityProtocol, transportLayer);
             else
-                authenticator = new SaslClientAuthenticator(id, 
loginManager.subject(), loginManager.serviceName(),
-                        socketChannel.socket().getInetAddress().getHostName(), 
clientSaslMechanism, handshakeRequestEnable);
-            // Both authenticators don't use `PrincipalBuilder`, so we pass 
`null` for now. Reconsider if this changes.
-            authenticator.configure(transportLayer, null, this.configs);
+                authenticator = new SaslClientAuthenticator(configs, id, 
loginManager.subject(), loginManager.serviceName(),
+                        socketChannel.socket().getInetAddress().getHostName(), 
clientSaslMechanism,
+                        handshakeRequestEnable, transportLayer);
             return new KafkaChannel(id, transportLayer, authenticator, 
maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
         } catch (Exception e) {
             log.info("Failed to create channel due to ", e);
@@ -129,6 +129,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
         }
     }
 
+    @Override
     public void close()  {
         if (loginManager != null) {
             loginManager.release();
@@ -136,7 +137,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
         }
     }
 
-    protected TransportLayer buildTransportLayer(String id, SelectionKey key, 
SocketChannel socketChannel) throws IOException {
+    private TransportLayer buildTransportLayer(String id, SelectionKey key, 
SocketChannel socketChannel) throws IOException {
         if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
             return SslTransportLayer.create(id, key,
                 
sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
 socketChannel.socket().getPort()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index 01b72fe..b6ef625 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -16,23 +16,27 @@
  */
 package org.apache.kafka.common.network;
 
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
+import org.apache.kafka.common.security.auth.SslAuthenticationContext;
+import org.apache.kafka.common.security.ssl.SslFactory;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.Map;
 
-import org.apache.kafka.common.memory.MemoryPool;
-import org.apache.kafka.common.security.auth.PrincipalBuilder;
-import org.apache.kafka.common.security.ssl.SslFactory;
-import org.apache.kafka.common.KafkaException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class SslChannelBuilder implements ChannelBuilder {
     private static final Logger log = 
LoggerFactory.getLogger(SslChannelBuilder.class);
     private SslFactory sslFactory;
-    private PrincipalBuilder principalBuilder;
     private Mode mode;
     private Map<String, ?> configs;
 
@@ -45,7 +49,6 @@ public class SslChannelBuilder implements ChannelBuilder {
             this.configs = configs;
             this.sslFactory = new SslFactory(mode);
             this.sslFactory.configure(this.configs);
-            this.principalBuilder = 
ChannelBuilders.createPrincipalBuilder(configs);
         } catch (Exception e) {
             throw new KafkaException(e);
         }
@@ -55,23 +58,21 @@ public class SslChannelBuilder implements ChannelBuilder {
     public KafkaChannel buildChannel(String id, SelectionKey key, int 
maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
         try {
             SslTransportLayer transportLayer = buildTransportLayer(sslFactory, 
id, key, peerHost(key));
-            Authenticator authenticator = new DefaultAuthenticator();
-            authenticator.configure(transportLayer, this.principalBuilder, 
this.configs);
-            return new KafkaChannel(id, transportLayer, authenticator, 
maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
+            Authenticator authenticator = new SslAuthenticator(configs, 
transportLayer);
+            return new KafkaChannel(id, transportLayer, authenticator, 
maxReceiveSize,
+                    memoryPool != null ? memoryPool : MemoryPool.NONE);
         } catch (Exception e) {
             log.info("Failed to create channel due to ", e);
             throw new KafkaException(e);
         }
     }
 
-    public void close()  {
-        this.principalBuilder.close();
-    }
+    @Override
+    public void close() {}
 
     protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, 
String id, SelectionKey key, String host) throws IOException {
         SocketChannel socketChannel = (SocketChannel) key.channel();
-        return SslTransportLayer.create(id, key,
-            sslFactory.createSslEngine(host, 
socketChannel.socket().getPort()));
+        return SslTransportLayer.create(id, key, 
sslFactory.createSslEngine(host, socketChannel.socket().getPort()));
     }
 
     /**
@@ -112,4 +113,50 @@ public class SslChannelBuilder implements ChannelBuilder {
         SocketChannel socketChannel = (SocketChannel) key.channel();
         return new InetSocketAddress(socketChannel.socket().getInetAddress(), 
0).getHostString();
     }
+
+    /**
+     * Note that client SSL authentication is handled in {@link 
SslTransportLayer}. This class is only used
+     * to transform the derived principal using a {@link 
KafkaPrincipalBuilder} configured by the user.
+     */
+    private static class SslAuthenticator implements Authenticator {
+        private final SslTransportLayer transportLayer;
+        private final KafkaPrincipalBuilder principalBuilder;
+
+        private SslAuthenticator(Map<String, ?> configs, SslTransportLayer 
transportLayer) {
+            this.transportLayer = transportLayer;
+            this.principalBuilder = 
ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null);
+        }
+        /**
+         * No-Op for plaintext authenticator
+         */
+        @Override
+        public void authenticate() throws IOException {}
+
+        /**
+         * Constructs Principal using configured principalBuilder.
+         * @return the built principal
+         */
+        @Override
+        public KafkaPrincipal principal() {
+            InetAddress clientAddress = 
transportLayer.socketChannel().socket().getInetAddress();
+            SslAuthenticationContext context = new 
SslAuthenticationContext(transportLayer.sslSession(), clientAddress);
+            return principalBuilder.build(context);
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (principalBuilder instanceof Closeable)
+                Utils.closeQuietly((Closeable) principalBuilder, "principal 
builder");
+        }
+
+        /**
+         * SslAuthenticator doesn't implement any additional authentication 
mechanism.
+         * @return true
+         */
+        @Override
+        public boolean complete() {
+            return true;
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java
 
b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java
new file mode 100644
index 0000000..8c82954
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.net.InetAddress;
+
+/**
+ * An object representing contextual information from the authentication 
session. See
+ * {@link SaslAuthenticationContext} and {@link SslAuthenticationContext}.
+ */
+public interface AuthenticationContext {
+    /**
+     * Underlying security protocol of the authentication session.
+     * @return The name of the security protocol (i.e. PLAINTEXT, 
SASL_PLAINTEXT, SASL_SSL, SSL)
+     */
+    String securityProtocolName();
+
+    /**
+     * Address of the authenticated client
+     */
+    InetAddress clientAddress();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
index 04e4c90..424cfaa 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
@@ -23,7 +23,11 @@ import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.KafkaException;
 
-/** DefaultPrincipalBuilder which return transportLayer's peer Principal **/
+/**
+ * DefaultPrincipalBuilder which return transportLayer's peer Principal
+ * @deprecated As of Kafka 1.0.0. This will be removed in a future major 
release.
+ **/
+@Deprecated
 public class DefaultPrincipalBuilder implements PrincipalBuilder {
 
     public void configure(Map<String, ?> configs) {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
 
b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
index 4bacdcb..ed3c956 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
@@ -16,53 +16,69 @@
  */
 package org.apache.kafka.common.security.auth;
 
+import org.apache.kafka.common.utils.SecurityUtils;
+
 import java.security.Principal;
 
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Principals in Kafka are defined by a type and a name. The principal type 
will always be "User"
+ * for the simple authorizer that is enabled by default, but custom 
authorizers can leverage different
+ * principal types (such as to enable group or role-based ACLs). The {@link 
KafkaPrincipalBuilder} interface
+ * is used when you need to derive a different principal type from the 
authentication context, or when
+ * you need to represent relations between different principals. For example, 
you could extend
+ * {@link KafkaPrincipal} in order to link a user principal to one or more 
role principals.
+ *
+ * For custom extensions of {@link KafkaPrincipal}, there two key points to 
keep in mind:
+ *
+ * 1. To be compatible with the ACL APIs provided by Kafka (including the 
command line tool), each ACL
+ *    can only represent a permission granted to a single principal 
(consisting of a principal type and name).
+ *    It is possible to use richer ACL semantics, but you must implement your 
own mechanisms for adding
+ *    and removing ACLs.
+ * 2. In general, {@link KafkaPrincipal} extensions are only useful when the 
corresponding Authorizer
+ *    is also aware of the extension. If you have a {@link 
KafkaPrincipalBuilder} which derives user groups
+ *    from the authentication context (e.g. from an SSL client certificate), 
then you need a custom
+ *    authorizer which is capable of using the additional group information.
+ */
 public class KafkaPrincipal implements Principal {
-    public static final String SEPARATOR = ":";
     public static final String USER_TYPE = "User";
     public final static KafkaPrincipal ANONYMOUS = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "ANONYMOUS");
 
-    private String principalType;
-    private String name;
+    private final String principalType;
+    private final String name;
 
     public KafkaPrincipal(String principalType, String name) {
-        if (principalType == null || name == null) {
-            throw new IllegalArgumentException("principalType and name can not 
be null");
-        }
-        this.principalType = principalType;
-        this.name = name;
+        this.principalType = requireNonNull(principalType, "Principal type 
cannot be null");
+        this.name = requireNonNull(name, "Principal name cannot be null");
     }
 
+    /**
+     * Parse a {@link KafkaPrincipal} instance from a string. This method 
cannot be used for {@link KafkaPrincipal}
+     * extensions.
+     *
+     * @param str The input string formatted as 
"{principalType}:{principalName}"
+     * @return The parsed {@link KafkaPrincipal} instance
+     * @deprecated As of 1.0.0. This method will be removed in a future major 
release.
+     */
+    @Deprecated
     public static KafkaPrincipal fromString(String str) {
-        if (str == null || str.isEmpty()) {
-            throw new IllegalArgumentException("expected a string in format 
principalType:principalName but got " + str);
-        }
-
-        String[] split = str.split(SEPARATOR, 2);
-
-        if (split == null || split.length != 2) {
-            throw new IllegalArgumentException("expected a string in format 
principalType:principalName but got " + str);
-        }
-
-        return new KafkaPrincipal(split[0], split[1]);
+        return SecurityUtils.parseKafkaPrincipal(str);
     }
 
     @Override
     public String toString() {
-        return principalType + SEPARATOR + name;
+        return principalType + ":" + name;
     }
 
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
-        if (!(o instanceof KafkaPrincipal)) return false;
+        if (o == null) return false;
+        if (getClass() != o.getClass()) return false;
 
         KafkaPrincipal that = (KafkaPrincipal) o;
-
-        if (!principalType.equals(that.principalType)) return false;
-        return name.equals(that.name);
-
+        return principalType.equals(that.principalType) && 
name.equals(that.name);
     }
 
     @Override
@@ -80,7 +96,5 @@ public class KafkaPrincipal implements Principal {
     public String getPrincipalType() {
         return principalType;
     }
-}
-
-
 
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java
new file mode 100644
index 0000000..941d3b1
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+/**
+ * Pluggable principal builder interface which supports both SSL 
authentication through
+ * {@link SslAuthenticationContext} and SASL through {@link 
SaslAuthenticationContext}.
+ *
+ * Note that the {@link org.apache.kafka.common.Configurable} and {@link 
java.io.Closeable}
+ * interfaces are respected if implemented. Additionally, implementations must 
provide a
+ * default no-arg constructor.
+ */
+public interface KafkaPrincipalBuilder {
+    /**
+     * Build a kafka principal from the authentication context.
+     * @param context The authentication context (either {@link 
SslAuthenticationContext} or
+     *        {@link SaslAuthenticationContext})
+     * @return The built principal which may provide additional enrichment 
through a subclass of
+     *        {@link KafkaPrincipalBuilder}.
+     */
+    KafkaPrincipal build(AuthenticationContext context);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java
 
b/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java
new file mode 100644
index 0000000..96b8376
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import org.apache.kafka.common.protocol.SecurityProtocol;
+
+import java.net.InetAddress;
+
+public class PlaintextAuthenticationContext implements AuthenticationContext {
+    private final InetAddress clientAddress;
+
+    public PlaintextAuthenticationContext(InetAddress clientAddress) {
+        this.clientAddress = clientAddress;
+    }
+
+    @Override
+    public String securityProtocolName() {
+        return SecurityProtocol.PLAINTEXT.name;
+    }
+
+    @Override
+    public InetAddress clientAddress() {
+        return clientAddress;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
index fc2a930..d58170a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
@@ -25,10 +25,13 @@ import org.apache.kafka.common.Configurable;
 import java.util.Map;
 import java.security.Principal;
 
-/*
+/**
  * PrincipalBuilder for Authenticator
+ * @deprecated As of Kafka 1.0.0, use {@link KafkaPrincipalBuilder} instead. 
This will be removed in
+ *             a future major release.
  */
 @InterfaceStability.Unstable
+@Deprecated
 public interface PrincipalBuilder extends Configurable {
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java
 
b/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java
new file mode 100644
index 0000000..f98164b
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import org.apache.kafka.common.protocol.SecurityProtocol;
+
+import javax.security.sasl.SaslServer;
+import java.net.InetAddress;
+
+public class SaslAuthenticationContext implements AuthenticationContext {
+    private final SaslServer server;
+    private final SecurityProtocol securityProtocol;
+    private final InetAddress clientAddress;
+
+    public SaslAuthenticationContext(SaslServer server, SecurityProtocol 
securityProtocol, InetAddress clientAddress) {
+        this.server = server;
+        this.securityProtocol = securityProtocol;
+        this.clientAddress = clientAddress;
+    }
+
+    public SaslServer server() {
+        return server;
+    }
+
+    @Override
+    public String securityProtocolName() {
+        return securityProtocol.name;
+    }
+
+    @Override
+    public InetAddress clientAddress() {
+        return clientAddress;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java
 
b/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java
new file mode 100644
index 0000000..325c282
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import org.apache.kafka.common.protocol.SecurityProtocol;
+
+import javax.net.ssl.SSLSession;
+import java.net.InetAddress;
+
+public class SslAuthenticationContext implements AuthenticationContext {
+    private final SSLSession session;
+    private final InetAddress clientAddress;
+
+    public SslAuthenticationContext(SSLSession session, InetAddress 
clientAddress) {
+        this.session = session;
+        this.clientAddress = clientAddress;
+    }
+
+    public SSLSession session() {
+        return session;
+    }
+
+    @Override
+    public String securityProtocolName() {
+        return SecurityProtocol.SSL.name;
+    }
+
+    @Override
+    public InetAddress clientAddress() {
+        return clientAddress;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
new file mode 100644
index 0000000..1d0295a
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.security.auth.AuthenticationContext;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
+import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
+import org.apache.kafka.common.security.auth.SslAuthenticationContext;
+import org.apache.kafka.common.security.kerberos.KerberosName;
+import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.security.sasl.SaslServer;
+import java.io.Closeable;
+import java.io.IOException;
+import java.security.Principal;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Default implementation of {@link KafkaPrincipalBuilder} which provides 
basic support for
+ * SSL authentication and SASL authentication. In the latter case, when GSSAPI 
is used, this
+ * class applies {@link 
org.apache.kafka.common.security.kerberos.KerberosShortNamer} to transform
+ * the name.
+ *
+ * NOTE: This is an internal class and can change without notice. Unlike 
normal implementations
+ * of {@link KafkaPrincipalBuilder}, there is no default no-arg constructor 
since this class
+ * must adapt implementations of the older {@link PrincipalBuilder} interface.
+ */
+public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, 
Closeable {
+    @SuppressWarnings("deprecation")
+    private final PrincipalBuilder oldPrincipalBuilder;
+    private final Authenticator authenticator;
+    private final TransportLayer transportLayer;
+    private final KerberosShortNamer kerberosShortNamer;
+
+    /**
+     * Construct a new instance which wraps an instance of the older {@link 
PrincipalBuilder}.
+     *
+     * @param authenticator The authenticator in use
+     * @param transportLayer The underlying transport layer
+     * @param oldPrincipalBuilder Instance of {@link PrincipalBuilder}
+     * @param kerberosShortNamer Kerberos name rewrite rules or null if none 
have been configured
+     */
+    @SuppressWarnings("deprecation")
+    public static DefaultKafkaPrincipalBuilder 
fromOldPrincipalBuilder(Authenticator authenticator,
+                                                                       
TransportLayer transportLayer,
+                                                                       
PrincipalBuilder oldPrincipalBuilder,
+                                                                       
KerberosShortNamer kerberosShortNamer) {
+        return new DefaultKafkaPrincipalBuilder(
+                requireNonNull(authenticator),
+                requireNonNull(transportLayer),
+                requireNonNull(oldPrincipalBuilder),
+                kerberosShortNamer);
+    }
+
+    @SuppressWarnings("deprecation")
+    private DefaultKafkaPrincipalBuilder(Authenticator authenticator,
+                                         TransportLayer transportLayer,
+                                         PrincipalBuilder oldPrincipalBuilder,
+                                         KerberosShortNamer 
kerberosShortNamer) {
+        this.authenticator = authenticator;
+        this.transportLayer = transportLayer;
+        this.oldPrincipalBuilder = oldPrincipalBuilder;
+        this.kerberosShortNamer = kerberosShortNamer;
+    }
+
+    /**
+     * Construct a new instance.
+     *
+     * @param kerberosShortNamer Kerberos name rewrite rules or null if none 
have been configured
+     */
+    public DefaultKafkaPrincipalBuilder(KerberosShortNamer kerberosShortNamer) 
{
+        this(null, null, null, kerberosShortNamer);
+    }
+
+    @Override
+    public KafkaPrincipal build(AuthenticationContext context) {
+        if (context instanceof PlaintextAuthenticationContext) {
+            if (oldPrincipalBuilder != null)
+                return 
convertToKafkaPrincipal(oldPrincipalBuilder.buildPrincipal(transportLayer, 
authenticator));
+
+            return KafkaPrincipal.ANONYMOUS;
+        } else if (context instanceof SslAuthenticationContext) {
+            SSLSession sslSession = ((SslAuthenticationContext) 
context).session();
+
+            if (oldPrincipalBuilder != null)
+                return 
convertToKafkaPrincipal(oldPrincipalBuilder.buildPrincipal(transportLayer, 
authenticator));
+
+            try {
+                return convertToKafkaPrincipal(sslSession.getPeerPrincipal());
+            } catch (SSLPeerUnverifiedException se) {
+                return KafkaPrincipal.ANONYMOUS;
+            }
+        } else if (context instanceof SaslAuthenticationContext) {
+            SaslServer saslServer = ((SaslAuthenticationContext) 
context).server();
+            if 
(SaslConfigs.GSSAPI_MECHANISM.equals(saslServer.getMechanismName()))
+                return 
applyKerberosShortNamer(saslServer.getAuthorizationID());
+            else
+                return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
saslServer.getAuthorizationID());
+        } else {
+            throw new IllegalArgumentException("Unhandled authentication 
context type: " + context.getClass().getName());
+        }
+    }
+
+    private KafkaPrincipal applyKerberosShortNamer(String authorizationId) {
+        KerberosName kerberosName = KerberosName.parse(authorizationId);
+        try {
+            String shortName = kerberosShortNamer.shortName(kerberosName);
+            return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, shortName);
+        } catch (IOException e) {
+            throw new KafkaException("Failed to set name for '" + kerberosName 
+
+                    "' based on Kerberos authentication rules.", e);
+        }
+    }
+
+    private KafkaPrincipal convertToKafkaPrincipal(Principal principal) {
+        return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
principal.getName());
+    }
+
+    @Override
+    public void close() {
+        if (oldPrincipalBuilder != null)
+            oldPrincipalBuilder.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 6bf9b2a..7b68abe 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -38,7 +38,6 @@ import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,14 +69,11 @@ public class SaslClientAuthenticator implements 
Authenticator {
     private final String host;
     private final String node;
     private final String mechanism;
-    private final boolean handshakeRequestEnable;
-
-    // assigned in `configure`
-    private SaslClient saslClient;
-    private Map<String, ?> configs;
-    private String clientPrincipalName;
-    private AuthCallbackHandler callbackHandler;
-    private TransportLayer transportLayer;
+    private final TransportLayer transportLayer;
+    private final SaslClient saslClient;
+    private final Map<String, ?> configs;
+    private final String clientPrincipalName;
+    private final AuthCallbackHandler callbackHandler;
 
     // buffers used in `authenticate`
     private NetworkReceive netInBuffer;
@@ -92,21 +88,24 @@ public class SaslClientAuthenticator implements 
Authenticator {
     // Request header for which response from the server is pending
     private RequestHeader currentRequestHeader;
 
-    public SaslClientAuthenticator(String node, Subject subject, String 
servicePrincipal, String host, String mechanism, boolean 
handshakeRequestEnable) throws IOException {
+    public SaslClientAuthenticator(Map<String, ?> configs,
+                                   String node,
+                                   Subject subject,
+                                   String servicePrincipal,
+                                   String host,
+                                   String mechanism,
+                                   boolean handshakeRequestEnable,
+                                   TransportLayer transportLayer) throws 
IOException {
         this.node = node;
         this.subject = subject;
         this.host = host;
         this.servicePrincipal = servicePrincipal;
         this.mechanism = mechanism;
-        this.handshakeRequestEnable = handshakeRequestEnable;
         this.correlationId = -1;
-    }
+        this.transportLayer = transportLayer;
+        this.configs = configs;
 
-    public void configure(TransportLayer transportLayer, PrincipalBuilder 
principalBuilder, Map<String, ?> configs) throws KafkaException {
         try {
-            this.transportLayer = transportLayer;
-            this.configs = configs;
-
             setSaslState(handshakeRequestEnable ? 
SaslState.SEND_HANDSHAKE_REQUEST : SaslState.INITIAL);
 
             // determine client principal from subject for Kerberos to use as 
authorization id for the SaslClient.
@@ -252,7 +251,7 @@ public class SaslClientAuthenticator implements 
Authenticator {
         return serverPacket;
     }
 
-    public Principal principal() {
+    public KafkaPrincipal principal() {
         return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
clientPrincipalName);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 276d067..28aa995 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -18,11 +18,13 @@ package org.apache.kafka.common.security.authenticator;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.network.ChannelBuilders;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.network.NetworkReceive;
@@ -43,12 +45,14 @@ import 
org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
+import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
 import org.apache.kafka.common.security.kerberos.KerberosName;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.scram.ScramCredential;
 import org.apache.kafka.common.security.scram.ScramMechanism;
 import org.apache.kafka.common.security.scram.ScramServerCallbackHandler;
+import org.apache.kafka.common.utils.Utils;
 import org.ietf.jgss.GSSContext;
 import org.ietf.jgss.GSSCredential;
 import org.ietf.jgss.GSSException;
@@ -62,11 +66,11 @@ import javax.security.auth.Subject;
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-import java.security.Principal;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashSet;
@@ -80,7 +84,7 @@ public class SaslServerAuthenticator implements Authenticator 
{
     static final int MAX_RECEIVE_SIZE = 524288;
     private static final Logger LOG = 
LoggerFactory.getLogger(SaslServerAuthenticator.class);
 
-    public enum SaslState {
+    private enum SaslState {
         GSSAPI_OR_HANDSHAKE_REQUEST, HANDSHAKE_REQUEST, AUTHENTICATE, 
COMPLETE, FAILED
     }
 
@@ -89,9 +93,11 @@ public class SaslServerAuthenticator implements 
Authenticator {
     private final String connectionId;
     private final JaasContext jaasContext;
     private final Subject subject;
-    private final KerberosShortNamer kerberosNamer;
-    private final InetAddress clientAddress;
     private final CredentialCache credentialCache;
+    private final TransportLayer transportLayer;
+    private final Set<String> enabledMechanisms;
+    private final Map<String, ?> configs;
+    private final KafkaPrincipalBuilder principalBuilder;
 
     // Current SASL state
     private SaslState saslState = SaslState.GSSAPI_OR_HANDSHAKE_REQUEST;
@@ -101,48 +107,44 @@ public class SaslServerAuthenticator implements 
Authenticator {
     private String saslMechanism;
     private AuthCallbackHandler callbackHandler;
 
-    // assigned in `configure`
-    private TransportLayer transportLayer;
-    private Set<String> enabledMechanisms;
-    private Map<String, ?> configs;
-
     // buffers used in `authenticate`
     private NetworkReceive netInBuffer;
     private Send netOutBuffer;
 
-    public SaslServerAuthenticator(String connectionId,
+    public SaslServerAuthenticator(Map<String, ?> configs,
+                                   String connectionId,
                                    JaasContext jaasContext,
                                    Subject subject,
                                    KerberosShortNamer kerberosNameParser,
-                                   InetAddress clientAddress,
                                    CredentialCache credentialCache,
                                    ListenerName listenerName,
-                                   SecurityProtocol securityProtocol) throws 
IOException {
+                                   SecurityProtocol securityProtocol,
+                                   TransportLayer transportLayer) throws 
IOException {
         if (subject == null)
             throw new IllegalArgumentException("subject cannot be null");
         this.connectionId = connectionId;
         this.jaasContext = jaasContext;
         this.subject = subject;
-        this.kerberosNamer = kerberosNameParser;
-        this.clientAddress = clientAddress;
         this.credentialCache = credentialCache;
         this.listenerName = listenerName;
         this.securityProtocol = securityProtocol;
-    }
-
-    public void configure(TransportLayer transportLayer, PrincipalBuilder 
principalBuilder, Map<String, ?> configs) {
         this.transportLayer = transportLayer;
+
         this.configs = configs;
-        List<String> enabledMechanisms = (List<String>) 
this.configs.get(SaslConfigs.SASL_ENABLED_MECHANISMS);
+        List<String> enabledMechanisms = (List<String>) 
this.configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG);
         if (enabledMechanisms == null || enabledMechanisms.isEmpty())
             throw new IllegalArgumentException("No SASL mechanisms are 
enabled");
         this.enabledMechanisms = new HashSet<>(enabledMechanisms);
+
+        // Note that the old principal builder does not support SASL, so we do 
not need to pass the
+        // authenticator or the transport layer
+        this.principalBuilder = 
ChannelBuilders.createPrincipalBuilder(configs, null, null, kerberosNameParser);
     }
 
     private void createSaslServer(String mechanism) throws IOException {
         this.saslMechanism = mechanism;
         if (!ScramMechanism.isScram(mechanism))
-            callbackHandler = new SaslServerCallbackHandler(jaasContext, 
kerberosNamer);
+            callbackHandler = new SaslServerCallbackHandler(jaasContext);
         else
             callbackHandler = new 
ScramServerCallbackHandler(credentialCache.cache(mechanism, 
ScramCredential.class));
         callbackHandler.configure(configs, Mode.SERVER, subject, 
saslMechanism);
@@ -152,7 +154,8 @@ public class SaslServerAuthenticator implements 
Authenticator {
             try {
                 saslServer = Subject.doAs(subject, new 
PrivilegedExceptionAction<SaslServer>() {
                     public SaslServer run() throws SaslException {
-                        return Sasl.createSaslServer(saslMechanism, "kafka", 
clientAddress.getHostName(), configs, callbackHandler);
+                        return Sasl.createSaslServer(saslMechanism, "kafka", 
serverAddress().getHostName(),
+                                configs, callbackHandler);
                     }
                 });
             } catch (PrivilegedActionException e) {
@@ -214,6 +217,7 @@ public class SaslServerAuthenticator implements 
Authenticator {
      * The messages are sent and received as size delimited bytes that 
consists of a 4 byte network-ordered size N
      * followed by N bytes representing the opaque payload.
      */
+    @Override
     public void authenticate() throws IOException {
         if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
             return;
@@ -263,15 +267,21 @@ public class SaslServerAuthenticator implements 
Authenticator {
         }
     }
 
-    public Principal principal() {
-        return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
saslServer.getAuthorizationID());
+    @Override
+    public KafkaPrincipal principal() {
+        SaslAuthenticationContext context = new 
SaslAuthenticationContext(saslServer, securityProtocol, clientAddress());
+        return principalBuilder.build(context);
     }
 
+    @Override
     public boolean complete() {
         return saslState == SaslState.COMPLETE;
     }
 
+    @Override
     public void close() throws IOException {
+        if (principalBuilder instanceof Closeable)
+            Utils.closeQuietly((Closeable) principalBuilder, "principal 
builder");
         if (saslServer != null)
             saslServer.dispose();
         if (callbackHandler != null)
@@ -305,6 +315,14 @@ public class SaslServerAuthenticator implements 
Authenticator {
         return netOutBuffer.completed();
     }
 
+    private InetAddress serverAddress() {
+        return transportLayer.socketChannel().socket().getLocalAddress();
+    }
+
+    private InetAddress clientAddress() {
+        return transportLayer.socketChannel().socket().getInetAddress();
+    }
+
     private boolean handleKafkaRequest(byte[] requestBytes) throws 
IOException, AuthenticationException {
         boolean isKafkaRequest = false;
         String clientMechanism = null;
@@ -325,7 +343,8 @@ public class SaslServerAuthenticator implements 
Authenticator {
 
             LOG.debug("Handling Kafka request {}", apiKey);
 
-            RequestContext requestContext = new RequestContext(header, 
connectionId, clientAddress,
+
+            RequestContext requestContext = new RequestContext(header, 
connectionId, clientAddress(),
                     KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol);
             RequestAndSize requestAndSize = 
requestContext.parseRequest(requestBuffer);
             if (apiKey == ApiKeys.API_VERSIONS)

Reply via email to