Repository: kafka Updated Branches: refs/heads/trunk ae4100f82 -> 3728f4cd9
MINOR: Fix transient failure in SelectorTest.testCloseConnectionInClosingState `SelectorTest.testCloseConnectionInClosingState` creates a channel with some staging receives and moves time forward to expire the channel. To ensure that the channel will be expired on the next poll, the channel must be muted to avoid expiry time being updated if more data is available for read. Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #3823 from rajinisivaram/MINOR-SelectorTest-closingChannel Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3728f4cd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3728f4cd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3728f4cd Branch: refs/heads/trunk Commit: 3728f4cd9623ce028f1720b03e15d444856fd7fc Parents: ae4100f Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Sun Sep 10 20:37:00 2017 +0100 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Sun Sep 10 20:37:00 2017 +0100 ---------------------------------------------------------------------- .../test/java/org/apache/kafka/common/network/SelectorTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3728f4cd/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index e88a4ee..ececff6 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -317,10 +317,11 @@ public class SelectorTest { public void testCloseConnectionInClosingState() throws Exception { KafkaChannel channel = createConnectionWithStagedReceives(5); String id = channel.id(); - time.sleep(6000); // The max idle time is 5000ms + selector.mute(id); // Mute to allow channel to be expired even if more data is available for read + time.sleep(6000); // The max idle time is 5000ms selector.poll(0); - assertEquals(channel, selector.closingChannel(id)); assertNull("Channel not expired", selector.channel(id)); + assertEquals(channel, selector.closingChannel(id)); assertEquals(ChannelState.EXPIRED, channel.state()); selector.close(id); assertNull("Channel not removed from channels", selector.channel(id));