Repository: kafka Updated Branches: refs/heads/trunk 985cc534a -> 8fca43222
http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 2dd7db9..d41d61a 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -26,11 +26,14 @@ import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.ChannelBuilders; import org.apache.kafka.common.network.ChannelState; import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.NetworkTestUtils; import org.apache.kafka.common.network.NioEchoServer; +import org.apache.kafka.common.network.SaslChannelBuilder; import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.network.Send; +import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.SecurityProtocol; @@ -38,15 +41,18 @@ import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.requests.SaslAuthenticateRequest; import org.apache.kafka.common.requests.SaslHandshakeRequest; import org.apache.kafka.common.requests.SaslHandshakeResponse; import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.TestSecurityConfig; import org.apache.kafka.common.security.plain.PlainLoginModule; import org.apache.kafka.common.security.scram.ScramCredential; +import org.apache.kafka.common.security.scram.ScramCredentialUtils; import org.apache.kafka.common.security.scram.ScramFormatter; import org.apache.kafka.common.security.scram.ScramLoginModule; import org.apache.kafka.common.security.scram.ScramMechanism; @@ -63,10 +69,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import javax.security.auth.Subject; import javax.security.auth.login.Configuration; import static org.junit.Assert.assertEquals; @@ -88,6 +96,8 @@ public class SaslAuthenticatorTest { private CertStores clientCertStores; private Map<String, Object> saslClientConfigs; private Map<String, Object> saslServerConfigs; + private CredentialCache credentialCache; + private int nextCorrelationId; @Before public void setup() throws Exception { @@ -95,6 +105,7 @@ public class SaslAuthenticatorTest { clientCertStores = new CertStores(false, "localhost"); saslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); saslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); + credentialCache = new CredentialCache(); } @After @@ -139,7 +150,7 @@ public class SaslAuthenticatorTest { String node = "0"; SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN")); - jaasConfig.setPlainClientOptions(TestJaasConfig.USERNAME, "invalidpassword"); + jaasConfig.setClientOptions("PLAIN", TestJaasConfig.USERNAME, "invalidpassword"); server = createEchoServer(securityProtocol); createAndCheckClientConnectionFailure(securityProtocol, node); @@ -153,7 +164,7 @@ public class SaslAuthenticatorTest { String node = "0"; SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN")); - jaasConfig.setPlainClientOptions("invaliduser", TestJaasConfig.PASSWORD); + jaasConfig.setClientOptions("PLAIN", "invaliduser", TestJaasConfig.PASSWORD); server = createEchoServer(securityProtocol); createAndCheckClientConnectionFailure(securityProtocol, node); @@ -166,7 +177,7 @@ public class SaslAuthenticatorTest { public void testMissingUsernameSaslPlain() throws Exception { String node = "0"; TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN")); - jaasConfig.setPlainClientOptions(null, "mypassword"); + jaasConfig.setClientOptions("PLAIN", null, "mypassword"); SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; server = createEchoServer(securityProtocol); @@ -190,7 +201,7 @@ public class SaslAuthenticatorTest { public void testMissingPasswordSaslPlain() throws Exception { String node = "0"; TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN")); - jaasConfig.setPlainClientOptions("myuser", null); + jaasConfig.setClientOptions("PLAIN", "myuser", null); SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; server = createEchoServer(securityProtocol); @@ -359,10 +370,20 @@ public class SaslAuthenticatorTest { * when transport layer is PLAINTEXT. This test simulates SASL authentication using a * (non-SASL) PLAINTEXT client and sends ApiVersionsRequest straight after * connection to the server is established, before any SASL-related packets are sent. + * This test is run with SaslHandshake version 0 and no SaslAuthenticate headers. */ @Test - public void testUnauthenticatedApiVersionsRequestOverPlaintext() throws Exception { - testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT); + public void testUnauthenticatedApiVersionsRequestOverPlaintextHandshakeVersion0() throws Exception { + testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, (short) 0); + } + + /** + * See {@link #testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion0()} for test scenario. + * This test is run with SaslHandshake version 1 and SaslAuthenticate headers. + */ + @Test + public void testUnauthenticatedApiVersionsRequestOverPlaintextHandshakeVersion1() throws Exception { + testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, (short) 1); } /** @@ -371,21 +392,32 @@ public class SaslAuthenticatorTest { * when transport layer is SSL. This test simulates SASL authentication using a * (non-SASL) SSL client and sends ApiVersionsRequest straight after * SSL handshake, before any SASL-related packets are sent. + * This test is run with SaslHandshake version 0 and no SaslAuthenticate headers. */ @Test - public void testUnauthenticatedApiVersionsRequestOverSsl() throws Exception { - testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_SSL); + public void testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion0() throws Exception { + testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_SSL, (short) 0); + } + + /** + * See {@link #testUnauthenticatedApiVersionsRequestOverPlaintextHandshakeVersion0()} for test scenario. + * This test is run with SaslHandshake version 1 and SaslAuthenticate headers. + */ + @Test + public void testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion1() throws Exception { + testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, (short) 1); } /** * Tests that unsupported version of ApiVersionsRequest before SASL handshake request * returns error response and does not result in authentication failure. This test - * is similar to {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)} + * is similar to {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol, short)} * where a non-SASL client is used to send requests that are processed by * {@link SaslServerAuthenticator} of the server prior to client authentication. */ @Test public void testApiVersionsRequestWithUnsupportedVersion() throws Exception { + short handshakeVersion = ApiKeys.SASL_HANDSHAKE.latestVersion(); SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; configureMechanisms("PLAIN", Arrays.asList("PLAIN")); server = createEchoServer(securityProtocol); @@ -405,14 +437,14 @@ public class SaslAuthenticatorTest { sendVersionRequestReceiveResponse(node); // Test that client can authenticate successfully - sendHandshakeRequestReceiveResponse(node); - authenticateUsingSaslPlainAndCheckConnection(node); + sendHandshakeRequestReceiveResponse(node, handshakeVersion); + authenticateUsingSaslPlainAndCheckConnection(node, handshakeVersion > 0); } /** * Tests that unsupported version of SASL handshake request returns error * response and fails authentication. This test is similar to - * {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)} + * {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol, short)} * where a non-SASL client is used to send requests that are processed by * {@link SaslServerAuthenticator} of the server prior to client authentication. */ @@ -422,13 +454,15 @@ public class SaslAuthenticatorTest { configureMechanisms("PLAIN", Arrays.asList("PLAIN")); server = createEchoServer(securityProtocol); - // Send ApiVersionsRequest and validate error response. + // Send SaslHandshakeRequest and validate that connection is closed by server. String node1 = "invalid1"; createClientConnection(SecurityProtocol.PLAINTEXT, node1); SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN"); RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, Short.MAX_VALUE, "someclient", 2); selector.send(request.toSend(node1, header)); - NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY); + // This test uses a non-SASL PLAINTEXT client in order to do manual handshake. + // So the channel is in READY state. + NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state()); selector.close(); // Test good connection still works @@ -449,12 +483,12 @@ public class SaslAuthenticatorTest { // Send invalid SASL packet after valid handshake request String node1 = "invalid1"; createClientConnection(SecurityProtocol.PLAINTEXT, node1); - sendHandshakeRequestReceiveResponse(node1); + sendHandshakeRequestReceiveResponse(node1, (short) 1); Random random = new Random(); byte[] bytes = new byte[1024]; random.nextBytes(bytes); selector.send(new NetworkSend(node1, ByteBuffer.wrap(bytes))); - NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY); + NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state()); selector.close(); // Test good connection still works @@ -465,7 +499,7 @@ public class SaslAuthenticatorTest { createClientConnection(SecurityProtocol.PLAINTEXT, node2); random.nextBytes(bytes); selector.send(new NetworkSend(node2, ByteBuffer.wrap(bytes))); - NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY); + NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state()); selector.close(); // Test good connection still works @@ -475,7 +509,7 @@ public class SaslAuthenticatorTest { /** * Tests that ApiVersionsRequest after Kafka SASL handshake request flow, * but prior to actual SASL authentication, results in authentication failure. - * This is similar to {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)} + * This is similar to {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol, short)} * where a non-SASL client is used to send requests that are processed by * {@link SaslServerAuthenticator} of the server prior to client authentication. */ @@ -488,12 +522,12 @@ public class SaslAuthenticatorTest { // Send handshake request followed by ApiVersionsRequest String node1 = "invalid1"; createClientConnection(SecurityProtocol.PLAINTEXT, node1); - sendHandshakeRequestReceiveResponse(node1); + sendHandshakeRequestReceiveResponse(node1, (short) 1); - ApiVersionsRequest request = new ApiVersionsRequest.Builder().build(); + ApiVersionsRequest request = createApiVersionsRequestV0(); RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS, request.version(), "someclient", 2); selector.send(request.toSend(node1, versionsHeader)); - NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY); + NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state()); selector.close(); // Test good connection still works @@ -514,13 +548,13 @@ public class SaslAuthenticatorTest { // Send SASL packet with large size after valid handshake request String node1 = "invalid1"; createClientConnection(SecurityProtocol.PLAINTEXT, node1); - sendHandshakeRequestReceiveResponse(node1); + sendHandshakeRequestReceiveResponse(node1, (short) 1); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.putInt(Integer.MAX_VALUE); buffer.put(new byte[buffer.capacity() - 4]); buffer.rewind(); selector.send(new NetworkSend(node1, buffer)); - NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY); + NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state()); selector.close(); // Test good connection still works @@ -534,7 +568,7 @@ public class SaslAuthenticatorTest { buffer.put(new byte[buffer.capacity() - 4]); buffer.rewind(); selector.send(new NetworkSend(node2, buffer)); - NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY); + NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state()); selector.close(); // Test good connection still works @@ -559,7 +593,7 @@ public class SaslAuthenticatorTest { RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA, metadataRequest1.version(), "someclient", 1); selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1)); - NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY); + NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state()); selector.close(); // Test good connection still works @@ -568,12 +602,12 @@ public class SaslAuthenticatorTest { // Send metadata request after Kafka SASL handshake request String node2 = "invalid2"; createClientConnection(SecurityProtocol.PLAINTEXT, node2); - sendHandshakeRequestReceiveResponse(node2); + sendHandshakeRequestReceiveResponse(node2, (short) 1); MetadataRequest metadataRequest2 = new MetadataRequest.Builder(Collections.singletonList("sometopic"), true).build(); RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA, metadataRequest2.version(), "someclient", 2); selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2)); - NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY); + NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state()); selector.close(); // Test good connection still works @@ -642,7 +676,7 @@ public class SaslAuthenticatorTest { TestJaasConfig staticJaasConfig = new TestJaasConfig(); staticJaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(), serverOptions); - staticJaasConfig.setPlainClientOptions("user1", "invalidpassword"); + staticJaasConfig.setClientOptions("PLAIN", "user1", "invalidpassword"); Configuration.setConfiguration(staticJaasConfig); server = createEchoServer(securityProtocol); @@ -715,6 +749,269 @@ public class SaslAuthenticatorTest { } /** + * Tests good path SASL/PLAIN authentication over PLAINTEXT with old version of server + * that does not support SASL_AUTHENTICATE headers and new version of client. + */ + @Test + public void oldSaslPlainPlaintextServerWithoutSaslAuthenticateHeader() throws Exception { + verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_PLAINTEXT, "PLAIN"); + } + + /** + * Tests good path SASL/PLAIN authentication over PLAINTEXT with old version of client + * that does not support SASL_AUTHENTICATE headers and new version of server. + */ + @Test + public void oldSaslPlainPlaintextClientWithoutSaslAuthenticateHeader() throws Exception { + verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_PLAINTEXT, "PLAIN"); + } + + /** + * Tests good path SASL/SCRAM authentication over PLAINTEXT with old version of server + * that does not support SASL_AUTHENTICATE headers and new version of client. + */ + @Test + public void oldSaslScramPlaintextServerWithoutSaslAuthenticateHeader() throws Exception { + verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256"); + } + + /** + * Tests good path SASL/SCRAM authentication over PLAINTEXT with old version of client + * that does not support SASL_AUTHENTICATE headers and new version of server. + */ + @Test + public void oldSaslScramPlaintextClientWithoutSaslAuthenticateHeader() throws Exception { + verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256"); + } + + /** + * Tests good path SASL/PLAIN authentication over SSL with old version of server + * that does not support SASL_AUTHENTICATE headers and new version of client. + */ + @Test + public void oldSaslPlainSslServerWithoutSaslAuthenticateHeader() throws Exception { + verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_SSL, "PLAIN"); + } + + /** + * Tests good path SASL/PLAIN authentication over SSL with old version of client + * that does not support SASL_AUTHENTICATE headers and new version of server. + */ + @Test + public void oldSaslPlainSslClientWithoutSaslAuthenticateHeader() throws Exception { + verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_SSL, "PLAIN"); + } + + /** + * Tests good path SASL/SCRAM authentication over SSL with old version of server + * that does not support SASL_AUTHENTICATE headers and new version of client. + */ + @Test + public void oldSaslScramSslServerWithoutSaslAuthenticateHeader() throws Exception { + verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_SSL, "SCRAM-SHA-512"); + } + + /** + * Tests good path SASL/SCRAM authentication over SSL with old version of client + * that does not support SASL_AUTHENTICATE headers and new version of server. + */ + @Test + public void oldSaslScramSslClientWithoutSaslAuthenticateHeader() throws Exception { + verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_SSL, "SCRAM-SHA-512"); + } + + /** + * Tests SASL/PLAIN authentication failure over PLAINTEXT with old version of server + * that does not support SASL_AUTHENTICATE headers and new version of client. + */ + @Test + public void oldSaslPlainPlaintextServerWithoutSaslAuthenticateHeaderFailure() throws Exception { + verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_PLAINTEXT, "PLAIN"); + } + + /** + * Tests SASL/PLAIN authentication failure over PLAINTEXT with old version of client + * that does not support SASL_AUTHENTICATE headers and new version of server. + */ + @Test + public void oldSaslPlainPlaintextClientWithoutSaslAuthenticateHeaderFailure() throws Exception { + verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_PLAINTEXT, "PLAIN"); + } + + /** + * Tests SASL/SCRAM authentication failure over PLAINTEXT with old version of server + * that does not support SASL_AUTHENTICATE headers and new version of client. + */ + @Test + public void oldSaslScramPlaintextServerWithoutSaslAuthenticateHeaderFailure() throws Exception { + verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256"); + } + + /** + * Tests SASL/SCRAM authentication failure over PLAINTEXT with old version of client + * that does not support SASL_AUTHENTICATE headers and new version of server. + */ + @Test + public void oldSaslScramPlaintextClientWithoutSaslAuthenticateHeaderFailure() throws Exception { + verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256"); + } + + /** + * Tests SASL/PLAIN authentication failure over SSL with old version of server + * that does not support SASL_AUTHENTICATE headers and new version of client. + */ + @Test + public void oldSaslPlainSslServerWithoutSaslAuthenticateHeaderFailure() throws Exception { + verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_SSL, "PLAIN"); + } + + /** + * Tests SASL/PLAIN authentication failure over SSL with old version of client + * that does not support SASL_AUTHENTICATE headers and new version of server. + */ + @Test + public void oldSaslPlainSslClientWithoutSaslAuthenticateHeaderFailure() throws Exception { + verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_SSL, "PLAIN"); + } + + /** + * Tests SASL/SCRAM authentication failure over SSL with old version of server + * that does not support SASL_AUTHENTICATE headers and new version of client. + */ + @Test + public void oldSaslScramSslServerWithoutSaslAuthenticateHeaderFailure() throws Exception { + verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_SSL, "SCRAM-SHA-512"); + } + + /** + * Tests SASL/SCRAM authentication failure over SSL with old version of client + * that does not support SASL_AUTHENTICATE headers and new version of server. + */ + @Test + public void oldSaslScramSslClientWithoutSaslAuthenticateHeaderFailure() throws Exception { + verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_SSL, "SCRAM-SHA-512"); + } + + private void verifySaslAuthenticateHeaderInterop(boolean enableHeaderOnServer, boolean enableHeaderOnClient, + SecurityProtocol securityProtocol, String saslMechanism) throws Exception { + configureMechanisms(saslMechanism, Arrays.asList(saslMechanism)); + createServer(securityProtocol, saslMechanism, enableHeaderOnServer); + + String node = "0"; + createClientConnection(securityProtocol, saslMechanism, node, enableHeaderOnClient); + NetworkTestUtils.checkClientConnection(selector, "0", 100, 10); + } + + private void verifySaslAuthenticateHeaderInteropWithFailure(boolean enableHeaderOnServer, boolean enableHeaderOnClient, + SecurityProtocol securityProtocol, String saslMechanism) throws Exception { + TestJaasConfig jaasConfig = configureMechanisms(saslMechanism, Arrays.asList(saslMechanism)); + jaasConfig.setClientOptions(saslMechanism, TestJaasConfig.USERNAME, "invalidpassword"); + createServer(securityProtocol, saslMechanism, enableHeaderOnServer); + + String node = "0"; + createClientConnection(securityProtocol, saslMechanism, node, enableHeaderOnClient); + // Without SASL_AUTHENTICATE headers, disconnect state is ChannelState.AUTHENTICATE which is + // a hint that channel was closed during authentication, unlike ChannelState.AUTHENTICATE_FAILED + // which is an actual authentication failure reported by the broker. + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state()); + } + + private void createServer(SecurityProtocol securityProtocol, String saslMechanism, + boolean enableSaslAuthenticateHeader) throws Exception { + if (enableSaslAuthenticateHeader) + server = createEchoServer(securityProtocol); + else + server = startServerWithoutSaslAuthenticateHeader(securityProtocol, saslMechanism); + updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD); + } + + private void createClientConnection(SecurityProtocol securityProtocol, String saslMechanism, String node, + boolean enableSaslAuthenticateHeader) throws Exception { + if (enableSaslAuthenticateHeader) + createClientConnection(securityProtocol, node); + else + createClientConnectionWithoutSaslAuthenticateHeader(securityProtocol, saslMechanism, node); + } + + private NioEchoServer startServerWithoutSaslAuthenticateHeader(final SecurityProtocol securityProtocol, String saslMechanism) + throws Exception { + final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol); + final Map<String, ?> configs = Collections.emptyMap(); + final JaasContext jaasContext = JaasContext.load(JaasContext.Type.SERVER, listenerName, configs); + + boolean isScram = ScramMechanism.isScram(saslMechanism); + if (isScram) + ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism)); + SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContext, + securityProtocol, listenerName, saslMechanism, true, credentialCache) { + + @Override + protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs, String id, + TransportLayer transportLayer, Subject subject) throws IOException { + return new SaslServerAuthenticator(configs, id, jaasContext, subject, null, + credentialCache, listenerName, securityProtocol, transportLayer) { + + @Override + protected ApiVersionsResponse apiVersionsResponse() { + List<ApiVersion> apiVersions = new ArrayList<>(ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions()); + for (Iterator<ApiVersion> it = apiVersions.iterator(); it.hasNext(); ) { + ApiVersion apiVersion = it.next(); + if (apiVersion.apiKey == ApiKeys.SASL_AUTHENTICATE.id) { + it.remove(); + break; + } + } + return new ApiVersionsResponse(0, Errors.NONE, apiVersions); + } + + @Override + protected void enableKafkaSaslAuthenticateHeaders(boolean flag) { + // Don't enable Kafka SASL_AUTHENTICATE headers + } + }; + } + }; + serverChannelBuilder.configure(saslServerConfigs); + server = new NioEchoServer(listenerName, securityProtocol, new TestSecurityConfig(saslServerConfigs), + "localhost", serverChannelBuilder, credentialCache); + server.start(); + return server; + } + + private void createClientConnectionWithoutSaslAuthenticateHeader(final SecurityProtocol securityProtocol, + final String saslMechanism, String node) throws Exception { + + final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol); + final Map<String, ?> configs = Collections.emptyMap(); + final JaasContext jaasContext = JaasContext.load(JaasContext.Type.CLIENT, null, configs); + SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContext, + securityProtocol, listenerName, saslMechanism, true, null) { + + @Override + protected SaslClientAuthenticator buildClientAuthenticator(Map<String, ?> configs, String id, + String serverHost, String servicePrincipal, + TransportLayer transportLayer, Subject subject) throws IOException { + + return new SaslClientAuthenticator(configs, id, subject, + servicePrincipal, serverHost, saslMechanism, true, transportLayer) { + @Override + protected SaslHandshakeRequest createSaslHandshakeRequest(short version) { + return new SaslHandshakeRequest.Builder(saslMechanism).build((short) 0); + } + @Override + protected void saslAuthenticateVersion(short version) { + // Don't set version so that headers are disabled + } + }; + } + }; + clientChannelBuilder.configure(saslClientConfigs); + this.selector = NetworkTestUtils.createSelector(clientChannelBuilder); + InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port()); + selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); + } + + /** * Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator * prior to SASL handshake flow and that subsequent authentication succeeds * when transport layer is PLAINTEXT/SSL. This test uses a non-SASL client that simulates @@ -738,7 +1035,7 @@ public class SaslAuthenticatorTest { * behaves exactly as a regular SASL_PLAINTEXT client that has completed authentication.</li> * </ol> */ - private void testUnauthenticatedApiVersionsRequest(SecurityProtocol securityProtocol) throws Exception { + private void testUnauthenticatedApiVersionsRequest(SecurityProtocol securityProtocol, short saslHandshakeVersion) throws Exception { configureMechanisms("PLAIN", Arrays.asList("PLAIN")); server = createEchoServer(securityProtocol); @@ -762,20 +1059,28 @@ public class SaslAuthenticatorTest { ApiVersionsResponse versionsResponse = sendVersionRequestReceiveResponse(node); assertEquals(ApiKeys.SASL_HANDSHAKE.oldestVersion(), versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).minVersion); assertEquals(ApiKeys.SASL_HANDSHAKE.latestVersion(), versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion); + assertEquals(ApiKeys.SASL_AUTHENTICATE.oldestVersion(), versionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id).minVersion); + assertEquals(ApiKeys.SASL_AUTHENTICATE.latestVersion(), versionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id).maxVersion); // Send SaslHandshakeRequest and check response - SaslHandshakeResponse handshakeResponse = sendHandshakeRequestReceiveResponse(node); + SaslHandshakeResponse handshakeResponse = sendHandshakeRequestReceiveResponse(node, saslHandshakeVersion); assertEquals(Collections.singletonList("PLAIN"), handshakeResponse.enabledMechanisms()); // Complete manual authentication and check send/receive succeed - authenticateUsingSaslPlainAndCheckConnection(node); + authenticateUsingSaslPlainAndCheckConnection(node, saslHandshakeVersion > 0); } - private void authenticateUsingSaslPlainAndCheckConnection(String node) throws Exception { + private void authenticateUsingSaslPlainAndCheckConnection(String node, boolean enableSaslAuthenticateHeader) throws Exception { // Authenticate using PLAIN username/password String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" + TestJaasConfig.PASSWORD; - selector.send(new NetworkSend(node, ByteBuffer.wrap(authString.getBytes("UTF-8")))); - waitForResponse(); + ByteBuffer authBuf = ByteBuffer.wrap(authString.getBytes("UTF-8")); + if (enableSaslAuthenticateHeader) { + SaslAuthenticateRequest request = new SaslAuthenticateRequest.Builder(authBuf).build(); + sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_AUTHENTICATE, request); + } else { + selector.send(new NetworkSend(node, authBuf)); + waitForResponse(); + } // Check send/receive on the manually authenticated connection NetworkTestUtils.checkClientConnection(selector, node, 100, 10); @@ -805,7 +1110,7 @@ public class SaslAuthenticatorTest { private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception { return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, - new TestSecurityConfig(saslServerConfigs)); + new TestSecurityConfig(saslServerConfigs), credentialCache); } private void createClientConnection(SecurityProtocol securityProtocol, String node) throws Exception { @@ -823,28 +1128,28 @@ public class SaslAuthenticatorTest { private void createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String node) throws Exception { createClientConnection(securityProtocol, node); - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); selector.close(); selector = null; } private AbstractResponse sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequest request) throws IOException { - RequestHeader header = new RequestHeader(apiKey, request.version(), "someclient", 1); + RequestHeader header = new RequestHeader(apiKey, request.version(), "someclient", nextCorrelationId++); Send send = request.toSend(node, header); selector.send(send); ByteBuffer responseBuffer = waitForResponse(); return NetworkClient.parseResponse(responseBuffer, header); } - private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node) throws Exception { - SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest("PLAIN"); + private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node, short version) throws Exception { + SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest.Builder("PLAIN").build(version); SaslHandshakeResponse response = (SaslHandshakeResponse) sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, handshakeRequest); assertEquals(Errors.NONE, response.error()); return response; } private ApiVersionsResponse sendVersionRequestReceiveResponse(String node) throws Exception { - ApiVersionsRequest handshakeRequest = new ApiVersionsRequest.Builder().build(); + ApiVersionsRequest handshakeRequest = createApiVersionsRequestV0(); ApiVersionsResponse response = (ApiVersionsResponse) sendKafkaRequestReceiveResponse(node, ApiKeys.API_VERSIONS, handshakeRequest); assertEquals(Errors.NONE, response.error()); return response; @@ -866,8 +1171,14 @@ public class SaslAuthenticatorTest { if (scramMechanism != null) { ScramFormatter formatter = new ScramFormatter(scramMechanism); ScramCredential credential = formatter.generateCredential(password, 4096); - server.credentialCache().cache(scramMechanism.mechanismName(), ScramCredential.class).put(username, credential); + credentialCache.cache(scramMechanism.mechanismName(), ScramCredential.class).put(username, credential); } } } + + // Creates an ApiVersionsRequest with version 0. Using v0 in tests since + // SaslClientAuthenticator always uses version 0 + private ApiVersionsRequest createApiVersionsRequestV0() { + return new ApiVersionsRequest.Builder((short) 0).build(); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java index bdef6ef..5336fd7 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java @@ -54,13 +54,14 @@ public class TestJaasConfig extends Configuration { return new Password(loginModule(mechanism) + " required username=" + username + " password=" + password + ";"); } - public void setPlainClientOptions(String clientUsername, String clientPassword) { + public void setClientOptions(String saslMechanism, String clientUsername, String clientPassword) { Map<String, Object> options = new HashMap<>(); if (clientUsername != null) options.put("username", clientUsername); if (clientPassword != null) options.put("password", clientPassword); - createOrUpdateEntry(LOGIN_CONTEXT_CLIENT, PlainLoginModule.class.getName(), options); + Class<?> loginModuleClass = ScramMechanism.isScram(saslMechanism) ? ScramLoginModule.class : PlainLoginModule.class; + createOrUpdateEntry(LOGIN_CONTEXT_CLIENT, loginModuleClass.getName(), options); } public void createOrUpdateEntry(String name, String loginModule, Map<String, Object> options) { http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2c5517f..62e8abf 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -51,7 +51,7 @@ import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{Node, TopicPartition} -import org.apache.kafka.common.requests.SaslHandshakeResponse +import org.apache.kafka.common.requests.{SaslAuthenticateResponse, SaslHandshakeResponse} import org.apache.kafka.common.resource.{Resource => AdminResource} import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding} import DescribeLogDirsResponse.LogDirInfo @@ -132,6 +132,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request) case ApiKeys.ALTER_REPLICA_DIR => handleAlterReplicaDirRequest(request) case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request) + case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request) } } catch { case e: FatalExitError => throw e @@ -1257,6 +1258,11 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms)) } + def handleSaslAuthenticateRequest(request: RequestChannel.Request) { + sendResponseMaybeThrottle(request, _ => new SaslAuthenticateResponse(Errors.ILLEGAL_SASL_STATE, + "SaslAuthenticate request received after successful authentication")) + } + def handleApiVersionsRequest(request: RequestChannel.Request) { // Note that broker returns its full list of supported ApiKeys and versions regardless of current // authentication state (e.g., before SASL authentication on an SASL listener, do note that no http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 3ec03c3..bb9f82e 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -233,6 +233,9 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.SASL_HANDSHAKE => new SaslHandshakeRequest.Builder("PLAIN") + case ApiKeys.SASL_AUTHENTICATE => + new SaslAuthenticateRequest.Builder(ByteBuffer.wrap(new Array[Byte](0))) + case ApiKeys.API_VERSIONS => new ApiVersionsRequest.Builder @@ -433,7 +436,8 @@ class RequestQuotaTest extends BaseRequestTest { object RequestQuotaTest { val ClusterActions = ApiKeys.values.toSet.filter(apiKey => apiKey.clusterAction) - val ClientActions = ApiKeys.values.toSet -- ClusterActions - ApiKeys.SASL_HANDSHAKE + val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE) + val ClientActions = ApiKeys.values.toSet -- ClusterActions -- SaslActions val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Unauthorized") // Principal used for all client connections. This is modified by tests which http://git-wip-us.apache.org/repos/asf/kafka/blob/8fca4322/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index 1ee4ac8..01d3a83 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -16,7 +16,6 @@ */ package kafka.server -import java.io.IOException import java.net.Socket import java.util.Collections @@ -66,12 +65,8 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslSetup { val plaintextSocket = connect(protocol = securityProtocol) try { sendSaslHandshakeRequestValidateResponse(plaintextSocket) - try { - sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest.Builder().build(0)) - fail("Versions Request during Sasl handshake did not fail") - } catch { - case _: IOException => // expected exception - } + val response = sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest.Builder().build(0)) + assertEquals(Errors.ILLEGAL_SASL_STATE, response.error) } finally { plaintextSocket.close() }