Repository: kafka
Updated Branches:
  refs/heads/0.10.2 0b4daa4bf -> c9168e10f


MINOR: Fix unit test failures from last commit, cherry-pick fix for SASL 
principal

This fixes PlainSaslServerTest from the last commit which uses a class not 
present in 0.10.2. Also cherry-picked commit 9934d28 to 0.10.2 since that is 
required for multiple mechanism support (SaslMultiMechanismConsumerTest was 
failing with the changes without this).

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jun Rao <jun...@gmail.com>

Closes #3773 from rajinisivaram/MINOR-0.10.2-buildfix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9168e10
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9168e10
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9168e10

Branch: refs/heads/0.10.2
Commit: c9168e10f93d713ad78ed421f50e9819b3f046b5
Parents: 0b4daa4
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Thu Aug 31 16:20:33 2017 -0400
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Thu Aug 31 16:20:33 2017 -0400

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java |  2 +-
 .../apache/kafka/common/network/Selector.java   | 17 +++++++++-
 .../kafka/common/protocol/types/Struct.java     |  2 +-
 .../authenticator/SaslClientAuthenticator.java  | 35 ++++++++++++++++----
 .../SaslClientCallbackHandler.java              |  2 +-
 .../authenticator/SaslServerAuthenticator.java  | 10 +++---
 .../clients/producer/ProducerRecordTest.java    | 12 +++----
 .../authenticator/SaslAuthenticatorTest.java    |  9 +++--
 .../security/plain/PlainSaslServerTest.java     | 16 ++++++---
 .../security/scram/ScramSaslServerTest.java     |  2 +-
 10 files changed, 76 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d0d5e8d..c8096ef 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -966,7 +966,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 log.error("Unexpected interrupt received in heartbeat thread 
for group {}", groupId, e);
                 this.failed.set(new RuntimeException(e));
             } catch (RuntimeException e) {
-                log.error("Heartbeat thread for group {} failed due to 
unexpected error" , groupId, e);
+                log.error("Heartbeat thread for group {} failed due to 
unexpected error", groupId, e);
                 this.failed.set(e);
             } finally {
                 log.debug("Heartbeat thread for group {} has closed", groupId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index df35266..f55dc96 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -185,7 +185,17 @@ public class Selector implements Selectable {
             throw e;
         }
         SelectionKey key = socketChannel.register(nioSelector, 
SelectionKey.OP_CONNECT);
-        KafkaChannel channel = channelBuilder.buildChannel(id, key, 
maxReceiveSize);
+        KafkaChannel channel;
+        try {
+            channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+        } catch (Exception e) {
+            try {
+                socketChannel.close();
+            } finally {
+                key.cancel();
+            }
+            throw new IOException("Channel could not be created for socket " + 
socketChannel, e);
+        }
         key.attach(channel);
         this.channels.put(id, channel);
 
@@ -650,6 +660,11 @@ public class Selector implements Selectable {
         this.sensors.recordBytesReceived(channel.id(), 
networkReceive.payload().limit());
     }
 
+    // only for testing
+    public Set<SelectionKey> keys() {
+        return new HashSet<>(nioSelector.keys());
+    }
+
     private class SelectorMetrics {
         private final Metrics metrics;
         public final Sensor connectionClosed;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 0165ce6..ea5e948 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -312,7 +312,7 @@ public class Struct {
             Field f = this.schema.get(i);
             if (f.type() instanceof ArrayOf) {
                 if (this.get(f) != null) {
-                    Object[] arrayObject = (Object []) this.get(f);
+                    Object[] arrayObject = (Object[]) this.get(f);
                     for (Object arrayItem: arrayObject)
                         result = prime * result + arrayItem.hashCode();
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 59eee83..dd793b6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -21,6 +21,7 @@ package org.apache.kafka.common.security.authenticator;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
@@ -54,7 +55,9 @@ import java.security.Principal;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 
 public class SaslClientAuthenticator implements Authenticator {
 
@@ -108,13 +111,14 @@ public class SaslClientAuthenticator implements 
Authenticator {
 
             setSaslState(handshakeRequestEnable ? 
SaslState.SEND_HANDSHAKE_REQUEST : SaslState.INITIAL);
 
-            // determine client principal from subject.
-            if (!subject.getPrincipals().isEmpty()) {
-                Principal clientPrincipal = 
subject.getPrincipals().iterator().next();
-                this.clientPrincipalName = clientPrincipal.getName();
-            } else {
-                clientPrincipalName = null;
-            }
+            // determine client principal from subject for Kerberos to use as 
authorization id for the SaslClient.
+            // For other mechanisms, the authenticated principal (username for 
PLAIN and SCRAM) is used as
+            // authorization id. Hence the principal is not specified for 
creating the SaslClient.
+            if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM))
+                this.clientPrincipalName = firstPrincipal(subject);
+            else
+                this.clientPrincipalName = null;
+
             callbackHandler = new SaslClientCallbackHandler();
             callbackHandler.configure(configs, Mode.CLIENT, subject, 
mechanism);
 
@@ -339,4 +343,21 @@ public class SaslClientAuthenticator implements 
Authenticator {
                     response.errorCode(), mechanism, 
response.enabledMechanisms()));
         }
     }
+
+    /**
+     * Returns the first Principal from Subject.
+     * @throws KafkaException if there are no Principals in the Subject.
+     *     During Kerberos re-login, principal is reset on Subject. An 
exception is
+     *     thrown so that the connection is retried after any configured 
backoff.
+     */
+    static final String firstPrincipal(Subject subject) {
+        Set<Principal> principals = subject.getPrincipals();
+        synchronized (principals) {
+            Iterator<Principal> iterator = principals.iterator();
+            if (iterator.hasNext())
+                return iterator.next().getName();
+            else
+                throw new KafkaException("Principal could not be determined 
from Subject, this may be a transient failure due to Kerberos re-login");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index 8e0b8db..6094b54 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -59,7 +59,7 @@ public class SaslClientCallbackHandler implements 
AuthCallbackHandler {
                     nc.setName(nc.getDefaultName());
             } else if (callback instanceof PasswordCallback) {
                 if (!isKerberos && subject != null && 
!subject.getPrivateCredentials(String.class).isEmpty()) {
-                    char [] password = 
subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
+                    char[] password = 
subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
                     ((PasswordCallback) callback).setPassword(password);
                 } else {
                     String errorMessage = "Could not login: the client is 
being asked for a password, but the Kafka" +

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 069e12f..c71802e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -134,9 +134,7 @@ public class SaslServerAuthenticator implements 
Authenticator {
             callbackHandler = new 
ScramServerCallbackHandler(credentialCache.cache(mechanism, 
ScramCredential.class));
         callbackHandler.configure(configs, Mode.SERVER, subject, 
saslMechanism);
         if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM)) {
-            if (subject.getPrincipals().isEmpty())
-                throw new IllegalArgumentException("subject must have at least 
one principal");
-            saslServer = createSaslKerberosServer(callbackHandler, configs);
+            saslServer = createSaslKerberosServer(callbackHandler, configs, 
subject);
         } else {
             try {
                 saslServer = Subject.doAs(subject, new 
PrivilegedExceptionAction<SaslServer>() {
@@ -150,12 +148,12 @@ public class SaslServerAuthenticator implements 
Authenticator {
         }
     }
 
-    private SaslServer createSaslKerberosServer(final AuthCallbackHandler 
saslServerCallbackHandler, final Map<String, ?> configs) throws IOException {
+    private SaslServer createSaslKerberosServer(final AuthCallbackHandler 
saslServerCallbackHandler, final Map<String, ?> configs, Subject subject) 
throws IOException {
         // server is using a JAAS-authenticated subject: determine service 
principal name and hostname from kafka server's subject.
-        final Principal servicePrincipal = 
subject.getPrincipals().iterator().next();
+        final String servicePrincipal = 
SaslClientAuthenticator.firstPrincipal(subject);
         KerberosName kerberosName;
         try {
-            kerberosName = KerberosName.parse(servicePrincipal.getName());
+            kerberosName = KerberosName.parse(servicePrincipal);
         } catch (IllegalArgumentException e) {
             throw new KafkaException("Principal has name with unexpected 
format " + servicePrincipal);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
index a844bb0..5186d05 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
@@ -26,24 +26,24 @@ public class ProducerRecordTest {
 
     @Test
     public void testEqualsAndHashCode() {
-        ProducerRecord<String, Integer> producerRecord = new 
ProducerRecord<>("test", 1 , "key", 1);
+        ProducerRecord<String, Integer> producerRecord = new 
ProducerRecord<>("test", 1, "key", 1);
         assertEquals(producerRecord, producerRecord);
         assertEquals(producerRecord.hashCode(), producerRecord.hashCode());
 
-        ProducerRecord<String, Integer> equalRecord = new 
ProducerRecord<>("test", 1 , "key", 1);
+        ProducerRecord<String, Integer> equalRecord = new 
ProducerRecord<>("test", 1, "key", 1);
         assertEquals(producerRecord, equalRecord);
         assertEquals(producerRecord.hashCode(), equalRecord.hashCode());
 
-        ProducerRecord<String, Integer> topicMisMatch = new 
ProducerRecord<>("test-1", 1 , "key", 1);
+        ProducerRecord<String, Integer> topicMisMatch = new 
ProducerRecord<>("test-1", 1, "key", 1);
         assertFalse(producerRecord.equals(topicMisMatch));
 
-        ProducerRecord<String, Integer> partitionMismatch = new 
ProducerRecord<>("test", 2 , "key", 1);
+        ProducerRecord<String, Integer> partitionMismatch = new 
ProducerRecord<>("test", 2, "key", 1);
         assertFalse(producerRecord.equals(partitionMismatch));
 
-        ProducerRecord<String, Integer> keyMisMatch = new 
ProducerRecord<>("test", 1 , "key-1", 1);
+        ProducerRecord<String, Integer> keyMisMatch = new 
ProducerRecord<>("test", 1, "key-1", 1);
         assertFalse(producerRecord.equals(keyMisMatch));
 
-        ProducerRecord<String, Integer> valueMisMatch = new 
ProducerRecord<>("test", 1 , "key", 2);
+        ProducerRecord<String, Integer> valueMisMatch = new 
ProducerRecord<>("test", 1, "key", 2);
         assertFalse(producerRecord.equals(valueMisMatch));
 
         ProducerRecord<String, Integer> nullFieldsRecord = new 
ProducerRecord<>("topic", null, null, null, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index ac9beb4..c8335aa 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -51,6 +51,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -63,6 +64,7 @@ import java.util.Random;
 import javax.security.auth.login.Configuration;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -169,8 +171,11 @@ public class SaslAuthenticatorTest {
         try {
             selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
             fail("SASL/PLAIN channel created without username");
-        } catch (KafkaException e) {
+        } catch (IOException e) {
             // Expected exception
+            assertTrue("Channels not closed", selector.channels().isEmpty());
+            for (SelectionKey key : selector.keys())
+                assertFalse("Key not cancelled", key.isValid());
         }
     }
 
@@ -190,7 +195,7 @@ public class SaslAuthenticatorTest {
         try {
             selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
             fail("SASL/PLAIN channel created without password");
-        } catch (KafkaException e) {
+        } catch (IOException e) {
             // Expected exception
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
index 5ca778b..039e385 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.security.plain;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -23,11 +24,11 @@ import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
+import javax.security.auth.login.Configuration;
 import javax.security.sasl.SaslException;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.authenticator.TestJaasConfig;
 
 public class PlainSaslServerTest {
@@ -45,9 +46,14 @@ public class PlainSaslServerTest {
         Map<String, Object> options = new HashMap<>();
         options.put("user_" + USER_A, PASSWORD_A);
         options.put("user_" + USER_B, PASSWORD_B);
-        jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), 
options);
-        JaasContext jaasContext = new JaasContext("jaasContext", 
JaasContext.Type.SERVER, jaasConfig);
-        saslServer = new PlainSaslServer(jaasContext);
+        jaasConfig.addEntry("KafkaServer", PlainLoginModule.class.getName(), 
options);
+        Configuration.setConfiguration(jaasConfig);
+        saslServer = new PlainSaslServer(null);
+    }
+
+    @After
+    public void tearDown() {
+        Configuration.setConfiguration(null);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9168e10/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
index 9b329d0..a1d51f0 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.

Reply via email to