Repository: kafka Updated Branches: refs/heads/0.10.2 0b4daa4bf -> c9168e10f
MINOR: Fix unit test failures from last commit, cherry-pick fix for SASL principal This fixes PlainSaslServerTest from the last commit which uses a class not present in 0.10.2. Also cherry-picked commit 9934d28 to 0.10.2 since that is required for multiple mechanism support (SaslMultiMechanismConsumerTest was failing with the changes without this). Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jun Rao <jun...@gmail.com> Closes #3773 from rajinisivaram/MINOR-0.10.2-buildfix Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9168e10 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9168e10 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9168e10 Branch: refs/heads/0.10.2 Commit: c9168e10f93d713ad78ed421f50e9819b3f046b5 Parents: 0b4daa4 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Thu Aug 31 16:20:33 2017 -0400 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Thu Aug 31 16:20:33 2017 -0400 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 2 +- .../apache/kafka/common/network/Selector.java | 17 +++++++++- .../kafka/common/protocol/types/Struct.java | 2 +- .../authenticator/SaslClientAuthenticator.java | 35 ++++++++++++++++---- .../SaslClientCallbackHandler.java | 2 +- .../authenticator/SaslServerAuthenticator.java | 10 +++--- .../clients/producer/ProducerRecordTest.java | 12 +++---- .../authenticator/SaslAuthenticatorTest.java | 9 +++-- .../security/plain/PlainSaslServerTest.java | 16 ++++++--- .../security/scram/ScramSaslServerTest.java | 2 +- 10 files changed, 76 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index d0d5e8d..c8096ef 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -966,7 +966,7 @@ public abstract class AbstractCoordinator implements Closeable { log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e); this.failed.set(new RuntimeException(e)); } catch (RuntimeException e) { - log.error("Heartbeat thread for group {} failed due to unexpected error" , groupId, e); + log.error("Heartbeat thread for group {} failed due to unexpected error", groupId, e); this.failed.set(e); } finally { log.debug("Heartbeat thread for group {} has closed", groupId); http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index df35266..f55dc96 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -185,7 +185,17 @@ public class Selector implements Selectable { throw e; } SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); - KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); + KafkaChannel channel; + try { + channel = channelBuilder.buildChannel(id, key, maxReceiveSize); + } catch (Exception e) { + try { + socketChannel.close(); + } finally { + key.cancel(); + } + throw new IOException("Channel could not be created for socket " + socketChannel, e); + } key.attach(channel); this.channels.put(id, channel); @@ -650,6 +660,11 @@ public class Selector implements Selectable { this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit()); } + // only for testing + public Set<SelectionKey> keys() { + return new HashSet<>(nioSelector.keys()); + } + private class SelectorMetrics { private final Metrics metrics; public final Sensor connectionClosed; http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 0165ce6..ea5e948 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -312,7 +312,7 @@ public class Struct { Field f = this.schema.get(i); if (f.type() instanceof ArrayOf) { if (this.get(f) != null) { - Object[] arrayObject = (Object []) this.get(f); + Object[] arrayObject = (Object[]) this.get(f); for (Object arrayItem: arrayObject) result = prime * result + arrayItem.hashCode(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java index 59eee83..dd793b6 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java @@ -21,6 +21,7 @@ package org.apache.kafka.common.security.authenticator; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.IllegalSaslStateException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; @@ -54,7 +55,9 @@ import java.security.Principal; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.Arrays; +import java.util.Iterator; import java.util.Map; +import java.util.Set; public class SaslClientAuthenticator implements Authenticator { @@ -108,13 +111,14 @@ public class SaslClientAuthenticator implements Authenticator { setSaslState(handshakeRequestEnable ? SaslState.SEND_HANDSHAKE_REQUEST : SaslState.INITIAL); - // determine client principal from subject. - if (!subject.getPrincipals().isEmpty()) { - Principal clientPrincipal = subject.getPrincipals().iterator().next(); - this.clientPrincipalName = clientPrincipal.getName(); - } else { - clientPrincipalName = null; - } + // determine client principal from subject for Kerberos to use as authorization id for the SaslClient. + // For other mechanisms, the authenticated principal (username for PLAIN and SCRAM) is used as + // authorization id. Hence the principal is not specified for creating the SaslClient. + if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM)) + this.clientPrincipalName = firstPrincipal(subject); + else + this.clientPrincipalName = null; + callbackHandler = new SaslClientCallbackHandler(); callbackHandler.configure(configs, Mode.CLIENT, subject, mechanism); @@ -339,4 +343,21 @@ public class SaslClientAuthenticator implements Authenticator { response.errorCode(), mechanism, response.enabledMechanisms())); } } + + /** + * Returns the first Principal from Subject. + * @throws KafkaException if there are no Principals in the Subject. + * During Kerberos re-login, principal is reset on Subject. An exception is + * thrown so that the connection is retried after any configured backoff. + */ + static final String firstPrincipal(Subject subject) { + Set<Principal> principals = subject.getPrincipals(); + synchronized (principals) { + Iterator<Principal> iterator = principals.iterator(); + if (iterator.hasNext()) + return iterator.next().getName(); + else + throw new KafkaException("Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login"); + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java index 8e0b8db..6094b54 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java @@ -59,7 +59,7 @@ public class SaslClientCallbackHandler implements AuthCallbackHandler { nc.setName(nc.getDefaultName()); } else if (callback instanceof PasswordCallback) { if (!isKerberos && subject != null && !subject.getPrivateCredentials(String.class).isEmpty()) { - char [] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray(); + char[] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray(); ((PasswordCallback) callback).setPassword(password); } else { String errorMessage = "Could not login: the client is being asked for a password, but the Kafka" + http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 069e12f..c71802e 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -134,9 +134,7 @@ public class SaslServerAuthenticator implements Authenticator { callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class)); callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism); if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM)) { - if (subject.getPrincipals().isEmpty()) - throw new IllegalArgumentException("subject must have at least one principal"); - saslServer = createSaslKerberosServer(callbackHandler, configs); + saslServer = createSaslKerberosServer(callbackHandler, configs, subject); } else { try { saslServer = Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() { @@ -150,12 +148,12 @@ public class SaslServerAuthenticator implements Authenticator { } } - private SaslServer createSaslKerberosServer(final AuthCallbackHandler saslServerCallbackHandler, final Map<String, ?> configs) throws IOException { + private SaslServer createSaslKerberosServer(final AuthCallbackHandler saslServerCallbackHandler, final Map<String, ?> configs, Subject subject) throws IOException { // server is using a JAAS-authenticated subject: determine service principal name and hostname from kafka server's subject. - final Principal servicePrincipal = subject.getPrincipals().iterator().next(); + final String servicePrincipal = SaslClientAuthenticator.firstPrincipal(subject); KerberosName kerberosName; try { - kerberosName = KerberosName.parse(servicePrincipal.getName()); + kerberosName = KerberosName.parse(servicePrincipal); } catch (IllegalArgumentException e) { throw new KafkaException("Principal has name with unexpected format " + servicePrincipal); } http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java index a844bb0..5186d05 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java @@ -26,24 +26,24 @@ public class ProducerRecordTest { @Test public void testEqualsAndHashCode() { - ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>("test", 1 , "key", 1); + ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>("test", 1, "key", 1); assertEquals(producerRecord, producerRecord); assertEquals(producerRecord.hashCode(), producerRecord.hashCode()); - ProducerRecord<String, Integer> equalRecord = new ProducerRecord<>("test", 1 , "key", 1); + ProducerRecord<String, Integer> equalRecord = new ProducerRecord<>("test", 1, "key", 1); assertEquals(producerRecord, equalRecord); assertEquals(producerRecord.hashCode(), equalRecord.hashCode()); - ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<>("test-1", 1 , "key", 1); + ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<>("test-1", 1, "key", 1); assertFalse(producerRecord.equals(topicMisMatch)); - ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<>("test", 2 , "key", 1); + ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<>("test", 2, "key", 1); assertFalse(producerRecord.equals(partitionMismatch)); - ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<>("test", 1 , "key-1", 1); + ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<>("test", 1, "key-1", 1); assertFalse(producerRecord.equals(keyMisMatch)); - ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<>("test", 1 , "key", 2); + ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<>("test", 1, "key", 2); assertFalse(producerRecord.equals(valueMisMatch)); ProducerRecord<String, Integer> nullFieldsRecord = new ProducerRecord<>("topic", null, null, null, null); http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index ac9beb4..c8335aa 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -51,6 +51,7 @@ import org.junit.Test; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Arrays; @@ -63,6 +64,7 @@ import java.util.Random; import javax.security.auth.login.Configuration; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -169,8 +171,11 @@ public class SaslAuthenticatorTest { try { selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); fail("SASL/PLAIN channel created without username"); - } catch (KafkaException e) { + } catch (IOException e) { // Expected exception + assertTrue("Channels not closed", selector.channels().isEmpty()); + for (SelectionKey key : selector.keys()) + assertFalse("Key not cancelled", key.isValid()); } } @@ -190,7 +195,7 @@ public class SaslAuthenticatorTest { try { selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); fail("SASL/PLAIN channel created without password"); - } catch (KafkaException e) { + } catch (IOException e) { // Expected exception } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java index 5ca778b..039e385 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.security.plain; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -23,11 +24,11 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import javax.security.auth.login.Configuration; import javax.security.sasl.SaslException; import static org.junit.Assert.assertEquals; -import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.authenticator.TestJaasConfig; public class PlainSaslServerTest { @@ -45,9 +46,14 @@ public class PlainSaslServerTest { Map<String, Object> options = new HashMap<>(); options.put("user_" + USER_A, PASSWORD_A); options.put("user_" + USER_B, PASSWORD_B); - jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), options); - JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig); - saslServer = new PlainSaslServer(jaasContext); + jaasConfig.addEntry("KafkaServer", PlainLoginModule.class.getName(), options); + Configuration.setConfiguration(jaasConfig); + saslServer = new PlainSaslServer(null); + } + + @After + public void tearDown() { + Configuration.setConfiguration(null); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java index 9b329d0..a1d51f0 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership.