nonbinaryprogrammer commented on a change in pull request #5954:
URL: https://github.com/apache/geode/pull/5954#discussion_r583073077
##########
File path:
geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/hash/HScanNativeRedisAcceptanceTest.java
##########
@@ -27,5 +27,4 @@
public int getPort() {
return redis.getPort();
}
Review comment:
can you reset this file to where it was before? when people come back
and look at the changes, this will unnecessarily show up in the files changed
##########
File path:
geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HMsetDUnitTest.java
##########
@@ -243,6 +243,7 @@ public void
shouldDistributeDataAmongMultipleServers_givenMultipleClientsOnSameS
jedis2B.disconnect();
}
+
Review comment:
please revert this too
##########
File path:
geode-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
##########
@@ -63,13 +65,19 @@ private ConcurrentLoopingThreads start(boolean lockstep) {
* operations.
*/
public void await() {
- loopingFutures.forEach(loopingThread -> {
- try {
- loopingThread.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
+ boolean timeOutExceptionThrown;
+ do {
+ timeOutExceptionThrown = false;
Review comment:
why add a timeout here when there's a get() without a timeout?
##########
File path:
geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static
org.apache.geode.distributed.ConfigurationProperties.REDIS_BIND_ADDRESS;
+import static
org.apache.geode.distributed.ConfigurationProperties.REDIS_ENABLED;
+import static org.apache.geode.distributed.ConfigurationProperties.REDIS_PORT;
+import static
org.apache.geode.redis.internal.GeodeRedisServer.ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.MapScanCursor;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.RedisException;
+import io.lettuce.core.ScanCursor;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.resource.ClientResources;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import
org.apache.geode.redis.session.springRedisTestApplication.config.DUnitSocketAddressResolver;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+public class HScanDunitTest {
+ @ClassRule
+ public static ClusterStartupRule clusterStartUp = new ClusterStartupRule(4);
+
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ @ClassRule
+ public static GfshCommandRule gfsh = new GfshCommandRule();
+
+ private static RedisCommands<String, String> commands;
+ private RedisClient redisClient;
+ private StatefulRedisConnection<String, String> connection;
+ private static Properties locatorProperties;
+
+ private static MemberVM locator;
+ private static MemberVM server1;
+ private static MemberVM server2;
+ private static MemberVM server3;
+
+ static final String HASH_KEY = "key";
+ static final String BASE_FIELD = "baseField_";
+ static final Map<String, String> INITIAL_DATA_SET = makeEntrySet(1000);
+
+ static int[] redisPorts;
+
+ @BeforeClass
+ public static void classSetup() throws Exception {
+ int locatorPort;
+ locatorProperties = new Properties();
+ locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+
+ locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+ locatorPort = locator.getPort();
+ redisPorts = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+
+ // note: due to rules around member weighting in split-brain scenarios,
+ // vm1 (server1) should not be crashed or it will cause additional
(unrelated) failures
+ String redisPort1 = redisPorts[0] + "";
+ server1 =
+ clusterStartUp.startServerVM(1,
+ x -> x.withProperty(REDIS_PORT, redisPort1)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+
.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ String redisPort2 = redisPorts[1] + "";
+ server2 = clusterStartUp.startServerVM(2,
+ x -> x.withProperty(REDIS_PORT, redisPort2)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+ .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ String redisPort3 = redisPorts[2] + "";
+ server3 = clusterStartUp.startServerVM(3,
+ x -> x.withProperty(REDIS_PORT, redisPort3)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+ .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ gfsh.connectAndVerify(locator);
+
+ }
+
+ @Before
+ public void testSetup() {
+ addIgnoredException(FunctionException.class);
+ String[] redisPortsAsStrings = new String[redisPorts.length];
+
+ for (int i = 0; i < redisPorts.length; i++) {
+ redisPortsAsStrings[i] = String.valueOf(redisPorts[i]);
+ }
+
+ DUnitSocketAddressResolver dnsResolver =
+ new DUnitSocketAddressResolver(redisPortsAsStrings);
+
+ ClientResources resources = ClientResources.builder()
+ .socketAddressResolver(dnsResolver)
+ .build();
+
+ redisClient = RedisClient.create(resources, "redis://localhost");
+ redisClient.setOptions(ClientOptions.builder()
+ .autoReconnect(true)
+ .build());
+
+ connection = redisClient.connect();
+ commands = connection.sync();
+ commands.hset(HASH_KEY, INITIAL_DATA_SET);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ commands.quit();
+
+ server1.stop();
+ server2.stop();
+ server3.stop();
+ }
+
+ @Test
+ public void
should_allow_hscan_iteration_to_complete_successfully_given_server_crashes_during_iteration()
+ throws ExecutionException, InterruptedException {
+
+ AtomicBoolean keepCrashingVMs = new AtomicBoolean(true);
+ AtomicInteger numberOfTimesServersCrashed = new AtomicInteger(0);
+
+ Future<Void> hScanFuture =
Review comment:
could you please go through places like this where there are extra line
breaks? I believe our coding convention to be putting things on the same line
except when it overflows the line, at which point it is appropriate to pick a
logical breakpoint
##########
File path:
geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static
org.apache.geode.distributed.ConfigurationProperties.REDIS_BIND_ADDRESS;
+import static
org.apache.geode.distributed.ConfigurationProperties.REDIS_ENABLED;
+import static org.apache.geode.distributed.ConfigurationProperties.REDIS_PORT;
+import static
org.apache.geode.redis.internal.GeodeRedisServer.ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.MapScanCursor;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.RedisException;
+import io.lettuce.core.ScanCursor;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.resource.ClientResources;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import
org.apache.geode.redis.session.springRedisTestApplication.config.DUnitSocketAddressResolver;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+public class HScanDunitTest {
+ @ClassRule
+ public static ClusterStartupRule clusterStartUp = new ClusterStartupRule(4);
+
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ @ClassRule
+ public static GfshCommandRule gfsh = new GfshCommandRule();
+
+ private static RedisCommands<String, String> commands;
+ private RedisClient redisClient;
+ private StatefulRedisConnection<String, String> connection;
+ private static Properties locatorProperties;
+
+ private static MemberVM locator;
+ private static MemberVM server1;
+ private static MemberVM server2;
+ private static MemberVM server3;
+
+ static final String HASH_KEY = "key";
+ static final String BASE_FIELD = "baseField_";
+ static final Map<String, String> INITIAL_DATA_SET = makeEntrySet(1000);
+
+ static int[] redisPorts;
+
+ @BeforeClass
+ public static void classSetup() throws Exception {
+ int locatorPort;
+ locatorProperties = new Properties();
+ locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+
+ locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+ locatorPort = locator.getPort();
+ redisPorts = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+
+ // note: due to rules around member weighting in split-brain scenarios,
+ // vm1 (server1) should not be crashed or it will cause additional
(unrelated) failures
+ String redisPort1 = redisPorts[0] + "";
+ server1 =
+ clusterStartUp.startServerVM(1,
+ x -> x.withProperty(REDIS_PORT, redisPort1)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+
.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ String redisPort2 = redisPorts[1] + "";
+ server2 = clusterStartUp.startServerVM(2,
+ x -> x.withProperty(REDIS_PORT, redisPort2)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+ .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ String redisPort3 = redisPorts[2] + "";
+ server3 = clusterStartUp.startServerVM(3,
+ x -> x.withProperty(REDIS_PORT, redisPort3)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+ .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ gfsh.connectAndVerify(locator);
+
+ }
+
+ @Before
+ public void testSetup() {
+ addIgnoredException(FunctionException.class);
+ String[] redisPortsAsStrings = new String[redisPorts.length];
+
+ for (int i = 0; i < redisPorts.length; i++) {
+ redisPortsAsStrings[i] = String.valueOf(redisPorts[i]);
+ }
+
+ DUnitSocketAddressResolver dnsResolver =
+ new DUnitSocketAddressResolver(redisPortsAsStrings);
+
+ ClientResources resources = ClientResources.builder()
+ .socketAddressResolver(dnsResolver)
+ .build();
+
+ redisClient = RedisClient.create(resources, "redis://localhost");
+ redisClient.setOptions(ClientOptions.builder()
+ .autoReconnect(true)
+ .build());
+
+ connection = redisClient.connect();
+ commands = connection.sync();
+ commands.hset(HASH_KEY, INITIAL_DATA_SET);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ commands.quit();
+
+ server1.stop();
+ server2.stop();
+ server3.stop();
+ }
+
+ @Test
+ public void
should_allow_hscan_iteration_to_complete_successfully_given_server_crashes_during_iteration()
+ throws ExecutionException, InterruptedException {
+
+ AtomicBoolean keepCrashingVMs = new AtomicBoolean(true);
+ AtomicInteger numberOfTimesServersCrashed = new AtomicInteger(0);
+
+ Future<Void> hScanFuture =
+ executor.runAsync(
+ () -> doHScanContinuallyAndAssertOnResults(keepCrashingVMs,
+ numberOfTimesServersCrashed));
+
+ Future<Void> crashingVmFuture =
+ executor.runAsync(
+ () -> crashAlternatingServers(keepCrashingVMs,
numberOfTimesServersCrashed));
+
+
+ hScanFuture.get();
+ crashingVmFuture.get();
+ }
+
+
+ private static void doHScanContinuallyAndAssertOnResults(AtomicBoolean
keepCrashingServers,
+ AtomicInteger numberOfTimesServersCrashed) {
+ int numberOfAssertionsCompleted = 0;
+
+ ScanCursor scanCursor = new ScanCursor("0", false);
+ List<String> allEntries = new ArrayList<>();
+ MapScanCursor<String, String> result;
+
+ while (numberOfAssertionsCompleted < 3 ||
numberOfTimesServersCrashed.get() < 3) {
Review comment:
why not do a for loop? it seems like you would always want to run the
assertions three times, even once the servers have been crashed three times,
because you would want to continually check that hscans work. If you do switch
this to a for loop, make sure to get rid of those variables as they will be
effectively unused but intelliJ will think they are used
##########
File path:
geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static
org.apache.geode.distributed.ConfigurationProperties.REDIS_BIND_ADDRESS;
+import static
org.apache.geode.distributed.ConfigurationProperties.REDIS_ENABLED;
+import static org.apache.geode.distributed.ConfigurationProperties.REDIS_PORT;
+import static
org.apache.geode.redis.internal.GeodeRedisServer.ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.MapScanCursor;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.RedisException;
+import io.lettuce.core.ScanCursor;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.resource.ClientResources;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import
org.apache.geode.redis.session.springRedisTestApplication.config.DUnitSocketAddressResolver;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+public class HScanDunitTest {
+ @ClassRule
+ public static ClusterStartupRule clusterStartUp = new ClusterStartupRule(4);
+
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ @ClassRule
+ public static GfshCommandRule gfsh = new GfshCommandRule();
+
+ private static RedisCommands<String, String> commands;
+ private RedisClient redisClient;
+ private StatefulRedisConnection<String, String> connection;
+ private static Properties locatorProperties;
+
+ private static MemberVM locator;
+ private static MemberVM server1;
+ private static MemberVM server2;
+ private static MemberVM server3;
+
+ static final String HASH_KEY = "key";
+ static final String BASE_FIELD = "baseField_";
+ static final Map<String, String> INITIAL_DATA_SET = makeEntrySet(1000);
+
+ static int[] redisPorts;
+
+ @BeforeClass
+ public static void classSetup() throws Exception {
+ int locatorPort;
+ locatorProperties = new Properties();
+ locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+
+ locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+ locatorPort = locator.getPort();
+ redisPorts = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+
+ // note: due to rules around member weighting in split-brain scenarios,
+ // vm1 (server1) should not be crashed or it will cause additional
(unrelated) failures
+ String redisPort1 = redisPorts[0] + "";
+ server1 =
+ clusterStartUp.startServerVM(1,
+ x -> x.withProperty(REDIS_PORT, redisPort1)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+
.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ String redisPort2 = redisPorts[1] + "";
+ server2 = clusterStartUp.startServerVM(2,
+ x -> x.withProperty(REDIS_PORT, redisPort2)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+ .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ String redisPort3 = redisPorts[2] + "";
+ server3 = clusterStartUp.startServerVM(3,
+ x -> x.withProperty(REDIS_PORT, redisPort3)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+ .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ gfsh.connectAndVerify(locator);
+
+ }
+
+ @Before
+ public void testSetup() {
+ addIgnoredException(FunctionException.class);
+ String[] redisPortsAsStrings = new String[redisPorts.length];
+
+ for (int i = 0; i < redisPorts.length; i++) {
+ redisPortsAsStrings[i] = String.valueOf(redisPorts[i]);
+ }
+
+ DUnitSocketAddressResolver dnsResolver =
+ new DUnitSocketAddressResolver(redisPortsAsStrings);
+
+ ClientResources resources = ClientResources.builder()
+ .socketAddressResolver(dnsResolver)
+ .build();
+
+ redisClient = RedisClient.create(resources, "redis://localhost");
+ redisClient.setOptions(ClientOptions.builder()
+ .autoReconnect(true)
+ .build());
+
+ connection = redisClient.connect();
+ commands = connection.sync();
+ commands.hset(HASH_KEY, INITIAL_DATA_SET);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ commands.quit();
+
+ server1.stop();
+ server2.stop();
+ server3.stop();
+ }
+
+ @Test
+ public void
should_allow_hscan_iteration_to_complete_successfully_given_server_crashes_during_iteration()
+ throws ExecutionException, InterruptedException {
+
+ AtomicBoolean keepCrashingVMs = new AtomicBoolean(true);
+ AtomicInteger numberOfTimesServersCrashed = new AtomicInteger(0);
+
+ Future<Void> hScanFuture =
+ executor.runAsync(
+ () -> doHScanContinuallyAndAssertOnResults(keepCrashingVMs,
+ numberOfTimesServersCrashed));
+
+ Future<Void> crashingVmFuture =
+ executor.runAsync(
+ () -> crashAlternatingServers(keepCrashingVMs,
numberOfTimesServersCrashed));
+
+
+ hScanFuture.get();
+ crashingVmFuture.get();
+ }
+
+
+ private static void doHScanContinuallyAndAssertOnResults(AtomicBoolean
keepCrashingServers,
+ AtomicInteger numberOfTimesServersCrashed) {
+ int numberOfAssertionsCompleted = 0;
+
+ ScanCursor scanCursor = new ScanCursor("0", false);
+ List<String> allEntries = new ArrayList<>();
+ MapScanCursor<String, String> result;
+
+ while (numberOfAssertionsCompleted < 3 ||
numberOfTimesServersCrashed.get() < 3) {
+
+ allEntries.clear();
+ scanCursor.setCursor("0");
+ scanCursor.setFinished(false);
+
+ try {
+ do {
+ result = commands.hscan(HASH_KEY, scanCursor);
+ scanCursor.setCursor(result.getCursor());
+ Map<String, String> resultEntries = result.getMap();
+
+ resultEntries
+ .entrySet()
+ .forEach(
+ entry -> allEntries.add(entry.getKey()));
+
+ } while (!result.isFinished());
+
+ assertThat(allEntries).containsAll(INITIAL_DATA_SET.keySet());
+ numberOfAssertionsCompleted++;
+
+ } catch (RedisCommandExecutionException ignore) {
+ } catch (RedisException ex) {
+ if (ex.getMessage().contains("Connection reset by peer")) {// ignore
error
+ } else {
+ throw ex;
+ }
+ }
+ }
+ keepCrashingServers.set(false);
+ }
+
+
+
+ private void crashAlternatingServers(AtomicBoolean keepCrashingServers,
+ AtomicInteger numberOfTimesServersCrashed) {
+
+ int serverToCrashToggle = 3;
Review comment:
this name is confusing. `vmToCrash` might be better
##########
File path:
geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static
org.apache.geode.distributed.ConfigurationProperties.REDIS_BIND_ADDRESS;
+import static
org.apache.geode.distributed.ConfigurationProperties.REDIS_ENABLED;
+import static org.apache.geode.distributed.ConfigurationProperties.REDIS_PORT;
+import static
org.apache.geode.redis.internal.GeodeRedisServer.ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.MapScanCursor;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.RedisException;
+import io.lettuce.core.ScanCursor;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.resource.ClientResources;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import
org.apache.geode.redis.session.springRedisTestApplication.config.DUnitSocketAddressResolver;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+public class HScanDunitTest {
+ @ClassRule
+ public static ClusterStartupRule clusterStartUp = new ClusterStartupRule(4);
+
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ @ClassRule
+ public static GfshCommandRule gfsh = new GfshCommandRule();
+
+ private static RedisCommands<String, String> commands;
+ private RedisClient redisClient;
+ private StatefulRedisConnection<String, String> connection;
+ private static Properties locatorProperties;
+
+ private static MemberVM locator;
+ private static MemberVM server1;
+ private static MemberVM server2;
+ private static MemberVM server3;
+
+ static final String HASH_KEY = "key";
+ static final String BASE_FIELD = "baseField_";
+ static final Map<String, String> INITIAL_DATA_SET = makeEntrySet(1000);
+
+ static int[] redisPorts;
+
+ @BeforeClass
+ public static void classSetup() throws Exception {
+ int locatorPort;
+ locatorProperties = new Properties();
+ locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+
+ locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+ locatorPort = locator.getPort();
+ redisPorts = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+
+ // note: due to rules around member weighting in split-brain scenarios,
+ // vm1 (server1) should not be crashed or it will cause additional
(unrelated) failures
+ String redisPort1 = redisPorts[0] + "";
+ server1 =
+ clusterStartUp.startServerVM(1,
+ x -> x.withProperty(REDIS_PORT, redisPort1)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+
.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ String redisPort2 = redisPorts[1] + "";
+ server2 = clusterStartUp.startServerVM(2,
+ x -> x.withProperty(REDIS_PORT, redisPort2)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+ .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ String redisPort3 = redisPorts[2] + "";
+ server3 = clusterStartUp.startServerVM(3,
+ x -> x.withProperty(REDIS_PORT, redisPort3)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+ .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ gfsh.connectAndVerify(locator);
+
+ }
+
+ @Before
+ public void testSetup() {
+ addIgnoredException(FunctionException.class);
+ String[] redisPortsAsStrings = new String[redisPorts.length];
+
+ for (int i = 0; i < redisPorts.length; i++) {
+ redisPortsAsStrings[i] = String.valueOf(redisPorts[i]);
+ }
+
+ DUnitSocketAddressResolver dnsResolver =
+ new DUnitSocketAddressResolver(redisPortsAsStrings);
+
+ ClientResources resources = ClientResources.builder()
+ .socketAddressResolver(dnsResolver)
+ .build();
+
+ redisClient = RedisClient.create(resources, "redis://localhost");
+ redisClient.setOptions(ClientOptions.builder()
+ .autoReconnect(true)
+ .build());
+
+ connection = redisClient.connect();
+ commands = connection.sync();
+ commands.hset(HASH_KEY, INITIAL_DATA_SET);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ commands.quit();
+
+ server1.stop();
+ server2.stop();
+ server3.stop();
+ }
+
+ @Test
+ public void
should_allow_hscan_iteration_to_complete_successfully_given_server_crashes_during_iteration()
+ throws ExecutionException, InterruptedException {
+
+ AtomicBoolean keepCrashingVMs = new AtomicBoolean(true);
+ AtomicInteger numberOfTimesServersCrashed = new AtomicInteger(0);
+
+ Future<Void> hScanFuture =
+ executor.runAsync(
+ () -> doHScanContinuallyAndAssertOnResults(keepCrashingVMs,
+ numberOfTimesServersCrashed));
+
+ Future<Void> crashingVmFuture =
+ executor.runAsync(
+ () -> crashAlternatingServers(keepCrashingVMs,
numberOfTimesServersCrashed));
+
+
+ hScanFuture.get();
+ crashingVmFuture.get();
+ }
+
+
+ private static void doHScanContinuallyAndAssertOnResults(AtomicBoolean
keepCrashingServers,
+ AtomicInteger numberOfTimesServersCrashed) {
+ int numberOfAssertionsCompleted = 0;
+
+ ScanCursor scanCursor = new ScanCursor("0", false);
+ List<String> allEntries = new ArrayList<>();
+ MapScanCursor<String, String> result;
+
+ while (numberOfAssertionsCompleted < 3 ||
numberOfTimesServersCrashed.get() < 3) {
+
+ allEntries.clear();
+ scanCursor.setCursor("0");
+ scanCursor.setFinished(false);
+
+ try {
+ do {
+ result = commands.hscan(HASH_KEY, scanCursor);
+ scanCursor.setCursor(result.getCursor());
+ Map<String, String> resultEntries = result.getMap();
+
+ resultEntries
+ .entrySet()
+ .forEach(
+ entry -> allEntries.add(entry.getKey()));
+
+ } while (!result.isFinished());
+
+ assertThat(allEntries).containsAll(INITIAL_DATA_SET.keySet());
+ numberOfAssertionsCompleted++;
+
+ } catch (RedisCommandExecutionException ignore) {
+ } catch (RedisException ex) {
+ if (ex.getMessage().contains("Connection reset by peer")) {// ignore
error
+ } else {
+ throw ex;
+ }
+ }
+ }
+ keepCrashingServers.set(false);
+ }
+
+
+
+ private void crashAlternatingServers(AtomicBoolean keepCrashingServers,
+ AtomicInteger numberOfTimesServersCrashed) {
+
+ int serverToCrashToggle = 3;
+ MemberVM server = null;
+ int redisPort;
+
+ do {
+ redisPort = redisPorts[serverToCrashToggle - 1];
+ if (serverToCrashToggle == 3) {
+ server = server3;
Review comment:
these assignments to `server` are not used, server is re-set on line 256
##########
File path:
geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
##########
@@ -203,49 +279,122 @@ public int hstrlen(ByteArrayWrapper field) {
return new ArrayList<>(hash.keySet());
}
- public Pair<BigInteger, List<Object>> hscan(Pattern matchPattern, int count,
BigInteger cursor) {
- List<Object> returnList = new ArrayList<Object>();
- int size = hash.size();
- BigInteger beforeCursor = new BigInteger("0");
- int numElements = 0;
- int i = -1;
- for (Map.Entry<ByteArrayWrapper, ByteArrayWrapper> entry :
hash.entrySet()) {
- ByteArrayWrapper key = entry.getKey();
- ByteArrayWrapper value = entry.getValue();
- i++;
- if (beforeCursor.compareTo(cursor) < 0) {
- beforeCursor = beforeCursor.add(new BigInteger("1"));
+ public ImmutablePair<BigInteger, List<Object>> hscan(UUID clientID, Pattern
matchPattern,
+ int count,
+ BigInteger cursorParameter) {
+
+ int startCursor = cursorParameter.intValue();
+
+ List<ByteArrayWrapper> keysToScan = getSnapShotOfKeySet(clientID);
+
+ Pair<Integer, List<Object>> resultsPair =
+ getResultsPair(keysToScan, startCursor, count, matchPattern);
+
+ List<Object> resultList = resultsPair.getRight();
+
+ Integer numberOfIterationsCompleted = resultsPair.getLeft();
+
+ int returnCursorValueAsInt =
+ getCursorValueToReturn(startCursor, numberOfIterationsCompleted,
keysToScan);
+
+ if (returnCursorValueAsInt == 0) {
+ removeHSCANSnapshot(clientID);
+ }
+
+ return new ImmutablePair<>(BigInteger.valueOf(returnCursorValueAsInt),
+ resultList);
+ }
+
+ private void removeHSCANSnapshot(UUID clientID) {
+ this.hScanSnapShots.remove(clientID);
+ this.hScanSnapShotCreationTimes.remove(clientID);
+
+ if (this.hScanSnapShots.isEmpty()) {
+ shutDownHscanSnapshotScheduledRemoval();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Pair<Integer, List<Object>> getResultsPair(List<ByteArrayWrapper>
keysSnapShot,
+ int startCursor,
+ int count,
+ Pattern matchPattern) {
+
+ int indexOfKeys = startCursor;
+
+ List<ByteArrayWrapper> resultList = new ArrayList<>();
+
+ for (int index = startCursor; index < keysSnapShot.size(); index++) {
+
+ if ((index - startCursor) == count) {
+ break;
+ }
+
+ ByteArrayWrapper key = keysSnapShot.get(index);
+ indexOfKeys++;
+
+ ByteArrayWrapper value = hash.get(key);
+ if (value == null) {
continue;
}
if (matchPattern != null) {
if (matchPattern.matcher(key.toString()).matches()) {
- returnList.add(key);
- returnList.add(value);
- numElements++;
+ resultList.add(key);
+ resultList.add(value);
}
} else {
- returnList.add(key);
- returnList.add(value);
- numElements++;
+ resultList.add(key);
+ resultList.add(value);
}
+ }
- if (numElements == count) {
- break;
- }
+ Integer numberOfIterationsCompleted = indexOfKeys - startCursor;
+
+ Pair resultsPair = new ImmutablePair(numberOfIterationsCompleted,
resultList);
Review comment:
you could return this directly instead of having this extra assignment
##########
File path:
geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -336,48 +453,120 @@ public void
givenMultipleCountsAndMatches_returnsEntriesMatchingLastMatchParamet
}
@Test
- public void givenNegativeCursor_returnsEntriesUsingAbsoluteValueOfCursor() {
- Map<String, String> entryMap = new HashMap<>();
- entryMap.put("1", "yellow");
- entryMap.put("2", "green");
- entryMap.put("3", "orange");
- jedis.hmset("colors", entryMap);
+ public void should_notReturnValue_givenValueWasRemovedBeforeHSCANISCalled() {
- String cursor = "-100";
- ScanResult<Map.Entry<String, String>> result;
- List<Map.Entry<String, String>> allEntries = new ArrayList<>();
+ Map<String, String> originalData = new HashMap<>();
+ originalData.put("field_1", "yellow");
+ originalData.put("field_2", "green");
+ originalData.put("field_3", "grey");
+ jedis.hmset("colors", originalData);
- do {
- result = jedis.hscan("colors", cursor);
- allEntries.addAll(result.getResult());
- cursor = result.getCursor();
- } while (!result.isCompleteIteration());
+ jedis.hdel("colors", "field_3");
+ originalData.remove("field_3");
- assertThat(allEntries).hasSize(3);
- assertThat(new HashSet<>(allEntries)).isEqualTo(entryMap.entrySet());
+ GeodeAwaitility.await().untilAsserted(
+ () -> assertThat(jedis.hget("colors", "field_3")).isNull());
+
+ ScanResult<Map.Entry<String, String>> result = jedis.hscan("colors", "0");
+
+ assertThat(new HashSet<>(result.getResult()))
+ .containsExactlyInAnyOrderElementsOf(originalData.entrySet());
}
+
+ /**** Concurrency ***/
+
+ private final int SIZE_OF_INITIAL_HASH_DATA = 100;
+ private Map<String, String> INITIAL_HASH_DATA =
makeEntrySet(SIZE_OF_INITIAL_HASH_DATA);
+ final String HASH_KEY = "key";
+ final String BASE_FIELD = "baseField_";
+
@Test
- public void givenCursorGreaterThanUnsignedLongCapacity_returnsCursorError() {
- assertThatThrownBy(() -> jedis.hscan("a", "18446744073709551616"))
- .hasMessageContaining(ERROR_CURSOR);
+ public void
should_notLoseFields_givenConcurrentThreadsDoingHScansAndChangingValues() {
+ jedis.hset(HASH_KEY, INITIAL_HASH_DATA);
+ final int ITERATION_COUNT = 500;
+
+ new ConcurrentLoopingThreads(ITERATION_COUNT,
+ (i) -> multipleHScanAndAssertOnSizeOfResultSet(jedis),
+ (i) -> multipleHScanAndAssertOnSizeOfResultSet(jedis2),
+ (i) -> {
+ int fieldSuffix = i % SIZE_OF_INITIAL_HASH_DATA;
+ jedis3.hset(HASH_KEY, BASE_FIELD + fieldSuffix, "new_value_" + i);
+ }).run();
Review comment:
why did you add a whole new test instead of just adding one line here?
##########
File path:
geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
##########
@@ -130,11 +131,15 @@ public ExecutionHandlerContext(Channel channel,
}
public ChannelFuture writeToChannel(RedisResponse response) {
- return channel.writeAndFlush(response.encode(byteBufAllocator),
channel.newPromise())
- .addListener((ChannelFutureListener) f -> {
- response.afterWrite();
- logResponse(response, channel.remoteAddress().toString(), f.cause());
- });
+ return channel
Review comment:
I don't see any code changes in this block. please revert it
##########
File path:
geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
##########
@@ -101,20 +105,24 @@ public RedisResponse executeCommand(Command command,
return RedisResponse.error(ERROR_SYNTAX);
}
}
-
try {
matchPattern = convertGlobToRegex(globPattern);
} catch (PatternSyntaxException e) {
+
LogService.getLogger().warn(
- "Could not compile the pattern: '{}' due to the following exception:
'{}'. HSCAN will return an empty list.",
+ "Could not compile the pattern: '{}'"
Review comment:
please change back this formatting
##########
File path:
geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static
org.apache.geode.distributed.ConfigurationProperties.REDIS_BIND_ADDRESS;
+import static
org.apache.geode.distributed.ConfigurationProperties.REDIS_ENABLED;
+import static org.apache.geode.distributed.ConfigurationProperties.REDIS_PORT;
+import static
org.apache.geode.redis.internal.GeodeRedisServer.ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.MapScanCursor;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.RedisException;
+import io.lettuce.core.ScanCursor;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.resource.ClientResources;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import
org.apache.geode.redis.session.springRedisTestApplication.config.DUnitSocketAddressResolver;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+public class HScanDunitTest {
+ @ClassRule
+ public static ClusterStartupRule clusterStartUp = new ClusterStartupRule(4);
+
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ @ClassRule
+ public static GfshCommandRule gfsh = new GfshCommandRule();
+
+ private static RedisCommands<String, String> commands;
+ private RedisClient redisClient;
+ private StatefulRedisConnection<String, String> connection;
+ private static Properties locatorProperties;
+
+ private static MemberVM locator;
+ private static MemberVM server1;
+ private static MemberVM server2;
+ private static MemberVM server3;
+
+ static final String HASH_KEY = "key";
+ static final String BASE_FIELD = "baseField_";
+ static final Map<String, String> INITIAL_DATA_SET = makeEntrySet(1000);
+
+ static int[] redisPorts;
+
+ @BeforeClass
+ public static void classSetup() throws Exception {
+ int locatorPort;
+ locatorProperties = new Properties();
+ locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+
+ locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+ locatorPort = locator.getPort();
+ redisPorts = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+
+ // note: due to rules around member weighting in split-brain scenarios,
+ // vm1 (server1) should not be crashed or it will cause additional
(unrelated) failures
+ String redisPort1 = redisPorts[0] + "";
+ server1 =
+ clusterStartUp.startServerVM(1,
+ x -> x.withProperty(REDIS_PORT, redisPort1)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+
.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ String redisPort2 = redisPorts[1] + "";
+ server2 = clusterStartUp.startServerVM(2,
+ x -> x.withProperty(REDIS_PORT, redisPort2)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+ .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ String redisPort3 = redisPorts[2] + "";
+ server3 = clusterStartUp.startServerVM(3,
+ x -> x.withProperty(REDIS_PORT, redisPort3)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+ .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ gfsh.connectAndVerify(locator);
+
+ }
+
+ @Before
+ public void testSetup() {
+ addIgnoredException(FunctionException.class);
+ String[] redisPortsAsStrings = new String[redisPorts.length];
+
+ for (int i = 0; i < redisPorts.length; i++) {
+ redisPortsAsStrings[i] = String.valueOf(redisPorts[i]);
+ }
+
+ DUnitSocketAddressResolver dnsResolver =
+ new DUnitSocketAddressResolver(redisPortsAsStrings);
+
+ ClientResources resources = ClientResources.builder()
+ .socketAddressResolver(dnsResolver)
+ .build();
+
+ redisClient = RedisClient.create(resources, "redis://localhost");
+ redisClient.setOptions(ClientOptions.builder()
+ .autoReconnect(true)
+ .build());
+
+ connection = redisClient.connect();
+ commands = connection.sync();
+ commands.hset(HASH_KEY, INITIAL_DATA_SET);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ commands.quit();
+
+ server1.stop();
+ server2.stop();
+ server3.stop();
+ }
+
+ @Test
+ public void
should_allow_hscan_iteration_to_complete_successfully_given_server_crashes_during_iteration()
+ throws ExecutionException, InterruptedException {
+
+ AtomicBoolean keepCrashingVMs = new AtomicBoolean(true);
+ AtomicInteger numberOfTimesServersCrashed = new AtomicInteger(0);
+
+ Future<Void> hScanFuture =
+ executor.runAsync(
+ () -> doHScanContinuallyAndAssertOnResults(keepCrashingVMs,
+ numberOfTimesServersCrashed));
+
+ Future<Void> crashingVmFuture =
+ executor.runAsync(
+ () -> crashAlternatingServers(keepCrashingVMs,
numberOfTimesServersCrashed));
+
+
+ hScanFuture.get();
+ crashingVmFuture.get();
+ }
+
+
+ private static void doHScanContinuallyAndAssertOnResults(AtomicBoolean
keepCrashingServers,
+ AtomicInteger numberOfTimesServersCrashed) {
+ int numberOfAssertionsCompleted = 0;
+
+ ScanCursor scanCursor = new ScanCursor("0", false);
+ List<String> allEntries = new ArrayList<>();
+ MapScanCursor<String, String> result;
+
+ while (numberOfAssertionsCompleted < 3 ||
numberOfTimesServersCrashed.get() < 3) {
+
+ allEntries.clear();
+ scanCursor.setCursor("0");
+ scanCursor.setFinished(false);
+
+ try {
+ do {
+ result = commands.hscan(HASH_KEY, scanCursor);
+ scanCursor.setCursor(result.getCursor());
+ Map<String, String> resultEntries = result.getMap();
+
+ resultEntries
+ .entrySet()
+ .forEach(
+ entry -> allEntries.add(entry.getKey()));
+
+ } while (!result.isFinished());
+
+ assertThat(allEntries).containsAll(INITIAL_DATA_SET.keySet());
+ numberOfAssertionsCompleted++;
+
+ } catch (RedisCommandExecutionException ignore) {
+ } catch (RedisException ex) {
+ if (ex.getMessage().contains("Connection reset by peer")) {// ignore
error
+ } else {
+ throw ex;
+ }
+ }
+ }
+ keepCrashingServers.set(false);
Review comment:
it seems like this also may not be necessary. it looks like this could
cause us to potentially not crash the server three times, or even to not crash
it at all, if the hscans were to finish very quickly. as far as I can tell
there isn't a need to run each of these threads any less than three times
##########
File path:
geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -219,13 +298,13 @@ public void
givenCount_returnsAllEntriesWithoutDuplicates() {
cursor = result.getCursor();
} while (!result.isCompleteIteration());
- assertThat(allEntries).hasSize(3);
- assertThat(new HashSet<>(allEntries)).isEqualTo(entryMap.entrySet());
+ assertThat(allEntries.size()).isCloseTo(3, Offset.offset(1));
Review comment:
why is an off-by-one acceptable here?
##########
File path:
geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static
org.apache.geode.distributed.ConfigurationProperties.REDIS_BIND_ADDRESS;
+import static
org.apache.geode.distributed.ConfigurationProperties.REDIS_ENABLED;
+import static org.apache.geode.distributed.ConfigurationProperties.REDIS_PORT;
+import static
org.apache.geode.redis.internal.GeodeRedisServer.ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.MapScanCursor;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.RedisException;
+import io.lettuce.core.ScanCursor;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.resource.ClientResources;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import
org.apache.geode.redis.session.springRedisTestApplication.config.DUnitSocketAddressResolver;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+public class HScanDunitTest {
+ @ClassRule
+ public static ClusterStartupRule clusterStartUp = new ClusterStartupRule(4);
+
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ @ClassRule
+ public static GfshCommandRule gfsh = new GfshCommandRule();
+
+ private static RedisCommands<String, String> commands;
+ private RedisClient redisClient;
+ private StatefulRedisConnection<String, String> connection;
+ private static Properties locatorProperties;
+
+ private static MemberVM locator;
+ private static MemberVM server1;
+ private static MemberVM server2;
+ private static MemberVM server3;
+
+ static final String HASH_KEY = "key";
+ static final String BASE_FIELD = "baseField_";
+ static final Map<String, String> INITIAL_DATA_SET = makeEntrySet(1000);
+
+ static int[] redisPorts;
+
+ @BeforeClass
+ public static void classSetup() throws Exception {
+ int locatorPort;
+ locatorProperties = new Properties();
+ locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+
+ locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+ locatorPort = locator.getPort();
+ redisPorts = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+
+ // note: due to rules around member weighting in split-brain scenarios,
+ // vm1 (server1) should not be crashed or it will cause additional
(unrelated) failures
+ String redisPort1 = redisPorts[0] + "";
+ server1 =
+ clusterStartUp.startServerVM(1,
+ x -> x.withProperty(REDIS_PORT, redisPort1)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+
.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ String redisPort2 = redisPorts[1] + "";
+ server2 = clusterStartUp.startServerVM(2,
+ x -> x.withProperty(REDIS_PORT, redisPort2)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+ .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ String redisPort3 = redisPorts[2] + "";
+ server3 = clusterStartUp.startServerVM(3,
+ x -> x.withProperty(REDIS_PORT, redisPort3)
+ .withProperty(REDIS_ENABLED, "true")
+ .withProperty(REDIS_BIND_ADDRESS, "localhost")
+ .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+ "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+ .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM,
"true")
+ .withConnectionToLocator(locatorPort));
+
+ gfsh.connectAndVerify(locator);
+
+ }
+
+ @Before
+ public void testSetup() {
+ addIgnoredException(FunctionException.class);
+ String[] redisPortsAsStrings = new String[redisPorts.length];
+
+ for (int i = 0; i < redisPorts.length; i++) {
+ redisPortsAsStrings[i] = String.valueOf(redisPorts[i]);
+ }
+
+ DUnitSocketAddressResolver dnsResolver =
+ new DUnitSocketAddressResolver(redisPortsAsStrings);
+
+ ClientResources resources = ClientResources.builder()
+ .socketAddressResolver(dnsResolver)
+ .build();
+
+ redisClient = RedisClient.create(resources, "redis://localhost");
+ redisClient.setOptions(ClientOptions.builder()
+ .autoReconnect(true)
+ .build());
+
+ connection = redisClient.connect();
+ commands = connection.sync();
+ commands.hset(HASH_KEY, INITIAL_DATA_SET);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ commands.quit();
+
+ server1.stop();
+ server2.stop();
+ server3.stop();
+ }
+
+ @Test
+ public void
should_allow_hscan_iteration_to_complete_successfully_given_server_crashes_during_iteration()
+ throws ExecutionException, InterruptedException {
+
+ AtomicBoolean keepCrashingVMs = new AtomicBoolean(true);
+ AtomicInteger numberOfTimesServersCrashed = new AtomicInteger(0);
+
+ Future<Void> hScanFuture =
+ executor.runAsync(
+ () -> doHScanContinuallyAndAssertOnResults(keepCrashingVMs,
+ numberOfTimesServersCrashed));
+
+ Future<Void> crashingVmFuture =
+ executor.runAsync(
+ () -> crashAlternatingServers(keepCrashingVMs,
numberOfTimesServersCrashed));
+
+
+ hScanFuture.get();
+ crashingVmFuture.get();
+ }
+
+
+ private static void doHScanContinuallyAndAssertOnResults(AtomicBoolean
keepCrashingServers,
+ AtomicInteger numberOfTimesServersCrashed) {
+ int numberOfAssertionsCompleted = 0;
+
+ ScanCursor scanCursor = new ScanCursor("0", false);
+ List<String> allEntries = new ArrayList<>();
+ MapScanCursor<String, String> result;
+
+ while (numberOfAssertionsCompleted < 3 ||
numberOfTimesServersCrashed.get() < 3) {
+
+ allEntries.clear();
+ scanCursor.setCursor("0");
+ scanCursor.setFinished(false);
+
+ try {
+ do {
+ result = commands.hscan(HASH_KEY, scanCursor);
+ scanCursor.setCursor(result.getCursor());
+ Map<String, String> resultEntries = result.getMap();
+
+ resultEntries
+ .entrySet()
+ .forEach(
+ entry -> allEntries.add(entry.getKey()));
+
+ } while (!result.isFinished());
+
+ assertThat(allEntries).containsAll(INITIAL_DATA_SET.keySet());
+ numberOfAssertionsCompleted++;
+
+ } catch (RedisCommandExecutionException ignore) {
+ } catch (RedisException ex) {
+ if (ex.getMessage().contains("Connection reset by peer")) {// ignore
error
+ } else {
+ throw ex;
+ }
+ }
+ }
+ keepCrashingServers.set(false);
+ }
+
+
+
+ private void crashAlternatingServers(AtomicBoolean keepCrashingServers,
+ AtomicInteger numberOfTimesServersCrashed) {
+
+ int serverToCrashToggle = 3;
+ MemberVM server = null;
+ int redisPort;
+
+ do {
+ redisPort = redisPorts[serverToCrashToggle - 1];
+ if (serverToCrashToggle == 3) {
+ server = server3;
+ } else if (serverToCrashToggle == 2) {
+ server = server2;
+ } else {
+ // blow up if unexpected number
+ assertThat(true).isFalse()
Review comment:
a more straightforward way to do this is `throw new
AssertionError("something is wrong with this test setup");`
##########
File path:
geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -336,48 +441,136 @@ public void
givenMultipleCountsAndMatches_returnsEntriesMatchingLastMatchParamet
}
@Test
- public void givenNegativeCursor_returnsEntriesUsingAbsoluteValueOfCursor() {
- Map<String, String> entryMap = new HashMap<>();
- entryMap.put("1", "yellow");
- entryMap.put("2", "green");
- entryMap.put("3", "orange");
- jedis.hmset("colors", entryMap);
+ public void should_notReturnValue_givenValueWasRemovedBeforeHSCANISCalled() {
- String cursor = "-100";
- ScanResult<Map.Entry<String, String>> result;
- List<Map.Entry<String, String>> allEntries = new ArrayList<>();
+ Map<String, String> data = new HashMap<>();
+ data.put("field_1", "yellow");
+ data.put("field_2", "green");
+ data.put("field_3", "grey");
+ jedis.hmset("colors", data);
- do {
- result = jedis.hscan("colors", cursor);
- allEntries.addAll(result.getResult());
- cursor = result.getCursor();
- } while (!result.isCompleteIteration());
+ jedis.hdel("colors", "field_3");
Review comment:
why is this field being set and then immediately deleted? I understand
that that's the point of the test, but I don't understand why there is this test
##########
File path:
geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
##########
@@ -203,49 +279,122 @@ public int hstrlen(ByteArrayWrapper field) {
return new ArrayList<>(hash.keySet());
}
- public Pair<BigInteger, List<Object>> hscan(Pattern matchPattern, int count,
BigInteger cursor) {
- List<Object> returnList = new ArrayList<Object>();
- int size = hash.size();
- BigInteger beforeCursor = new BigInteger("0");
- int numElements = 0;
- int i = -1;
- for (Map.Entry<ByteArrayWrapper, ByteArrayWrapper> entry :
hash.entrySet()) {
- ByteArrayWrapper key = entry.getKey();
- ByteArrayWrapper value = entry.getValue();
- i++;
- if (beforeCursor.compareTo(cursor) < 0) {
- beforeCursor = beforeCursor.add(new BigInteger("1"));
+ public ImmutablePair<BigInteger, List<Object>> hscan(UUID clientID, Pattern
matchPattern,
+ int count,
+ BigInteger cursorParameter) {
+
+ int startCursor = cursorParameter.intValue();
+
+ List<ByteArrayWrapper> keysToScan = getSnapShotOfKeySet(clientID);
+
+ Pair<Integer, List<Object>> resultsPair =
+ getResultsPair(keysToScan, startCursor, count, matchPattern);
+
+ List<Object> resultList = resultsPair.getRight();
+
+ Integer numberOfIterationsCompleted = resultsPair.getLeft();
+
+ int returnCursorValueAsInt =
+ getCursorValueToReturn(startCursor, numberOfIterationsCompleted,
keysToScan);
+
+ if (returnCursorValueAsInt == 0) {
+ removeHSCANSnapshot(clientID);
+ }
+
+ return new ImmutablePair<>(BigInteger.valueOf(returnCursorValueAsInt),
+ resultList);
+ }
+
+ private void removeHSCANSnapshot(UUID clientID) {
+ this.hScanSnapShots.remove(clientID);
+ this.hScanSnapShotCreationTimes.remove(clientID);
+
+ if (this.hScanSnapShots.isEmpty()) {
+ shutDownHscanSnapshotScheduledRemoval();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Pair<Integer, List<Object>> getResultsPair(List<ByteArrayWrapper>
keysSnapShot,
+ int startCursor,
+ int count,
+ Pattern matchPattern) {
+
+ int indexOfKeys = startCursor;
+
+ List<ByteArrayWrapper> resultList = new ArrayList<>();
+
+ for (int index = startCursor; index < keysSnapShot.size(); index++) {
+
+ if ((index - startCursor) == count) {
+ break;
+ }
+
+ ByteArrayWrapper key = keysSnapShot.get(index);
+ indexOfKeys++;
+
+ ByteArrayWrapper value = hash.get(key);
+ if (value == null) {
continue;
}
if (matchPattern != null) {
if (matchPattern.matcher(key.toString()).matches()) {
- returnList.add(key);
- returnList.add(value);
- numElements++;
+ resultList.add(key);
+ resultList.add(value);
}
} else {
- returnList.add(key);
- returnList.add(value);
- numElements++;
+ resultList.add(key);
+ resultList.add(value);
}
+ }
- if (numElements == count) {
- break;
- }
+ Integer numberOfIterationsCompleted = indexOfKeys - startCursor;
+
+ Pair resultsPair = new ImmutablePair(numberOfIterationsCompleted,
resultList);
+
+ return resultsPair;
+ }
+
+ private int getCursorValueToReturn(int startCursor,
+ int numberOfIterationsCompleted,
+ List<ByteArrayWrapper> keySnapshot) {
+
+ if (startCursor + numberOfIterationsCompleted >= keySnapshot.size()) {
+ return 0;
}
- Pair<BigInteger, List<Object>> scanResult;
- if (i >= size - 1) {
- scanResult = new ImmutablePair<>(new BigInteger("0"), returnList);
- } else {
- scanResult = new ImmutablePair<>(new BigInteger(String.valueOf(i + 1)),
returnList);
+ return (startCursor + numberOfIterationsCompleted);
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<ByteArrayWrapper> getSnapShotOfKeySet(UUID clientID) {
+ List<ByteArrayWrapper> keySnapShot = this.hScanSnapShots.get(clientID);
+
+ if (keySnapShot == null) {
+ if (this.hScanSnapShots.isEmpty()) {
+ startHscanSnapshotScheduledRemoval();
+ }
+ keySnapShot = createKeySnapShot(clientID);
}
- return scanResult;
+ return keySnapShot;
+ }
+
+
+ private List<ByteArrayWrapper> createKeySnapShot(UUID clientID) {
+ List<ByteArrayWrapper> keySnapShot = new ArrayList<>();
+ keySnapShot.addAll(hash.keySet());
+
+ this.hScanSnapShots.put(clientID, keySnapShot);
+ this.hScanSnapShotCreationTimes.put(clientID, currentTimeMillis());
+
+ return keySnapShot;
}
+
Review comment:
please put back the formatting here through line 397, as well as your
change to the formatting on lines 432&433
##########
File path:
geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -36,58 +38,86 @@
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.rules.RedisPortSupplier;
public abstract class AbstractHScanIntegrationTest implements
RedisPortSupplier {
protected Jedis jedis;
+ private static Jedis jedis2;
+ private static Jedis jedis3;
+
private static final int REDIS_CLIENT_TIMEOUT =
Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
@Before
public void setUp() {
jedis = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
+ jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
+ jedis3 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
}
@After
public void flushAll() {
jedis.flushAll();
+ jedis2.flushAll();
+ jedis3.flushAll();
}
@After
public void tearDown() {
jedis.close();
+ jedis2.close();
+ jedis3.close();
}
+ /********* Parameter Checks **************/
+
@Test
- public void givenNoKeyArgument_returnsWrongNumberOfArgumentsError() {
+ public void givenLessThanTwoArguments_returnsWrongNumberOfArgumentsError() {
Review comment:
please refactor this to use the new helper method. you can find examples
of it in any command's integration test
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]