[ https://issues.apache.org/jira/browse/KAFKA-7119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16582135#comment-16582135 ]
ASF GitHub Bot commented on KAFKA-7119: --------------------------------------- rajinisivaram closed pull request #5509: KAFKA-7119: Handle transient Kerberos errors on server side URL: https://github.com/apache/kafka/pull/5509 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java index 8d6549d867c..8934e8e5487 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java @@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.SaslHandshakeResponse; import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.security.auth.KafkaPrincipal; -import org.apache.kafka.common.utils.Java; +import org.apache.kafka.common.security.kerberos.KerberosError; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +52,6 @@ import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; import java.io.IOException; -import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.security.Principal; @@ -376,7 +375,7 @@ public void close() throws IOException { Throwable cause = e.getCause(); // Treat transient Kerberos errors as non-fatal SaslExceptions that are processed as I/O exceptions // and all other failures as fatal SaslAuthenticationException. - if (kerberosError != null && kerberosError.retriable) + if (kerberosError != null && kerberosError.retriable()) throw new SaslException(error, cause); else throw new SaslAuthenticationException(error, cause); @@ -443,73 +442,4 @@ static final String firstPrincipal(Subject subject) { } } - /** - * Kerberos exceptions that may require special handling. The standard Kerberos error codes - * for these errors are retrieved using KrbException#errorCode() from the underlying Kerberos - * exception thrown during {@link SaslClient#evaluateChallenge(byte[])}. - */ - private enum KerberosError { - // (Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER) - // This is retriable, but included here to add extra logging for this case. - SERVER_NOT_FOUND(7, false), - // (Mechanism level: Client not yet valid - try again later (21)) - CLIENT_NOT_YET_VALID(21, true), - // (Mechanism level: Ticket not yet valid (33) - Ticket not yet valid)]) - // This could be a small timing window. - TICKET_NOT_YET_VALID(33, true), - // (Mechanism level: Request is a replay (34) - Request is a replay) - // Replay detection used to prevent DoS attacks can result in false positives, so retry on error. - REPLAY(34, true); - - - private static final Class<?> KRB_EXCEPTION_CLASS; - private static final Method KRB_EXCEPTION_RETURN_CODE_METHOD; - - static { - try { - if (Java.isIbmJdk()) { - KRB_EXCEPTION_CLASS = Class.forName("com.ibm.security.krb5.internal.KrbException"); - } else { - KRB_EXCEPTION_CLASS = Class.forName("sun.security.krb5.KrbException"); - } - KRB_EXCEPTION_RETURN_CODE_METHOD = KRB_EXCEPTION_CLASS.getMethod("returnCode"); - } catch (Exception e) { - throw new KafkaException("Kerberos exceptions could not be initialized", e); - } - } - - private final int errorCode; - private final boolean retriable; - - KerberosError(int errorCode, boolean retriable) { - this.errorCode = errorCode; - this.retriable = retriable; - } - - private static KerberosError fromException(Exception exception) { - Throwable cause = exception.getCause(); - while (cause != null && !KRB_EXCEPTION_CLASS.isInstance(cause)) { - cause = cause.getCause(); - } - if (cause == null) - return null; - else { - try { - Integer errorCode = (Integer) KRB_EXCEPTION_RETURN_CODE_METHOD.invoke(cause); - return fromErrorCode(errorCode); - } catch (Exception e) { - LOG.trace("Kerberos return code could not be determined from {} due to {}", exception, e); - return null; - } - } - } - - private static KerberosError fromErrorCode(int errorCode) { - for (KerberosError error : values()) { - if (error.errorCode == errorCode) - return error; - } - return null; - } - } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index a3f81629bc7..e8f77a53e22 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -49,6 +49,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; import org.apache.kafka.common.security.auth.SaslAuthenticationContext; +import org.apache.kafka.common.security.kerberos.KerberosError; import org.apache.kafka.common.security.kerberos.KerberosName; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.scram.ScramLoginModule; @@ -267,11 +268,9 @@ public void authenticate() throws IOException { default: break; } - } catch (SaslException | AuthenticationException e) { + } catch (AuthenticationException e) { // Exception will be propagated after response is sent to client - AuthenticationException authException = (e instanceof AuthenticationException) ? - (AuthenticationException) e : new AuthenticationException("SASL authentication failed", e); - setSaslState(SaslState.FAILED, authException); + setSaslState(SaslState.FAILED, e); } catch (Exception e) { // In the case of IOExceptions and other unexpected exceptions, fail immediately saslState = SaslState.FAILED; @@ -378,12 +377,20 @@ private void handleSaslToken(byte[] clientToken) throws IOException { // For versions with SASL_AUTHENTICATE header, send a response to SASL_AUTHENTICATE request even if token is empty. ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER : ByteBuffer.wrap(responseToken); sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.NONE, null, responseBuf)); - } catch (SaslAuthenticationException | SaslException e) { - String errorMessage = e instanceof SaslAuthenticationException ? e.getMessage() : - "Authentication failed due to invalid credentials with SASL mechanism " + saslMechanism; - sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, - errorMessage)); + } catch (SaslAuthenticationException e) { + sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage())); throw e; + } catch (SaslException e) { + KerberosError kerberosError = KerberosError.fromException(e); + if (kerberosError != null && kerberosError.retriable()) { + // Handle retriable Kerberos exceptions as I/O exceptions rather than authentication exceptions + throw e; + } else { + String errorMessage = "Authentication failed due to invalid credentials with SASL mechanism " + saslMechanism; + sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, + errorMessage)); + throw new SaslAuthenticationException(errorMessage, e); + } } } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java new file mode 100644 index 00000000000..c6be441c7ed --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.kerberos; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator; +import org.apache.kafka.common.utils.Java; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.sasl.SaslClient; +import java.lang.reflect.Method; + +/** + * Kerberos exceptions that may require special handling. The standard Kerberos error codes + * for these errors are retrieved using KrbException#errorCode() from the underlying Kerberos + * exception thrown during {@link SaslClient#evaluateChallenge(byte[])}. + */ +public enum KerberosError { + // (Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER) + // This is retriable, but included here to add extra logging for this case. + SERVER_NOT_FOUND(7, false), + // (Mechanism level: Client not yet valid - try again later (21)) + CLIENT_NOT_YET_VALID(21, true), + // (Mechanism level: Ticket not yet valid (33) - Ticket not yet valid)]) + // This could be a small timing window. + TICKET_NOT_YET_VALID(33, true), + // (Mechanism level: Request is a replay (34) - Request is a replay) + // Replay detection used to prevent DoS attacks can result in false positives, so retry on error. + REPLAY(34, true); + + private static final Logger log = LoggerFactory.getLogger(SaslClientAuthenticator.class); + private static final Class<?> KRB_EXCEPTION_CLASS; + private static final Method KRB_EXCEPTION_RETURN_CODE_METHOD; + + static { + try { + if (Java.isIbmJdk()) { + KRB_EXCEPTION_CLASS = Class.forName("com.ibm.security.krb5.internal.KrbException"); + } else { + KRB_EXCEPTION_CLASS = Class.forName("sun.security.krb5.KrbException"); + } + KRB_EXCEPTION_RETURN_CODE_METHOD = KRB_EXCEPTION_CLASS.getMethod("returnCode"); + } catch (Exception e) { + throw new KafkaException("Kerberos exceptions could not be initialized", e); + } + } + + private final int errorCode; + private final boolean retriable; + + KerberosError(int errorCode, boolean retriable) { + this.errorCode = errorCode; + this.retriable = retriable; + } + + public boolean retriable() { + return retriable; + } + + public static KerberosError fromException(Exception exception) { + Throwable cause = exception.getCause(); + while (cause != null && !KRB_EXCEPTION_CLASS.isInstance(cause)) { + cause = cause.getCause(); + } + if (cause == null) + return null; + else { + try { + Integer errorCode = (Integer) KRB_EXCEPTION_RETURN_CODE_METHOD.invoke(cause); + return fromErrorCode(errorCode); + } catch (Exception e) { + log.trace("Kerberos return code could not be determined from {} due to {}", exception, e); + return null; + } + } + } + + private static KerberosError fromErrorCode(int errorCode) { + for (KerberosError error : values()) { + if (error.errorCode == errorCode) + return error; + } + return null; + } +} diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala index 04166c63b18..74b2a152e23 100644 --- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala +++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala @@ -113,7 +113,8 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { val disconnectState = selector.disconnected().get(nodeId) // Verify that disconnect state is not AUTHENTICATION_FAILED if (disconnectState != null) - assertEquals(ChannelState.State.AUTHENTICATE, disconnectState.state()) + assertEquals(s"Authentication failed with exception ${disconnectState.exception()}", + ChannelState.State.AUTHENTICATE, disconnectState.state()) selector.isChannelReady(nodeId) || disconnectState != null }, "Client not ready or disconnected within timeout") if (selector.isChannelReady(nodeId)) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Intermittent test failure with GSSAPI authentication failure > ------------------------------------------------------------ > > Key: KAFKA-7119 > URL: https://issues.apache.org/jira/browse/KAFKA-7119 > Project: Kafka > Issue Type: Bug > Components: security > Affects Versions: 2.0.0 > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Major > Fix For: 1.0.3, 1.1.2, 2.0.1, 2.1.0 > > > I have seen this failure a couple of times in builds (e.g. > [https://builds.apache.org/job/kafka-pr-jdk10-scala2.12/2412/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testLogStartOffsetCheckpoint/)] > {quote} > org.apache.kafka.common.errors.SaslAuthenticationException: An error: > (java.security.PrivilegedActionException: javax.security.sasl.SaslException: > GSS initiate failed [Caused by GSSException: No valid credentials provided > (Mechanism level: Request is a replay (34) - Request is a replay)]) occurred > when evaluating SASL token received from the Kafka Broker. Kafka Client will > go to AUTHENTICATION_FAILED state. Caused by: > javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Request is a > replay (34) - Request is a replay)] at > jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:358) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:356) > at java.base/java.security.AccessController.doPrivileged(Native Method) at > java.base/javax.security.auth.Subject.doAs(Subject.java:423) at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:356) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:268) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:205) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487) > at org.apache.kafka.common.network.Selector.poll(Selector.java:425) at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > kafka.api.AdminClientIntegrationTest.$anonfun$subscribeAndWaitForAssignment$2(AdminClientIntegrationTest.scala:980) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:781) at > kafka.api.AdminClientIntegrationTest.subscribeAndWaitForAssignment(AdminClientIntegrationTest.scala:979) > at > kafka.api.AdminClientIntegrationTest.testLogStartOffsetCheckpoint(AdminClientIntegrationTest.scala:755) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:564) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at > java.base/java.lang.Thread.run(Thread.java:844) Caused by: GSSException: No > valid credentials provided (Mechanism level: Request is a replay (34) - > Request is a replay) at > java.security.jgss/sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:771) > at > java.security.jgss/sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:265) > at > java.security.jgss/sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:196) > at > jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192) > ... 37 more Caused by: KrbException: Request is a replay (34) - Request is a > replay at > java.security.jgss/sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:73) at > java.security.jgss/sun.security.krb5.KrbTgsReq.getReply(KrbTgsReq.java:251) > at > java.security.jgss/sun.security.krb5.KrbTgsReq.sendAndGetCreds(KrbTgsReq.java:262) > at > java.security.jgss/sun.security.krb5.internal.CredentialsUtil.serviceCreds(CredentialsUtil.java:308) > at > java.security.jgss/sun.security.krb5.internal.CredentialsUtil.acquireServiceCreds(CredentialsUtil.java:126) > at > java.security.jgss/sun.security.krb5.Credentials.acquireServiceCreds(Credentials.java:458) > at > java.security.jgss/sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:695) > ... 40 more Caused by: KrbException: Identifier doesn't match expected value > (906) at > java.security.jgss/sun.security.krb5.internal.KDCRep.init(KDCRep.java:140) at > java.security.jgss/sun.security.krb5.internal.TGSRep.init(TGSRep.java:65) at > java.security.jgss/sun.security.krb5.internal.TGSRep.<init>(TGSRep.java:60) > at java.security.jgss/sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:55) > ... 46 more > {quote} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)