[ 
https://issues.apache.org/jira/browse/KAFKA-7119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16582135#comment-16582135
 ] 

ASF GitHub Bot commented on KAFKA-7119:
---------------------------------------

rajinisivaram closed pull request #5509: KAFKA-7119: Handle transient Kerberos 
errors on server side
URL: https://github.com/apache/kafka/pull/5509
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 8d6549d867c..8934e8e5487 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -42,7 +42,7 @@
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import org.apache.kafka.common.utils.Java;
+import org.apache.kafka.common.security.kerberos.KerberosError;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +52,6 @@
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.security.Principal;
@@ -376,7 +375,7 @@ public void close() throws IOException {
             Throwable cause = e.getCause();
             // Treat transient Kerberos errors as non-fatal SaslExceptions 
that are processed as I/O exceptions
             // and all other failures as fatal SaslAuthenticationException.
-            if (kerberosError != null && kerberosError.retriable)
+            if (kerberosError != null && kerberosError.retriable())
                 throw new SaslException(error, cause);
             else
                 throw new SaslAuthenticationException(error, cause);
@@ -443,73 +442,4 @@ static final String firstPrincipal(Subject subject) {
         }
     }
 
-    /**
-     * Kerberos exceptions that may require special handling. The standard 
Kerberos error codes
-     * for these errors are retrieved using KrbException#errorCode() from the 
underlying Kerberos
-     * exception thrown during {@link SaslClient#evaluateChallenge(byte[])}.
-     */
-    private enum KerberosError {
-        // (Mechanism level: Server not found in Kerberos database (7) - 
UNKNOWN_SERVER)
-        // This is retriable, but included here to add extra logging for this 
case.
-        SERVER_NOT_FOUND(7, false),
-        // (Mechanism level: Client not yet valid - try again later (21))
-        CLIENT_NOT_YET_VALID(21, true),
-        // (Mechanism level: Ticket not yet valid (33) - Ticket not yet 
valid)])
-        // This could be a small timing window.
-        TICKET_NOT_YET_VALID(33, true),
-        // (Mechanism level: Request is a replay (34) - Request is a replay)
-        // Replay detection used to prevent DoS attacks can result in false 
positives, so retry on error.
-        REPLAY(34, true);
-
-
-        private static final Class<?> KRB_EXCEPTION_CLASS;
-        private static final Method KRB_EXCEPTION_RETURN_CODE_METHOD;
-
-        static {
-            try {
-                if (Java.isIbmJdk()) {
-                    KRB_EXCEPTION_CLASS = 
Class.forName("com.ibm.security.krb5.internal.KrbException");
-                } else {
-                    KRB_EXCEPTION_CLASS = 
Class.forName("sun.security.krb5.KrbException");
-                }
-                KRB_EXCEPTION_RETURN_CODE_METHOD = 
KRB_EXCEPTION_CLASS.getMethod("returnCode");
-            } catch (Exception e) {
-                throw new KafkaException("Kerberos exceptions could not be 
initialized", e);
-            }
-        }
-
-        private final int errorCode;
-        private final boolean retriable;
-
-        KerberosError(int errorCode, boolean retriable) {
-            this.errorCode = errorCode;
-            this.retriable = retriable;
-        }
-
-        private static KerberosError fromException(Exception exception) {
-            Throwable cause = exception.getCause();
-            while (cause != null && !KRB_EXCEPTION_CLASS.isInstance(cause)) {
-                cause = cause.getCause();
-            }
-            if (cause == null)
-                return null;
-            else {
-                try {
-                    Integer errorCode = (Integer) 
KRB_EXCEPTION_RETURN_CODE_METHOD.invoke(cause);
-                    return fromErrorCode(errorCode);
-                } catch (Exception e) {
-                    LOG.trace("Kerberos return code could not be determined 
from {} due to {}", exception, e);
-                    return null;
-                }
-            }
-        }
-
-        private static KerberosError fromErrorCode(int errorCode) {
-            for (KerberosError error : values()) {
-                if (error.errorCode == errorCode)
-                    return error;
-            }
-            return null;
-        }
-    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index a3f81629bc7..e8f77a53e22 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -49,6 +49,7 @@
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
+import org.apache.kafka.common.security.kerberos.KerberosError;
 import org.apache.kafka.common.security.kerberos.KerberosName;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.scram.ScramLoginModule;
@@ -267,11 +268,9 @@ public void authenticate() throws IOException {
                     default:
                         break;
                 }
-            } catch (SaslException | AuthenticationException e) {
+            } catch (AuthenticationException e) {
                 // Exception will be propagated after response is sent to 
client
-                AuthenticationException authException = (e instanceof 
AuthenticationException) ?
-                        (AuthenticationException) e : new 
AuthenticationException("SASL authentication failed", e);
-                setSaslState(SaslState.FAILED, authException);
+                setSaslState(SaslState.FAILED, e);
             } catch (Exception e) {
                 // In the case of IOExceptions and other unexpected 
exceptions, fail immediately
                 saslState = SaslState.FAILED;
@@ -378,12 +377,20 @@ private void handleSaslToken(byte[] clientToken) throws 
IOException {
                 // For versions with SASL_AUTHENTICATE header, send a response 
to SASL_AUTHENTICATE request even if token is empty.
                 ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER 
: ByteBuffer.wrap(responseToken);
                 sendKafkaResponse(requestContext, new 
SaslAuthenticateResponse(Errors.NONE, null, responseBuf));
-            } catch (SaslAuthenticationException | SaslException e) {
-                String errorMessage = e instanceof SaslAuthenticationException 
? e.getMessage() :
-                    "Authentication failed due to invalid credentials with 
SASL mechanism " + saslMechanism;
-                sendKafkaResponse(requestContext, new 
SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED,
-                        errorMessage));
+            } catch (SaslAuthenticationException e) {
+                sendKafkaResponse(requestContext, new 
SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()));
                 throw e;
+            } catch (SaslException e) {
+                KerberosError kerberosError = KerberosError.fromException(e);
+                if (kerberosError != null && kerberosError.retriable()) {
+                    // Handle retriable Kerberos exceptions as I/O exceptions 
rather than authentication exceptions
+                    throw e;
+                } else {
+                    String errorMessage = "Authentication failed due to 
invalid credentials with SASL mechanism " + saslMechanism;
+                    sendKafkaResponse(requestContext, new 
SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED,
+                            errorMessage));
+                    throw new SaslAuthenticationException(errorMessage, e);
+                }
             }
         }
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
new file mode 100644
index 00000000000..c6be441c7ed
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.kerberos;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
+import org.apache.kafka.common.utils.Java;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.sasl.SaslClient;
+import java.lang.reflect.Method;
+
+/**
+ * Kerberos exceptions that may require special handling. The standard 
Kerberos error codes
+ * for these errors are retrieved using KrbException#errorCode() from the 
underlying Kerberos
+ * exception thrown during {@link SaslClient#evaluateChallenge(byte[])}.
+ */
+public enum KerberosError {
+    // (Mechanism level: Server not found in Kerberos database (7) - 
UNKNOWN_SERVER)
+    // This is retriable, but included here to add extra logging for this case.
+    SERVER_NOT_FOUND(7, false),
+    // (Mechanism level: Client not yet valid - try again later (21))
+    CLIENT_NOT_YET_VALID(21, true),
+    // (Mechanism level: Ticket not yet valid (33) - Ticket not yet valid)])
+    // This could be a small timing window.
+    TICKET_NOT_YET_VALID(33, true),
+    // (Mechanism level: Request is a replay (34) - Request is a replay)
+    // Replay detection used to prevent DoS attacks can result in false 
positives, so retry on error.
+    REPLAY(34, true);
+
+    private static final Logger log = 
LoggerFactory.getLogger(SaslClientAuthenticator.class);
+    private static final Class<?> KRB_EXCEPTION_CLASS;
+    private static final Method KRB_EXCEPTION_RETURN_CODE_METHOD;
+
+    static {
+        try {
+            if (Java.isIbmJdk()) {
+                KRB_EXCEPTION_CLASS = 
Class.forName("com.ibm.security.krb5.internal.KrbException");
+            } else {
+                KRB_EXCEPTION_CLASS = 
Class.forName("sun.security.krb5.KrbException");
+            }
+            KRB_EXCEPTION_RETURN_CODE_METHOD = 
KRB_EXCEPTION_CLASS.getMethod("returnCode");
+        } catch (Exception e) {
+            throw new KafkaException("Kerberos exceptions could not be 
initialized", e);
+        }
+    }
+
+    private final int errorCode;
+    private final boolean retriable;
+
+    KerberosError(int errorCode, boolean retriable) {
+        this.errorCode = errorCode;
+        this.retriable = retriable;
+    }
+
+    public boolean retriable() {
+        return retriable;
+    }
+
+    public static KerberosError fromException(Exception exception) {
+        Throwable cause = exception.getCause();
+        while (cause != null && !KRB_EXCEPTION_CLASS.isInstance(cause)) {
+            cause = cause.getCause();
+        }
+        if (cause == null)
+            return null;
+        else {
+            try {
+                Integer errorCode = (Integer) 
KRB_EXCEPTION_RETURN_CODE_METHOD.invoke(cause);
+                return fromErrorCode(errorCode);
+            } catch (Exception e) {
+                log.trace("Kerberos return code could not be determined from 
{} due to {}", exception, e);
+                return null;
+            }
+        }
+    }
+
+    private static KerberosError fromErrorCode(int errorCode) {
+        for (KerberosError error : values()) {
+            if (error.errorCode == errorCode)
+                return error;
+        }
+        return null;
+    }
+}
diff --git 
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala 
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 04166c63b18..74b2a152e23 100644
--- 
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -113,7 +113,8 @@ class GssapiAuthenticationTest extends 
IntegrationTestHarness with SaslSetup {
           val disconnectState = selector.disconnected().get(nodeId)
           // Verify that disconnect state is not AUTHENTICATION_FAILED
           if (disconnectState != null)
-            assertEquals(ChannelState.State.AUTHENTICATE, 
disconnectState.state())
+            assertEquals(s"Authentication failed with exception 
${disconnectState.exception()}",
+              ChannelState.State.AUTHENTICATE, disconnectState.state())
           selector.isChannelReady(nodeId) || disconnectState != null
         }, "Client not ready or disconnected within timeout")
         if (selector.isChannelReady(nodeId))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Intermittent test failure with GSSAPI authentication failure
