Repository: mina-sshd Updated Branches: refs/heads/master bfa9419c2 -> 4914247b8
Fixed race condition in ServerTest's usage of the TestChannelListener Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/b26b027b Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/b26b027b Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/b26b027b Branch: refs/heads/master Commit: b26b027b8800874ba5a9ec58887c4251f7a3bd67 Parents: bfa9419 Author: Lyor Goldstein <lyor.goldst...@gmail.com> Authored: Wed Oct 12 10:25:11 2016 +0300 Committer: Lyor Goldstein <lyor.goldst...@gmail.com> Committed: Wed Oct 12 10:25:11 2016 +0300 ---------------------------------------------------------------------- .../common/channel/TestChannelListener.java | 32 +++++++++++++++--- .../java/org/apache/sshd/server/ServerTest.java | 34 ++++++++------------ 2 files changed, 41 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/b26b027b/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java b/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java index 22e3b18..6f8a128 100644 --- a/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java +++ b/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java @@ -36,14 +36,15 @@ import org.junit.Assert; public class TestChannelListener extends AbstractLoggingBean implements ChannelListener, NamedResource { private final String name; private final Collection<Channel> activeChannels = new CopyOnWriteArraySet<>(); + private final Semaphore activeChannelsCounter = new Semaphore(0); private final Collection<Channel> openChannels = new CopyOnWriteArraySet<>(); + private final Semaphore openChannelsCounter = new Semaphore(0); private final Collection<Channel> failedChannels = new CopyOnWriteArraySet<>(); + private final Semaphore failedChannelsCounter = new Semaphore(0); private final Map<Channel, Collection<String>> channelStateHints = new ConcurrentHashMap<>(); + private final Semaphore chanelStateCounter = new Semaphore(0); private final Semaphore modificationsCounter = new Semaphore(0); - - public TestChannelListener() { - this(""); - } + private final Semaphore closedChannelsCounter = new Semaphore(0); public TestChannelListener(String discriminator) { super(discriminator); @@ -66,10 +67,15 @@ public class TestChannelListener extends AbstractLoggingBean implements ChannelL @Override public void channelInitialized(Channel channel) { Assert.assertTrue("Same channel instance re-initialized: " + channel, activeChannels.add(channel)); + activeChannelsCounter.release(); modificationsCounter.release(); log.info("channelInitialized({})", channel); } + public boolean waitForActiveChannelsChange(long timeout, TimeUnit unit) throws InterruptedException { + return activeChannelsCounter.tryAcquire(timeout, unit); + } + public Collection<Channel> getOpenChannels() { return openChannels; } @@ -78,10 +84,15 @@ public class TestChannelListener extends AbstractLoggingBean implements ChannelL public void channelOpenSuccess(Channel channel) { Assert.assertTrue("Open channel not activated: " + channel, activeChannels.contains(channel)); Assert.assertTrue("Same channel instance re-opened: " + channel, openChannels.add(channel)); + openChannelsCounter.release(); modificationsCounter.release(); log.info("channelOpenSuccess({})", channel); } + public boolean waitForOpenChannelsChange(long timeout, TimeUnit unit) throws InterruptedException { + return openChannelsCounter.tryAcquire(timeout, unit); + } + public Collection<Channel> getFailedChannels() { return failedChannels; } @@ -90,6 +101,7 @@ public class TestChannelListener extends AbstractLoggingBean implements ChannelL public void channelOpenFailure(Channel channel, Throwable reason) { Assert.assertTrue("Failed channel not activated: " + channel, activeChannels.contains(channel)); Assert.assertTrue("Same channel instance re-failed: " + channel, failedChannels.add(channel)); + failedChannelsCounter.release(); modificationsCounter.release(); log.warn("channelOpenFailure({}) {} : {}", channel, reason.getClass().getSimpleName(), reason.getMessage()); if (log.isDebugEnabled()) { @@ -97,13 +109,23 @@ public class TestChannelListener extends AbstractLoggingBean implements ChannelL } } + public boolean waitForFailedChannelsChange(long timeout, TimeUnit unit) throws InterruptedException { + return failedChannelsCounter.tryAcquire(timeout, unit); + } + @Override public void channelClosed(Channel channel, Throwable reason) { Assert.assertTrue("Unknown closed channel instance: " + channel, activeChannels.remove(channel)); + activeChannelsCounter.release(); + closedChannelsCounter.release(); modificationsCounter.release(); log.info("channelClosed({})", channel); } + public boolean waitForClosedChannelsChange(long timeout, TimeUnit unit) throws InterruptedException { + return closedChannelsCounter.tryAcquire(timeout, unit); + } + public Map<Channel, Collection<String>> getChannelStateHints() { return channelStateHints; } @@ -120,6 +142,8 @@ public class TestChannelListener extends AbstractLoggingBean implements ChannelL } hints.add(hint); + chanelStateCounter.release(); + modificationsCounter.release(); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/b26b027b/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java b/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java index e398b0d..04d36e5 100644 --- a/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java @@ -226,7 +226,7 @@ public class ServerTest extends BaseTestSupport { } }); - TestChannelListener channelListener = new TestChannelListener(); + TestChannelListener channelListener = new TestChannelListener(getCurrentTestName()); sshd.addChannelListener(channelListener); sshd.start(); @@ -239,10 +239,8 @@ public class ServerTest extends BaseTestSupport { shell.setErr(err); shell.open().verify(9L, TimeUnit.SECONDS); - assertTrue("No changes in activated channels", channelListener.waitForModification(5L, TimeUnit.SECONDS)); - assertTrue("No activated server side channels", GenericUtils.size(channelListener.getActiveChannels()) > 0); - assertTrue("No changes in open channels", channelListener.waitForModification(5L, TimeUnit.SECONDS)); - assertTrue("No open server side channels", GenericUtils.size(channelListener.getOpenChannels()) > 0); + assertTrue("No changes in activated channels", channelListener.waitForActiveChannelsChange(5L, TimeUnit.SECONDS)); + assertTrue("No changes in open channels", channelListener.waitForOpenChannelsChange(5L, TimeUnit.SECONDS)); Collection<ClientSession.ClientSessionEvent> res = s.waitFor(EnumSet.of(ClientSession.ClientSessionEvent.CLOSED), 2L * testIdleTimeout); @@ -298,7 +296,7 @@ public class ServerTest extends BaseTestSupport { } }); - TestChannelListener channelListener = new TestChannelListener(); + TestChannelListener channelListener = new TestChannelListener(getCurrentTestName()); sshd.addChannelListener(channelListener); sshd.start(); @@ -312,10 +310,8 @@ public class ServerTest extends BaseTestSupport { shell.setOut(pos); shell.open().verify(5L, TimeUnit.SECONDS); - assertTrue("No changes in activated channels", channelListener.waitForModification(5L, TimeUnit.SECONDS)); - assertTrue("No activated server side channels", GenericUtils.size(channelListener.getActiveChannels()) > 0); - assertTrue("No changes in open channels", channelListener.waitForModification(5L, TimeUnit.SECONDS)); - assertTrue("No open server side channels", GenericUtils.size(channelListener.getOpenChannels()) > 0); + assertTrue("No changes in activated channels", channelListener.waitForActiveChannelsChange(5L, TimeUnit.SECONDS)); + assertTrue("No changes in open channels", channelListener.waitForOpenChannelsChange(5L, TimeUnit.SECONDS)); try (AbstractSession serverSession = sshd.getActiveSessions().iterator().next()) { AbstractConnectionService<?> service = serverSession.getService(AbstractConnectionService.class); @@ -629,11 +625,10 @@ public class ServerTest extends BaseTestSupport { }; }); - TestChannelListener channelListener = new TestChannelListener(); + TestChannelListener channelListener = new TestChannelListener(getCurrentTestName()); sshd.addChannelListener(channelListener); sshd.start(); - @SuppressWarnings("synthetic-access") Map<String, String> expected = GenericUtils.<String, String>mapBuilder(String.CASE_INSENSITIVE_ORDER) .put("test", getCurrentTestName()) @@ -651,10 +646,8 @@ public class ServerTest extends BaseTestSupport { shell.open().verify(5L, TimeUnit.SECONDS); - assertTrue("No changes in activated channels", channelListener.waitForModification(5L, TimeUnit.SECONDS)); - assertTrue("No activated server side channels", GenericUtils.size(channelListener.getActiveChannels()) > 0); - assertTrue("No changes in open channels", channelListener.waitForModification(5L, TimeUnit.SECONDS)); - assertTrue("No open server side channels", GenericUtils.size(channelListener.getOpenChannels()) > 0); + assertTrue("No changes in activated channels", channelListener.waitForActiveChannelsChange(5L, TimeUnit.SECONDS)); + assertTrue("No changes in open channels", channelListener.waitForOpenChannelsChange(5L, TimeUnit.SECONDS)); Collection<ClientChannelEvent> result = shell.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), TimeUnit.SECONDS.toMillis(17L)); @@ -667,7 +660,7 @@ public class ServerTest extends BaseTestSupport { client.stop(); } - assertTrue("No changes in closed channels", channelListener.waitForModification(5L, TimeUnit.SECONDS)); + assertTrue("No changes in closed channels", channelListener.waitForClosedChannelsChange(5L, TimeUnit.SECONDS)); assertTrue("Still activated server side channels", GenericUtils.isEmpty(channelListener.getActiveChannels())); Environment cmdEnv = envHolder.get(); @@ -787,8 +780,7 @@ public class ServerTest extends BaseTestSupport { String serverIdent = getCurrentTestName() + "-server"; PropertyResolverUtils.updateProperty(sshd, ServerFactoryManager.SERVER_IDENTIFICATION, serverIdent); - final String expServerIdent = Session.DEFAULT_SSH_VERSION_PREFIX + serverIdent; - + String expServerIdent = Session.DEFAULT_SSH_VERSION_PREFIX + serverIdent; SessionListener listener = new SessionListener() { @Override public void sessionException(Session session, Throwable t) { @@ -840,8 +832,8 @@ public class ServerTest extends BaseTestSupport { GenericUtils.join(expected, ServerFactoryManager.SERVER_EXTRA_IDENT_LINES_SEPARATOR)); sshd.start(); - final AtomicReference<List<String>> actualHolder = new AtomicReference<>(); - final Semaphore signal = new Semaphore(0); + AtomicReference<List<String>> actualHolder = new AtomicReference<>(); + Semaphore signal = new Semaphore(0); client.setUserInteraction(new UserInteraction() { @Override public void serverVersionInfo(ClientSession session, List<String> lines) {