Repository: kafka Updated Branches: refs/heads/trunk 8bd2a68b5 -> 9ed8bf273
KAFKA-5872; Fix transient failure in SslSelectorTest.testMuteOnOOM Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #3836 from rajinisivaram/KAFKA-5872-sslselectortest-failure Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9ed8bf27 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9ed8bf27 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9ed8bf27 Branch: refs/heads/trunk Commit: 9ed8bf273c6ee5569ed68b71782ec7ab947f9c93 Parents: 8bd2a68 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Tue Sep 12 17:39:47 2017 +0100 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Tue Sep 12 17:39:47 2017 +0100 ---------------------------------------------------------------------- .../org/apache/kafka/common/network/Selector.java | 4 +++- .../kafka/common/network/SslSelectorTest.java | 18 +++++++----------- 2 files changed, 10 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9ed8bf27/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 1e5d969..23dbc0f 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -395,6 +395,9 @@ public class Selector implements Selectable, AutoCloseable { //poll from channels where the underlying socket has more data pollSelectionKeys(readyKeys, false, endSelect); pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); + + // Clear all selected keys so that they are included in the ready count for the next select + readyKeys.clear(); } else { madeReadProgressLastPoll = true; //no work is also "progress" } @@ -424,7 +427,6 @@ public class Selector implements Selectable, AutoCloseable { Iterator<SelectionKey> iterator = determineHandlingOrder(selectionKeys).iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); - iterator.remove(); KafkaChannel channel = channel(key); long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0; http://git-wip-us.apache.org/repos/asf/kafka/blob/9ed8bf27/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index bea8084..5683068 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.network; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.nio.channels.SelectionKey; @@ -42,7 +43,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestSslUtils; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -203,7 +203,7 @@ public class SslSelectorTest extends SelectorTest { selector.register("clientX", channelX); selector.register("clientY", channelY); - boolean success = false; + boolean handshaked = false; NetworkReceive firstReceive = null; long deadline = System.currentTimeMillis() + 5000; //keep calling poll until: @@ -224,22 +224,18 @@ public class SslSelectorTest extends SelectorTest { assertTrue("only expecting single request", completed.isEmpty()); } - boolean handshaked = sender1.waitForHandshake(1); - handshaked = handshaked && sender2.waitForHandshake(1); + handshaked = sender1.waitForHandshake(1) && sender2.waitForHandshake(1); - if (handshaked && firstReceive != null) { - success = true; + if (handshaked && firstReceive != null && selector.isOutOfMemory()) break; - } - } - if (!success) { - Assert.fail("could not initiate connections within timeout"); } + assertTrue("could not initiate connections within timeout", handshaked); selector.poll(10); assertTrue(selector.completedReceives().isEmpty()); assertEquals(0, pool.availableMemory()); - assertTrue(selector.isOutOfMemory()); + assertNotNull("First receive not complete", firstReceive); + assertTrue("Selector not out of memory", selector.isOutOfMemory()); firstReceive.close(); assertEquals(900, pool.availableMemory()); //memory has been released back to pool