DonalEvans commented on a change in pull request #6604:
URL: https://github.com/apache/geode/pull/6604#discussion_r649618574
##########
File path:
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/publishAndSubscribe/AbstractPubSubIntegrationTest.java
##########
@@ -161,7 +163,8 @@ public void
channels_shouldOnlyReturnChannelsWithActiveSubscribers() {
@Test
public void
channels_shouldNotReturnDuplicates_givenMultipleSubscribersToSameChannel_whenCalledWithoutPattern()
{
- Jedis subscriber2 = new Jedis("localhost", getPort(), JEDIS_TIMEOUT);
+ Jedis subscriber2 =
+ new Jedis("localhost", getPort(),
AbstractPublishAndSubscribeIntegrationTest.JEDIS_TIMEOUT);
Review comment:
This would be better as
```
Jedis subscriber2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT);
```
using the `BIND_ADDRESS` constant from `RedisClusterStartupRule`.
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PubSubExecutor.java
##########
@@ -33,28 +33,57 @@
@Override
public RedisResponse executeCommand(Command command, ExecutionHandlerContext
context) {
- byte[] subCommand = command.getProcessedCommand().get(1);
+ List<byte[]> processedCommand = command.getProcessedCommand();
+ String subCommand =
Coder.bytesToString(processedCommand.get(1)).toLowerCase();
- if (!Arrays.equals(subCommand, Coder.stringToBytes("channels"))) {
- return RedisResponse
- .error(String.format(ERROR_UNKNOWN_PUBSUB_SUBCOMMAND, new
String(subCommand)));
- }
+ List<Object> response;
+
+ switch (subCommand) {
+ case "channels":
+ // in a subsequent story, a new parameter requirement class
+ // specific to subCommands might be a better way to do this
+ if (processedCommand.size() > 3) {
+ return RedisResponse
+ .error(String.format(ERROR_UNKNOWN_PUBSUB_SUBCOMMAND,
subCommand));
+ }
+
+ response = doChannels(processedCommand, context);
+ break;
+
+ case "numsub":
+ response = doNumsub(processedCommand, context);
+ break;
+
+ default:
+ return RedisResponse
+ .error(String.format(ERROR_UNKNOWN_PUBSUB_SUBCOMMAND, subCommand));
- // in a subsequent story, a new parameter requirement class
- // specific to subCommands might be a better way to do this
- if (command.getProcessedCommand().size() > 3) {
- return RedisResponse
- .error(String.format(ERROR_UNKNOWN_PUBSUB_SUBCOMMAND, new
String(subCommand)));
}
- List<byte[]> response;
- if (command.getProcessedCommand().size() > 2) {
- byte[] pattern = command.getProcessedCommand().get(2);
+ return RedisResponse.array(response);
Review comment:
To avoid having to change the return type of the `findChannelNames()`
methods, you could return directly from each switch case, which would mean that
you wouldn't need to define `List<Object> response` above.
##########
File path:
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/publishAndSubscribe/SubscriptionsJUnitTest.java
##########
@@ -318,8 +319,74 @@ public void
findChannelNames_shouldNotReturnDuplicates_givenMultipleSubscription
subject.add(subscriptionFoo1);
subject.add(subscriptionFoo2);
- List<byte[]> result = subject.findChannelNames(Coder.stringToBytes("f*"));
+ List<Object> result = subject.findChannelNames(Coder.stringToBytes("f*"));
assertThat(result).containsExactlyInAnyOrder(Coder.stringToBytes("foo"));
}
+
+ @Test
+ public void
findNumberOfSubscribersByChannel_shouldReturnListOfChannelsAndSubscriberCount_givenListOfActiveChannels()
{
+ Subscriptions subject = new Subscriptions();
+
+ Channel channel = mock(Channel.class);
+ when(channel.closeFuture()).thenReturn(mock(ChannelFuture.class));
+ Client client = new Client(channel);
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+
+ Subscription subscriptionFoo1 =
+ new ChannelSubscription(client, Coder.stringToBytes("foo"), context,
subject);
+
+ Subscription subscriptionFoo2 =
+ new ChannelSubscription(client, Coder.stringToBytes("foo"), context,
subject);
+
+ subject.add(subscriptionFoo1);
+ subject.add(subscriptionFoo2);
+
+ List<byte[]> channels = new ArrayList<>();
+ channels.add(Coder.stringToBytes("foo"));
+
+ List<Object> actual = subject.findNumberOfSubscribersForChannel(channels);
+
+ assertThat(actual.get(0)).isEqualTo(Coder.stringToBytes("foo"));
+
+ assertThat(actual.get(1)).isEqualTo(2L);
+ }
+
+ @Test
+ public void
findNumberOfSubscribersByChannel_shouldReturnChannelNameAndZero_givenInactiveActiveChannel()
{
+ Subscriptions subject = new Subscriptions();
+
+ List<byte[]> channels = new ArrayList<>();
+ channels.add(Coder.stringToBytes("bar"));
+
+ ArrayList<Object> result =
+ (ArrayList<Object>)
subject.findNumberOfSubscribersForChannel(channels);
+
+ assertThat(result.get(0)).isEqualTo(Coder.stringToBytes("bar"));
+ assertThat(result.get(1)).isEqualTo(0L);
+ }
+
+
+ @Test
+ public void
findNumberOfSubscribersByChannel_shouldPatterAndZero_givenPatternSubscription()
{
Review comment:
It seems like there's a typo in this test name.
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/publishAndSubscribe/PubSub.java
##########
@@ -96,7 +96,9 @@ long publish(
*/
List<byte[]> findSubscriptionNames(Client client);
- List<byte[]> findChannelNames();
+ List<Object> findChannelNames();
- List<byte[]> findChannelNames(byte[] pattern);
+ List<Object> findChannelNames(byte[] pattern);
Review comment:
These can (and should, I think) go back to `List<byte[]>` if the change
in `PubSubExecutor` is applied.
##########
File path:
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/publishAndSubscribe/AbstractPubSubIntegrationTest.java
##########
@@ -191,6 +194,91 @@ public void
channels_shouldNotReturnDuplicates_givenMultipleSubscribersToSameCha
subscriber2.close();
}
+ /** -- NUMSUB-- **/
+
+ @Test
+ public void numsub_shouldReturnEmptyList_whenCalledWithOutChannelNames() {
+ Runnable fooRunnable =
+ () -> subscriber.subscribe(mockSubscriber, "foo");
+ Thread fooSubscriberThread = new Thread(fooRunnable);
+ fooSubscriberThread.start();
+
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+ List<Object> result =
+ uncheckedCast(
+ introspector.sendCommand(Protocol.Command.PUBSUB, PUBSUB_NUMSUB));
+
+ assertThat(result).isEmpty();
+ }
+
+
+ @Test
+ public void
numsub_shouldReturnListOfChannelsWithSubscriberCount_whenCalledWithActiveChannels()
{
+
+ Jedis subscriber2 = new Jedis("localhost", getPort());
Review comment:
Similar to above, could this be:
```
Jedis subscriber2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT);
```
##########
File path:
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/publishAndSubscribe/SubscriptionsJUnitTest.java
##########
@@ -318,8 +319,74 @@ public void
findChannelNames_shouldNotReturnDuplicates_givenMultipleSubscription
subject.add(subscriptionFoo1);
subject.add(subscriptionFoo2);
- List<byte[]> result = subject.findChannelNames(Coder.stringToBytes("f*"));
+ List<Object> result = subject.findChannelNames(Coder.stringToBytes("f*"));
assertThat(result).containsExactlyInAnyOrder(Coder.stringToBytes("foo"));
}
+
+ @Test
+ public void
findNumberOfSubscribersByChannel_shouldReturnListOfChannelsAndSubscriberCount_givenListOfActiveChannels()
{
+ Subscriptions subject = new Subscriptions();
+
+ Channel channel = mock(Channel.class);
+ when(channel.closeFuture()).thenReturn(mock(ChannelFuture.class));
+ Client client = new Client(channel);
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+
+ Subscription subscriptionFoo1 =
+ new ChannelSubscription(client, Coder.stringToBytes("foo"), context,
subject);
Review comment:
Can the byte arrays/strings in these tests be extracted to variables
(when they're referred to in multiple places)?
##########
File path:
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/publishAndSubscribe/SubscriptionsJUnitTest.java
##########
@@ -318,8 +319,74 @@ public void
findChannelNames_shouldNotReturnDuplicates_givenMultipleSubscription
subject.add(subscriptionFoo1);
subject.add(subscriptionFoo2);
- List<byte[]> result = subject.findChannelNames(Coder.stringToBytes("f*"));
+ List<Object> result = subject.findChannelNames(Coder.stringToBytes("f*"));
assertThat(result).containsExactlyInAnyOrder(Coder.stringToBytes("foo"));
}
+
+ @Test
+ public void
findNumberOfSubscribersByChannel_shouldReturnListOfChannelsAndSubscriberCount_givenListOfActiveChannels()
{
+ Subscriptions subject = new Subscriptions();
+
+ Channel channel = mock(Channel.class);
+ when(channel.closeFuture()).thenReturn(mock(ChannelFuture.class));
+ Client client = new Client(channel);
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+
+ Subscription subscriptionFoo1 =
+ new ChannelSubscription(client, Coder.stringToBytes("foo"), context,
subject);
+
+ Subscription subscriptionFoo2 =
+ new ChannelSubscription(client, Coder.stringToBytes("foo"), context,
subject);
+
+ subject.add(subscriptionFoo1);
+ subject.add(subscriptionFoo2);
+
+ List<byte[]> channels = new ArrayList<>();
+ channels.add(Coder.stringToBytes("foo"));
+
+ List<Object> actual = subject.findNumberOfSubscribersForChannel(channels);
+
+ assertThat(actual.get(0)).isEqualTo(Coder.stringToBytes("foo"));
+
+ assertThat(actual.get(1)).isEqualTo(2L);
+ }
+
+ @Test
+ public void
findNumberOfSubscribersByChannel_shouldReturnChannelNameAndZero_givenInactiveActiveChannel()
{
+ Subscriptions subject = new Subscriptions();
+
+ List<byte[]> channels = new ArrayList<>();
+ channels.add(Coder.stringToBytes("bar"));
+
+ ArrayList<Object> result =
+ (ArrayList<Object>)
subject.findNumberOfSubscribersForChannel(channels);
Review comment:
The cast here can be removed if this is changed to:
```
List<Object> result =
subject.findNumberOfSubscribersForChannel(channels);
```
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/publishAndSubscribe/Subscriptions.java
##########
@@ -73,28 +73,52 @@ boolean exists(Object channelOrPattern, Client client) {
.collect(Collectors.toList());
}
- public List<byte[]> findChannelNames() {
+ public List<Object> findChannelNames() {
ObjectOpenCustomHashSet<byte[]> hashSet =
new ObjectOpenCustomHashSet<>(ByteArrays.HASH_STRATEGY);
- findChannels()
+ findChannelSubscriptions()
.forEach(channel -> hashSet.add(channel.getSubscriptionName()));
return new ArrayList<>(hashSet);
}
- public List<byte[]> findChannelNames(byte[] pattern) {
+ public List<Object> findChannelNames(byte[] pattern) {
GlobPattern globPattern = new GlobPattern(Coder.bytesToString(pattern));
return findChannelNames()
.stream()
- .filter(name -> globPattern.matches(Coder.bytesToString(name)))
+ .filter(name -> globPattern.matches(Coder.bytesToString((byte[])
name)))
.collect(Collectors.toList());
}
- private Stream<ChannelSubscription> findChannels() {
+ public List<Object> findNumberOfSubscribersForChannel(List<byte[]> names) {
+
+ List<Object> result = new ArrayList<>();
+
+ names.forEach(name -> {
+
+ List<Subscription> subscriptions =
+ findSubscriptions(name)
+ .stream()
+ .filter(subscription -> subscription instanceof
ChannelSubscription)
+ .collect(Collectors.toList());
+
+ if (subscriptions.isEmpty()) {
+ result.add(name);
+ result.add(0L);
+ } else {
+ result.add(name);
+ result.add((long) subscriptions.size());
+ }
Review comment:
This can be simplified to:
```
Long subscriptions =
findSubscriptions(name)
.stream()
.filter(subscription -> subscription instanceof
ChannelSubscription)
.count();
result.add(name);
result.add(subscriptions);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]