Repository: kafka
Updated Branches:
  refs/heads/trunk 8bd2a68b5 -> 9ed8bf273


KAFKA-5872; Fix transient failure in SslSelectorTest.testMuteOnOOM

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #3836 from rajinisivaram/KAFKA-5872-sslselectortest-failure


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9ed8bf27
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9ed8bf27
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9ed8bf27

Branch: refs/heads/trunk
Commit: 9ed8bf273c6ee5569ed68b71782ec7ab947f9c93
Parents: 8bd2a68
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Tue Sep 12 17:39:47 2017 +0100
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Tue Sep 12 17:39:47 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/common/network/Selector.java |  4 +++-
 .../kafka/common/network/SslSelectorTest.java     | 18 +++++++-----------
 2 files changed, 10 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9ed8bf27/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 1e5d969..23dbc0f 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -395,6 +395,9 @@ public class Selector implements Selectable, AutoCloseable {
             //poll from channels where the underlying socket has more data
             pollSelectionKeys(readyKeys, false, endSelect);
             pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
+
+            // Clear all selected keys so that they are included in the ready 
count for the next select
+            readyKeys.clear();
         } else {
             madeReadProgressLastPoll = true; //no work is also "progress"
         }
@@ -424,7 +427,6 @@ public class Selector implements Selectable, AutoCloseable {
         Iterator<SelectionKey> iterator = 
determineHandlingOrder(selectionKeys).iterator();
         while (iterator.hasNext()) {
             SelectionKey key = iterator.next();
-            iterator.remove();
             KafkaChannel channel = channel(key);
             long channelStartTimeNanos = recordTimePerConnection ? 
time.nanoseconds() : 0;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ed8bf27/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index bea8084..5683068 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.network;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.nio.channels.SelectionKey;
@@ -42,7 +43,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestSslUtils;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -203,7 +203,7 @@ public class SslSelectorTest extends SelectorTest {
             selector.register("clientX", channelX);
             selector.register("clientY", channelY);
 
-            boolean success = false;
+            boolean handshaked = false;
             NetworkReceive firstReceive = null;
             long deadline = System.currentTimeMillis() + 5000;
             //keep calling poll until:
@@ -224,22 +224,18 @@ public class SslSelectorTest extends SelectorTest {
                     assertTrue("only expecting single request", 
completed.isEmpty());
                 }
 
-                boolean handshaked = sender1.waitForHandshake(1);
-                handshaked = handshaked && sender2.waitForHandshake(1);
+                handshaked = sender1.waitForHandshake(1) && 
sender2.waitForHandshake(1);
 
-                if (handshaked && firstReceive != null) {
-                    success = true;
+                if (handshaked && firstReceive != null && 
selector.isOutOfMemory())
                     break;
-                }
-            }
-            if (!success) {
-                Assert.fail("could not initiate connections within timeout");
             }
+            assertTrue("could not initiate connections within timeout", 
handshaked);
 
             selector.poll(10);
             assertTrue(selector.completedReceives().isEmpty());
             assertEquals(0, pool.availableMemory());
-            assertTrue(selector.isOutOfMemory());
+            assertNotNull("First receive not complete", firstReceive);
+            assertTrue("Selector not out of memory", selector.isOutOfMemory());
 
             firstReceive.close();
             assertEquals(900, pool.availableMemory()); //memory has been 
released back to pool

Reply via email to