[ https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517146#comment-16517146 ]
ASF GitHub Bot commented on KAFKA-7012: --------------------------------------- rajinisivaram closed pull request #5237: KAFKA-7012: Don't process SSL channels without data to process URL: https://github.com/apache/kafka/pull/5237 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 334ca79f035..a269f0fd604 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -498,7 +498,9 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys, //this channel has bytes enqueued in intermediary buffers that we could not read //(possibly because no memory). it may be the case that the underlying socket will //not come up in the next poll() and so we need to remember this channel for the - //next poll call otherwise data may be stuck in said buffers forever. + //next poll call otherwise data may be stuck in said buffers forever. If we attempt + //to process buffered data and no progress is made, the channel buffered status is + //cleared to avoid the overhead of checking every time. keysWithBufferedRead.add(key); } diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index 704a19818e2..06e7e937886 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -64,6 +64,7 @@ private ByteBuffer netReadBuffer; private ByteBuffer netWriteBuffer; private ByteBuffer appReadBuffer; + private boolean hasBytesBuffered; private ByteBuffer emptyBuf = ByteBuffer.allocate(0); public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { @@ -503,13 +504,17 @@ public int read(ByteBuffer dst) throws IOException { read = readFromAppBuffer(dst); } + boolean readFromNetwork = false; boolean isClosed = false; // Each loop reads at most once from the socket. while (dst.remaining() > 0) { int netread = 0; netReadBuffer = Utils.ensureCapacity(netReadBuffer, netReadBufferSize()); - if (netReadBuffer.remaining() > 0) + if (netReadBuffer.remaining() > 0) { netread = readFromSocketChannel(); + if (netread > 0) + readFromNetwork = true; + } while (netReadBuffer.position() > 0) { netReadBuffer.flip(); @@ -563,6 +568,7 @@ public int read(ByteBuffer dst) throws IOException { if (netread <= 0 || isClosed) break; } + updateBytesBuffered(readFromNetwork || read > 0); // If data has been read and unwrapped, return the data even if end-of-stream, channel will be closed // on a subsequent poll. return read; @@ -793,6 +799,11 @@ protected ByteBuffer netReadBuffer() { return netReadBuffer; } + // Visibility for testing + protected ByteBuffer appReadBuffer() { + return appReadBuffer; + } + /** * SSL exceptions are propagated as authentication failures so that clients can avoid * retries and report the failure. If `flush` is true, exceptions are propagated after @@ -826,12 +837,22 @@ public boolean isMute() { @Override public boolean hasBytesBuffered() { - return netReadBuffer.position() != 0 || appReadBuffer.position() != 0; + return hasBytesBuffered; + } + + // Update `hasBytesBuffered` status. If any bytes were read from the network or + // if data was returned from read, `hasBytesBuffered` is set to true if any buffered + // data is still remaining. If not, `hasBytesBuffered` is set to false since no progress + // can be made until more data is available to read from the network. + private void updateBytesBuffered(boolean madeProgress) { + if (madeProgress) + hasBytesBuffered = netReadBuffer.position() != 0 || appReadBuffer.position() != 0; + else + hasBytesBuffered = false; } @Override public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException { return fileChannel.transferTo(position, count, this); } - } diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java index 3673d21dae6..a8a4b873028 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java @@ -94,6 +94,7 @@ /** * @return true if channel has bytes to be read in any intermediate buffers + * which may be processed without reading additional data from the network. */ boolean hasBytesBuffered(); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index 1d78e5aa8e1..3bdb07a87c3 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -42,6 +42,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -96,6 +99,13 @@ public void testDisconnectWithIntermediateBufferedBytes() throws Exception { connect(node, new InetSocketAddress("localhost", server.port)); selector.send(createSend(node, request)); + waitForBytesBuffered(selector, node); + + selector.close(node); + verifySelectorEmpty(); + } + + private void waitForBytesBuffered(Selector selector, String node) throws Exception { TestUtils.waitForCondition(new TestCondition() { @Override public boolean conditionMet() { @@ -107,8 +117,72 @@ public boolean conditionMet() { } } }, 2000L, "Failed to reach socket state with bytes buffered"); + } - selector.close(node); + @Test + public void testBytesBufferedChannelWithNoIncomingBytes() throws Exception { + verifyNoUnnecessaryPollWithBytesBuffered(key -> + key.interestOps(key.interestOps() & ~SelectionKey.OP_READ)); + } + + @Test + public void testBytesBufferedChannelAfterMute() throws Exception { + verifyNoUnnecessaryPollWithBytesBuffered(key -> ((KafkaChannel) key.attachment()).mute()); + } + + private void verifyNoUnnecessaryPollWithBytesBuffered(Consumer<SelectionKey> disableRead) + throws Exception { + this.selector.close(); + + String node1 = "1"; + String node2 = "2"; + final AtomicInteger node1Polls = new AtomicInteger(); + + this.channelBuilder = new TestSslChannelBuilder(Mode.CLIENT); + this.channelBuilder.configure(sslClientConfigs); + this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()) { + @Override + void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { + for (SelectionKey key : selectionKeys) { + KafkaChannel channel = (KafkaChannel) key.attachment(); + if (channel != null && channel.id().equals(node1)) + node1Polls.incrementAndGet(); + } + super.pollSelectionKeys(selectionKeys, isImmediatelyConnected, currentTimeNanos); + } + }; + + // Get node1 into bytes buffered state and then disable read on the socket. + // Truncate the read buffers to ensure that there is buffered data, but not enough to make progress. + int largeRequestSize = 100 * 1024; + connect(node1, new InetSocketAddress("localhost", server.port)); + selector.send(createSend(node1, TestUtils.randomString(largeRequestSize))); + waitForBytesBuffered(selector, node1); + TestSslChannelBuilder.TestSslTransportLayer.transportLayers.get(node1).truncateReadBuffer(); + disableRead.accept(selector.channel(node1).selectionKey()); + + // Clear poll count and count the polls from now on + node1Polls.set(0); + + // Process sends and receives on node2. Test verifies that we don't process node1 + // unnecessarily on each of these polls. + connect(node2, new InetSocketAddress("localhost", server.port)); + int received = 0; + String request = TestUtils.randomString(10); + selector.send(createSend(node2, request)); + while (received < 100) { + received += selector.completedReceives().size(); + if (!selector.completedSends().isEmpty()) { + selector.send(createSend(node2, request)); + } + selector.poll(5); + } + + // Verify that pollSelectionKeys was invoked once to process buffered data + // but not again since there isn't sufficient data to process. + assertEquals(1, node1Polls.get()); + selector.close(node1); + selector.close(node2); verifySelectorEmpty(); } @@ -252,22 +326,33 @@ protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id * TestSslTransportLayer will read from socket once every two tries. This increases * the chance that there will be bytes buffered in the transport layer after read(). */ - class TestSslTransportLayer extends SslTransportLayer { + static class TestSslTransportLayer extends SslTransportLayer { + static Map<String, TestSslTransportLayer> transportLayers = new HashMap<>(); boolean muteSocket = false; public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { super(channelId, key, sslEngine); + transportLayers.put(channelId, this); } @Override protected int readFromSocketChannel() throws IOException { if (muteSocket) { - muteSocket = false; + if ((selectionKey().interestOps() & SelectionKey.OP_READ) != 0) + muteSocket = false; return 0; } muteSocket = true; return super.readFromSocketChannel(); } + + // Leave one byte in network read buffer so that some buffered bytes are present, + // but not enough to make progress on a read. + void truncateReadBuffer() throws Exception { + netReadBuffer().position(1); + appReadBuffer().position(0); + muteSocket = true; + } } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Performance issue upgrading to kafka 1.0.1 or 1.1 > ------------------------------------------------- > > Key: KAFKA-7012 > URL: https://issues.apache.org/jira/browse/KAFKA-7012 > Project: Kafka > Issue Type: Bug > Affects Versions: 1.1.0, 1.0.1 > Reporter: rajadayalan perumalsamy > Assignee: praveen > Priority: Critical > Labels: regression > Fix For: 2.0.0 > > Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, > Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, > Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, > Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png > > > We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. > After upgrading 1 node on the cluster, we notice that network threads use > most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. > With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% > vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is > high depending on the number of network threads used. If networks threads is > set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 > then the cpu usage is around 450%(5 vcpus). Using the same kafka > server.properties for both. > Did further analysis with git bisect, couple of build and deploys, traced the > issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine > for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit > 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have > attached screenshots of profiling done with both the commits. Screenshot > Commit-f15cdbc91b-profile shows less cpu usage by network threads and > Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show > higher cpu usage(almost entire cpu usage) by network threads. Also noticed > that kafka.network.Processor.poll() method is invoked 10 times more with > commit 47ee8e954df62b9a79099e944ec4be29afe046f6. > We need the issue to be resolved to upgrade the cluster. Please let me know > if you need any additional information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)