[ https://issues.apache.org/jira/browse/KAFKA-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646291#comment-16646291 ]
ASF GitHub Bot commented on KAFKA-7475: --------------------------------------- rajinisivaram closed pull request #5729: KAFKA-7475 - capture remote address on connection authetication errors, and log it URL: https://github.com/apache/kafka/pull/5729 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index b2098bfa065..8ec51ed309d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -649,6 +649,7 @@ private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer respo * @param responses The list of responses to update * @param nodeId Id of the node to be disconnected * @param now The current time + * @param disconnectState The state of the disconnected channel */ private void processDisconnection(List<ClientResponse> responses, String nodeId, @@ -662,15 +663,15 @@ private void processDisconnection(List<ClientResponse> responses, AuthenticationException exception = disconnectState.exception(); connectionStates.authenticationFailed(nodeId, now, exception); metadataUpdater.handleAuthenticationFailure(exception); - log.error("Connection to node {} failed authentication due to: {}", nodeId, exception.getMessage()); + log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId, disconnectState.remoteAddress(), exception.getMessage()); break; case AUTHENTICATE: // This warning applies to older brokers which don't provide feedback on authentication failures - log.warn("Connection to node {} terminated during authentication. This may indicate " + - "that authentication failed due to invalid credentials.", nodeId); + log.warn("Connection to node {} ({}) terminated during authentication. This may indicate " + + "that authentication failed due to invalid credentials.", nodeId, disconnectState.remoteAddress()); break; case NOT_CONNECTED: - log.warn("Connection to node {} could not be established. Broker may not be available.", nodeId); + log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress()); break; default: break; // Disconnections in other states are logged at debug level in Selector diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java index 2d584bd80ce..5f6dfb92bd2 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java @@ -75,12 +75,20 @@ private final State state; private final AuthenticationException exception; + private final String remoteAddress; + public ChannelState(State state) { - this(state, null); + this(state, null, null); + } + + public ChannelState(State state, String remoteAddress) { + this(state, null, remoteAddress); } - public ChannelState(State state, AuthenticationException exception) { + + public ChannelState(State state, AuthenticationException exception, String remoteAddress) { this.state = state; this.exception = exception; + this.remoteAddress = remoteAddress; } public State state() { @@ -90,4 +98,8 @@ public State state() { public AuthenticationException exception() { return exception; } + + public String remoteAddress() { + return remoteAddress; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 28951282a9f..6a12ef23310 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.network; +import java.net.SocketAddress; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.security.auth.KafkaPrincipal; @@ -25,6 +26,7 @@ import java.net.InetAddress; import java.net.Socket; import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; import java.util.Objects; public class KafkaChannel { @@ -89,6 +91,7 @@ private boolean disconnected; private ChannelMuteState muteState; private ChannelState state; + private SocketAddress remoteAddress; public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize, MemoryPool memoryPool) { this.id = id; @@ -131,7 +134,8 @@ public void prepare() throws AuthenticationException, IOException { } catch (AuthenticationException e) { // Clients are notified of authentication exceptions to enable operations to be terminated // without retries. Other errors are handled as network exceptions in Selector. - state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED, e); + String remoteDesc = remoteAddress != null ? remoteAddress.toString() : null; + state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED, e, remoteDesc); if (authenticating) { delayCloseOnAuthenticationFailure(); throw new DelayedResponseAuthenticationException(e); @@ -144,6 +148,10 @@ public void prepare() throws AuthenticationException, IOException { public void disconnect() { disconnected = true; + if (state == ChannelState.NOT_CONNECTED && remoteAddress != null) { + //if we captured the remote address we can provide more information + state = new ChannelState(ChannelState.State.NOT_CONNECTED, remoteAddress.toString()); + } transportLayer.disconnect(); } @@ -156,9 +164,22 @@ public ChannelState state() { } public boolean finishConnect() throws IOException { + //we need to grab remoteAddr before finishConnect() is called otherwise + //it becomes inaccessible if the connection was refused. + SocketChannel socketChannel = transportLayer.socketChannel(); + if (socketChannel != null) { + remoteAddress = socketChannel.getRemoteAddress(); + } boolean connected = transportLayer.finishConnect(); - if (connected) - state = ready() ? ChannelState.READY : ChannelState.AUTHENTICATE; + if (connected) { + if (ready()) { + state = ChannelState.READY; + } else if (remoteAddress != null) { + state = new ChannelState(ChannelState.State.AUTHENTICATE, remoteAddress.toString()); + } else { + state = ChannelState.AUTHENTICATE; + } + } return connected; } diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 297cba5d2a5..c090b6b4323 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -1282,7 +1282,7 @@ private void verifySaslAuthenticateHeaderInteropWithFailure(boolean enableHeader // Without SASL_AUTHENTICATE headers, disconnect state is ChannelState.AUTHENTICATE which is // a hint that channel was closed during authentication, unlike ChannelState.AUTHENTICATE_FAILED // which is an actual authentication failure reported by the broker. - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state()); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATE); } private void createServer(SecurityProtocol securityProtocol, String saslMechanism, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > print the actual cluster bootstrap address on authentication failures > --------------------------------------------------------------------- > > Key: KAFKA-7475 > URL: https://issues.apache.org/jira/browse/KAFKA-7475 > Project: Kafka > Issue Type: Improvement > Reporter: radai rosenblatt > Assignee: radai rosenblatt > Priority: Major > > currently when a kafka client fails to connect to a cluster, users see > something like this: > {code} > Connection to node -1 terminated during authentication. This may indicate > that authentication failed due to invalid credentials. > {code} > that log line is mostly useless in identifying which (of potentially many) > kafka client is having issues and what kafka cluster is it having issues with. > would be nice to record the remote host/port -- This message was sent by Atlassian JIRA (v7.6.3#76005)