> ------------------------------------------------------------
>
>                 Key: KAFKA-7119
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7119
>             Project: Kafka
>          Issue Type: Bug
>          Components: security
>    Affects Versions: 2.0.0
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 1.0.3, 1.1.2, 2.0.1, 2.1.0
>
>
> I have seen this failure a couple of times in builds (e.g. 
> [https://builds.apache.org/job/kafka-pr-jdk10-scala2.12/2412/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testLogStartOffsetCheckpoint/)]
> {quote}
> org.apache.kafka.common.errors.SaslAuthenticationException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Request is a replay (34) - Request is a replay)]) occurred 
> when evaluating SASL token received from the Kafka Broker. Kafka Client will 
> go to AUTHENTICATION_FAILED state. Caused by: 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Request is a 
> replay (34) - Request is a replay)] at 
> jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:358)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:356)
>  at java.base/java.security.AccessController.doPrivileged(Native Method) at 
> java.base/javax.security.auth.Subject.doAs(Subject.java:423) at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:356)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:268)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:205)
>  at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127) 
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487) 
> at org.apache.kafka.common.network.Selector.poll(Selector.java:425) at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) 
> at 
> kafka.api.AdminClientIntegrationTest.$anonfun$subscribeAndWaitForAssignment$2(AdminClientIntegrationTest.scala:980)
>  at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:781) at 
> kafka.api.AdminClientIntegrationTest.subscribeAndWaitForAssignment(AdminClientIntegrationTest.scala:979)
>  at 
> kafka.api.AdminClientIntegrationTest.testLogStartOffsetCheckpoint(AdminClientIntegrationTest.scala:755)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
> java.base/java.lang.Thread.run(Thread.java:844) Caused by: GSSException: No 
> valid credentials provided (Mechanism level: Request is a replay (34) - 
> Request is a replay) at 
> java.security.jgss/sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:771)
>  at 
> java.security.jgss/sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:265)
>  at 
> java.security.jgss/sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:196)
>  at 
> jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
>  ... 37 more Caused by: KrbException: Request is a replay (34) - Request is a 
> replay at 
> java.security.jgss/sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:73) at 
> java.security.jgss/sun.security.krb5.KrbTgsReq.getReply(KrbTgsReq.java:251) 
> at 
> java.security.jgss/sun.security.krb5.KrbTgsReq.sendAndGetCreds(KrbTgsReq.java:262)
>  at 
> java.security.jgss/sun.security.krb5.internal.CredentialsUtil.serviceCreds(CredentialsUtil.java:308)
>  at 
> java.security.jgss/sun.security.krb5.internal.CredentialsUtil.acquireServiceCreds(CredentialsUtil.java:126)
>  at 
> java.security.jgss/sun.security.krb5.Credentials.acquireServiceCreds(Credentials.java:458)
>  at 
> java.security.jgss/sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:695)
>  ... 40 more Caused by: KrbException: Identifier doesn't match expected value 
> (906) at 
> java.security.jgss/sun.security.krb5.internal.KDCRep.init(KDCRep.java:140) at 
> java.security.jgss/sun.security.krb5.internal.TGSRep.init(TGSRep.java:65) at 
> java.security.jgss/sun.security.krb5.internal.TGSRep.<init>(TGSRep.java:60) 
> at java.security.jgss/sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:55)
> ... 46 more
> {quote}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to