Repository: kafka
Updated Branches:
  refs/heads/trunk 985cc534a -> 8fca43222


http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 2dd7db9..d41d61a 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -26,11 +26,14 @@ import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.ChannelBuilders;
 import org.apache.kafka.common.network.ChannelState;
 import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.NetworkTestUtils;
 import org.apache.kafka.common.network.NioEchoServer;
+import org.apache.kafka.common.network.SaslChannelBuilder;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.SecurityProtocol;
@@ -38,15 +41,18 @@ import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.requests.SaslAuthenticateRequest;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.TestSecurityConfig;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
 import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.ScramCredentialUtils;
 import org.apache.kafka.common.security.scram.ScramFormatter;
 import org.apache.kafka.common.security.scram.ScramLoginModule;
 import org.apache.kafka.common.security.scram.ScramMechanism;
@@ -63,10 +69,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import javax.security.auth.Subject;
 import javax.security.auth.login.Configuration;
 
 import static org.junit.Assert.assertEquals;
@@ -88,6 +96,8 @@ public class SaslAuthenticatorTest {
     private CertStores clientCertStores;
     private Map<String, Object> saslClientConfigs;
     private Map<String, Object> saslServerConfigs;
+    private CredentialCache credentialCache;
+    private int nextCorrelationId;
 
     @Before
     public void setup() throws Exception {
@@ -95,6 +105,7 @@ public class SaslAuthenticatorTest {
         clientCertStores = new CertStores(false, "localhost");
         saslServerConfigs = 
serverCertStores.getTrustingConfig(clientCertStores);
         saslClientConfigs = 
clientCertStores.getTrustingConfig(serverCertStores);
+        credentialCache = new CredentialCache();
     }
 
     @After
@@ -139,7 +150,7 @@ public class SaslAuthenticatorTest {
         String node = "0";
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         TestJaasConfig jaasConfig = configureMechanisms("PLAIN", 
Arrays.asList("PLAIN"));
-        jaasConfig.setPlainClientOptions(TestJaasConfig.USERNAME, 
"invalidpassword");
+        jaasConfig.setClientOptions("PLAIN", TestJaasConfig.USERNAME, 
"invalidpassword");
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnectionFailure(securityProtocol, node);
@@ -153,7 +164,7 @@ public class SaslAuthenticatorTest {
         String node = "0";
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         TestJaasConfig jaasConfig = configureMechanisms("PLAIN", 
Arrays.asList("PLAIN"));
-        jaasConfig.setPlainClientOptions("invaliduser", 
TestJaasConfig.PASSWORD);
+        jaasConfig.setClientOptions("PLAIN", "invaliduser", 
TestJaasConfig.PASSWORD);
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnectionFailure(securityProtocol, node);
@@ -166,7 +177,7 @@ public class SaslAuthenticatorTest {
     public void testMissingUsernameSaslPlain() throws Exception {
         String node = "0";
         TestJaasConfig jaasConfig = configureMechanisms("PLAIN", 
Arrays.asList("PLAIN"));
-        jaasConfig.setPlainClientOptions(null, "mypassword");
+        jaasConfig.setClientOptions("PLAIN", null, "mypassword");
 
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         server = createEchoServer(securityProtocol);
@@ -190,7 +201,7 @@ public class SaslAuthenticatorTest {
     public void testMissingPasswordSaslPlain() throws Exception {
         String node = "0";
         TestJaasConfig jaasConfig = configureMechanisms("PLAIN", 
Arrays.asList("PLAIN"));
-        jaasConfig.setPlainClientOptions("myuser", null);
+        jaasConfig.setClientOptions("PLAIN", "myuser", null);
 
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         server = createEchoServer(securityProtocol);
@@ -359,10 +370,20 @@ public class SaslAuthenticatorTest {
      * when transport layer is PLAINTEXT. This test simulates SASL 
authentication using a
      * (non-SASL) PLAINTEXT client and sends ApiVersionsRequest straight after
      * connection to the server is established, before any SASL-related 
packets are sent.
+     * This test is run with SaslHandshake version 0 and no SaslAuthenticate 
headers.
      */
     @Test
-    public void testUnauthenticatedApiVersionsRequestOverPlaintext() throws 
Exception {
-        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT);
+    public void 
testUnauthenticatedApiVersionsRequestOverPlaintextHandshakeVersion0() throws 
Exception {
+        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, 
(short) 0);
+    }
+
+    /**
+     * See {@link 
#testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion0()} for test 
scenario.
+     * This test is run with SaslHandshake version 1 and SaslAuthenticate 
headers.
+     */
+    @Test
+    public void 
testUnauthenticatedApiVersionsRequestOverPlaintextHandshakeVersion1() throws 
Exception {
+        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, 
(short) 1);
     }
 
     /**
@@ -371,21 +392,32 @@ public class SaslAuthenticatorTest {
      * when transport layer is SSL. This test simulates SASL authentication 
using a
      * (non-SASL) SSL client and sends ApiVersionsRequest straight after
      * SSL handshake, before any SASL-related packets are sent.
+     * This test is run with SaslHandshake version 0 and no SaslAuthenticate 
headers.
      */
     @Test
-    public void testUnauthenticatedApiVersionsRequestOverSsl() throws 
Exception {
-        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_SSL);
+    public void 
testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion0() throws 
Exception {
+        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_SSL, 
(short) 0);
+    }
+
+    /**
+     * See {@link 
#testUnauthenticatedApiVersionsRequestOverPlaintextHandshakeVersion0()} for 
test scenario.
+     * This test is run with SaslHandshake version 1 and SaslAuthenticate 
headers.
+     */
+    @Test
+    public void 
testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion1() throws 
Exception {
+        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, 
(short) 1);
     }
 
     /**
      * Tests that unsupported version of ApiVersionsRequest before SASL 
handshake request
      * returns error response and does not result in authentication failure. 
This test
-     * is similar to {@link 
#testUnauthenticatedApiVersionsRequest(SecurityProtocol)}
+     * is similar to {@link 
#testUnauthenticatedApiVersionsRequest(SecurityProtocol, short)}
      * where a non-SASL client is used to send requests that are processed by
      * {@link SaslServerAuthenticator} of the server prior to client 
authentication.
      */
     @Test
     public void testApiVersionsRequestWithUnsupportedVersion() throws 
Exception {
+        short handshakeVersion = ApiKeys.SASL_HANDSHAKE.latestVersion();
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
         server = createEchoServer(securityProtocol);
@@ -405,14 +437,14 @@ public class SaslAuthenticatorTest {
         sendVersionRequestReceiveResponse(node);
 
         // Test that client can authenticate successfully
-        sendHandshakeRequestReceiveResponse(node);
-        authenticateUsingSaslPlainAndCheckConnection(node);
+        sendHandshakeRequestReceiveResponse(node, handshakeVersion);
+        authenticateUsingSaslPlainAndCheckConnection(node, handshakeVersion > 
0);
     }
 
     /**
      * Tests that unsupported version of SASL handshake request returns error
      * response and fails authentication. This test is similar to
-     * {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)}
+     * {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol, short)}
      * where a non-SASL client is used to send requests that are processed by
      * {@link SaslServerAuthenticator} of the server prior to client 
authentication.
      */
@@ -422,13 +454,15 @@ public class SaslAuthenticatorTest {
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
         server = createEchoServer(securityProtocol);
 
-        // Send ApiVersionsRequest and validate error response.
+        // Send SaslHandshakeRequest and validate that connection is closed by 
server.
         String node1 = "invalid1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
         SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN");
         RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, 
Short.MAX_VALUE, "someclient", 2);
         selector.send(request.toSend(node1, header));
-        NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY);
+        // This test uses a non-SASL PLAINTEXT client in order to do manual 
handshake.
+        // So the channel is in READY state.
+        NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -449,12 +483,12 @@ public class SaslAuthenticatorTest {
         // Send invalid SASL packet after valid handshake request
         String node1 = "invalid1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
-        sendHandshakeRequestReceiveResponse(node1);
+        sendHandshakeRequestReceiveResponse(node1, (short) 1);
         Random random = new Random();
         byte[] bytes = new byte[1024];
         random.nextBytes(bytes);
         selector.send(new NetworkSend(node1, ByteBuffer.wrap(bytes)));
-        NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY);
+        NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -465,7 +499,7 @@ public class SaslAuthenticatorTest {
         createClientConnection(SecurityProtocol.PLAINTEXT, node2);
         random.nextBytes(bytes);
         selector.send(new NetworkSend(node2, ByteBuffer.wrap(bytes)));
-        NetworkTestUtils.waitForChannelClose(selector, node2, 
ChannelState.READY);
+        NetworkTestUtils.waitForChannelClose(selector, node2, 
ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -475,7 +509,7 @@ public class SaslAuthenticatorTest {
     /**
      * Tests that ApiVersionsRequest after Kafka SASL handshake request flow,
      * but prior to actual SASL authentication, results in authentication 
failure.
-     * This is similar to {@link 
#testUnauthenticatedApiVersionsRequest(SecurityProtocol)}
+     * This is similar to {@link 
#testUnauthenticatedApiVersionsRequest(SecurityProtocol, short)}
      * where a non-SASL client is used to send requests that are processed by
      * {@link SaslServerAuthenticator} of the server prior to client 
authentication.
      */
@@ -488,12 +522,12 @@ public class SaslAuthenticatorTest {
         // Send handshake request followed by ApiVersionsRequest
         String node1 = "invalid1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
-        sendHandshakeRequestReceiveResponse(node1);
+        sendHandshakeRequestReceiveResponse(node1, (short) 1);
 
-        ApiVersionsRequest request = new ApiVersionsRequest.Builder().build();
+        ApiVersionsRequest request = createApiVersionsRequestV0();
         RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS, 
request.version(), "someclient", 2);
         selector.send(request.toSend(node1, versionsHeader));
-        NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY);
+        NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -514,13 +548,13 @@ public class SaslAuthenticatorTest {
         // Send SASL packet with large size after valid handshake request
         String node1 = "invalid1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
-        sendHandshakeRequestReceiveResponse(node1);
+        sendHandshakeRequestReceiveResponse(node1, (short) 1);
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         buffer.putInt(Integer.MAX_VALUE);
         buffer.put(new byte[buffer.capacity() - 4]);
         buffer.rewind();
         selector.send(new NetworkSend(node1, buffer));
-        NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY);
+        NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -534,7 +568,7 @@ public class SaslAuthenticatorTest {
         buffer.put(new byte[buffer.capacity() - 4]);
         buffer.rewind();
         selector.send(new NetworkSend(node2, buffer));
-        NetworkTestUtils.waitForChannelClose(selector, node2, 
ChannelState.READY);
+        NetworkTestUtils.waitForChannelClose(selector, node2, 
ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -559,7 +593,7 @@ public class SaslAuthenticatorTest {
         RequestHeader metadataRequestHeader1 = new 
RequestHeader(ApiKeys.METADATA, metadataRequest1.version(),
                 "someclient", 1);
         selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1));
-        NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY);
+        NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -568,12 +602,12 @@ public class SaslAuthenticatorTest {
         // Send metadata request after Kafka SASL handshake request
         String node2 = "invalid2";
         createClientConnection(SecurityProtocol.PLAINTEXT, node2);
-        sendHandshakeRequestReceiveResponse(node2);
+        sendHandshakeRequestReceiveResponse(node2, (short) 1);
         MetadataRequest metadataRequest2 = new 
MetadataRequest.Builder(Collections.singletonList("sometopic"), true).build();
         RequestHeader metadataRequestHeader2 = new 
RequestHeader(ApiKeys.METADATA,
                 metadataRequest2.version(), "someclient", 2);
         selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2));
-        NetworkTestUtils.waitForChannelClose(selector, node2, 
ChannelState.READY);
+        NetworkTestUtils.waitForChannelClose(selector, node2, 
ChannelState.READY.state());
         selector.close();
 
         // Test good connection still works
@@ -642,7 +676,7 @@ public class SaslAuthenticatorTest {
         TestJaasConfig staticJaasConfig = new TestJaasConfig();
         
staticJaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, 
PlainLoginModule.class.getName(),
                 serverOptions);
-        staticJaasConfig.setPlainClientOptions("user1", "invalidpassword");
+        staticJaasConfig.setClientOptions("PLAIN", "user1", "invalidpassword");
         Configuration.setConfiguration(staticJaasConfig);
         server = createEchoServer(securityProtocol);
 
@@ -715,6 +749,269 @@ public class SaslAuthenticatorTest {
     }
 
     /**
+     * Tests good path SASL/PLAIN authentication over PLAINTEXT with old 
version of server
+     * that does not support SASL_AUTHENTICATE headers and new version of 
client.
+     */
+    @Test
+    public void oldSaslPlainPlaintextServerWithoutSaslAuthenticateHeader() 
throws Exception {
+        verifySaslAuthenticateHeaderInterop(false, true, 
SecurityProtocol.SASL_PLAINTEXT, "PLAIN");
+    }
+
+    /**
+     * Tests good path SASL/PLAIN authentication over PLAINTEXT with old 
version of client
+     * that does not support SASL_AUTHENTICATE headers and new version of 
server.
+     */
+    @Test
+    public void oldSaslPlainPlaintextClientWithoutSaslAuthenticateHeader() 
throws Exception {
+        verifySaslAuthenticateHeaderInterop(true, false, 
SecurityProtocol.SASL_PLAINTEXT, "PLAIN");
+    }
+
+    /**
+     * Tests good path SASL/SCRAM authentication over PLAINTEXT with old 
version of server
+     * that does not support SASL_AUTHENTICATE headers and new version of 
client.
+     */
+    @Test
+    public void oldSaslScramPlaintextServerWithoutSaslAuthenticateHeader() 
throws Exception {
+        verifySaslAuthenticateHeaderInterop(false, true, 
SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256");
+    }
+
+    /**
+     * Tests good path SASL/SCRAM authentication over PLAINTEXT with old 
version of client
+     * that does not support SASL_AUTHENTICATE headers and new version of 
server.
+     */
+    @Test
+    public void oldSaslScramPlaintextClientWithoutSaslAuthenticateHeader() 
throws Exception {
+        verifySaslAuthenticateHeaderInterop(true, false, 
SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256");
+    }
+
+    /**
+     * Tests good path SASL/PLAIN authentication over SSL with old version of 
server
+     * that does not support SASL_AUTHENTICATE headers and new version of 
client.
+     */
+    @Test
+    public void oldSaslPlainSslServerWithoutSaslAuthenticateHeader() throws 
Exception {
+        verifySaslAuthenticateHeaderInterop(false, true, 
SecurityProtocol.SASL_SSL, "PLAIN");
+    }
+
+    /**
+     * Tests good path SASL/PLAIN authentication over SSL with old version of 
client
+     * that does not support SASL_AUTHENTICATE headers and new version of 
server.
+     */
+    @Test
+    public void oldSaslPlainSslClientWithoutSaslAuthenticateHeader() throws 
Exception {
+        verifySaslAuthenticateHeaderInterop(true, false, 
SecurityProtocol.SASL_SSL, "PLAIN");
+    }
+
+    /**
+     * Tests good path SASL/SCRAM authentication over SSL with old version of 
server
+     * that does not support SASL_AUTHENTICATE headers and new version of 
client.
+     */
+    @Test
+    public void oldSaslScramSslServerWithoutSaslAuthenticateHeader() throws 
Exception {
+        verifySaslAuthenticateHeaderInterop(false, true, 
SecurityProtocol.SASL_SSL, "SCRAM-SHA-512");
+    }
+
+    /**
+     * Tests good path SASL/SCRAM authentication over SSL with old version of 
client
+     * that does not support SASL_AUTHENTICATE headers and new version of 
server.
+     */
+    @Test
+    public void oldSaslScramSslClientWithoutSaslAuthenticateHeader() throws 
Exception {
+        verifySaslAuthenticateHeaderInterop(true, false, 
SecurityProtocol.SASL_SSL, "SCRAM-SHA-512");
+    }
+
+    /**
+     * Tests SASL/PLAIN authentication failure over PLAINTEXT with old version 
of server
+     * that does not support SASL_AUTHENTICATE headers and new version of 
client.
+     */
+    @Test
+    public void 
oldSaslPlainPlaintextServerWithoutSaslAuthenticateHeaderFailure() throws 
Exception {
+        verifySaslAuthenticateHeaderInteropWithFailure(false, true, 
SecurityProtocol.SASL_PLAINTEXT, "PLAIN");
+    }
+
+    /**
+     * Tests SASL/PLAIN authentication failure over PLAINTEXT with old version 
of client
+     * that does not support SASL_AUTHENTICATE headers and new version of 
server.
+     */
+    @Test
+    public void 
oldSaslPlainPlaintextClientWithoutSaslAuthenticateHeaderFailure() throws 
Exception {
+        verifySaslAuthenticateHeaderInteropWithFailure(true, false, 
SecurityProtocol.SASL_PLAINTEXT, "PLAIN");
+    }
+
+    /**
+     * Tests SASL/SCRAM authentication failure over PLAINTEXT with old version 
of server
+     * that does not support SASL_AUTHENTICATE headers and new version of 
client.
+     */
+    @Test
+    public void 
oldSaslScramPlaintextServerWithoutSaslAuthenticateHeaderFailure() throws 
Exception {
+        verifySaslAuthenticateHeaderInteropWithFailure(false, true, 
SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256");
+    }
+
+    /**
+     * Tests SASL/SCRAM authentication failure over PLAINTEXT with old version 
of client
+     * that does not support SASL_AUTHENTICATE headers and new version of 
server.
+     */
+    @Test
+    public void 
oldSaslScramPlaintextClientWithoutSaslAuthenticateHeaderFailure() throws 
Exception {
+        verifySaslAuthenticateHeaderInteropWithFailure(true, false, 
SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256");
+    }
+
+    /**
+     * Tests SASL/PLAIN authentication failure over SSL with old version of 
server
+     * that does not support SASL_AUTHENTICATE headers and new version of 
client.
+     */
+    @Test
+    public void oldSaslPlainSslServerWithoutSaslAuthenticateHeaderFailure() 
throws Exception {
+        verifySaslAuthenticateHeaderInteropWithFailure(false, true, 
SecurityProtocol.SASL_SSL, "PLAIN");
+    }
+
+    /**
+     * Tests SASL/PLAIN authentication failure over SSL with old version of 
client
+     * that does not support SASL_AUTHENTICATE headers and new version of 
server.
+     */
+    @Test
+    public void oldSaslPlainSslClientWithoutSaslAuthenticateHeaderFailure() 
throws Exception {
+        verifySaslAuthenticateHeaderInteropWithFailure(true, false, 
SecurityProtocol.SASL_SSL, "PLAIN");
+    }
+
+    /**
+     * Tests SASL/SCRAM authentication failure over SSL with old version of 
server
+     * that does not support SASL_AUTHENTICATE headers and new version of 
client.
+     */
+    @Test
+    public void oldSaslScramSslServerWithoutSaslAuthenticateHeaderFailure() 
throws Exception {
+        verifySaslAuthenticateHeaderInteropWithFailure(false, true, 
SecurityProtocol.SASL_SSL, "SCRAM-SHA-512");
+    }
+
+    /**
+     * Tests SASL/SCRAM authentication failure over SSL with old version of 
client
+     * that does not support SASL_AUTHENTICATE headers and new version of 
server.
+     */
+    @Test
+    public void oldSaslScramSslClientWithoutSaslAuthenticateHeaderFailure() 
throws Exception {
+        verifySaslAuthenticateHeaderInteropWithFailure(true, false, 
SecurityProtocol.SASL_SSL, "SCRAM-SHA-512");
+    }
+
+    private void verifySaslAuthenticateHeaderInterop(boolean 
enableHeaderOnServer, boolean enableHeaderOnClient,
+            SecurityProtocol securityProtocol, String saslMechanism) throws 
Exception {
+        configureMechanisms(saslMechanism, Arrays.asList(saslMechanism));
+        createServer(securityProtocol, saslMechanism, enableHeaderOnServer);
+
+        String node = "0";
+        createClientConnection(securityProtocol, saslMechanism, node, 
enableHeaderOnClient);
+        NetworkTestUtils.checkClientConnection(selector, "0", 100, 10);
+    }
+
+    private void verifySaslAuthenticateHeaderInteropWithFailure(boolean 
enableHeaderOnServer, boolean enableHeaderOnClient,
+            SecurityProtocol securityProtocol, String saslMechanism) throws 
Exception {
+        TestJaasConfig jaasConfig = configureMechanisms(saslMechanism, 
Arrays.asList(saslMechanism));
+        jaasConfig.setClientOptions(saslMechanism, TestJaasConfig.USERNAME, 
"invalidpassword");
+        createServer(securityProtocol, saslMechanism, enableHeaderOnServer);
+
+        String node = "0";
+        createClientConnection(securityProtocol, saslMechanism, node, 
enableHeaderOnClient);
+        // Without SASL_AUTHENTICATE headers, disconnect state is 
ChannelState.AUTHENTICATE which is
+        // a hint that channel was closed during authentication, unlike 
ChannelState.AUTHENTICATE_FAILED
+        // which is an actual authentication failure reported by the broker.
+        NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.AUTHENTICATE.state());
+    }
+
+    private void createServer(SecurityProtocol securityProtocol, String 
saslMechanism,
+            boolean enableSaslAuthenticateHeader) throws Exception {
+        if (enableSaslAuthenticateHeader)
+            server = createEchoServer(securityProtocol);
+        else
+            server = 
startServerWithoutSaslAuthenticateHeader(securityProtocol, saslMechanism);
+        updateScramCredentialCache(TestJaasConfig.USERNAME, 
TestJaasConfig.PASSWORD);
+    }
+
+    private void createClientConnection(SecurityProtocol securityProtocol, 
String saslMechanism, String node,
+            boolean enableSaslAuthenticateHeader) throws Exception {
+        if (enableSaslAuthenticateHeader)
+            createClientConnection(securityProtocol, node);
+        else
+            
createClientConnectionWithoutSaslAuthenticateHeader(securityProtocol, 
saslMechanism, node);
+    }
+
+    private NioEchoServer startServerWithoutSaslAuthenticateHeader(final 
SecurityProtocol securityProtocol, String saslMechanism)
+            throws Exception {
+        final ListenerName listenerName = 
ListenerName.forSecurityProtocol(securityProtocol);
+        final Map<String, ?> configs = Collections.emptyMap();
+        final JaasContext jaasContext = 
JaasContext.load(JaasContext.Type.SERVER, listenerName, configs);
+
+        boolean isScram = ScramMechanism.isScram(saslMechanism);
+        if (isScram)
+            ScramCredentialUtils.createCache(credentialCache, 
Arrays.asList(saslMechanism));
+        SaslChannelBuilder serverChannelBuilder = new 
SaslChannelBuilder(Mode.SERVER, jaasContext,
+                securityProtocol, listenerName, saslMechanism, true, 
credentialCache) {
+
+            @Override
+            protected SaslServerAuthenticator 
buildServerAuthenticator(Map<String, ?> configs, String id,
+                            TransportLayer transportLayer, Subject subject) 
throws IOException {
+                return new SaslServerAuthenticator(configs, id, jaasContext, 
subject, null,
+                                credentialCache, listenerName, 
securityProtocol, transportLayer) {
+
+                    @Override
+                    protected ApiVersionsResponse apiVersionsResponse() {
+                        List<ApiVersion> apiVersions = new 
ArrayList<>(ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions());
+                        for (Iterator<ApiVersion> it = apiVersions.iterator(); 
it.hasNext(); ) {
+                            ApiVersion apiVersion = it.next();
+                            if (apiVersion.apiKey == 
ApiKeys.SASL_AUTHENTICATE.id) {
+                                it.remove();
+                                break;
+                            }
+                        }
+                        return new ApiVersionsResponse(0, Errors.NONE, 
apiVersions);
+                    }
+
+                    @Override
+                    protected void enableKafkaSaslAuthenticateHeaders(boolean 
flag) {
+                        // Don't enable Kafka SASL_AUTHENTICATE headers
+                    }
+                };
+            }
+        };
+        serverChannelBuilder.configure(saslServerConfigs);
+        server = new NioEchoServer(listenerName, securityProtocol, new 
TestSecurityConfig(saslServerConfigs),
+                "localhost", serverChannelBuilder, credentialCache);
+        server.start();
+        return server;
+    }
+
+    private void createClientConnectionWithoutSaslAuthenticateHeader(final 
SecurityProtocol securityProtocol,
+            final String saslMechanism, String node) throws Exception {
+
+        final ListenerName listenerName = 
ListenerName.forSecurityProtocol(securityProtocol);
+        final Map<String, ?> configs = Collections.emptyMap();
+        final JaasContext jaasContext = 
JaasContext.load(JaasContext.Type.CLIENT, null, configs);
+        SaslChannelBuilder clientChannelBuilder = new 
SaslChannelBuilder(Mode.CLIENT, jaasContext,
+                securityProtocol, listenerName, saslMechanism, true, null) {
+
+            @Override
+            protected SaslClientAuthenticator 
buildClientAuthenticator(Map<String, ?> configs, String id,
+                    String serverHost, String servicePrincipal,
+                    TransportLayer transportLayer, Subject subject) throws 
IOException {
+
+                return new SaslClientAuthenticator(configs, id, subject,
+                        servicePrincipal, serverHost, saslMechanism, true, 
transportLayer) {
+                    @Override
+                    protected SaslHandshakeRequest 
createSaslHandshakeRequest(short version) {
+                        return new 
SaslHandshakeRequest.Builder(saslMechanism).build((short) 0);
+                    }
+                    @Override
+                    protected void saslAuthenticateVersion(short version) {
+                        // Don't set version so that headers are disabled
+                    }
+                };
+            }
+        };
+        clientChannelBuilder.configure(saslClientConfigs);
+        this.selector = NetworkTestUtils.createSelector(clientChannelBuilder);
+        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 
server.port());
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+    }
+
+    /**
      * Tests that Kafka ApiVersionsRequests are handled by the SASL server 
authenticator
      * prior to SASL handshake flow and that subsequent authentication succeeds
      * when transport layer is PLAINTEXT/SSL. This test uses a non-SASL client 
that simulates
@@ -738,7 +1035,7 @@ public class SaslAuthenticatorTest {
      *       behaves exactly as a regular SASL_PLAINTEXT client that has 
completed authentication.</li>
      * </ol>
      */
-    private void testUnauthenticatedApiVersionsRequest(SecurityProtocol 
securityProtocol) throws Exception {
+    private void testUnauthenticatedApiVersionsRequest(SecurityProtocol 
securityProtocol, short saslHandshakeVersion) throws Exception {
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
         server = createEchoServer(securityProtocol);
 
@@ -762,20 +1059,28 @@ public class SaslAuthenticatorTest {
         ApiVersionsResponse versionsResponse = 
sendVersionRequestReceiveResponse(node);
         assertEquals(ApiKeys.SASL_HANDSHAKE.oldestVersion(), 
versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).minVersion);
         assertEquals(ApiKeys.SASL_HANDSHAKE.latestVersion(), 
versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion);
+        assertEquals(ApiKeys.SASL_AUTHENTICATE.oldestVersion(), 
versionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id).minVersion);
+        assertEquals(ApiKeys.SASL_AUTHENTICATE.latestVersion(), 
versionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id).maxVersion);
 
         // Send SaslHandshakeRequest and check response
-        SaslHandshakeResponse handshakeResponse = 
sendHandshakeRequestReceiveResponse(node);
+        SaslHandshakeResponse handshakeResponse = 
sendHandshakeRequestReceiveResponse(node, saslHandshakeVersion);
         assertEquals(Collections.singletonList("PLAIN"), 
handshakeResponse.enabledMechanisms());
 
         // Complete manual authentication and check send/receive succeed
-        authenticateUsingSaslPlainAndCheckConnection(node);
+        authenticateUsingSaslPlainAndCheckConnection(node, 
saslHandshakeVersion > 0);
     }
 
-    private void authenticateUsingSaslPlainAndCheckConnection(String node) 
throws Exception {
+    private void authenticateUsingSaslPlainAndCheckConnection(String node, 
boolean enableSaslAuthenticateHeader) throws Exception {
         // Authenticate using PLAIN username/password
         String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" + 
TestJaasConfig.PASSWORD;
-        selector.send(new NetworkSend(node, 
ByteBuffer.wrap(authString.getBytes("UTF-8"))));
-        waitForResponse();
+        ByteBuffer authBuf = ByteBuffer.wrap(authString.getBytes("UTF-8"));
+        if (enableSaslAuthenticateHeader) {
+            SaslAuthenticateRequest request = new 
SaslAuthenticateRequest.Builder(authBuf).build();
+            sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_AUTHENTICATE, 
request);
+        } else {
+            selector.send(new NetworkSend(node, authBuf));
+            waitForResponse();
+        }
 
         // Check send/receive on the manually authenticated connection
         NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
@@ -805,7 +1110,7 @@ public class SaslAuthenticatorTest {
 
     private NioEchoServer createEchoServer(ListenerName listenerName, 
SecurityProtocol securityProtocol) throws Exception {
         return NetworkTestUtils.createEchoServer(listenerName, 
securityProtocol,
-                new TestSecurityConfig(saslServerConfigs));
+                new TestSecurityConfig(saslServerConfigs), credentialCache);
     }
 
     private void createClientConnection(SecurityProtocol securityProtocol, 
String node) throws Exception {
@@ -823,28 +1128,28 @@ public class SaslAuthenticatorTest {
 
     private void createAndCheckClientConnectionFailure(SecurityProtocol 
securityProtocol, String node) throws Exception {
         createClientConnection(securityProtocol, node);
-        NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.AUTHENTICATE);
+        NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.State.AUTHENTICATION_FAILED);
         selector.close();
         selector = null;
     }
 
     private AbstractResponse sendKafkaRequestReceiveResponse(String node, 
ApiKeys apiKey, AbstractRequest request) throws IOException {
-        RequestHeader header = new RequestHeader(apiKey, request.version(), 
"someclient", 1);
+        RequestHeader header = new RequestHeader(apiKey, request.version(), 
"someclient", nextCorrelationId++);
         Send send = request.toSend(node, header);
         selector.send(send);
         ByteBuffer responseBuffer = waitForResponse();
         return NetworkClient.parseResponse(responseBuffer, header);
     }
 
-    private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String 
node) throws Exception {
-        SaslHandshakeRequest handshakeRequest = new 
SaslHandshakeRequest("PLAIN");
+    private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String 
node, short version) throws Exception {
+        SaslHandshakeRequest handshakeRequest = new 
SaslHandshakeRequest.Builder("PLAIN").build(version);
         SaslHandshakeResponse response = (SaslHandshakeResponse) 
sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, handshakeRequest);
         assertEquals(Errors.NONE, response.error());
         return response;
     }
 
     private ApiVersionsResponse sendVersionRequestReceiveResponse(String node) 
throws Exception {
-        ApiVersionsRequest handshakeRequest = new 
ApiVersionsRequest.Builder().build();
+        ApiVersionsRequest handshakeRequest = createApiVersionsRequestV0();
         ApiVersionsResponse response =  (ApiVersionsResponse) 
sendKafkaRequestReceiveResponse(node, ApiKeys.API_VERSIONS, handshakeRequest);
         assertEquals(Errors.NONE, response.error());
         return response;
@@ -866,8 +1171,14 @@ public class SaslAuthenticatorTest {
             if (scramMechanism != null) {
                 ScramFormatter formatter = new ScramFormatter(scramMechanism);
                 ScramCredential credential = 
formatter.generateCredential(password, 4096);
-                server.credentialCache().cache(scramMechanism.mechanismName(), 
ScramCredential.class).put(username, credential);
+                credentialCache.cache(scramMechanism.mechanismName(), 
ScramCredential.class).put(username, credential);
             }
         }
     }
+
+    // Creates an ApiVersionsRequest with version 0. Using v0 in tests since
+    // SaslClientAuthenticator always uses version 0
+    private ApiVersionsRequest createApiVersionsRequestV0() {
+        return new ApiVersionsRequest.Builder((short) 0).build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
index bdef6ef..5336fd7 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
@@ -54,13 +54,14 @@ public class TestJaasConfig extends Configuration {
         return new Password(loginModule(mechanism) + " required username=" + 
username + " password=" + password + ";");
     }
 
-    public void setPlainClientOptions(String clientUsername, String 
clientPassword) {
+    public void setClientOptions(String saslMechanism, String clientUsername, 
String clientPassword) {
         Map<String, Object> options = new HashMap<>();
         if (clientUsername != null)
             options.put("username", clientUsername);
         if (clientPassword != null)
             options.put("password", clientPassword);
-        createOrUpdateEntry(LOGIN_CONTEXT_CLIENT, 
PlainLoginModule.class.getName(), options);
+        Class<?> loginModuleClass = ScramMechanism.isScram(saslMechanism) ? 
ScramLoginModule.class : PlainLoginModule.class;
+        createOrUpdateEntry(LOGIN_CONTEXT_CLIENT, loginModuleClass.getName(), 
options);
     }
 
     public void createOrUpdateEntry(String name, String loginModule, 
Map<String, Object> options) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2c5517f..62e8abf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -51,7 +51,7 @@ import org.apache.kafka.common.requests.{Resource => 
RResource, ResourceType =>
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
-import org.apache.kafka.common.requests.SaslHandshakeResponse
+import org.apache.kafka.common.requests.{SaslAuthenticateResponse, 
SaslHandshakeResponse}
 import org.apache.kafka.common.resource.{Resource => AdminResource}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 import DescribeLogDirsResponse.LogDirInfo
@@ -132,6 +132,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
         case ApiKeys.ALTER_REPLICA_DIR => handleAlterReplicaDirRequest(request)
         case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
+        case ApiKeys.SASL_AUTHENTICATE => 
handleSaslAuthenticateRequest(request)
       }
     } catch {
       case e: FatalExitError => throw e
@@ -1257,6 +1258,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     sendResponseMaybeThrottle(request, _ => new 
SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms))
   }
 
+  def handleSaslAuthenticateRequest(request: RequestChannel.Request) {
+    sendResponseMaybeThrottle(request, _ => new 
SaslAuthenticateResponse(Errors.ILLEGAL_SASL_STATE,
+        "SaslAuthenticate request received after successful authentication"))
+  }
+
   def handleApiVersionsRequest(request: RequestChannel.Request) {
     // Note that broker returns its full list of supported ApiKeys and 
versions regardless of current
     // authentication state (e.g., before SASL authentication on an SASL 
listener, do note that no

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 3ec03c3..bb9f82e 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -233,6 +233,9 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.SASL_HANDSHAKE =>
           new SaslHandshakeRequest.Builder("PLAIN")
 
+        case ApiKeys.SASL_AUTHENTICATE =>
+          new SaslAuthenticateRequest.Builder(ByteBuffer.wrap(new 
Array[Byte](0)))
+
         case ApiKeys.API_VERSIONS =>
           new ApiVersionsRequest.Builder
 
@@ -433,7 +436,8 @@ class RequestQuotaTest extends BaseRequestTest {
 
 object RequestQuotaTest {
   val ClusterActions = ApiKeys.values.toSet.filter(apiKey => 
apiKey.clusterAction)
-  val ClientActions = ApiKeys.values.toSet -- ClusterActions - 
ApiKeys.SASL_HANDSHAKE
+  val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE)
+  val ClientActions = ApiKeys.values.toSet -- ClusterActions -- SaslActions
 
   val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"Unauthorized")
   // Principal used for all client connections. This is modified by tests which

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 1ee4ac8..01d3a83 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -16,7 +16,6 @@
   */
 package kafka.server
 
-import java.io.IOException
 import java.net.Socket
 import java.util.Collections
 
@@ -66,12 +65,8 @@ class SaslApiVersionsRequestTest extends BaseRequestTest 
with SaslSetup {
     val plaintextSocket = connect(protocol = securityProtocol)
     try {
       sendSaslHandshakeRequestValidateResponse(plaintextSocket)
-      try {
-        sendApiVersionsRequest(plaintextSocket, new 
ApiVersionsRequest.Builder().build(0))
-        fail("Versions Request during Sasl handshake did not fail")
-      } catch {
-        case _: IOException => // expected exception
-      }
+      val response = sendApiVersionsRequest(plaintextSocket, new 
ApiVersionsRequest.Builder().build(0))
+      assertEquals(Errors.ILLEGAL_SASL_STATE, response.error)
     } finally {
       plaintextSocket.close()
     }

Reply via email to