This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push: new 7c6244f KAFKA-6529: Stop file descriptor leak when client disconnects with staged receives (#4517) 7c6244f is described below commit 7c6244f73b95e1104904f2b4b9f6a6bf282a5cba Author: parafiend <darthfrogge...@hotmail.com> AuthorDate: Fri Feb 9 04:59:18 2018 -0800 KAFKA-6529: Stop file descriptor leak when client disconnects with staged receives (#4517) If an exception is encountered while sending data to a client connection, that connection is disconnected. If there are staged receives for that connection, they are tracked to process those records. However, if the exception was encountered during processing a `RequestChannel.Request`, the `KafkaChannel` for that connection is muted and won't be processed. Disable processing of outstanding staged receives if a send fails. This stops the leak of the memory for pending requests and the file descriptor of the TCP socket. Test that a channel is closed when an exception is raised while writing to a socket that has been closed by the client. Since sending a response requires acks != 0, allow specifying the required acks for test requests in SocketServerTest.scala. Author: Graham Campbell <graham.campb...@salesforce.com> Reviewers: Jason Gustafson <ja...@confluent.io>, Rajini Sivaram <rajinisiva...@googlemail.com>, Ismael Juma <ism...@juma.me.uk>, Ted Yu <yuzhih...@gmail.com> --- .../org/apache/kafka/common/network/Selector.java | 29 +++++--- .../unit/kafka/network/SocketServerTest.scala | 83 +++++++++++++++++----- 2 files changed, 85 insertions(+), 27 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 6bfcfd2..ed037b3 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -325,9 +325,9 @@ public class Selector implements Selectable, AutoCloseable { } catch (Exception e) { // update the state for consistency, the channel will be discarded after `close` channel.state(ChannelState.FAILED_SEND); - // ensure notification via `disconnected` + // ensure notification via `disconnected` when `failedSends` are processed in the next poll this.failedSends.add(connectionId); - close(channel, false); + close(channel, false, false); if (!(e instanceof CancelledKeyException)) { log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}", connectionId, e); @@ -450,6 +450,7 @@ public class Selector implements Selectable, AutoCloseable { if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), currentTimeNanos); + boolean sendFailed = false; try { /* complete any connections that have finished their handshake (either normally or immediately) */ @@ -491,7 +492,13 @@ public class Selector implements Selectable, AutoCloseable { /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ if (channel.ready() && key.isWritable()) { - Send send = channel.write(); + Send send = null; + try { + send = channel.write(); + } catch (Exception e) { + sendFailed = true; + throw e; + } if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); @@ -500,7 +507,7 @@ public class Selector implements Selectable, AutoCloseable { /* cancel any defunct sockets */ if (!key.isValid()) - close(channel, true); + close(channel, true, true); } catch (Exception e) { String desc = channel.socketDescription(); @@ -510,7 +517,7 @@ public class Selector implements Selectable, AutoCloseable { log.debug("Connection with {} disconnected due to authentication exception", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); - close(channel, true); + close(channel, !sendFailed, true); } finally { maybeRecordTimePerConnection(channel, channelStartTimeNanos); } @@ -620,7 +627,7 @@ public class Selector implements Selectable, AutoCloseable { log.trace("About to close the idle connection from {} due to being idle for {} millis", connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000); channel.state(ChannelState.EXPIRED); - close(channel, true); + close(channel, true, true); } } } @@ -674,7 +681,7 @@ public class Selector implements Selectable, AutoCloseable { // There is no disconnect notification for local close, but updating // channel state here anyway to avoid confusion. channel.state(ChannelState.LOCAL_CLOSE); - close(channel, false); + close(channel, false, false); } else { KafkaChannel closingChannel = this.closingChannels.remove(id); // Close any closing channel, leave the channel in the state in which closing was triggered @@ -694,7 +701,10 @@ public class Selector implements Selectable, AutoCloseable { * closed immediately. The channel will not be added to disconnected list and it is the * responsibility of the caller to handle disconnect notifications. */ - private void close(KafkaChannel channel, boolean processOutstanding) { + private void close(KafkaChannel channel, boolean processOutstanding, boolean notifyDisconnect) { + + if (processOutstanding && !notifyDisconnect) + throw new IllegalStateException("Disconnect notification required for remote disconnect after processing outstanding requests"); channel.disconnect(); @@ -712,8 +722,9 @@ public class Selector implements Selectable, AutoCloseable { if (processOutstanding && deque != null && !deque.isEmpty()) { // stagedReceives will be moved to completedReceives later along with receives from other channels closingChannels.put(channel.id(), channel); + log.debug("Tracking closing connection {} to process outstanding requests", channel.id()); } else - doClose(channel, processOutstanding); + doClose(channel, notifyDisconnect); this.channels.remove(channel.id()); if (idleExpiryManager != null) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 558098b..954d7e6 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -147,7 +147,7 @@ class SocketServerTest extends JUnitSuite { } def sendAndReceiveRequest(socket: Socket, server: SocketServer): RequestChannel.Request = { - sendRequest(socket, producerRequestBytes) + sendRequest(socket, producerRequestBytes()) receiveRequest(server.requestChannel) } @@ -156,11 +156,10 @@ class SocketServerTest extends JUnitSuite { server.metrics.close() } - private def producerRequestBytes: Array[Byte] = { + private def producerRequestBytes(ack: Short = 0): Array[Byte] = { val correlationId = -1 val clientId = "" val ackTimeoutMs = 10000 - val ack = 0: Short val emptyRequest = ProduceRequest.Builder.forCurrentMagic(ack, ackTimeoutMs, new HashMap[TopicPartition, MemoryRecords]()).build() @@ -176,7 +175,7 @@ class SocketServerTest extends JUnitSuite { @Test def simpleRequest() { val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() // Test PLAINTEXT socket sendRequest(plainSocket, serializedBytes) @@ -205,7 +204,7 @@ class SocketServerTest extends JUnitSuite { @Test def testGracefulClose() { val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() for (_ <- 0 until 10) sendRequest(plainSocket, serializedBytes) @@ -220,7 +219,7 @@ class SocketServerTest extends JUnitSuite { @Test def testNoOpAction(): Unit = { val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() for (_ <- 0 until 3) sendRequest(plainSocket, serializedBytes) @@ -234,7 +233,7 @@ class SocketServerTest extends JUnitSuite { @Test def testConnectionId() { val sockets = (1 to 5).map(_ => connect(protocol = SecurityProtocol.PLAINTEXT)) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() val requests = sockets.map{socket => sendRequest(socket, serializedBytes) @@ -263,7 +262,7 @@ class SocketServerTest extends JUnitSuite { try { overrideServer.startup() - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() // Connection with no staged receives val socket1 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT) @@ -346,7 +345,7 @@ class SocketServerTest extends JUnitSuite { // Send requests to `channel1` until a receive is staged and advance time beyond idle time so that `channel1` is // closed with staged receives and is in Selector.closingChannels - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() val request = sendRequestsUntilStagedReceive(overrideServer, socket1, serializedBytes) time.sleep(idleTimeMs + 1) TestUtils.waitUntilTrue(() => openChannel.isEmpty, "Idle channel not closed") @@ -436,7 +435,7 @@ class SocketServerTest extends JUnitSuite { TestUtils.waitUntilTrue(() => server.connectionCount(address) < conns.length, "Failed to decrement connection count after close") val conn2 = connect() - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() sendRequest(conn2, serializedBytes) val request = server.requestChannel.receiveRequest(2000) assertNotNull(request) @@ -455,7 +454,7 @@ class SocketServerTest extends JUnitSuite { val conns = (0 until overrideNum).map(_ => connect(overrideServer)) // it should succeed - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() sendRequest(conns.last, serializedBytes) val request = overrideServer.requestChannel.receiveRequest(2000) assertNotNull(request) @@ -538,7 +537,7 @@ class SocketServerTest extends JUnitSuite { try { overrideServer.startup() conn = connect(overrideServer) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() sendRequest(conn, serializedBytes) val channel = overrideServer.requestChannel @@ -563,6 +562,54 @@ class SocketServerTest extends JUnitSuite { } } + @Test + def testClientDisconnectionWithStagedReceivesFullyProcessed() { + val serverMetrics = new Metrics + @volatile var selector: TestableSelector = null + val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0" + val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) { + override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, + protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { + new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, + config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider, memoryPool, new LogContext()) { + override protected[network] def connectionId(socket: Socket): String = overrideConnectionId + override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = { + val testableSelector = new TestableSelector(config, channelBuilder, time, metrics) + selector = testableSelector + testableSelector + } + } + } + } + + def openChannel: Option[KafkaChannel] = overrideServer.processor(0).channel(overrideConnectionId) + def openOrClosingChannel: Option[KafkaChannel] = overrideServer.processor(0).openOrClosingChannel(overrideConnectionId) + + try { + overrideServer.startup() + val socket = connect(overrideServer) + + TestUtils.waitUntilTrue(() => openChannel.nonEmpty, "Channel not found") + + // Setup channel to client with staged receives so when client disconnects + // it will be stored in Selector.closingChannels + val serializedBytes = producerRequestBytes(1) + val request = sendRequestsUntilStagedReceive(overrideServer, socket, serializedBytes) + + // Set SoLinger to 0 to force a hard disconnect via TCP RST + socket.setSoLinger(true, 0) + socket.close() + + // Complete request with socket exception so that the channel is removed from Selector.closingChannels + processRequest(overrideServer.requestChannel, request) + TestUtils.waitUntilTrue(() => openOrClosingChannel.isEmpty, "Channel not closed after failed send") + assertTrue("Unexpected completed send", selector.completedSends.isEmpty) + } finally { + overrideServer.shutdown() + serverMetrics.close() + } + } + /* * Test that we update request metrics if the channel has been removed from the selector when the broker calls * `selector.send` (selector closes old connections, for example). @@ -577,7 +624,7 @@ class SocketServerTest extends JUnitSuite { try { overrideServer.startup() conn = connect(overrideServer) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() sendRequest(conn, serializedBytes) val channel = overrideServer.requestChannel val request = receiveRequest(channel) @@ -696,7 +743,7 @@ class SocketServerTest extends JUnitSuite { testableSelector.updateMinWakeup(2) val sockets = (1 to 2).map(_ => connect(testableServer)) - sockets.foreach(sendRequest(_, producerRequestBytes)) + sockets.foreach(sendRequest(_, producerRequestBytes())) testableServer.testableSelector.addFailure(SelectorOperation.Send) sockets.foreach(_ => processRequest(testableServer.requestChannel)) @@ -719,7 +766,7 @@ class SocketServerTest extends JUnitSuite { testableSelector.updateMinWakeup(2) val sockets = (1 to 2).map(_ => connect(testableServer)) - sockets.foreach(sendRequest(_, producerRequestBytes)) + sockets.foreach(sendRequest(_, producerRequestBytes())) val requestChannel = testableServer.requestChannel val requests = sockets.map(_ => receiveRequest(requestChannel)) @@ -747,7 +794,7 @@ class SocketServerTest extends JUnitSuite { testableSelector.updateMinWakeup(2) val sockets = (1 to 2).map(_ => connect(testableServer)) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() val request = sendRequestsUntilStagedReceive(testableServer, sockets(0), serializedBytes) sendRequest(sockets(1), serializedBytes) @@ -781,7 +828,7 @@ class SocketServerTest extends JUnitSuite { testableSelector.cachedCompletedReceives.minPerPoll = 2 testableSelector.addFailure(SelectorOperation.Mute) - sockets.foreach(sendRequest(_, producerRequestBytes)) + sockets.foreach(sendRequest(_, producerRequestBytes())) val requests = sockets.map(_ => receiveRequest(requestChannel)) testableSelector.waitForOperations(SelectorOperation.Mute, 2) testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, locallyClosed = true) @@ -905,7 +952,7 @@ class SocketServerTest extends JUnitSuite { // Check new channel behaves as expected val (socket, connectionId) = connectAndProcessRequest(testableServer) - assertArrayEquals(producerRequestBytes, receiveResponse(socket)) + assertArrayEquals(producerRequestBytes(), receiveResponse(socket)) assertNotNull("Channel should not have been closed", selector.channel(connectionId)) assertNull("Channel should not be closing", selector.closingChannel(connectionId)) socket.close() -- To stop receiving notification emails like this one, please contact rsiva...@apache.org.