This is an automated email from the ASF dual-hosted git repository. jensdeppe pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 5e2baea Feature/expand pubsub support (#5284) 5e2baea is described below commit 5e2baea7516b5e900f2699afddb63c59a20a1ab3 Author: Darrel Schneider <dschnei...@pivotal.io> AuthorDate: Thu Jun 25 05:47:48 2020 -0700 Feature/expand pubsub support (#5284) Co-authored-by: Jens Deppe <jde...@pivotal.io> Co-authored-by: Sarah <sab...@pivotal.io> Co-authored-by: Darrel Schneider <dschnei...@pivotal.io> --- geode-redis/build.gradle | 1 + .../executor/pubsub/PubSubIntegrationTest.java | 329 +++++++++++++++++++++ .../geode/redis/mocks/DummySubscription.java | 5 + .../apache/geode/redis/mocks/MockSubscriber.java | 53 ++++ .../geode/redis/internal/GeodeRedisServer.java | 4 +- .../geode/redis/internal/RedisCommandType.java | 4 +- .../executor/pubsub/PunsubscribeExecutor.java | 56 +++- .../executor/pubsub/UnsubscribeExecutor.java | 55 +++- .../apache/geode/redis/internal/netty/Coder.java | 2 +- .../internal/netty/ExecutionHandlerContext.java | 7 + .../redis/internal/pubsub/ChannelSubscription.java | 4 + .../redis/internal/pubsub/PatternSubscription.java | 4 + .../apache/geode/redis/internal/pubsub/PubSub.java | 11 + .../geode/redis/internal/pubsub/PubSubImpl.java | 29 +- .../geode/redis/internal/pubsub/Subscription.java | 6 + .../geode/redis/internal/pubsub/Subscriptions.java | 39 ++- geode-redis/src/test/resources/expected-pom.xml | 11 + 17 files changed, 569 insertions(+), 51 deletions(-) diff --git a/geode-redis/build.gradle b/geode-redis/build.gradle index 6932b29..bd58ce7 100644 --- a/geode-redis/build.gradle +++ b/geode-redis/build.gradle @@ -38,6 +38,7 @@ dependencies { implementation('io.netty:netty-all') implementation('org.apache.logging.log4j:log4j-api') implementation('commons-codec:commons-codec') + implementation('org.apache.commons:commons-lang3') testImplementation(project(':geode-junit')) testImplementation('org.mockito:mockito-core') diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java index 7389d5b..20c4a60 100644 --- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java +++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -29,6 +30,7 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; import redis.clients.jedis.Jedis; +import redis.clients.jedis.Protocol; import org.apache.geode.redis.GeodeRedisServerRule; import org.apache.geode.redis.mocks.MockBinarySubscriber; @@ -66,6 +68,27 @@ public class PubSubIntegrationTest { } @Test + @SuppressWarnings("unchecked") + public void punsubscribe_whenNonexistent() { + assertThat((List<Object>) subscriber.sendCommand(Protocol.Command.PUNSUBSCRIBE, "Nonexistent")) + .containsExactly("punsubscribe".getBytes(), "Nonexistent".getBytes(), 0L); + } + + @Test + @SuppressWarnings("unchecked") + public void unsubscribe_whenNoSubscriptionsExist_shouldNotHang() { + assertThat((List<Object>) subscriber.sendCommand(Protocol.Command.UNSUBSCRIBE)) + .containsExactly("unsubscribe".getBytes(), null, 0L); + } + + @Test + @SuppressWarnings("unchecked") + public void punsubscribe_whenNoSubscriptionsExist_shouldNotHang() { + assertThat((List<Object>) subscriber.sendCommand(Protocol.Command.PUNSUBSCRIBE)) + .containsExactly("punsubscribe".getBytes(), null, 0L); + } + + @Test public void testOneSubscriberOneChannel() { List<String> expectedMessages = Arrays.asList("hello"); @@ -90,6 +113,170 @@ public class PubSubIntegrationTest { } @Test + public void punsubscribe_givenSubscribe_doesNotReduceSubscriptions() { + MockSubscriber mockSubscriber = new MockSubscriber(); + + Runnable runnable = () -> { + subscriber.subscribe(mockSubscriber, "salutations"); + }; + + Thread subscriberThread = new Thread(runnable); + subscriberThread.start(); + try { + waitFor(() -> mockSubscriber.getSubscribedChannels() == 1); + + mockSubscriber.punsubscribe("salutations"); + waitFor(() -> mockSubscriber.punsubscribeInfos.size() == 1); + + assertThat(mockSubscriber.punsubscribeInfos.get(0).channel).isEqualTo("salutations"); + assertThat(mockSubscriber.punsubscribeInfos.get(0).count).isEqualTo(1); + assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1); + } finally { + // now cleanup the actual subscription + mockSubscriber.unsubscribe("salutations"); + waitFor(() -> !subscriberThread.isAlive()); + } + } + + @Test + public void unsubscribe_givenPsubscribe_doesNotReduceSubscriptions() { + MockSubscriber mockSubscriber = new MockSubscriber(); + + Runnable runnable = () -> { + subscriber.psubscribe(mockSubscriber, "salutations"); + }; + + Thread subscriberThread = new Thread(runnable); + subscriberThread.start(); + try { + waitFor(() -> mockSubscriber.getSubscribedChannels() == 1); + + mockSubscriber.unsubscribe("salutations"); + waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1); + + assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("salutations"); + assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1); + assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1); + } finally { + // now cleanup the actual subscription + mockSubscriber.punsubscribe("salutations"); + waitFor(() -> !subscriberThread.isAlive()); + } + } + + @Test + public void unsubscribe_onNonExistentSubscription_doesNotReduceSubscriptions() { + MockSubscriber mockSubscriber = new MockSubscriber(); + Runnable runnable = () -> { + subscriber.subscribe(mockSubscriber, "salutations"); + }; + + Thread subscriberThread = new Thread(runnable); + subscriberThread.start(); + try { + waitFor(() -> mockSubscriber.getSubscribedChannels() == 1); + mockSubscriber.unsubscribe("NonExistent"); + waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1); + + assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("NonExistent"); + assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1); + assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1); + } finally { + // now cleanup the actual subscription + mockSubscriber.unsubscribe("salutations"); + waitFor(() -> !subscriberThread.isAlive()); + } + } + + @Test + public void unsubscribe_whenGivenAnEmptyString() { + MockSubscriber mockSubscriber = new MockSubscriber(); + Runnable runnable = () -> subscriber.subscribe(mockSubscriber, "salutations"); + + Thread subscriberThread = new Thread(runnable); + subscriberThread.start(); + try { + waitFor(() -> mockSubscriber.getSubscribedChannels() == 1); + mockSubscriber.unsubscribe(""); + waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1); + + assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo(""); + assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1); + assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1); + } finally { + // now cleanup the actual subscription + mockSubscriber.unsubscribe(); + waitFor(() -> !subscriberThread.isAlive()); + } + } + + @Test + public void unsubscribeWithEmptyChannel_doesNotUnsubscribeExistingChannels() { + MockSubscriber mockSubscriber = new MockSubscriber(); + Runnable runnable = () -> subscriber.subscribe(mockSubscriber, "salutations"); + + Thread subscriberThread = new Thread(runnable); + subscriberThread.start(); + try { + waitFor(() -> mockSubscriber.getSubscribedChannels() == 1); + mockSubscriber.unsubscribe(""); + waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1); + + Long result = publisher.publish("salutations", "heyho"); + waitFor(() -> mockSubscriber.getReceivedMessages().size() == 1); + + assertThat(result).isEqualTo(1); + assertThat(mockSubscriber.getReceivedMessages().get(0)).isEqualTo("heyho"); + } finally { + // now cleanup the actual subscription + mockSubscriber.unsubscribe(); + waitFor(() -> !subscriberThread.isAlive()); + } + } + + @Test + public void canSubscribeToAnEmptyString() { + MockSubscriber mockSubscriber = new MockSubscriber(); + Runnable runnable = () -> subscriber.subscribe(mockSubscriber, ""); + + Thread subscriberThread = new Thread(runnable); + subscriberThread.start(); + waitFor(() -> mockSubscriber.getSubscribedChannels() == 1); + Long result = publisher.publish("", "blank"); + assertThat(result).isEqualTo(1); + + mockSubscriber.unsubscribe(""); + waitFor(() -> mockSubscriber.getSubscribedChannels() == 0); + waitFor(() -> !subscriberThread.isAlive()); + + assertThat(mockSubscriber.getReceivedMessages()).containsExactly("blank"); + } + + @Test + public void punsubscribe_onNonExistentSubscription_doesNotReduceSubscriptions() { + MockSubscriber mockSubscriber = new MockSubscriber(); + Runnable runnable = () -> { + subscriber.psubscribe(mockSubscriber, "salutations"); + }; + + Thread subscriberThread = new Thread(runnable); + subscriberThread.start(); + try { + waitFor(() -> mockSubscriber.getSubscribedChannels() == 1); + mockSubscriber.punsubscribe("NonExistent"); + waitFor(() -> mockSubscriber.punsubscribeInfos.size() == 1); + + assertThat(mockSubscriber.punsubscribeInfos.get(0).channel).isEqualTo("NonExistent"); + assertThat(mockSubscriber.punsubscribeInfos.get(0).count).isEqualTo(1); + assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1); + } finally { + // now cleanup the actual subscription + mockSubscriber.punsubscribe("salutations"); + waitFor(() -> !subscriberThread.isAlive()); + } + } + + @Test public void testPublishBinaryData() { byte[] expectedMessage = new byte[256]; for (int i = 0; i < 256; i++) { @@ -117,6 +304,33 @@ public class PubSubIntegrationTest { } @Test + public void testSubscribeAndPublishUsingBinaryData() { + byte[] binaryBlob = new byte[256]; + for (int i = 0; i < 256; i++) { + binaryBlob[i] = (byte) i; + } + + MockBinarySubscriber mockSubscriber = new MockBinarySubscriber(); + + Runnable runnable = () -> { + subscriber.subscribe(mockSubscriber, binaryBlob); + }; + + Thread subscriberThread = new Thread(runnable); + subscriberThread.start(); + waitFor(() -> mockSubscriber.getSubscribedChannels() == 1); + + Long result = publisher.publish(binaryBlob, binaryBlob); + assertThat(result).isEqualTo(1); + + mockSubscriber.unsubscribe(binaryBlob); + waitFor(() -> mockSubscriber.getSubscribedChannels() == 0); + waitFor(() -> !subscriberThread.isAlive()); + + assertThat(mockSubscriber.getReceivedMessages().get(0)).isEqualTo(binaryBlob); + } + + @Test public void testOneSubscriberSubscribingToTwoChannels() { List<String> expectedMessages = Arrays.asList("hello", "howdy"); MockSubscriber mockSubscriber = new MockSubscriber(); @@ -135,14 +349,111 @@ public class PubSubIntegrationTest { assertThat(result).isEqualTo(1); mockSubscriber.unsubscribe("salutations"); waitFor(() -> mockSubscriber.getSubscribedChannels() == 1); + assertThat(mockSubscriber.unsubscribeInfos).hasSize(1); + assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("salutations"); + assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1); + mockSubscriber.unsubscribeInfos.clear(); mockSubscriber.unsubscribe("yuletide"); waitFor(() -> mockSubscriber.getSubscribedChannels() == 0); + assertThat(mockSubscriber.unsubscribeInfos).hasSize(1); + assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("yuletide"); + assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(0); waitFor(() -> !subscriberThread.isAlive()); assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(expectedMessages); } @Test + public void testSubscribingAndUnsubscribingFromMultipleChannels() { + MockSubscriber mockSubscriber = new MockSubscriber(); + + Runnable runnable = () -> subscriber.subscribe(mockSubscriber, "salutations", "yuletide"); + + Thread subscriberThread = new Thread(runnable); + subscriberThread.start(); + + waitFor(() -> mockSubscriber.getSubscribedChannels() == 2); + + mockSubscriber.unsubscribe("yuletide", "salutations"); + waitFor(() -> mockSubscriber.getSubscribedChannels() == 0); + waitFor(() -> !subscriberThread.isAlive()); + + List<String> unsubscribedChannels = mockSubscriber.unsubscribeInfos.stream() + .map(x -> x.channel).collect(Collectors.toList()); + assertThat(unsubscribedChannels).containsExactlyInAnyOrder("salutations", "yuletide"); + + List<Integer> channelCounts = mockSubscriber.unsubscribeInfos.stream() + .map(x -> x.count).collect(Collectors.toList()); + assertThat(channelCounts).containsExactlyInAnyOrder(1, 0); + + } + + @Test + public void testUnsubscribingImplicitlyFromAllChannels() { + MockSubscriber mockSubscriber = new MockSubscriber(); + + Runnable runnable = () -> subscriber.subscribe(mockSubscriber, "salutations", "yuletide"); + + Thread subscriberThread = new Thread(runnable); + subscriberThread.start(); + + waitFor(() -> mockSubscriber.getSubscribedChannels() == 2); + + mockSubscriber.unsubscribe(); + waitFor(() -> mockSubscriber.getSubscribedChannels() == 0); + waitFor(() -> !subscriberThread.isAlive()); + + List<String> unsubscribedChannels = mockSubscriber.unsubscribeInfos.stream() + .map(x -> x.channel).collect(Collectors.toList()); + assertThat(unsubscribedChannels).containsExactlyInAnyOrder("salutations", "yuletide"); + + List<Integer> channelCounts = mockSubscriber.unsubscribeInfos.stream() + .map(x -> x.count).collect(Collectors.toList()); + assertThat(channelCounts).containsExactlyInAnyOrder(1, 0); + + Long result = publisher.publish("salutations", "greetings"); + assertThat(result).isEqualTo(0); + } + + @Test + public void testPsubscribingAndPunsubscribingFromMultipleChannels() { + MockSubscriber mockSubscriber = new MockSubscriber(); + + Runnable runnable = () -> subscriber.psubscribe(mockSubscriber, "sal*", "yul*"); + + Thread subscriberThread = new Thread(runnable); + subscriberThread.start(); + + waitFor(() -> mockSubscriber.getSubscribedChannels() == 2); + + mockSubscriber.punsubscribe("yul*", "sal*"); + waitFor(() -> mockSubscriber.getSubscribedChannels() == 0); + waitFor(() -> !subscriberThread.isAlive()); + assertThat(mockSubscriber.punsubscribeInfos).containsExactly( + new MockSubscriber.UnsubscribeInfo("yul*", 1), + new MockSubscriber.UnsubscribeInfo("sal*", 0)); + } + + @Test + public void testPunsubscribingImplicitlyFromAllChannels() { + MockSubscriber mockSubscriber = new MockSubscriber(); + + Runnable runnable = () -> subscriber.psubscribe(mockSubscriber, "sal*", "yul*"); + + Thread subscriberThread = new Thread(runnable); + subscriberThread.start(); + + waitFor(() -> mockSubscriber.getSubscribedChannels() == 2); + + mockSubscriber.punsubscribe(); + waitFor(() -> mockSubscriber.getSubscribedChannels() == 0); + waitFor(() -> !subscriberThread.isAlive()); + assertThat(mockSubscriber.punsubscribeInfos).containsExactly( + new MockSubscriber.UnsubscribeInfo("sal*", 1), + new MockSubscriber.UnsubscribeInfo("yul*", 0)); + } + + @Test public void testTwoSubscribersOneChannel() { Jedis subscriber2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT); MockSubscriber mockSubscriber1 = new MockSubscriber(); @@ -164,12 +475,18 @@ public class PubSubIntegrationTest { mockSubscriber1.unsubscribe("salutations"); waitFor(() -> mockSubscriber1.getSubscribedChannels() == 0); waitFor(() -> !subscriber1Thread.isAlive()); + assertThat(mockSubscriber1.unsubscribeInfos).hasSize(1); + assertThat(mockSubscriber1.unsubscribeInfos.get(0).channel).isEqualTo("salutations"); + assertThat(mockSubscriber1.unsubscribeInfos.get(0).count).isEqualTo(0); result = publisher.publish("salutations", "goodbye"); assertThat(result).isEqualTo(1); mockSubscriber2.unsubscribe("salutations"); waitFor(() -> mockSubscriber2.getSubscribedChannels() == 0); waitFor(() -> !subscriber2Thread.isAlive()); + assertThat(mockSubscriber2.unsubscribeInfos).hasSize(1); + assertThat(mockSubscriber2.unsubscribeInfos.get(0).channel).isEqualTo("salutations"); + assertThat(mockSubscriber2.unsubscribeInfos.get(0).count).isEqualTo(0); assertThat(mockSubscriber1.getReceivedMessages()).isEqualTo(Collections.singletonList("hello")); assertThat(mockSubscriber2.getReceivedMessages()).isEqualTo(Arrays.asList("hello", "goodbye")); @@ -201,6 +518,8 @@ public class PubSubIntegrationTest { waitFor(() -> mockSubscriber.getSubscribedChannels() == 0); waitFor(() -> !subscriberThread.isAlive()); + assertThat(mockSubscriber.unsubscribeInfos) + .containsExactly(new MockSubscriber.UnsubscribeInfo("salutations", 0)); assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(Collections.singletonList("hello")); } @@ -252,6 +571,8 @@ public class PubSubIntegrationTest { mockSubscriber.punsubscribe("sal*s"); waitFor(() -> mockSubscriber.getSubscribedChannels() == 0); waitFor(() -> !subscriberThread.isAlive()); + assertThat(mockSubscriber.punsubscribeInfos) + .containsExactly(new MockSubscriber.UnsubscribeInfo("sal*s", 0)); } @Test @@ -283,6 +604,8 @@ public class PubSubIntegrationTest { mockSubscriber.punsubscribe("sal*s"); waitFor(() -> mockSubscriber.getSubscribedChannels() == 0); waitFor(() -> !subscriberThread.isAlive()); + assertThat(mockSubscriber.punsubscribeInfos) + .containsExactly(new MockSubscriber.UnsubscribeInfo("sal*s", 0)); } @Test @@ -305,11 +628,17 @@ public class PubSubIntegrationTest { mockSubscriber.punsubscribe("sal*s"); waitFor(() -> mockSubscriber.getSubscribedChannels() == 1); + assertThat(mockSubscriber.punsubscribeInfos).hasSize(1); + assertThat(mockSubscriber.punsubscribeInfos.get(0).channel).isEqualTo("sal*s"); + assertThat(mockSubscriber.punsubscribeInfos.get(0).count).isEqualTo(1); mockSubscriber.unsubscribe("salutations"); waitFor(() -> mockSubscriber.getSubscribedChannels() == 0); waitFor(() -> !subscriberThread.isAlive()); + assertThat(mockSubscriber.unsubscribeInfos).hasSize(1); + assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("salutations"); + assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(0); assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello"); assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello"); diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java index 52db20e..d3ba73b 100644 --- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java +++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java @@ -47,4 +47,9 @@ public class DummySubscription implements Subscription { public List<Object> createResponse(String channel, byte[] message) { return null; } + + @Override + public String getChannelName() { + return null; + } } diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java index 5ba976e..e0ecb43 100644 --- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java +++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java @@ -19,6 +19,7 @@ package org.apache.geode.redis.mocks; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import redis.clients.jedis.JedisPubSub; @@ -34,6 +35,58 @@ public class MockSubscriber extends JedisPubSub { return new ArrayList<>(receivedPMessages); } + public final List<UnsubscribeInfo> unsubscribeInfos = + Collections.synchronizedList(new ArrayList<>()); + public final List<UnsubscribeInfo> punsubscribeInfos = + Collections.synchronizedList(new ArrayList<>()); + + @Override + public void onUnsubscribe(String channel, int subscribedChannels) { + unsubscribeInfos.add(new UnsubscribeInfo(channel, subscribedChannels)); + } + + @Override + public void onPUnsubscribe(String pattern, int subscribedChannels) { + punsubscribeInfos.add(new UnsubscribeInfo(pattern, subscribedChannels)); + } + + public static class UnsubscribeInfo { + public final String channel; + public final int count; + + public UnsubscribeInfo(String channel, int count) { + this.channel = channel; + this.count = count; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof UnsubscribeInfo)) { + return false; + } + UnsubscribeInfo that = (UnsubscribeInfo) o; + return count == that.count && + Objects.equals(channel, that.channel); + } + + @Override + public int hashCode() { + return Objects.hash(channel, count); + } + + @Override + public String toString() { + return "UnsubscribeInfo{" + + "channel='" + channel + '\'' + + ", count=" + count + + '}'; + } + } + + @Override public void onMessage(String channel, String message) { receivedMessages.add(message); diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java index 0b66d64..9a860bf 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java @@ -46,6 +46,7 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.timeout.WriteTimeoutHandler; import io.netty.util.concurrent.Future; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.Logger; import org.apache.geode.annotations.Experimental; @@ -416,7 +417,8 @@ public class GeodeRedisServer { InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem(); String redisPassword = system.getConfig().getRedisPassword(); - final byte[] redisPasswordBytes = Coder.stringToBytes(redisPassword); + final byte[] redisPasswordBytes = + StringUtils.isBlank(redisPassword) ? null : Coder.stringToBytes(redisPassword); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(socketClass) diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java index 4ec0ac6..2c7044b 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java @@ -164,9 +164,9 @@ public enum RedisCommandType { SUBSCRIBE(new SubscribeExecutor(), SUPPORTED, new MinimumParameterRequirements(2)), PUBLISH(new PublishExecutor(), SUPPORTED, new ExactParameterRequirements(3)), - UNSUBSCRIBE(new UnsubscribeExecutor(), SUPPORTED, new MinimumParameterRequirements(2)), + UNSUBSCRIBE(new UnsubscribeExecutor(), SUPPORTED, new MinimumParameterRequirements(1)), PSUBSCRIBE(new PsubscribeExecutor(), SUPPORTED, new MinimumParameterRequirements(2)), - PUNSUBSCRIBE(new PunsubscribeExecutor(), SUPPORTED, new MinimumParameterRequirements(2)), + PUNSUBSCRIBE(new PunsubscribeExecutor(), SUPPORTED, new MinimumParameterRequirements(1)), UNKNOWN(new UnknownExecutor(), SUPPORTED), diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java index 9b49127..b71ca68 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java @@ -17,10 +17,14 @@ package org.apache.geode.redis.internal.executor.pubsub; import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; import org.apache.logging.log4j.Logger; import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.redis.internal.data.ByteArrayWrapper; import org.apache.geode.redis.internal.executor.AbstractExecutor; import org.apache.geode.redis.internal.executor.GlobPattern; import org.apache.geode.redis.internal.executor.RedisResponse; @@ -33,17 +37,45 @@ public class PunsubscribeExecutor extends AbstractExecutor { @Override public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) { - byte[] pattern = command.getProcessedCommand().get(1); - long subscriptionCount = - context - .getPubSub() - .punsubscribe(new GlobPattern(new String(pattern)), context.getClient()); - - ArrayList<Object> items = new ArrayList<>(); - items.add("punsubscribe"); - items.add(pattern); - items.add(subscriptionCount); - - return RedisResponse.array(items); + + List<String> channelNames = extractChannelNames(command); + if (channelNames.isEmpty()) { + channelNames = context.getPubSub().findSubscribedChannels(context.getClient()); + } + + Collection<Collection<?>> response = punsubscribe(context, channelNames); + + return RedisResponse.flattenedArray(response); + } + + private List<String> extractChannelNames(Command command) { + return command.getProcessedCommandWrappers().stream() + .skip(1) + .map(ByteArrayWrapper::toString) + .collect(Collectors.toList()); + } + + private Collection<Collection<?>> punsubscribe(ExecutionHandlerContext context, + List<String> channelNames) { + Collection<Collection<?>> response = new ArrayList<>(); + + if (channelNames.isEmpty()) { + response.add(createItem(null, 0)); + } else { + for (String channel : channelNames) { + long subscriptionCount = + context.getPubSub().punsubscribe(new GlobPattern(channel), context.getClient()); + response.add(createItem(channel, subscriptionCount)); + } + } + return response; + } + + private ArrayList<Object> createItem(String channel, long subscriptionCount) { + ArrayList<Object> oneItem = new ArrayList<>(); + oneItem.add("punsubscribe"); + oneItem.add(channel); + oneItem.add(subscriptionCount); + return oneItem; } } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/UnsubscribeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/UnsubscribeExecutor.java index 853a29e..09d1a9d 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/UnsubscribeExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/UnsubscribeExecutor.java @@ -16,34 +16,63 @@ package org.apache.geode.redis.internal.executor.pubsub; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; -import org.apache.logging.log4j.Logger; - -import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.redis.internal.data.ByteArrayWrapper; import org.apache.geode.redis.internal.executor.AbstractExecutor; import org.apache.geode.redis.internal.executor.RedisResponse; import org.apache.geode.redis.internal.netty.Command; import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; public class UnsubscribeExecutor extends AbstractExecutor { - private static final Logger logger = LogService.getLogger(); @Override public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) { - List<byte[]> commandElems = command.getProcessedCommand(); - byte[] channelName = commandElems.get(1); - long subscriptionCount = - context.getPubSub().unsubscribe(new String(channelName), context.getClient()); + List<String> channelNames = extractChannelNames(command); + if (channelNames.isEmpty()) { + channelNames = context.getPubSub().findSubscribedChannels(context.getClient()); + } + + Collection<Collection<?>> response = unsubscribe(context, channelNames); + + return RedisResponse.flattenedArray(response); + } + + private List<String> extractChannelNames(Command command) { + return command.getProcessedCommandWrappers().stream() + .skip(1) + .map(ByteArrayWrapper::toString) + .collect(Collectors.toList()); + } + + private Collection<Collection<?>> unsubscribe(ExecutionHandlerContext context, + List<String> channelNames) { + Collection<Collection<?>> response = new ArrayList<>(); + + if (channelNames.isEmpty()) { + response.add(createItem(null, 0)); + } else { + for (String channel : channelNames) { + long subscriptionCount = + context.getPubSub().unsubscribe(channel, context.getClient()); - ArrayList<Object> items = new ArrayList<>(); - items.add("unsubscribe"); - items.add(channelName); - items.add(subscriptionCount); + response.add(createItem(channel, subscriptionCount)); + } + } + + return response; + } - return RedisResponse.array(items); + private ArrayList<Object> createItem(String channelName, long subscriptionCount) { + ArrayList<Object> oneItem = new ArrayList<>(); + oneItem.add("unsubscribe"); + oneItem.add(channelName); + oneItem.add(subscriptionCount); + return oneItem; } } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java index 0f02a7c..6c06c08 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java @@ -352,7 +352,7 @@ public class Coder { } public static byte[] stringToBytes(String string) { - if (string == null || string.equals("")) { + if (string == null) { return null; } try { diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java index 297abb3..7ce9528 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java @@ -182,6 +182,13 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { return; } + if (command.isOfType(RedisCommandType.SELECT) + || command.isOfType(RedisCommandType.CONFIG) + || command.isOfType(RedisCommandType.PUBSUB)) { + writeToChannel(RedisResponse.ok()); + return; + } + if (command.isUnsupported() && !allowUnsupportedCommands()) { writeToChannel( RedisResponse.error(command.getCommandType() + RedisConstants.ERROR_UNSUPPORTED_COMMAND)); diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java index dcf2e50..9e0e4b8 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java @@ -53,4 +53,8 @@ class ChannelSubscription extends AbstractSubscription { return this.channel.equals(channel); } + @Override + public String getChannelName() { + return channel; + } } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java index d19c870..2fbcdfe 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java @@ -54,4 +54,8 @@ class PatternSubscription extends AbstractSubscription { return pattern.matches(channel); } + @Override + public String getChannelName() { + return pattern.globPattern(); + } } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java index 22b9e40..69aebc4 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java @@ -16,6 +16,8 @@ package org.apache.geode.redis.internal.pubsub; +import java.util.List; + import org.apache.geode.cache.Region; import org.apache.geode.redis.internal.data.ByteArrayWrapper; import org.apache.geode.redis.internal.data.RedisData; @@ -77,4 +79,13 @@ public interface PubSub { * @return the number of channels still subscribed to by the client */ long punsubscribe(GlobPattern pattern, Client client); + + /** + * Return a list of channel names that a client has subscribed to + * + * @param client the Client which is to be queried + * @return the list of channels + */ + List<String> findSubscribedChannels(Client client); + } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java index 71deb07..d6b5788 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java @@ -19,6 +19,7 @@ package org.apache.geode.redis.internal.pubsub; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import org.apache.logging.log4j.Logger; @@ -86,23 +87,12 @@ public class PubSubImpl implements PubSub { @Override public long subscribe(String channel, ExecutionHandlerContext context, Client client) { - if (subscriptions.exists(channel, client)) { - return subscriptions.findSubscriptions(client).size(); - } - Subscription subscription = new ChannelSubscription(client, channel, context); - subscriptions.add(subscription); - return subscriptions.findSubscriptions(client).size(); + return subscriptions.subscribe(channel, context, client); } @Override public long psubscribe(GlobPattern pattern, ExecutionHandlerContext context, Client client) { - if (subscriptions.exists(pattern, client)) { - return subscriptions.findSubscriptions(client).size(); - } - Subscription subscription = new PatternSubscription(client, pattern, context); - subscriptions.add(subscription); - - return subscriptions.findSubscriptions(client).size(); + return subscriptions.psubscribe(pattern, context, client); } private void registerPublishFunction() { @@ -135,14 +125,19 @@ public class PubSubImpl implements PubSub { @Override public long unsubscribe(String channel, Client client) { - subscriptions.remove(channel, client); - return subscriptions.findSubscriptions(client).size(); + return subscriptions.unsubscribe(channel, client); } @Override public long punsubscribe(GlobPattern pattern, Client client) { - subscriptions.remove(pattern, client); - return subscriptions.findSubscriptions(client).size(); + return subscriptions.unsubscribe(pattern, client); + } + + @Override + public List<String> findSubscribedChannels(Client client) { + return subscriptions.findSubscriptions(client).stream() + .map(Subscription::getChannelName) + .collect(Collectors.toList()); } @VisibleForTesting diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java index 4490917..7695a17 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java @@ -51,4 +51,10 @@ public interface Subscription { * The response dependent on the type of the subscription */ List<Object> createResponse(String channel, byte[] message); + + /** + * Return the subscription name. In the case of a pattern the string representation of the + * pattern is returned. + */ + String getChannelName(); } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java index fc73a79..8f7e732 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java @@ -20,7 +20,10 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; +import org.apache.geode.annotations.VisibleForTesting; +import org.apache.geode.redis.internal.executor.GlobPattern; import org.apache.geode.redis.internal.netty.Client; +import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; /** * Class that manages both channel and pattern subscriptions. @@ -34,7 +37,8 @@ public class Subscriptions { * @param channelOrPattern channel or pattern * @param client a client connection */ - public boolean exists(Object channelOrPattern, Client client) { + @VisibleForTesting + boolean exists(Object channelOrPattern, Client client) { return subscriptions.stream() .anyMatch(subscription -> subscription.isEqualTo(channelOrPattern, client)); } @@ -66,7 +70,8 @@ public class Subscriptions { /** * Add a new subscription */ - public void add(Subscription subscription) { + @VisibleForTesting + void add(Subscription subscription) { subscriptions.add(subscription); } @@ -80,14 +85,38 @@ public class Subscriptions { /** * Remove a single subscription */ - public void remove(Object channel, Client client) { - subscriptions.removeIf(subscription -> subscription.isEqualTo(channel, client)); + @VisibleForTesting + boolean remove(Object channel, Client client) { + return subscriptions.removeIf(subscription -> subscription.isEqualTo(channel, client)); } /** * @return the total number of all local subscriptions */ - public int size() { + @VisibleForTesting + int size() { return subscriptions.size(); } + + public synchronized long subscribe(String channel, ExecutionHandlerContext context, + Client client) { + if (!exists(channel, client)) { + add(new ChannelSubscription(client, channel, context)); + } + return findSubscriptions(client).size(); + } + + public synchronized long psubscribe(GlobPattern pattern, ExecutionHandlerContext context, + Client client) { + if (!exists(pattern, client)) { + add(new PatternSubscription(client, pattern, context)); + } + return findSubscriptions(client).size(); + } + + public synchronized long unsubscribe(Object channelOrPattern, Client client) { + remove(channelOrPattern, client); + return findSubscriptions(client).size(); + } + } diff --git a/geode-redis/src/test/resources/expected-pom.xml b/geode-redis/src/test/resources/expected-pom.xml index b3f8d0b..c17b109 100644 --- a/geode-redis/src/test/resources/expected-pom.xml +++ b/geode-redis/src/test/resources/expected-pom.xml @@ -134,5 +134,16 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <scope>runtime</scope> + <exclusions> + <exclusion> + <artifactId>spring-boot-starter-logging</artifactId> + <groupId>org.springframework.boot</groupId> + </exclusion> + </exclusions> + </dependency> </dependencies> </project>