[
https://issues.apache.org/jira/browse/KAFKA-7168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16547706#comment-16547706
]
ASF GitHub Bot commented on KAFKA-7168:
---------------------------------------
rajinisivaram closed pull request #5371: KAFKA-7168: Treat connection close
during SSL handshake as retriable
URL: https://github.com/apache/kafka/pull/5371
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 06e7e937886..838a6a75af3 100644
---
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -31,8 +31,10 @@
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
-import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLKeyException;
import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLProtocolException;
+import javax.net.ssl.SSLSession;
import org.apache.kafka.common.errors.SslAuthenticationException;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
@@ -255,17 +257,17 @@ public void handshake() throws IOException {
doHandshake();
} catch (SSLException e) {
- handshakeFailure(e, true);
+ maybeProcessHandshakeFailure(e, true, null);
} catch (IOException e) {
maybeThrowSslAuthenticationException();
// this exception could be due to a write. If there is data
available to unwrap,
- // process the data so that any SSLExceptions are reported
+ // process the data so that any SSL handshake exceptions are
reported
if (handshakeStatus == HandshakeStatus.NEED_UNWRAP &&
netReadBuffer.position() > 0) {
try {
handshakeUnwrap(false);
} catch (SSLException e1) {
- handshakeFailure(e1, false);
+ maybeProcessHandshakeFailure(e1, false, e);
}
}
// If we get here, this is not a handshake failure, throw the
original IOException
@@ -824,6 +826,32 @@ private void handshakeFailure(SSLException sslException,
boolean flush) throws I
throw handshakeException;
}
+ // SSL handshake failures are typically thrown as SSLHandshakeException,
SSLProtocolException,
+ // SSLPeerUnverifiedException or SSLKeyException if the cause is known.
These exceptions indicate
+ // authentication failures (e.g. configuration errors) which should not be
retried. But the SSL engine
+ // may also throw exceptions using the base class SSLException in a few
cases:
+ // a) If there are no matching ciphers or TLS version or the private key
is invalid, client will be
+ // unable to process the server message and an SSLException is thrown:
+ // javax.net.ssl.SSLException: Unrecognized SSL message, plaintext
connection?
+ // b) If server closes the connection gracefully during handshake,
client may receive close_notify
+ // and and an SSLException is thrown:
+ // javax.net.ssl.SSLException: Received close_notify during handshake
+ // We want to handle a) as a non-retriable SslAuthenticationException and
b) as a retriable IOException.
+ // To do this we need to rely on the exception string. Since it is safer
to throw a retriable exception
+ // when we are not sure, we will treat only the first exception string as
a handshake exception.
+ private void maybeProcessHandshakeFailure(SSLException sslException,
boolean flush, IOException ioException) throws IOException {
+ if (sslException instanceof SSLHandshakeException || sslException
instanceof SSLProtocolException ||
+ sslException instanceof SSLPeerUnverifiedException ||
sslException instanceof SSLKeyException ||
+ sslException.getMessage().contains("Unrecognized SSL message"))
+ handshakeFailure(sslException, flush);
+ else if (ioException == null)
+ throw sslException;
+ else {
+ log.debug("SSLException while unwrapping data after IOException,
original IOException will be propagated", sslException);
+ throw ioException;
+ }
+ }
+
// If handshake has already failed, throw the authentication exception.
private void maybeThrowSslAuthenticationException() {
if (handshakeException != null)
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 2ce9671736e..0c81b53742d 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -64,7 +64,8 @@
private volatile WritableByteChannel outputChannel;
private final CredentialCache credentialCache;
private final Metrics metrics;
- private int numSent = 0;
+ private volatile int numSent = 0;
+ private volatile boolean closeKafkaChannels;
private final DelegationTokenCache tokenCache;
public NioEchoServer(ListenerName listenerName, SecurityProtocol
securityProtocol, AbstractConfig config,
@@ -155,6 +156,11 @@ public void run() {
}
newChannels.clear();
}
+ if (closeKafkaChannels) {
+ for (KafkaChannel channel : selector.channels())
+ selector.close(channel.id());
+ closeKafkaChannels = false;
+ }
List<NetworkReceive> completedReceives =
selector.completedReceives();
for (NetworkReceive rcv : completedReceives) {
@@ -174,7 +180,6 @@ public void run() {
selector.unmute(send.destination());
numSent += 1;
}
-
}
} catch (IOException e) {
// ignore
@@ -208,15 +213,26 @@ public Selector selector() {
return selector;
}
- public void closeConnections() throws IOException {
- for (SocketChannel channel : socketChannels)
+ public void closeKafkaChannels() throws IOException {
+ closeKafkaChannels = true;
+ selector.wakeup();
+ try {
+ TestUtils.waitForCondition(() -> selector.channels().isEmpty(),
"Channels not closed");
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void closeSocketChannels() throws IOException {
+ for (SocketChannel channel : socketChannels) {
channel.close();
+ }
socketChannels.clear();
}
public void close() throws IOException, InterruptedException {
this.serverSocketChannel.close();
- closeConnections();
+ closeSocketChannels();
acceptorThread.interrupt();
acceptorThread.join();
interrupt();
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 1f62c10bd51..6aef2f7eda6 100644
---
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -699,7 +699,8 @@ public boolean conditionMet() {
*/
@Test
public void testIOExceptionsDuringHandshakeRead() throws Exception {
- testIOExceptionsDuringHandshake(true, false);
+ server = createEchoServer(SecurityProtocol.SSL);
+ testIOExceptionsDuringHandshake(FailureAction.THROW_IO_EXCEPTION,
FailureAction.NO_OP);
}
/**
@@ -707,20 +708,60 @@ public void testIOExceptionsDuringHandshakeRead() throws
Exception {
*/
@Test
public void testIOExceptionsDuringHandshakeWrite() throws Exception {
- testIOExceptionsDuringHandshake(false, true);
+ server = createEchoServer(SecurityProtocol.SSL);
+ testIOExceptionsDuringHandshake(FailureAction.NO_OP,
FailureAction.THROW_IO_EXCEPTION);
+ }
+
+ /**
+ * Tests that if the remote end closes connection ungracefully during SSL
handshake while reading data,
+ * the disconnection is not treated as an authentication failure.
+ */
+ @Test
+ public void testUngracefulRemoteCloseDuringHandshakeRead() throws
Exception {
+ server = createEchoServer(SecurityProtocol.SSL);
+ testIOExceptionsDuringHandshake(server::closeSocketChannels,
FailureAction.NO_OP);
+ }
+
+ /**
+ * Tests that if the remote end closes connection ungracefully during SSL
handshake while writing data,
+ * the disconnection is not treated as an authentication failure.
+ */
+ @Test
+ public void testUngracefulRemoteCloseDuringHandshakeWrite() throws
Exception {
+ server = createEchoServer(SecurityProtocol.SSL);
+ testIOExceptionsDuringHandshake(FailureAction.NO_OP,
server::closeSocketChannels);
}
- private void testIOExceptionsDuringHandshake(boolean failRead, boolean
failWrite) throws Exception {
+ /**
+ * Tests that if the remote end closes the connection during SSL handshake
while reading data,
+ * the disconnection is not treated as an authentication failure.
+ */
+ @Test
+ public void testGracefulRemoteCloseDuringHandshakeRead() throws Exception {
+ server = createEchoServer(SecurityProtocol.SSL);
+ testIOExceptionsDuringHandshake(FailureAction.NO_OP,
server::closeKafkaChannels);
+ }
+
+ /**
+ * Tests that if the remote end closes the connection during SSL handshake
while writing data,
+ * the disconnection is not treated as an authentication failure.
+ */
+ @Test
+ public void testGracefulRemoteCloseDuringHandshakeWrite() throws Exception
{
server = createEchoServer(SecurityProtocol.SSL);
+ testIOExceptionsDuringHandshake(server::closeKafkaChannels,
FailureAction.NO_OP);
+ }
+
+ private void testIOExceptionsDuringHandshake(FailureAction
readFailureAction,
+ FailureAction
flushFailureAction) throws Exception {
TestSslChannelBuilder channelBuilder = new
TestSslChannelBuilder(Mode.CLIENT);
boolean done = false;
for (int i = 1; i <= 100; i++) {
- int readFailureIndex = failRead ? i : Integer.MAX_VALUE;
- int flushFailureIndex = failWrite ? i : Integer.MAX_VALUE;
String node = String.valueOf(i);
- channelBuilder.readFailureIndex = readFailureIndex;
- channelBuilder.flushFailureIndex = flushFailureIndex;
+ channelBuilder.readFailureAction = readFailureAction;
+ channelBuilder.flushFailureAction = flushFailureAction;
+ channelBuilder.failureIndex = i;
channelBuilder.configure(sslClientConfigs);
this.selector = new Selector(5000, new Metrics(), new MockTime(),
"MetricGroup", channelBuilder, new LogContext());
@@ -734,7 +775,9 @@ private void testIOExceptionsDuringHandshake(boolean
failRead, boolean failWrite
break;
}
if (selector.disconnected().containsKey(node)) {
- assertEquals(ChannelState.State.AUTHENTICATE,
selector.disconnected().get(node).state());
+ ChannelState.State state =
selector.disconnected().get(node).state();
+ assertTrue("Unexpected channel state " + state,
+ state == ChannelState.State.AUTHENTICATE || state
== ChannelState.State.READY);
break;
}
}
@@ -973,13 +1016,23 @@ private NioEchoServer createEchoServer(SecurityProtocol
securityProtocol) throws
return
createEchoServer(ListenerName.forSecurityProtocol(securityProtocol),
securityProtocol);
}
+ @FunctionalInterface
+ private interface FailureAction {
+ FailureAction NO_OP = () -> { };
+ FailureAction THROW_IO_EXCEPTION = () -> {
+ throw new IOException("Test IO exception");
+ };
+ void run() throws IOException;
+ }
+
private static class TestSslChannelBuilder extends SslChannelBuilder {
private Integer netReadBufSizeOverride;
private Integer netWriteBufSizeOverride;
private Integer appBufSizeOverride;
- long readFailureIndex = Long.MAX_VALUE;
- long flushFailureIndex = Long.MAX_VALUE;
+ private long failureIndex = Long.MAX_VALUE;
+ FailureAction readFailureAction = FailureAction.NO_OP;
+ FailureAction flushFailureAction = FailureAction.NO_OP;
int flushDelayCount = 0;
public TestSslChannelBuilder(Mode mode) {
@@ -1029,8 +1082,8 @@ public TestSslTransportLayer(String channelId,
SelectionKey key, SSLEngine sslEn
this.netReadBufSize = new
ResizeableBufferSize(netReadBufSizeOverride);
this.netWriteBufSize = new
ResizeableBufferSize(netWriteBufSizeOverride);
this.appBufSize = new ResizeableBufferSize(appBufSizeOverride);
- numReadsRemaining = new AtomicLong(readFailureIndex);
- numFlushesRemaining = new AtomicLong(flushFailureIndex);
+ numReadsRemaining = new AtomicLong(failureIndex);
+ numFlushesRemaining = new AtomicLong(failureIndex);
numDelayedFlushesRemaining = new
AtomicInteger(flushDelayCount);
}
@@ -1058,14 +1111,14 @@ protected int applicationBufferSize() {
@Override
protected int readFromSocketChannel() throws IOException {
if (numReadsRemaining.decrementAndGet() == 0 && !ready())
- throw new IOException("Test exception during read");
+ readFailureAction.run();
return super.readFromSocketChannel();
}
@Override
protected boolean flush(ByteBuffer buf) throws IOException {
if (numFlushesRemaining.decrementAndGet() == 0 && !ready())
- throw new IOException("Test exception during write");
+ flushFailureAction.run();
else if (numDelayedFlushesRemaining.getAndDecrement() != 0)
return false;
resetDelayedFlush();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Broker shutdown during SSL handshake may be handled as handshake failure
> ------------------------------------------------------------------------
>
> Key: KAFKA-7168
> URL: https://issues.apache.org/jira/browse/KAFKA-7168
> Project: Kafka
> Issue Type: Bug
> Components: security
> Affects Versions: 1.0.2, 1.1.1, 2.0.0
> Reporter: Rajini Sivaram
> Assignee: Rajini Sivaram
> Priority: Major
>
> If broker is shutdown while SSL handshake of a client connection is in
> progress, the client may process the resulting SSLException as a
> non-retriable handshake failure rather than a retriable I/O exception. This
> can cause streams applications to fail during rolling restarts.
> Exception stack trace:
> {quote}
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake
> failed
> Caused by: javax.net.ssl.SSLException: Received close_notify during handshake
> at sun.security.ssl.Alerts.getSSLException(Alerts.java:208)
> at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1639)
> at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1607)
> at sun.security.ssl.SSLEngineImpl.recvAlert(SSLEngineImpl.java:1752)
> at sun.security.ssl.SSLEngineImpl.readRecord(SSLEngineImpl.java:1068)
> at
> sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:890)
> at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:764)
> at javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:624)
> at
> org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:465)
> at
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:266)
> at
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:88)
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:474)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:258)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:206)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:219)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:284)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:848)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:805)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:771)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:741)
> {quote}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)