DonalEvans commented on a change in pull request #6493:
URL: https://github.com/apache/geode/pull/6493#discussion_r639312347
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
##########
@@ -67,6 +73,33 @@ boolean exists(Object channelOrPattern, Client client) {
.collect(Collectors.toList());
}
+ public List<byte[]> findChannelNames() {
+
+ ObjectOpenCustomHashSet<byte[]> hashSet =
+ new ObjectOpenCustomHashSet<>(ByteArrays.HASH_STRATEGY);
+
+ findChannels()
+ .forEach(channel -> hashSet.add(channel.getSubscriptionName()));
Review comment:
To avoid set resizing here, would it be worth creating a list of byte
arrays, then passing that value to the hash set constructor?
```
List<byte[]> names = findChannels()
.map(ChannelSubscription::getSubscriptionName)
.collect(Collectors.toList());
ObjectOpenCustomHashSet<byte[]> hashSet = new
ObjectOpenCustomHashSet<>(names, ByteArrays.HASH_STRATEGY);
```
I don't know if it's actually better that way though, since I don't know
what the relative cost is of set resizing versus creating the list.
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PubSubExecutor.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ *
+ */
+
+
+package org.apache.geode.redis.internal.executor.pubsub;
+
+import static
org.apache.geode.redis.internal.RedisConstants.ERROR_UNKNOWN_PUBSUB_SUBCOMMAND;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.geode.redis.internal.executor.Executor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class PubSubExecutor implements Executor {
+
+ @Override
+ public RedisResponse executeCommand(Command command, ExecutionHandlerContext
context) {
+
+ List<byte[]> response;
Review comment:
This declaration can be moved down to line 50, to be as close to its
first use as possible, since it's possible we return before we need it.
##########
File path:
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractSubCommandsIntegrationTest.java
##########
@@ -0,0 +1,203 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal.executor.pubsub;
+
+import static
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtMostNArgsForSubCommand;
+import static
org.apache.geode.redis.internal.RedisConstants.ERROR_UNKNOWN_PUBSUB_SUBCOMMAND;
+import static
org.apache.geode.redis.internal.executor.pubsub.AbstractPubSubIntegrationTest.JEDIS_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.mocks.MockSubscriber;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractSubCommandsIntegrationTest implements
RedisIntegrationTest {
+ private Jedis subscriber;
+ private Jedis introspector;
+ private MockSubscriber mockSubscriber;
+
+ @Before
+ public void setup() {
+ mockSubscriber = new MockSubscriber();
+ subscriber = new Jedis("localhost", getPort());
+ introspector = new Jedis("localhost", getPort());
+ }
+
+ @After
+ public void teardown() {
+ if (mockSubscriber.getSubscribedChannels() > 0) {
+ mockSubscriber.unsubscribe();
+ }
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+ }
+
+ @Test
+ public void pubsub_shouldError_givenTooFewArguments() {
+ assertAtLeastNArgs(introspector, Protocol.Command.PUBSUB, 1);
+ }
+
+ @Test
+ public void channels_shouldError_givenTooManyArguments() {
+ assertAtMostNArgsForSubCommand(introspector,
+ Protocol.Command.PUBSUB,
+ "channels".getBytes(),
+ 1);
+ }
+
+ @Test
+ public void pubsub_shouldReturnError_givenUnknownSubcommand() {
+ String expected = String.format(ERROR_UNKNOWN_PUBSUB_SUBCOMMAND,
"nonesuch");
+
+ assertThatThrownBy(() -> introspector.sendCommand(Protocol.Command.PUBSUB,
"nonesuch"))
+ .hasMessageContaining(expected);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void
channels_shouldReturnListOfAllChannels_withActiveChannelSubscribers_whenCalledWithoutPattern()
{
+ ArrayList<byte[]> expectedChannels = new ArrayList<>();
Review comment:
This (and other uses of concrete types instead of interfaces in
declarations in this class) would be better as `List<byte[]> expectedChannels =
new ArrayList<>();`
##########
File path:
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractSubCommandsIntegrationTest.java
##########
@@ -0,0 +1,203 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal.executor.pubsub;
+
+import static
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtMostNArgsForSubCommand;
+import static
org.apache.geode.redis.internal.RedisConstants.ERROR_UNKNOWN_PUBSUB_SUBCOMMAND;
+import static
org.apache.geode.redis.internal.executor.pubsub.AbstractPubSubIntegrationTest.JEDIS_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.mocks.MockSubscriber;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractSubCommandsIntegrationTest implements
RedisIntegrationTest {
+ private Jedis subscriber;
+ private Jedis introspector;
+ private MockSubscriber mockSubscriber;
+
+ @Before
+ public void setup() {
+ mockSubscriber = new MockSubscriber();
+ subscriber = new Jedis("localhost", getPort());
+ introspector = new Jedis("localhost", getPort());
+ }
+
+ @After
+ public void teardown() {
+ if (mockSubscriber.getSubscribedChannels() > 0) {
+ mockSubscriber.unsubscribe();
+ }
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+ }
+
+ @Test
+ public void pubsub_shouldError_givenTooFewArguments() {
+ assertAtLeastNArgs(introspector, Protocol.Command.PUBSUB, 1);
+ }
+
+ @Test
+ public void channels_shouldError_givenTooManyArguments() {
+ assertAtMostNArgsForSubCommand(introspector,
+ Protocol.Command.PUBSUB,
+ "channels".getBytes(),
+ 1);
+ }
+
+ @Test
+ public void pubsub_shouldReturnError_givenUnknownSubcommand() {
+ String expected = String.format(ERROR_UNKNOWN_PUBSUB_SUBCOMMAND,
"nonesuch");
+
+ assertThatThrownBy(() -> introspector.sendCommand(Protocol.Command.PUBSUB,
"nonesuch"))
+ .hasMessageContaining(expected);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void
channels_shouldReturnListOfAllChannels_withActiveChannelSubscribers_whenCalledWithoutPattern()
{
+ ArrayList<byte[]> expectedChannels = new ArrayList<>();
+ expectedChannels.add("foo".getBytes());
+ expectedChannels.add("bar".getBytes());
+ Runnable runnable =
+ () -> subscriber.subscribe(mockSubscriber, "foo", "bar");
+ Thread subscriberThread = new Thread(runnable);
+
+ subscriberThread.start();
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
+ ArrayList<byte[]> result = (ArrayList<byte[]>) introspector
+ .sendCommand(Protocol.Command.PUBSUB, "channels");
Review comment:
Another use of a concrete type instead of an interface here. Also, the
unchecked cast warnings in this class can be resolved by using the
`uncheckedCast()` static method:
```
List<byte[]> result =
uncheckedCast(introspector.sendCommand(Protocol.Command.PUBSUB, "channels"));
```
##########
File path:
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/pubsub/SubscriptionsJUnitTest.java
##########
@@ -195,4 +197,119 @@ public void removeByClientAndPattern() {
channelSubscriberOne,
channelSubscriberTwo);
}
+
+ @Test
+ public void
findChannelNames_shouldReturnAllChannelNames_whenCalledWithoutParameter() {
+ Subscriptions subject = new Subscriptions();
+
+ Channel channel = mock(Channel.class);
+ when(channel.closeFuture()).thenReturn(mock(ChannelFuture.class));
+ Client client = new Client(channel);
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+
+ Subscription subscriptionFoo =
+ new ChannelSubscription(client, "foo".getBytes(), context, subject);
Review comment:
Could the calls to `.getBytes()` in this class be replaced by calls to
`Coder.stringToBytes()`? There's a plan to have this be consistent throughout
the module at some point, so not introducing any other cases where `Coder`
isn't used will save time in the long run.
##########
File path:
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractSubCommandsIntegrationTest.java
##########
@@ -0,0 +1,203 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal.executor.pubsub;
+
+import static
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtMostNArgsForSubCommand;
+import static
org.apache.geode.redis.internal.RedisConstants.ERROR_UNKNOWN_PUBSUB_SUBCOMMAND;
+import static
org.apache.geode.redis.internal.executor.pubsub.AbstractPubSubIntegrationTest.JEDIS_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.mocks.MockSubscriber;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractSubCommandsIntegrationTest implements
RedisIntegrationTest {
+ private Jedis subscriber;
+ private Jedis introspector;
+ private MockSubscriber mockSubscriber;
+
+ @Before
+ public void setup() {
+ mockSubscriber = new MockSubscriber();
+ subscriber = new Jedis("localhost", getPort());
+ introspector = new Jedis("localhost", getPort());
+ }
+
+ @After
+ public void teardown() {
+ if (mockSubscriber.getSubscribedChannels() > 0) {
+ mockSubscriber.unsubscribe();
+ }
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+ }
+
+ @Test
+ public void pubsub_shouldError_givenTooFewArguments() {
+ assertAtLeastNArgs(introspector, Protocol.Command.PUBSUB, 1);
+ }
+
+ @Test
+ public void channels_shouldError_givenTooManyArguments() {
+ assertAtMostNArgsForSubCommand(introspector,
+ Protocol.Command.PUBSUB,
+ "channels".getBytes(),
Review comment:
Could uses of `.getBytes()` in this class be replaced with calls to
`Coder.stringToBytes()` please? Also, instead of hard-coding "channels"
throughout, you could use the `PUBSUB_CHANNELS` constant.
##########
File path:
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractSubCommandsIntegrationTest.java
##########
@@ -0,0 +1,203 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal.executor.pubsub;
+
+import static
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtMostNArgsForSubCommand;
+import static
org.apache.geode.redis.internal.RedisConstants.ERROR_UNKNOWN_PUBSUB_SUBCOMMAND;
+import static
org.apache.geode.redis.internal.executor.pubsub.AbstractPubSubIntegrationTest.JEDIS_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.mocks.MockSubscriber;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractSubCommandsIntegrationTest implements
RedisIntegrationTest {
+ private Jedis subscriber;
+ private Jedis introspector;
+ private MockSubscriber mockSubscriber;
+
+ @Before
+ public void setup() {
+ mockSubscriber = new MockSubscriber();
+ subscriber = new Jedis("localhost", getPort());
+ introspector = new Jedis("localhost", getPort());
+ }
+
+ @After
+ public void teardown() {
+ if (mockSubscriber.getSubscribedChannels() > 0) {
+ mockSubscriber.unsubscribe();
+ }
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+ }
+
+ @Test
+ public void pubsub_shouldError_givenTooFewArguments() {
+ assertAtLeastNArgs(introspector, Protocol.Command.PUBSUB, 1);
+ }
+
+ @Test
+ public void channels_shouldError_givenTooManyArguments() {
+ assertAtMostNArgsForSubCommand(introspector,
+ Protocol.Command.PUBSUB,
+ "channels".getBytes(),
+ 1);
+ }
+
+ @Test
+ public void pubsub_shouldReturnError_givenUnknownSubcommand() {
+ String expected = String.format(ERROR_UNKNOWN_PUBSUB_SUBCOMMAND,
"nonesuch");
+
+ assertThatThrownBy(() -> introspector.sendCommand(Protocol.Command.PUBSUB,
"nonesuch"))
+ .hasMessageContaining(expected);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void
channels_shouldReturnListOfAllChannels_withActiveChannelSubscribers_whenCalledWithoutPattern()
{
+ ArrayList<byte[]> expectedChannels = new ArrayList<>();
+ expectedChannels.add("foo".getBytes());
+ expectedChannels.add("bar".getBytes());
+ Runnable runnable =
+ () -> subscriber.subscribe(mockSubscriber, "foo", "bar");
+ Thread subscriberThread = new Thread(runnable);
+
+ subscriberThread.start();
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
+ ArrayList<byte[]> result = (ArrayList<byte[]>) introspector
+ .sendCommand(Protocol.Command.PUBSUB, "channels");
+
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectedChannels);
+
+ }
+
+ @Test
+ public void
channels_shouldNeverReturnPsubscribedChannels_givenNoActiveChannelSubscribers()
{
+
+ Runnable runnable = () -> subscriber.psubscribe(mockSubscriber, "f*");
+ Thread patternSubscriberThread = new Thread(runnable);
+
+ patternSubscriberThread.start();
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+ ArrayList<String> result =
+ (ArrayList<String>) introspector.pubsubChannels("f*");
+
+ assertThat(result).isEmpty();
+
+ mockSubscriber.punsubscribe();
+ }
+
+
+ @Test
+ public void
channels_shouldReturnListOfMatchingChannels_withActiveChannelSubscribers_whenCalledWithPattern()
{
+
+ ArrayList<String> expectedChannels = new ArrayList<>();
+ expectedChannels.add("foo");
+
+ Runnable runnable =
+ () -> subscriber.subscribe(mockSubscriber, "foo", "bar");
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
+ ArrayList<String> result =
+ (ArrayList<String>) introspector.pubsubChannels("fo*");
+
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectedChannels);
+ }
+
+ @Test
+ public void channels_should_returnEmptyArry_GivenPatternWithNoMatches() {
Review comment:
Couple of typos here, should be "returnEmptyArray" and for consistency,
"given" with a lower-case g.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org