This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 0.11.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.11.0 by this push: new 73c646c KAFKA-6529: Stop file descriptor leak when client disconnects with staged receives (#4517) 73c646c is described below commit 73c646c442fd17e3f9919eb2fd50fdac75e32917 Author: parafiend <darthfrogge...@hotmail.com> AuthorDate: Fri Feb 9 04:59:18 2018 -0800 KAFKA-6529: Stop file descriptor leak when client disconnects with staged receives (#4517) If an exception is encountered while sending data to a client connection, that connection is disconnected. If there are staged receives for that connection, they are tracked to process those records. However, if the exception was encountered during processing a `RequestChannel.Request`, the `KafkaChannel` for that connection is muted and won't be processed. Disable processing of outstanding staged receives if a send fails. This stops the leak of the memory for pending requests and the file descriptor of the TCP socket. Test that a channel is closed when an exception is raised while writing to a socket that has been closed by the client. Since sending a response requires acks != 0, allow specifying the required acks for test requests in SocketServerTest.scala. Author: Graham Campbell <graham.campb...@salesforce.com> Reviewers: Jason Gustafson <ja...@confluent.io>, Rajini Sivaram <rajinisiva...@googlemail.com>, Ismael Juma <ism...@juma.me.uk>, Ted Yu <yuzhih...@gmail.com> --- checkstyle/suppressions.xml | 2 +- .../org/apache/kafka/common/network/Selector.java | 44 ++++++++++---- .../main/scala/kafka/network/SocketServer.scala | 8 +++ .../unit/kafka/network/SocketServerTest.scala | 68 ++++++++++++++++++---- 4 files changed, 99 insertions(+), 23 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 9825671..c35c77c 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -54,7 +54,7 @@ files="AbstractRequest.java"/> <suppress checks="NPathComplexity" - files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender|Serdes|PluginUtils).java"/> + files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Selector|Sender|Serdes|PluginUtils).java"/> <!-- clients tests --> <suppress checks="ClassDataAbstractionCoupling" diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 00392d0..06d0f96 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -269,9 +269,17 @@ public class Selector implements Selectable, AutoCloseable { KafkaChannel channel = channelOrFail(connectionId, false); try { channel.setSend(send); - } catch (CancelledKeyException e) { + } catch (Exception e) { + // update the state for consistency, the channel will be discarded after `close` + channel.state(ChannelState.FAILED_SEND); + // ensure notification via `disconnected` when `failedSends` are processed in the next poll this.failedSends.add(connectionId); - close(channel, false); + close(channel, false, false); + if (!(e instanceof CancelledKeyException)) { + log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}", + connectionId, e); + throw e; + } } } } @@ -354,6 +362,7 @@ public class Selector implements Selectable, AutoCloseable { if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), currentTimeNanos); + boolean sendFailed = false; try { /* complete any connections that have finished their handshake (either normally or immediately) */ @@ -384,7 +393,13 @@ public class Selector implements Selectable, AutoCloseable { /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ if (channel.ready() && key.isWritable()) { - Send send = channel.write(); + Send send = null; + try { + send = channel.write(); + } catch (Exception e) { + sendFailed = true; + throw e; + } if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); @@ -393,7 +408,7 @@ public class Selector implements Selectable, AutoCloseable { /* cancel any defunct sockets */ if (!key.isValid()) - close(channel, true); + close(channel, true, true); } catch (Exception e) { String desc = channel.socketDescription(); @@ -401,7 +416,7 @@ public class Selector implements Selectable, AutoCloseable { log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); - close(channel, true); + close(channel, !sendFailed, true); } finally { maybeRecordTimePerConnection(channel, channelStartTimeNanos); } @@ -479,7 +494,7 @@ public class Selector implements Selectable, AutoCloseable { log.trace("About to close the idle connection from {} due to being idle for {} millis", connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000); channel.state(ChannelState.EXPIRED); - close(channel, true); + close(channel, true, true); } } } @@ -538,7 +553,12 @@ public class Selector implements Selectable, AutoCloseable { // There is no disconnect notification for local close, but updating // channel state here anyway to avoid confusion. channel.state(ChannelState.LOCAL_CLOSE); - close(channel, false); + close(channel, false, false); + } else { + KafkaChannel closingChannel = this.closingChannels.remove(id); + // Close any closing channel, leave the channel in the state in which closing was triggered + if (closingChannel != null) + doClose(closingChannel, false); } } @@ -553,7 +573,10 @@ public class Selector implements Selectable, AutoCloseable { * closed immediately. The channel will not be added to disconnected list and it is the * responsibility of the caller to handle disconnect notifications. */ - private void close(KafkaChannel channel, boolean processOutstanding) { + private void close(KafkaChannel channel, boolean processOutstanding, boolean notifyDisconnect) { + + if (processOutstanding && !notifyDisconnect) + throw new IllegalStateException("Disconnect notification required for remote disconnect after processing outstanding requests"); channel.disconnect(); @@ -571,8 +594,9 @@ public class Selector implements Selectable, AutoCloseable { if (processOutstanding && deque != null && !deque.isEmpty()) { // stagedReceives will be moved to completedReceives later along with receives from other channels closingChannels.put(channel.id(), channel); + log.debug("Tracking closing connection {} to process outstanding requests", channel.id()); } else - doClose(channel, processOutstanding); + doClose(channel, notifyDisconnect); this.channels.remove(channel.id()); if (idleExpiryManager != null) @@ -700,7 +724,7 @@ public class Selector implements Selectable, AutoCloseable { } // only for testing - int numStagedReceives(KafkaChannel channel) { + public int numStagedReceives(KafkaChannel channel) { Deque<NetworkReceive> deque = stagedReceives.get(channel); return deque == null ? 0 : deque.size(); } diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 6db70cf..af94231 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -605,6 +605,14 @@ private[kafka] class Processor(val id: Int, private[network] def channel(connectionId: String): Option[KafkaChannel] = Option(selector.channel(connectionId)) + /* For test usage */ + private[network] def openOrClosingChannel(connectionId: String): Option[KafkaChannel] = + channel(connectionId).orElse(Option(selector.closingChannel(connectionId))) + + // Visible for testing + private[network] def numStagedReceives(connectionId: String): Int = + openOrClosingChannel(connectionId).map(c => selector.numStagedReceives(c)).getOrElse(0) + /** * Wakeup the thread for selection. */ diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index acf96e8..a3897c0 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -65,7 +65,7 @@ class SocketServerTest extends JUnitSuite { server.startup() val sockets = new ArrayBuffer[Socket] - def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None) { + def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None, flush: Boolean = true) { val outgoing = new DataOutputStream(socket.getOutputStream) id match { case Some(id) => @@ -75,7 +75,8 @@ class SocketServerTest extends JUnitSuite { outgoing.writeInt(request.length) } outgoing.write(request) - outgoing.flush() + if (flush) + outgoing.flush() } def receiveResponse(socket: Socket): Array[Byte] = { @@ -86,10 +87,15 @@ class SocketServerTest extends JUnitSuite { response } + private def receiveRequest(channel: RequestChannel, timeout: Long = 2000L): RequestChannel.Request = { + val request = channel.receiveRequest(timeout) + assertNotNull("receiveRequest timed out", request) + request + } + /* A simple request handler that just echos back the response */ def processRequest(channel: RequestChannel) { - val request = channel.receiveRequest(2000) - assertNotNull("receiveRequest timed out", request) + val request = receiveRequest(channel) processRequest(channel, request) } @@ -115,12 +121,11 @@ class SocketServerTest extends JUnitSuite { sockets.clear() } - private def producerRequestBytes: Array[Byte] = { + private def producerRequestBytes(ack: Short = 0): Array[Byte] = { val apiKey: Short = 0 val correlationId = -1 val clientId = "" val ackTimeoutMs = 10000 - val ack = 0: Short val emptyRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, ack, ackTimeoutMs, new HashMap[TopicPartition, MemoryRecords]()).build() @@ -133,11 +138,30 @@ class SocketServerTest extends JUnitSuite { serializedBytes } + private def sendRequestsUntilStagedReceive(server: SocketServer, socket: Socket, requestBytes: Array[Byte]): RequestChannel.Request = { + def sendTwoRequestsReceiveOne(): RequestChannel.Request = { + sendRequest(socket, requestBytes, flush = false) + sendRequest(socket, requestBytes, flush = true) + receiveRequest(server.requestChannel) + } + val (request, hasStagedReceives) = TestUtils.computeUntilTrue(sendTwoRequestsReceiveOne()) { req => + val connectionId = req.connectionId + val hasStagedReceives = server.processor(0).numStagedReceives(connectionId) > 0 + if (!hasStagedReceives) { + processRequest(server.requestChannel, req) + processRequest(server.requestChannel) + } + hasStagedReceives + } + assertTrue(s"Receives not staged for ${org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS} ms", hasStagedReceives) + request + } + @Test def simpleRequest() { val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) val traceSocket = connect(protocol = SecurityProtocol.TRACE) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() // Test PLAINTEXT socket sendRequest(plainSocket, serializedBytes) @@ -171,7 +195,7 @@ class SocketServerTest extends JUnitSuite { @Test def testGracefulClose() { val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() for (_ <- 0 until 10) sendRequest(plainSocket, serializedBytes) @@ -236,7 +260,7 @@ class SocketServerTest extends JUnitSuite { TestUtils.waitUntilTrue(() => server.connectionCount(address) < conns.length, "Failed to decrement connection count after close") val conn2 = connect() - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() sendRequest(conn2, serializedBytes) val request = server.requestChannel.receiveRequest(2000) assertNotNull(request) @@ -255,7 +279,7 @@ class SocketServerTest extends JUnitSuite { val conns = (0 until overrideNum).map(_ => connect(overrideServer)) // it should succeed - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() sendRequest(conns.last, serializedBytes) val request = overrideServer.requestChannel.receiveRequest(2000) assertNotNull(request) @@ -341,7 +365,7 @@ class SocketServerTest extends JUnitSuite { try { overrideServer.startup() conn = connect(overrideServer) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() sendRequest(conn, serializedBytes) val channel = overrideServer.requestChannel @@ -367,6 +391,26 @@ class SocketServerTest extends JUnitSuite { } } + @Test + def testClientDisconnectionWithStagedReceivesFullyProcessed() { + val socket = connect(server) + + // Setup channel to client with staged receives so when client disconnects + // it will be stored in Selector.closingChannels + val serializedBytes = producerRequestBytes(1) + val request = sendRequestsUntilStagedReceive(server, socket, serializedBytes) + val connectionId = request.connectionId + + // Set SoLinger to 0 to force a hard disconnect via TCP RST + socket.setSoLinger(true, 0) + socket.close() + + // Complete request with socket exception so that the channel is removed from Selector.closingChannels + processRequest(server.requestChannel, request) + TestUtils.waitUntilTrue(() => server.processor(0).openOrClosingChannel(connectionId).isEmpty, + "Channel not closed after failed send") + } + /* * Test that we update request metrics if the channel has been removed from the selector when the broker calls * `selector.send` (selector closes old connections, for example). @@ -381,7 +425,7 @@ class SocketServerTest extends JUnitSuite { try { overrideServer.startup() conn = connect(overrideServer) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() sendRequest(conn, serializedBytes) val channel = overrideServer.requestChannel val request = channel.receiveRequest(2000) -- To stop receiving notification emails like this one, please contact rsiva...@apache.org.