[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517146#comment-16517146
 ] 

ASF GitHub Bot commented on KAFKA-7012:
---------------------------------------

rajinisivaram closed pull request #5237: KAFKA-7012: Don't process SSL channels 
without data to process
URL: https://github.com/apache/kafka/pull/5237
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 334ca79f035..a269f0fd604 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -498,7 +498,9 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                     //this channel has bytes enqueued in intermediary buffers 
that we could not read
                     //(possibly because no memory). it may be the case that 
the underlying socket will
                     //not come up in the next poll() and so we need to 
remember this channel for the
-                    //next poll call otherwise data may be stuck in said 
buffers forever.
+                    //next poll call otherwise data may be stuck in said 
buffers forever. If we attempt
+                    //to process buffered data and no progress is made, the 
channel buffered status is
+                    //cleared to avoid the overhead of checking every time.
                     keysWithBufferedRead.add(key);
                 }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 704a19818e2..06e7e937886 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -64,6 +64,7 @@
     private ByteBuffer netReadBuffer;
     private ByteBuffer netWriteBuffer;
     private ByteBuffer appReadBuffer;
+    private boolean hasBytesBuffered;
     private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
 
     public static SslTransportLayer create(String channelId, SelectionKey key, 
SSLEngine sslEngine) throws IOException {
@@ -503,13 +504,17 @@ public int read(ByteBuffer dst) throws IOException {
             read = readFromAppBuffer(dst);
         }
 
+        boolean readFromNetwork = false;
         boolean isClosed = false;
         // Each loop reads at most once from the socket.
         while (dst.remaining() > 0) {
             int netread = 0;
             netReadBuffer = Utils.ensureCapacity(netReadBuffer, 
netReadBufferSize());
-            if (netReadBuffer.remaining() > 0)
+            if (netReadBuffer.remaining() > 0) {
                 netread = readFromSocketChannel();
+                if (netread > 0)
+                    readFromNetwork = true;
+            }
 
             while (netReadBuffer.position() > 0) {
                 netReadBuffer.flip();
@@ -563,6 +568,7 @@ public int read(ByteBuffer dst) throws IOException {
             if (netread <= 0 || isClosed)
                 break;
         }
+        updateBytesBuffered(readFromNetwork || read > 0);
         // If data has been read and unwrapped, return the data even if 
end-of-stream, channel will be closed
         // on a subsequent poll.
         return read;
@@ -793,6 +799,11 @@ protected ByteBuffer netReadBuffer() {
         return netReadBuffer;
     }
 
+    // Visibility for testing
+    protected ByteBuffer appReadBuffer() {
+        return appReadBuffer;
+    }
+
     /**
      * SSL exceptions are propagated as authentication failures so that 
clients can avoid
      * retries and report the failure. If `flush` is true, exceptions are 
propagated after
@@ -826,12 +837,22 @@ public boolean isMute() {
 
     @Override
     public boolean hasBytesBuffered() {
-        return netReadBuffer.position() != 0 || appReadBuffer.position() != 0;
+        return hasBytesBuffered;
+    }
+
+    // Update `hasBytesBuffered` status. If any bytes were read from the 
network or
+    // if data was returned from read, `hasBytesBuffered` is set to true if 
any buffered
+    // data is still remaining. If not, `hasBytesBuffered` is set to false 
since no progress
+    // can be made until more data is available to read from the network.
+    private void updateBytesBuffered(boolean madeProgress) {
+        if (madeProgress)
+            hasBytesBuffered = netReadBuffer.position() != 0 || 
appReadBuffer.position() != 0;
+        else
+            hasBytesBuffered = false;
     }
 
     @Override
     public long transferFrom(FileChannel fileChannel, long position, long 
count) throws IOException {
         return fileChannel.transferTo(position, count, this);
     }
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
index 3673d21dae6..a8a4b873028 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
@@ -94,6 +94,7 @@
 
     /**
      * @return true if channel has bytes to be read in any intermediate buffers
+     * which may be processed without reading additional data from the network.
      */
     boolean hasBytesBuffered();
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 1d78e5aa8e1..3bdb07a87c3 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -42,6 +42,9 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -96,6 +99,13 @@ public void testDisconnectWithIntermediateBufferedBytes() 
throws Exception {
         connect(node, new InetSocketAddress("localhost", server.port));
         selector.send(createSend(node, request));
 
+        waitForBytesBuffered(selector, node);
+
+        selector.close(node);
+        verifySelectorEmpty();
+    }
+
+    private void waitForBytesBuffered(Selector selector, String node) throws 
Exception {
         TestUtils.waitForCondition(new TestCondition() {
             @Override
             public boolean conditionMet() {
@@ -107,8 +117,72 @@ public boolean conditionMet() {
                 }
             }
         }, 2000L, "Failed to reach socket state with bytes buffered");
+    }
 
-        selector.close(node);
+    @Test
+    public void testBytesBufferedChannelWithNoIncomingBytes() throws Exception 
{
+        verifyNoUnnecessaryPollWithBytesBuffered(key ->
+            key.interestOps(key.interestOps() & ~SelectionKey.OP_READ));
+    }
+
+    @Test
+    public void testBytesBufferedChannelAfterMute() throws Exception {
+        verifyNoUnnecessaryPollWithBytesBuffered(key -> ((KafkaChannel) 
key.attachment()).mute());
+    }
+
+    private void 
verifyNoUnnecessaryPollWithBytesBuffered(Consumer<SelectionKey> disableRead)
+            throws Exception {
+        this.selector.close();
+
+        String node1 = "1";
+        String node2 = "2";
+        final AtomicInteger node1Polls = new AtomicInteger();
+
+        this.channelBuilder = new TestSslChannelBuilder(Mode.CLIENT);
+        this.channelBuilder.configure(sslClientConfigs);
+        this.selector = new Selector(5000, metrics, time, "MetricGroup", 
channelBuilder, new LogContext()) {
+            @Override
+            void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean 
isImmediatelyConnected, long currentTimeNanos) {
+                for (SelectionKey key : selectionKeys) {
+                    KafkaChannel channel = (KafkaChannel) key.attachment();
+                    if (channel != null && channel.id().equals(node1))
+                        node1Polls.incrementAndGet();
+                }
+                super.pollSelectionKeys(selectionKeys, isImmediatelyConnected, 
currentTimeNanos);
+            }
+        };
+
+        // Get node1 into bytes buffered state and then disable read on the 
socket.
+        // Truncate the read buffers to ensure that there is buffered data, 
but not enough to make progress.
+        int largeRequestSize = 100 * 1024;
+        connect(node1, new InetSocketAddress("localhost", server.port));
+        selector.send(createSend(node1,  
TestUtils.randomString(largeRequestSize)));
+        waitForBytesBuffered(selector, node1);
+        
TestSslChannelBuilder.TestSslTransportLayer.transportLayers.get(node1).truncateReadBuffer();
+        disableRead.accept(selector.channel(node1).selectionKey());
+
+        // Clear poll count and count the polls from now on
+        node1Polls.set(0);
+
+        // Process sends and receives on node2. Test verifies that we don't 
process node1
+        // unnecessarily on each of these polls.
+        connect(node2, new InetSocketAddress("localhost", server.port));
+        int received = 0;
+        String request = TestUtils.randomString(10);
+        selector.send(createSend(node2, request));
+        while (received < 100) {
+            received += selector.completedReceives().size();
+            if (!selector.completedSends().isEmpty()) {
+                selector.send(createSend(node2, request));
+            }
+            selector.poll(5);
+        }
+
+        // Verify that pollSelectionKeys was invoked once to process buffered 
data
+        // but not again since there isn't sufficient data to process.
+        assertEquals(1, node1Polls.get());
+        selector.close(node1);
+        selector.close(node2);
         verifySelectorEmpty();
     }
 
@@ -252,22 +326,33 @@ protected SslTransportLayer 
buildTransportLayer(SslFactory sslFactory, String id
          * TestSslTransportLayer will read from socket once every two tries. 
This increases
          * the chance that there will be bytes buffered in the transport layer 
after read().
          */
-        class TestSslTransportLayer extends SslTransportLayer {
+        static class TestSslTransportLayer extends SslTransportLayer {
+            static Map<String, TestSslTransportLayer> transportLayers = new 
HashMap<>();
             boolean muteSocket = false;
 
             public TestSslTransportLayer(String channelId, SelectionKey key, 
SSLEngine sslEngine) throws IOException {
                 super(channelId, key, sslEngine);
+                transportLayers.put(channelId, this);
             }
 
             @Override
             protected int readFromSocketChannel() throws IOException {
                 if (muteSocket) {
-                    muteSocket = false;
+                    if ((selectionKey().interestOps() & SelectionKey.OP_READ) 
!= 0)
+                        muteSocket = false;
                     return 0;
                 }
                 muteSocket = true;
                 return super.readFromSocketChannel();
             }
+
+            // Leave one byte in network read buffer so that some buffered 
bytes are present,
+            // but not enough to make progress on a read.
+            void truncateReadBuffer() throws Exception {
+                netReadBuffer().position(1);
+                appReadBuffer().position(0);
+                muteSocket = true;
+            }
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Performance issue upgrading to kafka 1.0.1 or 1.1
> -------------------------------------------------
>
>                 Key: KAFKA-7012
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7012
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.1.0, 1.0.1
>            Reporter: rajadayalan perumalsamy
>            Assignee: praveen
>            Priority: Critical
>              Labels: regression
>             Fix For: 2.0.0
>
>         Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, 
> Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, 
> Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, 
> Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine 
> for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit 
> 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have 
> attached screenshots of profiling done with both the commits. Screenshot 
> Commit-f15cdbc91b-profile shows less cpu usage by network threads and 
> Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show 
> higher cpu usage(almost entire cpu usage) by network threads. Also noticed 
> that kafka.network.Processor.poll() method is invoked 10 times more with 
> commit 47ee8e954df62b9a79099e944ec4be29afe046f6.
> We need the issue to be resolved to upgrade the cluster. Please let me know 
> if you need any additional information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to