DonalEvans commented on a change in pull request #6604:
URL: https://github.com/apache/geode/pull/6604#discussion_r649618574



##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/publishAndSubscribe/AbstractPubSubIntegrationTest.java
##########
@@ -161,7 +163,8 @@ public void 
channels_shouldOnlyReturnChannelsWithActiveSubscribers() {
   @Test
   public void 
channels_shouldNotReturnDuplicates_givenMultipleSubscribersToSameChannel_whenCalledWithoutPattern()
 {
 
-    Jedis subscriber2 = new Jedis("localhost", getPort(), JEDIS_TIMEOUT);
+    Jedis subscriber2 =
+        new Jedis("localhost", getPort(), 
AbstractPublishAndSubscribeIntegrationTest.JEDIS_TIMEOUT);

Review comment:
       This would be better as 
   ```
   Jedis subscriber2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT);
   ```
   using the `BIND_ADDRESS` constant from `RedisClusterStartupRule`.

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PubSubExecutor.java
##########
@@ -33,28 +33,57 @@
   @Override
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext 
context) {
 
-    byte[] subCommand = command.getProcessedCommand().get(1);
+    List<byte[]> processedCommand = command.getProcessedCommand();
+    String subCommand = 
Coder.bytesToString(processedCommand.get(1)).toLowerCase();
 
-    if (!Arrays.equals(subCommand, Coder.stringToBytes("channels"))) {
-      return RedisResponse
-          .error(String.format(ERROR_UNKNOWN_PUBSUB_SUBCOMMAND, new 
String(subCommand)));
-    }
+    List<Object> response;
+
+    switch (subCommand) {
+      case "channels":
+        // in a subsequent story, a new parameter requirement class
+        // specific to subCommands might be a better way to do this
+        if (processedCommand.size() > 3) {
+          return RedisResponse
+              .error(String.format(ERROR_UNKNOWN_PUBSUB_SUBCOMMAND, 
subCommand));
+        }
+
+        response = doChannels(processedCommand, context);
+        break;
+
+      case "numsub":
+        response = doNumsub(processedCommand, context);
+        break;
+
+      default:
+        return RedisResponse
+            .error(String.format(ERROR_UNKNOWN_PUBSUB_SUBCOMMAND, subCommand));
 
-    // in a subsequent story, a new parameter requirement class
-    // specific to subCommands might be a better way to do this
-    if (command.getProcessedCommand().size() > 3) {
-      return RedisResponse
-          .error(String.format(ERROR_UNKNOWN_PUBSUB_SUBCOMMAND, new 
String(subCommand)));
     }
 
-    List<byte[]> response;
-    if (command.getProcessedCommand().size() > 2) {
-      byte[] pattern = command.getProcessedCommand().get(2);
+    return RedisResponse.array(response);

Review comment:
       To avoid having to change the return type of the `findChannelNames()` 
methods, you could return directly from each switch case, which would mean that 
you wouldn't need to define `List<Object> response` above.

##########
File path: 
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/publishAndSubscribe/SubscriptionsJUnitTest.java
##########
@@ -318,8 +319,74 @@ public void 
findChannelNames_shouldNotReturnDuplicates_givenMultipleSubscription
     subject.add(subscriptionFoo1);
     subject.add(subscriptionFoo2);
 
-    List<byte[]> result = subject.findChannelNames(Coder.stringToBytes("f*"));
+    List<Object> result = subject.findChannelNames(Coder.stringToBytes("f*"));
 
     assertThat(result).containsExactlyInAnyOrder(Coder.stringToBytes("foo"));
   }
+
+  @Test
+  public void 
findNumberOfSubscribersByChannel_shouldReturnListOfChannelsAndSubscriberCount_givenListOfActiveChannels()
 {
+    Subscriptions subject = new Subscriptions();
+
+    Channel channel = mock(Channel.class);
+    when(channel.closeFuture()).thenReturn(mock(ChannelFuture.class));
+    Client client = new Client(channel);
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+
+    Subscription subscriptionFoo1 =
+        new ChannelSubscription(client, Coder.stringToBytes("foo"), context, 
subject);
+
+    Subscription subscriptionFoo2 =
+        new ChannelSubscription(client, Coder.stringToBytes("foo"), context, 
subject);
+
+    subject.add(subscriptionFoo1);
+    subject.add(subscriptionFoo2);
+
+    List<byte[]> channels = new ArrayList<>();
+    channels.add(Coder.stringToBytes("foo"));
+
+    List<Object> actual = subject.findNumberOfSubscribersForChannel(channels);
+
+    assertThat(actual.get(0)).isEqualTo(Coder.stringToBytes("foo"));
+
+    assertThat(actual.get(1)).isEqualTo(2L);
+  }
+
+  @Test
+  public void 
findNumberOfSubscribersByChannel_shouldReturnChannelNameAndZero_givenInactiveActiveChannel()
 {
+    Subscriptions subject = new Subscriptions();
+
+    List<byte[]> channels = new ArrayList<>();
+    channels.add(Coder.stringToBytes("bar"));
+
+    ArrayList<Object> result =
+        (ArrayList<Object>) 
subject.findNumberOfSubscribersForChannel(channels);
+
+    assertThat(result.get(0)).isEqualTo(Coder.stringToBytes("bar"));
+    assertThat(result.get(1)).isEqualTo(0L);
+  }
+
+
+  @Test
+  public void 
findNumberOfSubscribersByChannel_shouldPatterAndZero_givenPatternSubscription() 
{

Review comment:
       It seems like there's a typo in this test name.

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/publishAndSubscribe/PubSub.java
##########
@@ -96,7 +96,9 @@ long publish(
    */
   List<byte[]> findSubscriptionNames(Client client);
 
-  List<byte[]> findChannelNames();
+  List<Object> findChannelNames();
 
-  List<byte[]> findChannelNames(byte[] pattern);
+  List<Object> findChannelNames(byte[] pattern);

Review comment:
       These can (and should, I think) go back to `List<byte[]>` if the change 
in `PubSubExecutor` is applied.

##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/publishAndSubscribe/AbstractPubSubIntegrationTest.java
##########
@@ -191,6 +194,91 @@ public void 
channels_shouldNotReturnDuplicates_givenMultipleSubscribersToSameCha
     subscriber2.close();
   }
 
+  /** -- NUMSUB-- **/
+
+  @Test
+  public void numsub_shouldReturnEmptyList_whenCalledWithOutChannelNames() {
+    Runnable fooRunnable =
+        () -> subscriber.subscribe(mockSubscriber, "foo");
+    Thread fooSubscriberThread = new Thread(fooRunnable);
+    fooSubscriberThread.start();
+
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+    List<Object> result =
+        uncheckedCast(
+            introspector.sendCommand(Protocol.Command.PUBSUB, PUBSUB_NUMSUB));
+
+    assertThat(result).isEmpty();
+  }
+
+
+  @Test
+  public void 
numsub_shouldReturnListOfChannelsWithSubscriberCount_whenCalledWithActiveChannels()
 {
+
+    Jedis subscriber2 = new Jedis("localhost", getPort());

Review comment:
       Similar to above, could this be:
   ```
   Jedis subscriber2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT);
   ```

##########
File path: 
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/publishAndSubscribe/SubscriptionsJUnitTest.java
##########
@@ -318,8 +319,74 @@ public void 
findChannelNames_shouldNotReturnDuplicates_givenMultipleSubscription
     subject.add(subscriptionFoo1);
     subject.add(subscriptionFoo2);
 
-    List<byte[]> result = subject.findChannelNames(Coder.stringToBytes("f*"));
+    List<Object> result = subject.findChannelNames(Coder.stringToBytes("f*"));
 
     assertThat(result).containsExactlyInAnyOrder(Coder.stringToBytes("foo"));
   }
+
+  @Test
+  public void 
findNumberOfSubscribersByChannel_shouldReturnListOfChannelsAndSubscriberCount_givenListOfActiveChannels()
 {
+    Subscriptions subject = new Subscriptions();
+
+    Channel channel = mock(Channel.class);
+    when(channel.closeFuture()).thenReturn(mock(ChannelFuture.class));
+    Client client = new Client(channel);
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+
+    Subscription subscriptionFoo1 =
+        new ChannelSubscription(client, Coder.stringToBytes("foo"), context, 
subject);

Review comment:
       Can the byte arrays/strings in these tests be extracted to variables 
(when they're referred to in multiple places)?

##########
File path: 
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/publishAndSubscribe/SubscriptionsJUnitTest.java
##########
@@ -318,8 +319,74 @@ public void 
findChannelNames_shouldNotReturnDuplicates_givenMultipleSubscription
     subject.add(subscriptionFoo1);
     subject.add(subscriptionFoo2);
 
-    List<byte[]> result = subject.findChannelNames(Coder.stringToBytes("f*"));
+    List<Object> result = subject.findChannelNames(Coder.stringToBytes("f*"));
 
     assertThat(result).containsExactlyInAnyOrder(Coder.stringToBytes("foo"));
   }
+
+  @Test
+  public void 
findNumberOfSubscribersByChannel_shouldReturnListOfChannelsAndSubscriberCount_givenListOfActiveChannels()
 {
+    Subscriptions subject = new Subscriptions();
+
+    Channel channel = mock(Channel.class);
+    when(channel.closeFuture()).thenReturn(mock(ChannelFuture.class));
+    Client client = new Client(channel);
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+
+    Subscription subscriptionFoo1 =
+        new ChannelSubscription(client, Coder.stringToBytes("foo"), context, 
subject);
+
+    Subscription subscriptionFoo2 =
+        new ChannelSubscription(client, Coder.stringToBytes("foo"), context, 
subject);
+
+    subject.add(subscriptionFoo1);
+    subject.add(subscriptionFoo2);
+
+    List<byte[]> channels = new ArrayList<>();
+    channels.add(Coder.stringToBytes("foo"));
+
+    List<Object> actual = subject.findNumberOfSubscribersForChannel(channels);
+
+    assertThat(actual.get(0)).isEqualTo(Coder.stringToBytes("foo"));
+
+    assertThat(actual.get(1)).isEqualTo(2L);
+  }
+
+  @Test
+  public void 
findNumberOfSubscribersByChannel_shouldReturnChannelNameAndZero_givenInactiveActiveChannel()
 {
+    Subscriptions subject = new Subscriptions();
+
+    List<byte[]> channels = new ArrayList<>();
+    channels.add(Coder.stringToBytes("bar"));
+
+    ArrayList<Object> result =
+        (ArrayList<Object>) 
subject.findNumberOfSubscribersForChannel(channels);

Review comment:
       The cast here can be removed if this is changed to:
   ```
       List<Object> result = 
subject.findNumberOfSubscribersForChannel(channels);
   ```

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/publishAndSubscribe/Subscriptions.java
##########
@@ -73,28 +73,52 @@ boolean exists(Object channelOrPattern, Client client) {
         .collect(Collectors.toList());
   }
 
-  public List<byte[]> findChannelNames() {
+  public List<Object> findChannelNames() {
 
     ObjectOpenCustomHashSet<byte[]> hashSet =
         new ObjectOpenCustomHashSet<>(ByteArrays.HASH_STRATEGY);
 
-    findChannels()
+    findChannelSubscriptions()
         .forEach(channel -> hashSet.add(channel.getSubscriptionName()));
 
     return new ArrayList<>(hashSet);
   }
 
-  public List<byte[]> findChannelNames(byte[] pattern) {
+  public List<Object> findChannelNames(byte[] pattern) {
 
     GlobPattern globPattern = new GlobPattern(Coder.bytesToString(pattern));
 
     return findChannelNames()
         .stream()
-        .filter(name -> globPattern.matches(Coder.bytesToString(name)))
+        .filter(name -> globPattern.matches(Coder.bytesToString((byte[]) 
name)))
         .collect(Collectors.toList());
   }
 
-  private Stream<ChannelSubscription> findChannels() {
+  public List<Object> findNumberOfSubscribersForChannel(List<byte[]> names) {
+
+    List<Object> result = new ArrayList<>();
+
+    names.forEach(name -> {
+
+      List<Subscription> subscriptions =
+          findSubscriptions(name)
+              .stream()
+              .filter(subscription -> subscription instanceof 
ChannelSubscription)
+              .collect(Collectors.toList());
+
+      if (subscriptions.isEmpty()) {
+        result.add(name);
+        result.add(0L);
+      } else {
+        result.add(name);
+        result.add((long) subscriptions.size());
+      }

Review comment:
       This can be simplified to:
   ```
         Long subscriptions =
             findSubscriptions(name)
                 .stream()
                 .filter(subscription -> subscription instanceof 
ChannelSubscription)
                 .count();
   
           result.add(name);
           result.add(subscriptions);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to