sabbey37 commented on a change in pull request #5954:
URL: https://github.com/apache/geode/pull/5954#discussion_r583025069



##########
File path: 
geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REDIS_BIND_ADDRESS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REDIS_ENABLED;
+import static org.apache.geode.distributed.ConfigurationProperties.REDIS_PORT;
+import static 
org.apache.geode.redis.internal.GeodeRedisServer.ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.MapScanCursor;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.RedisException;
+import io.lettuce.core.ScanCursor;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.resource.ClientResources;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import 
org.apache.geode.redis.session.springRedisTestApplication.config.DUnitSocketAddressResolver;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+public class HScanDunitTest {
+  @ClassRule
+  public static ClusterStartupRule clusterStartUp = new ClusterStartupRule(4);

Review comment:
       Is there any reason we can't use `RedisClusterStartupRule()` instead of 
`ClusterStartupRule()`?  If it's just to get the cache and do the rebalance, 
maybe we could make a method on the ` RedisClusterStartupRule()` that returns 
the cache or the resource manager for that purpose.  It seems like it would 
clean up a lot of the `BeforeClass` set up.  Looking at the `HdelDUnitTest` 
class, we were able to use the ` RedisClusterStartupRule()` and still 
crash/restart servers.  I realize the `CrashAndNoRepeatDUnitTest` does not use 
the ` RedisClusterStartupRule()`, but I'm wondering if that could be refactored 
as well (in a different PR).

##########
File path: 
geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REDIS_BIND_ADDRESS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REDIS_ENABLED;
+import static org.apache.geode.distributed.ConfigurationProperties.REDIS_PORT;
+import static 
org.apache.geode.redis.internal.GeodeRedisServer.ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.MapScanCursor;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.RedisException;
+import io.lettuce.core.ScanCursor;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.resource.ClientResources;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import 
org.apache.geode.redis.session.springRedisTestApplication.config.DUnitSocketAddressResolver;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+public class HScanDunitTest {
+  @ClassRule
+  public static ClusterStartupRule clusterStartUp = new ClusterStartupRule(4);
+
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @ClassRule
+  public static GfshCommandRule gfsh = new GfshCommandRule();
+
+  private static RedisCommands<String, String> commands;
+  private RedisClient redisClient;
+  private StatefulRedisConnection<String, String> connection;
+  private static Properties locatorProperties;
+
+  private static MemberVM locator;
+  private static MemberVM server1;
+  private static MemberVM server2;
+  private static MemberVM server3;
+
+  static final String HASH_KEY = "key";
+  static final String BASE_FIELD = "baseField_";
+  static final Map<String, String> INITIAL_DATA_SET = makeEntrySet(1000);
+
+  static int[] redisPorts;
+
+  @BeforeClass
+  public static void classSetup() throws Exception {
+    int locatorPort;
+    locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    locatorPort = locator.getPort();
+    redisPorts = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+
+    // note: due to rules around member weighting in split-brain scenarios,
+    // vm1 (server1) should not be crashed or it will cause additional 
(unrelated) failures
+    String redisPort1 = redisPorts[0] + "";
+    server1 =
+        clusterStartUp.startServerVM(1,
+            x -> x.withProperty(REDIS_PORT, redisPort1)
+                .withProperty(REDIS_ENABLED, "true")
+                .withProperty(REDIS_BIND_ADDRESS, "localhost")
+                
.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                    "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+                .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM, 
"true")
+                .withConnectionToLocator(locatorPort));
+
+    String redisPort2 = redisPorts[1] + "";
+    server2 = clusterStartUp.startServerVM(2,
+        x -> x.withProperty(REDIS_PORT, redisPort2)
+            .withProperty(REDIS_ENABLED, "true")
+            .withProperty(REDIS_BIND_ADDRESS, "localhost")
+            .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+            .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM, 
"true")
+            .withConnectionToLocator(locatorPort));
+
+    String redisPort3 = redisPorts[2] + "";
+    server3 = clusterStartUp.startServerVM(3,
+        x -> x.withProperty(REDIS_PORT, redisPort3)
+            .withProperty(REDIS_ENABLED, "true")
+            .withProperty(REDIS_BIND_ADDRESS, "localhost")
+            .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+            .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM, 
"true")
+            .withConnectionToLocator(locatorPort));
+
+    gfsh.connectAndVerify(locator);
+
+  }
+
+  @Before
+  public void testSetup() {
+    addIgnoredException(FunctionException.class);
+    String[] redisPortsAsStrings = new String[redisPorts.length];
+
+    for (int i = 0; i < redisPorts.length; i++) {
+      redisPortsAsStrings[i] = String.valueOf(redisPorts[i]);
+    }
+
+    DUnitSocketAddressResolver dnsResolver =
+        new DUnitSocketAddressResolver(redisPortsAsStrings);
+
+    ClientResources resources = ClientResources.builder()
+        .socketAddressResolver(dnsResolver)
+        .build();
+
+    redisClient = RedisClient.create(resources, "redis://localhost");
+    redisClient.setOptions(ClientOptions.builder()
+        .autoReconnect(true)
+        .build());
+
+    connection = redisClient.connect();
+    commands = connection.sync();
+    commands.hset(HASH_KEY, INITIAL_DATA_SET);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    commands.quit();
+
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void 
should_allow_hscan_iteration_to_complete_successfully_given_server_crashes_during_iteration()
+      throws ExecutionException, InterruptedException {
+
+    AtomicBoolean keepCrashingVMs = new AtomicBoolean(true);
+    AtomicInteger numberOfTimesServersCrashed = new AtomicInteger(0);
+
+    Future<Void> hScanFuture =
+        executor.runAsync(
+            () -> doHScanContinuallyAndAssertOnResults(keepCrashingVMs,
+                numberOfTimesServersCrashed));
+
+    Future<Void> crashingVmFuture =
+        executor.runAsync(
+            () -> crashAlternatingServers(keepCrashingVMs, 
numberOfTimesServersCrashed));
+
+
+    hScanFuture.get();
+    crashingVmFuture.get();
+  }
+
+
+  private static void doHScanContinuallyAndAssertOnResults(AtomicBoolean 
keepCrashingServers,
+      AtomicInteger numberOfTimesServersCrashed) {
+    int numberOfAssertionsCompleted = 0;
+
+    ScanCursor scanCursor = new ScanCursor("0", false);
+    List<String> allEntries = new ArrayList<>();
+    MapScanCursor<String, String> result;
+
+    while (numberOfAssertionsCompleted < 3 || 
numberOfTimesServersCrashed.get() < 3) {
+
+      allEntries.clear();
+      scanCursor.setCursor("0");
+      scanCursor.setFinished(false);
+
+      try {
+        do {
+          result = commands.hscan(HASH_KEY, scanCursor);
+          scanCursor.setCursor(result.getCursor());
+          Map<String, String> resultEntries = result.getMap();
+
+          resultEntries
+              .entrySet()
+              .forEach(
+                  entry -> allEntries.add(entry.getKey()));
+
+        } while (!result.isFinished());
+
+        assertThat(allEntries).containsAll(INITIAL_DATA_SET.keySet());
+        numberOfAssertionsCompleted++;
+
+      } catch (RedisCommandExecutionException ignore) {
+      } catch (RedisException ex) {
+        if (ex.getMessage().contains("Connection reset by peer")) {// ignore 
error
+        } else {
+          throw ex;
+        }
+      }
+    }
+    keepCrashingServers.set(false);
+  }
+
+
+
+  private void crashAlternatingServers(AtomicBoolean keepCrashingServers,
+      AtomicInteger numberOfTimesServersCrashed) {
+
+    int serverToCrashToggle = 3;
+    MemberVM server = null;

Review comment:
       Via IntelliJ tooling, the initialization of this variable does not seem 
to be necessary.

##########
File path: 
geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -36,58 +38,86 @@
 import redis.clients.jedis.ScanParams;
 import redis.clients.jedis.ScanResult;
 
+import org.apache.geode.redis.ConcurrentLoopingThreads;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.rules.RedisPortSupplier;
 
 public abstract class AbstractHScanIntegrationTest implements 
RedisPortSupplier {
 
   protected Jedis jedis;
+  private static Jedis jedis2;
+  private static Jedis jedis3;
+
   private static final int REDIS_CLIENT_TIMEOUT =
       Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
 
   @Before
   public void setUp() {
     jedis = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
+    jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
+    jedis3 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
   }
 
   @After
   public void flushAll() {
     jedis.flushAll();
+    jedis2.flushAll();
+    jedis3.flushAll();
   }
 
   @After
   public void tearDown() {
     jedis.close();
+    jedis2.close();
+    jedis3.close();
   }
 
+  /********* Parameter Checks **************/
+
   @Test
-  public void givenNoKeyArgument_returnsWrongNumberOfArgumentsError() {
+  public void givenLessThanTwoArguments_returnsWrongNumberOfArgumentsError() {
     assertThatThrownBy(() -> jedis.sendCommand(Protocol.Command.HSCAN))
         .hasMessageContaining("ERR wrong number of arguments for 'hscan' 
command");
-  }
 
-  @Test
-  public void givenNoCursorArgument_returnsWrongNumberOfArgumentsError() {
     assertThatThrownBy(() -> jedis.sendCommand(Protocol.Command.HSCAN, "key!"))
         .hasMessageContaining("ERR wrong number of arguments for 'hscan' 
command");
   }
 
   @Test
-  public void givenArgumentsAreNotOddAndKeyExists_returnsSyntaxError() {
+  public void 
givenMatchArgumentWithoutPatternOnExistingKey_returnsSyntaxError() {
+    jedis.hset("key", "b", "1");
+
+    assertThatThrownBy(() -> jedis.sendCommand(Protocol.Command.HSCAN, "key", 
"0", "Match"))
+        .hasMessageContaining(ERROR_SYNTAX);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void 
givenMatchArgumentWithoutPatternOnNonExistentKey_returnsEmptyArray() {
+
+    List<Object> result =
+        (List<Object>) jedis.sendCommand(Protocol.Command.HSCAN, "key1", "0", 
"Match");
+
+    assertThat((List<String>) result.get(1)).isEmpty();
+  }
+
+  @Test
+  public void 
givenCountArgumentWithoutNumberOnExistingKey_returnsSyntaxError() {
     jedis.hset("a", "b", "1");
 
-    assertThatThrownBy(() -> jedis.sendCommand(Protocol.Command.HSCAN, "a", 
"0", "a*"))
+    assertThatThrownBy(() -> jedis.sendCommand(Protocol.Command.HSCAN, "a", 
"0", "Count"))
         .hasMessageContaining(ERROR_SYNTAX);
   }
 
   @Test
   @SuppressWarnings("unchecked")
-  public void givenArgumentsAreNotOddAndKeyDoesNotExist_returnsEmptyArray() {
+  public void 
givenCountArgumentWithoutNumberOnNonExistentKey_returnsEmptyArray() {
+    jedis.hset("a", "b", "1");

Review comment:
       Somehow this hset got added back in?

##########
File path: 
geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -237,15 +316,40 @@ public void 
givenMultipleCounts_returnsAllEntriesWithoutDuplicates() {
     String cursor = "0";
 
     do {
-      result = (List<Object>) jedis.sendCommand(Protocol.Command.HSCAN, 
"colors", cursor, "COUNT",
-          "2", "COUNT", "1");
+      result = (List<Object>) jedis.sendCommand(Protocol.Command.HSCAN,
+          "colors",
+          cursor,
+          "COUNT", "2",
+          "COUNT", "1");
+
       allEntries.addAll((List<byte[]>) result.get(1));
       cursor = new String((byte[]) result.get(0));
     } while (!Arrays.equals((byte[]) result.get(0), "0".getBytes()));
 
     assertThat((byte[]) result.get(0)).isEqualTo("0".getBytes());
-    assertThat(allEntries).containsExactlyInAnyOrder("1".getBytes(), 
"yellow".getBytes(),
-        "12".getBytes(), "green".getBytes(), "3".getBytes(), 
"grey".getBytes());

Review comment:
       I know this was removed entirely, but maybe we could just change this to 
check that all the expected elements are there (`.containsAll()`)... since 
there may be duplicates, but it is guaranteed that all elements present for the 
full iteration will be returned at least once.

##########
File path: 
geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REDIS_BIND_ADDRESS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REDIS_ENABLED;
+import static org.apache.geode.distributed.ConfigurationProperties.REDIS_PORT;
+import static 
org.apache.geode.redis.internal.GeodeRedisServer.ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.MapScanCursor;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.RedisException;
+import io.lettuce.core.ScanCursor;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.resource.ClientResources;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import 
org.apache.geode.redis.session.springRedisTestApplication.config.DUnitSocketAddressResolver;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+public class HScanDunitTest {
+  @ClassRule
+  public static ClusterStartupRule clusterStartUp = new ClusterStartupRule(4);
+
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @ClassRule
+  public static GfshCommandRule gfsh = new GfshCommandRule();
+
+  private static RedisCommands<String, String> commands;
+  private RedisClient redisClient;
+  private StatefulRedisConnection<String, String> connection;
+  private static Properties locatorProperties;
+
+  private static MemberVM locator;
+  private static MemberVM server1;
+  private static MemberVM server2;
+  private static MemberVM server3;
+
+  static final String HASH_KEY = "key";
+  static final String BASE_FIELD = "baseField_";
+  static final Map<String, String> INITIAL_DATA_SET = makeEntrySet(1000);
+
+  static int[] redisPorts;
+
+  @BeforeClass
+  public static void classSetup() throws Exception {
+    int locatorPort;
+    locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    locatorPort = locator.getPort();
+    redisPorts = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+
+    // note: due to rules around member weighting in split-brain scenarios,
+    // vm1 (server1) should not be crashed or it will cause additional 
(unrelated) failures
+    String redisPort1 = redisPorts[0] + "";
+    server1 =
+        clusterStartUp.startServerVM(1,
+            x -> x.withProperty(REDIS_PORT, redisPort1)
+                .withProperty(REDIS_ENABLED, "true")
+                .withProperty(REDIS_BIND_ADDRESS, "localhost")
+                
.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                    "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+                .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM, 
"true")
+                .withConnectionToLocator(locatorPort));
+
+    String redisPort2 = redisPorts[1] + "";
+    server2 = clusterStartUp.startServerVM(2,
+        x -> x.withProperty(REDIS_PORT, redisPort2)
+            .withProperty(REDIS_ENABLED, "true")
+            .withProperty(REDIS_BIND_ADDRESS, "localhost")
+            .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+            .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM, 
"true")
+            .withConnectionToLocator(locatorPort));
+
+    String redisPort3 = redisPorts[2] + "";
+    server3 = clusterStartUp.startServerVM(3,
+        x -> x.withProperty(REDIS_PORT, redisPort3)
+            .withProperty(REDIS_ENABLED, "true")
+            .withProperty(REDIS_BIND_ADDRESS, "localhost")
+            .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+            .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM, 
"true")
+            .withConnectionToLocator(locatorPort));
+
+    gfsh.connectAndVerify(locator);
+
+  }
+
+  @Before
+  public void testSetup() {
+    addIgnoredException(FunctionException.class);
+    String[] redisPortsAsStrings = new String[redisPorts.length];
+
+    for (int i = 0; i < redisPorts.length; i++) {
+      redisPortsAsStrings[i] = String.valueOf(redisPorts[i]);
+    }
+
+    DUnitSocketAddressResolver dnsResolver =
+        new DUnitSocketAddressResolver(redisPortsAsStrings);
+
+    ClientResources resources = ClientResources.builder()
+        .socketAddressResolver(dnsResolver)
+        .build();
+
+    redisClient = RedisClient.create(resources, "redis://localhost");
+    redisClient.setOptions(ClientOptions.builder()
+        .autoReconnect(true)
+        .build());
+
+    connection = redisClient.connect();
+    commands = connection.sync();
+    commands.hset(HASH_KEY, INITIAL_DATA_SET);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    commands.quit();
+
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void 
should_allow_hscan_iteration_to_complete_successfully_given_server_crashes_during_iteration()
+      throws ExecutionException, InterruptedException {
+
+    AtomicBoolean keepCrashingVMs = new AtomicBoolean(true);
+    AtomicInteger numberOfTimesServersCrashed = new AtomicInteger(0);
+
+    Future<Void> hScanFuture =
+        executor.runAsync(
+            () -> doHScanContinuallyAndAssertOnResults(keepCrashingVMs,
+                numberOfTimesServersCrashed));
+
+    Future<Void> crashingVmFuture =
+        executor.runAsync(
+            () -> crashAlternatingServers(keepCrashingVMs, 
numberOfTimesServersCrashed));
+
+
+    hScanFuture.get();
+    crashingVmFuture.get();
+  }
+
+
+  private static void doHScanContinuallyAndAssertOnResults(AtomicBoolean 
keepCrashingServers,
+      AtomicInteger numberOfTimesServersCrashed) {
+    int numberOfAssertionsCompleted = 0;
+
+    ScanCursor scanCursor = new ScanCursor("0", false);
+    List<String> allEntries = new ArrayList<>();
+    MapScanCursor<String, String> result;
+
+    while (numberOfAssertionsCompleted < 3 || 
numberOfTimesServersCrashed.get() < 3) {
+
+      allEntries.clear();
+      scanCursor.setCursor("0");
+      scanCursor.setFinished(false);
+
+      try {
+        do {
+          result = commands.hscan(HASH_KEY, scanCursor);
+          scanCursor.setCursor(result.getCursor());
+          Map<String, String> resultEntries = result.getMap();
+
+          resultEntries
+              .entrySet()
+              .forEach(
+                  entry -> allEntries.add(entry.getKey()));
+
+        } while (!result.isFinished());
+
+        assertThat(allEntries).containsAll(INITIAL_DATA_SET.keySet());
+        numberOfAssertionsCompleted++;
+
+      } catch (RedisCommandExecutionException ignore) {
+      } catch (RedisException ex) {
+        if (ex.getMessage().contains("Connection reset by peer")) {// ignore 
error
+        } else {
+          throw ex;
+        }
+      }
+    }
+    keepCrashingServers.set(false);
+  }
+
+
+
+  private void crashAlternatingServers(AtomicBoolean keepCrashingServers,

Review comment:
       Could we rename these variables to be consistent (further up we call the 
`keepCrashingServers` variable `keepCrashingVMs`)?

##########
File path: 
geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -306,8 +410,9 @@ public void 
givenMatchAndCount_returnsAllMatchingKeysWithoutDuplicates() {
     } while (!result.isCompleteIteration());
 
     entryMap.remove("3");
-    assertThat(allEntries).hasSize(2);
-    assertThat(new HashSet<>(allEntries)).isEqualTo(entryMap.entrySet());
+
+    assertThat(new HashSet<>(allEntries))
+        .containsExactlyInAnyOrderElementsOf(entryMap.entrySet());

Review comment:
       Reiterating the above comment, we should probably change this to 
`containsAll()` or something since `containsExactlyInAnyOrderElementsOf()` will 
fail if there are duplicates.

##########
File path: 
geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REDIS_BIND_ADDRESS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REDIS_ENABLED;
+import static org.apache.geode.distributed.ConfigurationProperties.REDIS_PORT;
+import static 
org.apache.geode.redis.internal.GeodeRedisServer.ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.MapScanCursor;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.RedisException;
+import io.lettuce.core.ScanCursor;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.resource.ClientResources;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import 
org.apache.geode.redis.session.springRedisTestApplication.config.DUnitSocketAddressResolver;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+public class HScanDunitTest {
+  @ClassRule
+  public static ClusterStartupRule clusterStartUp = new ClusterStartupRule(4);
+
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @ClassRule
+  public static GfshCommandRule gfsh = new GfshCommandRule();
+
+  private static RedisCommands<String, String> commands;
+  private RedisClient redisClient;
+  private StatefulRedisConnection<String, String> connection;
+  private static Properties locatorProperties;
+
+  private static MemberVM locator;
+  private static MemberVM server1;
+  private static MemberVM server2;
+  private static MemberVM server3;
+
+  static final String HASH_KEY = "key";
+  static final String BASE_FIELD = "baseField_";
+  static final Map<String, String> INITIAL_DATA_SET = makeEntrySet(1000);
+
+  static int[] redisPorts;
+
+  @BeforeClass
+  public static void classSetup() throws Exception {
+    int locatorPort;
+    locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    locatorPort = locator.getPort();
+    redisPorts = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+
+    // note: due to rules around member weighting in split-brain scenarios,
+    // vm1 (server1) should not be crashed or it will cause additional 
(unrelated) failures
+    String redisPort1 = redisPorts[0] + "";
+    server1 =
+        clusterStartUp.startServerVM(1,
+            x -> x.withProperty(REDIS_PORT, redisPort1)
+                .withProperty(REDIS_ENABLED, "true")
+                .withProperty(REDIS_BIND_ADDRESS, "localhost")
+                
.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                    "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+                .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM, 
"true")
+                .withConnectionToLocator(locatorPort));
+
+    String redisPort2 = redisPorts[1] + "";
+    server2 = clusterStartUp.startServerVM(2,
+        x -> x.withProperty(REDIS_PORT, redisPort2)
+            .withProperty(REDIS_ENABLED, "true")
+            .withProperty(REDIS_BIND_ADDRESS, "localhost")
+            .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+            .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM, 
"true")
+            .withConnectionToLocator(locatorPort));
+
+    String redisPort3 = redisPorts[2] + "";
+    server3 = clusterStartUp.startServerVM(3,
+        x -> x.withProperty(REDIS_PORT, redisPort3)
+            .withProperty(REDIS_ENABLED, "true")
+            .withProperty(REDIS_BIND_ADDRESS, "localhost")
+            .withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+            .withSystemProperty(ENABLE_REDIS_UNSUPPORTED_COMMANDS_PARAM, 
"true")
+            .withConnectionToLocator(locatorPort));
+
+    gfsh.connectAndVerify(locator);
+
+  }
+
+  @Before
+  public void testSetup() {
+    addIgnoredException(FunctionException.class);
+    String[] redisPortsAsStrings = new String[redisPorts.length];
+
+    for (int i = 0; i < redisPorts.length; i++) {
+      redisPortsAsStrings[i] = String.valueOf(redisPorts[i]);
+    }
+
+    DUnitSocketAddressResolver dnsResolver =
+        new DUnitSocketAddressResolver(redisPortsAsStrings);
+
+    ClientResources resources = ClientResources.builder()
+        .socketAddressResolver(dnsResolver)
+        .build();
+
+    redisClient = RedisClient.create(resources, "redis://localhost");
+    redisClient.setOptions(ClientOptions.builder()
+        .autoReconnect(true)
+        .build());
+
+    connection = redisClient.connect();
+    commands = connection.sync();
+    commands.hset(HASH_KEY, INITIAL_DATA_SET);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    commands.quit();
+
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void 
should_allow_hscan_iteration_to_complete_successfully_given_server_crashes_during_iteration()
+      throws ExecutionException, InterruptedException {
+
+    AtomicBoolean keepCrashingVMs = new AtomicBoolean(true);
+    AtomicInteger numberOfTimesServersCrashed = new AtomicInteger(0);
+
+    Future<Void> hScanFuture =
+        executor.runAsync(
+            () -> doHScanContinuallyAndAssertOnResults(keepCrashingVMs,
+                numberOfTimesServersCrashed));
+
+    Future<Void> crashingVmFuture =
+        executor.runAsync(
+            () -> crashAlternatingServers(keepCrashingVMs, 
numberOfTimesServersCrashed));
+
+
+    hScanFuture.get();
+    crashingVmFuture.get();
+  }
+
+
+  private static void doHScanContinuallyAndAssertOnResults(AtomicBoolean 
keepCrashingServers,
+      AtomicInteger numberOfTimesServersCrashed) {
+    int numberOfAssertionsCompleted = 0;
+
+    ScanCursor scanCursor = new ScanCursor("0", false);
+    List<String> allEntries = new ArrayList<>();
+    MapScanCursor<String, String> result;
+
+    while (numberOfAssertionsCompleted < 3 || 
numberOfTimesServersCrashed.get() < 3) {
+
+      allEntries.clear();
+      scanCursor.setCursor("0");
+      scanCursor.setFinished(false);
+
+      try {
+        do {
+          result = commands.hscan(HASH_KEY, scanCursor);
+          scanCursor.setCursor(result.getCursor());
+          Map<String, String> resultEntries = result.getMap();
+
+          resultEntries
+              .entrySet()
+              .forEach(
+                  entry -> allEntries.add(entry.getKey()));
+
+        } while (!result.isFinished());
+
+        assertThat(allEntries).containsAll(INITIAL_DATA_SET.keySet());
+        numberOfAssertionsCompleted++;
+
+      } catch (RedisCommandExecutionException ignore) {
+      } catch (RedisException ex) {
+        if (ex.getMessage().contains("Connection reset by peer")) {// ignore 
error
+        } else {
+          throw ex;
+        }
+      }
+    }
+    keepCrashingServers.set(false);
+  }
+
+
+
+  private void crashAlternatingServers(AtomicBoolean keepCrashingServers,
+      AtomicInteger numberOfTimesServersCrashed) {
+
+    int serverToCrashToggle = 3;
+    MemberVM server = null;
+    int redisPort;
+
+    do {
+      redisPort = redisPorts[serverToCrashToggle - 1];
+      if (serverToCrashToggle == 3) {
+        server = server3;
+      } else if (serverToCrashToggle == 2) {
+        server = server2;
+      } else {
+        // blow up if unexpected number
+        assertThat(true).isFalse()
+            .withFailMessage("something is wrong with this test setup");
+      }
+
+      clusterStartUp.crashVM(serverToCrashToggle);
+      server = startRedisVM(serverToCrashToggle, redisPort);

Review comment:
       After setting the server variable to `server3` or `server2`, it seems 
like we set it again to the returned value of 
`startRedisVM(serverToCrashToggle, redisPort)` before we use it.  Is the 
initial setting to `server3` or `server2` necessary for some reason? It seems 
like we could remove the conditionals altogether and just have an assertion 
like the following:
   ```
   assertThat(serverToCrashToggle).isBetween(2, 
3).withFailMessage("serverToCrashToggle is not 2 or 3, please check test set 
up.")
   ```

##########
File path: 
geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -237,15 +316,40 @@ public void 
givenMultipleCounts_returnsAllEntriesWithoutDuplicates() {
     String cursor = "0";
 
     do {
-      result = (List<Object>) jedis.sendCommand(Protocol.Command.HSCAN, 
"colors", cursor, "COUNT",
-          "2", "COUNT", "1");
+      result = (List<Object>) jedis.sendCommand(Protocol.Command.HSCAN,
+          "colors",
+          cursor,
+          "COUNT", "2",
+          "COUNT", "1");
+
       allEntries.addAll((List<byte[]>) result.get(1));
       cursor = new String((byte[]) result.get(0));
     } while (!Arrays.equals((byte[]) result.get(0), "0".getBytes()));
 
     assertThat((byte[]) result.get(0)).isEqualTo("0".getBytes());
-    assertThat(allEntries).containsExactlyInAnyOrder("1".getBytes(), 
"yellow".getBytes(),
-        "12".getBytes(), "green".getBytes(), "3".getBytes(), 
"grey".getBytes());
+  }
+
+  @Test
+  public void givenCompleteIteration_shouldReturnCursorWithValueOfZero() {
+    Map<String, String> entryMap = new HashMap<>();
+    entryMap.put("1", "yellow");
+    entryMap.put("2", "green");
+    entryMap.put("3", "orange");
+    jedis.hmset("colors", entryMap);
+
+    ScanParams scanParams = new ScanParams();
+    scanParams.count(1);
+    ScanResult<Map.Entry<String, String>> result;
+    List<Map.Entry<String, String>> allEntries = new ArrayList<>();
+    String cursor = "0";
+
+    do {
+      result = jedis.hscan("colors", cursor, scanParams);
+      allEntries.addAll(result.getResult());
+      cursor = result.getCursor();
+    } while (!result.isCompleteIteration());
+
+    assertThat(result.getCursor()).isEqualTo("0");
   }
 
   @Test

Review comment:
       There are some other tests (that Darrel and I added) that check exact 
matches between what's expected and what's returned.  Given previous 
comments/changes (and knowing that native Redis doesn't guarantee an element 
will only be returned once), I'm wondering if it makes sense to change these as 
well to check `containsAll()` vs. `containsExactlyInAnyOrder()` or `equals()`, 
etc.

##########
File path: 
geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -336,48 +441,136 @@ public void 
givenMultipleCountsAndMatches_returnsEntriesMatchingLastMatchParamet
   }
 
   @Test
-  public void givenNegativeCursor_returnsEntriesUsingAbsoluteValueOfCursor() {
-    Map<String, String> entryMap = new HashMap<>();
-    entryMap.put("1", "yellow");
-    entryMap.put("2", "green");
-    entryMap.put("3", "orange");
-    jedis.hmset("colors", entryMap);
+  public void should_notReturnValue_givenValueWasRemovedBeforeHSCANISCalled() {
 
-    String cursor = "-100";
-    ScanResult<Map.Entry<String, String>> result;
-    List<Map.Entry<String, String>> allEntries = new ArrayList<>();
+    Map<String, String> data = new HashMap<>();
+    data.put("field_1", "yellow");
+    data.put("field_2", "green");
+    data.put("field_3", "grey");
+    jedis.hmset("colors", data);
 
-    do {
-      result = jedis.hscan("colors", cursor);
-      allEntries.addAll(result.getResult());
-      cursor = result.getCursor();
-    } while (!result.isCompleteIteration());
+    jedis.hdel("colors", "field_3");
+    data.remove("field_3");
 
-    assertThat(allEntries).hasSize(3);
-    assertThat(new HashSet<>(allEntries)).isEqualTo(entryMap.entrySet());
+    GeodeAwaitility.await().untilAsserted(
+        () -> assertThat(jedis.hget("colors", "field_3")).isNull());
+
+    ScanResult<Map.Entry<String, String>> result = jedis.hscan("colors", "0");
+
+    assertThat(new HashSet<>(result.getResult()))
+        .containsExactlyInAnyOrderElementsOf(data.entrySet());
   }
 
   @Test
-  public void givenCursorGreaterThanUnsignedLongCapacity_returnsCursorError() {
-    assertThatThrownBy(() -> jedis.hscan("a", "18446744073709551616"))
-        .hasMessageContaining(ERROR_CURSOR);
+  public void should_retun_not_error_given_non_zero_cursor_on_first_call() {

Review comment:
       I noticed this pattern in the DUnit tests as well... not sure why we 
suddenly changed to snake case (underscores) vs. camel case... and the naming 
pattern for the tests seems to have changed as well.  Maybe we could update it 
to be more consistent with the other test names?

##########
File path: 
geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -336,48 +441,136 @@ public void 
givenMultipleCountsAndMatches_returnsEntriesMatchingLastMatchParamet
   }
 
   @Test
-  public void givenNegativeCursor_returnsEntriesUsingAbsoluteValueOfCursor() {
-    Map<String, String> entryMap = new HashMap<>();
-    entryMap.put("1", "yellow");
-    entryMap.put("2", "green");
-    entryMap.put("3", "orange");
-    jedis.hmset("colors", entryMap);
+  public void should_notReturnValue_givenValueWasRemovedBeforeHSCANISCalled() {
 
-    String cursor = "-100";
-    ScanResult<Map.Entry<String, String>> result;
-    List<Map.Entry<String, String>> allEntries = new ArrayList<>();
+    Map<String, String> data = new HashMap<>();
+    data.put("field_1", "yellow");
+    data.put("field_2", "green");
+    data.put("field_3", "grey");
+    jedis.hmset("colors", data);
 
-    do {
-      result = jedis.hscan("colors", cursor);
-      allEntries.addAll(result.getResult());
-      cursor = result.getCursor();
-    } while (!result.isCompleteIteration());
+    jedis.hdel("colors", "field_3");
+    data.remove("field_3");
 
-    assertThat(allEntries).hasSize(3);
-    assertThat(new HashSet<>(allEntries)).isEqualTo(entryMap.entrySet());
+    GeodeAwaitility.await().untilAsserted(
+        () -> assertThat(jedis.hget("colors", "field_3")).isNull());
+
+    ScanResult<Map.Entry<String, String>> result = jedis.hscan("colors", "0");
+
+    assertThat(new HashSet<>(result.getResult()))
+        .containsExactlyInAnyOrderElementsOf(data.entrySet());
   }
 
   @Test
-  public void givenCursorGreaterThanUnsignedLongCapacity_returnsCursorError() {
-    assertThatThrownBy(() -> jedis.hscan("a", "18446744073709551616"))
-        .hasMessageContaining(ERROR_CURSOR);
+  public void should_retun_not_error_given_non_zero_cursor_on_first_call() {
+
+    Map<String, String> data = new HashMap<>();
+    data.put("field_1", "yellow");
+    data.put("field_2", "green");
+    data.put("field_3", "grey");
+    jedis.hmset("colors", data);
+
+
+    ScanResult<Map.Entry<String, String>> result = jedis.hscan("colors", "5");
+
+    assertThat(new HashSet<>(result.getResult()))
+        .containsExactlyInAnyOrderElementsOf(data.entrySet());
   }
 
+  /**** Concurrency ***/
+
+  private final int SIZE_OF_INITIAL_HASH_DATA = 100;
+  final String HASH_KEY = "key";
+  final String BASE_FIELD = "baseField_";
+
   @Test
-  public void 
givenNegativeCursorGreaterThanUnsignedLongCapacity_returnsCursorError() {
-    assertThatThrownBy(() -> jedis.hscan("a", "-18446744073709551616"))
-        .hasMessageContaining(ERROR_CURSOR);
+  public void 
should_notLoseFields_givenConcurrentThreadsDoingHScansAndChangingValues() {
+    final Map<String, String> INITIAL_HASH_DATA = 
makeEntrySet(SIZE_OF_INITIAL_HASH_DATA);
+    jedis.hset(HASH_KEY, INITIAL_HASH_DATA);
+    final int ITERATION_COUNT = 500;
+
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> multipleHScanAndAssertOnSizeOfResultSet(jedis, 
INITIAL_HASH_DATA),
+        (i) -> multipleHScanAndAssertOnSizeOfResultSet(jedis2, 
INITIAL_HASH_DATA),
+        (i) -> {
+          int fieldSuffix = i % SIZE_OF_INITIAL_HASH_DATA;
+          jedis3.hset(HASH_KEY, BASE_FIELD + fieldSuffix, "new_value_" + i);
+        }).run();
+
   }
 
   @Test
-  public void givenInvalidRegexSyntax_returnsEmptyArray() {
-    jedis.hset("a", "1", "green");
-    ScanParams scanParams = new ScanParams();
-    scanParams.count(1);
-    scanParams.match("\\p");
+  public void 
should_notLoseKeysForConsistentlyPresentFields_givenConcurrentThreadsAddingAndRemovingFields()
 {
+    final Map<String, String> INITIAL_HASH_DATA = 
makeEntrySet(SIZE_OF_INITIAL_HASH_DATA);
+    jedis.hset(HASH_KEY, INITIAL_HASH_DATA);
+    final int ITERATION_COUNT = 500;
+
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> multipleHScanAndAssertOnContentOfResultSet(jedis, 
INITIAL_HASH_DATA),
+        (i) -> multipleHScanAndAssertOnContentOfResultSet(jedis2, 
INITIAL_HASH_DATA),
+        (i) -> {
+          String field = "new_" + BASE_FIELD + i;
+          jedis3.hset(HASH_KEY, field, "whatever");
+          jedis3.hdel(HASH_KEY, field);
+        }).run();
 
-    ScanResult<Map.Entry<String, String>> result = jedis.hscan("a", "0", 
scanParams);
+  }
 
-    assertThat(result.getResult()).isEmpty();
+  @Test
+  public void should_notAlterUnderlyingData_givenMultipleConcurrentHscans() {
+    final Map<String, String> INITIAL_HASH_DATA = 
makeEntrySet(SIZE_OF_INITIAL_HASH_DATA);
+    jedis.hset(HASH_KEY, INITIAL_HASH_DATA);
+    final int ITERATION_COUNT = 500;
+
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> multipleHScanAndAssertOnContentOfResultSet(jedis, 
INITIAL_HASH_DATA),
+        (i) -> multipleHScanAndAssertOnContentOfResultSet(jedis2, 
INITIAL_HASH_DATA));
+
+    INITIAL_HASH_DATA.forEach((field, value) -> {
+      assertThat(jedis3.hget(HASH_KEY, field).equals(value));
+    });
+
+  }
+
+  private void multipleHScanAndAssertOnContentOfResultSet(Jedis jedis,
+      final Map<String, String> intialHashData) {
+
+    List<String> allEntries = new ArrayList<>();
+    ScanResult<Map.Entry<String, String>> result;
+    String cursor = "0";
+
+    do {
+      result = jedis.hscan(HASH_KEY, cursor);
+      cursor = result.getCursor();
+      List<Map.Entry<String, String>> resultEntries = result.getResult();
+      resultEntries
+          .forEach((entry) -> allEntries.add(entry.getKey()));
+    } while (!result.isCompleteIteration());
+
+    assertThat(allEntries).containsAll(intialHashData.keySet());
+  }
+
+  private void multipleHScanAndAssertOnSizeOfResultSet(Jedis jedis,
+      final Map<String, String> intialHashData) {
+    List<Map.Entry<String, String>> allEntries = new ArrayList<>();
+    ScanResult<Map.Entry<String, String>> result;
+    String cursor = "0";
+
+    do {
+      result = jedis.hscan(HASH_KEY, cursor);
+      cursor = result.getCursor();
+      allEntries.addAll(result.getResult());
+    } while (!result.isCompleteIteration());
+
+    assertThat(allEntries.size())
+        .isEqualTo(intialHashData.size());

Review comment:
       Since there is a possibility that native Redis will return duplicates, 
maybe we should change this to `isGreaterThanOrEqualTo()` ?

##########
File path: 
geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
##########
@@ -46,19 +53,88 @@
 public class RedisHash extends AbstractRedisData {
   public static final RedisHash NULL_REDIS_HASH = new NullRedisHash();
   private HashMap<ByteArrayWrapper, ByteArrayWrapper> hash;
+  private ConcurrentHashMap<UUID, List<ByteArrayWrapper>> hScanSnapShots;
+  private ConcurrentHashMap<UUID, Long> hScanSnapShotCreationTimes;
+  private ScheduledExecutorService HSCANSnapshotExpirationExecutor = null;
+
+  private static int default_hscan_snapshots_expire_check_frequency =
+      Integer.getInteger("redis.hscan-snapshot-cleanup-interval", 30000);
+
+  private static int default_hscan_snapshots_milliseconds_to_live =
+      Integer.getInteger("redis.hscan-snapshot-expiry", 30000);

Review comment:
       Not sure why we switched from camel case to snake case for variable 
names?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to