DonalEvans commented on a change in pull request #6862:
URL: https://github.com/apache/geode/pull/6862#discussion_r711245979



##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/common/UnsupportedCommandsIntegrationTest.java
##########
@@ -66,11 +66,11 @@ public void 
shouldReturnUnknownCommandMessage_givenCallToUnsupportedCommand_when
     final String KEY = "key";
     final String NEW_VALUE = "changed value";
     final String EXPECTED_ERROR_MSG =
-        String.format(ERROR_UNKNOWN_COMMAND, "MSET", "`" + KEY + "`", 
NEW_VALUE);
+        String.format(ERROR_UNKNOWN_COMMAND, "BITCOUNT", "`" + KEY + "`", 
NEW_VALUE);

Review comment:
       The first test in this class also needs to be updated to use an 
unsupported command since MSET is no longer unsupported. Perhaps it would make 
sense to extract the unsupported command used by both tests to a field so that 
both can be updated if this happens in future?

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/string/MSetExecutor.java
##########
@@ -32,14 +35,25 @@ public RedisResponse executeCommand(Command command, 
ExecutionHandlerContext con
     List<byte[]> commandElems = command.getProcessedCommand();
     RedisStringCommands stringCommands = context.getStringCommands();
 
-    // TODO: make this atomic
+    int numElements = (commandElems.size() - 1) / 2;

Review comment:
       Rather than getting all the command elements and then having to 
skip/ignore the first one, you can use `command.getCommandArguments();` to get 
only the elements you're actually interested in, which makes iteration and 
sizing more straightforward.

##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/string/AbstractMSetIntegrationTest.java
##########
@@ -98,76 +97,37 @@ public void testMSet_setsKeysAndReturnsCorrectValues() {
   }
 
   @Test
-  @Ignore("GEODE-8192")
-  public void testMSet_concurrentInstances_mustBeAtomic()
-      throws InterruptedException, ExecutionException {
-    String keyBaseName = "MSETBASE";
-    String val1BaseName = "FIRSTVALBASE";
-    String val2BaseName = "SECONDVALBASE";
-    String[] keysAndVals1 = new String[(ITERATION_COUNT * 2)];
-    String[] keysAndVals2 = new String[(ITERATION_COUNT * 2)];
-    String[] keys = new String[ITERATION_COUNT];
-    String[] vals1 = new String[ITERATION_COUNT];
-    String[] vals2 = new String[ITERATION_COUNT];
-    String[] expectedVals;
-
-    SetUpArraysForConcurrentMSet(keyBaseName,
-        val1BaseName, val2BaseName,
-        keysAndVals1, keysAndVals2,
-        keys,
-        vals1, vals2);
-
-    RunTwoMSetsInParallelThreadsAndVerifyReturnValue(keysAndVals1, 
keysAndVals2);
-
-    List<String> actualVals = jedis.mget(keys);
-    expectedVals = DetermineWhichMSetWonTheRace(vals1, vals2, actualVals);
-
-    assertThat(actualVals.toArray(new String[] {})).contains(expectedVals);
-  }
+  public void testMSet_concurrentInstances_mustBeAtomic() {
+    int KEY_COUNT = 5000;
+    String[] keys = new String[KEY_COUNT];
 
-  private void SetUpArraysForConcurrentMSet(String keyBaseName, String 
val1BaseName,
-      String val2BaseName, String[] keysAndVals1,
-      String[] keysAndVals2, String[] keys, String[] vals1,
-      String[] vals2) {
-    for (int i = 0; i < ITERATION_COUNT; i++) {
-      String key = keyBaseName + i + hashTag;
-      String value1 = val1BaseName + i;
-      String value2 = val2BaseName + i;
-      keysAndVals1[2 * i] = key;
-      keysAndVals1[2 * i + 1] = value1;
-      keysAndVals2[2 * i] = key;
-      keysAndVals2[2 * i + 1] = value2;
-      keys[i] = key;
-      vals1[i] = value1;
-      vals2[i] = value2;
+    for (int i = 0; i < keys.length; i++) {
+      keys[i] = HASHTAG + "key" + i;
     }
+    String[] keysAndValues1 = makeKeysAndValues(keys, "valueOne");
+    String[] keysAndValues2 = makeKeysAndValues(keys, "valueTwo");
+
+    new ConcurrentLoopingThreads(1000,
+        i -> jedis.mset(keysAndValues1),
+        i -> jedis.mset(keysAndValues2))
+            .runWithAction(() -> {
+              int count = 0;
+              List<String> values = jedis.mget(keys);
+              for (String v : values) {
+                count += v.startsWith("valueOne") ? 1 : -1;
+              }
+              assertThat(Math.abs(count)).isEqualTo(KEY_COUNT);

Review comment:
       Just personal preference, but I found this a little difficult to 
understand at first, so a clearer way to assert the same thing could be:
   ```
   assertThat(jedis.mget(keys)).satisfiesAnyOf(
       values -> assertThat(values).allSatisfy(value -> 
assertThat(value).startsWith("valueOne")),
       values -> assertThat(values).allSatisfy(value -> 
assertThat(value).startsWith("valueTwo"))
   );
   ```

##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/services/LockingStripedCoordinatorIntegrationTest.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class LockingStripedCoordinatorIntegrationTest {
+
+  @Test
+  public void concurrentLockRequestsDoNotDeadlock() {
+    int keysToLock = 1000;
+    StripedCoordinator coordinator = new LockingStripedCoordinator();
+    List<RedisKey> keyList = new ArrayList<>(keysToLock);
+    List<RedisKey> reversedKeyList = new ArrayList<>(keysToLock);
+
+    for (int i = 0; i < keysToLock; i++) {
+      RedisKey k = new RedisKey(("" + i).getBytes());
+      keyList.add(k);
+    }
+    for (int i = keysToLock - 1; i >= 0; i--) {
+      reversedKeyList.add(keyList.get(i));
+    }
+
+    new ConcurrentLoopingThreads(10000,
+        i -> {
+          List<RedisKey> keys = new ArrayList<>(keyList);
+          coordinator.execute(keys, () -> "OK");
+        },
+        i -> {
+          List<RedisKey> keys = new ArrayList<>(reversedKeyList);
+          coordinator.execute(keys, () -> "OK");
+        }).run();

Review comment:
       Is it worth changing the runnable here to something that can be used to 
verify that it's actually executed?

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/string/MSetExecutor.java
##########
@@ -32,14 +35,25 @@ public RedisResponse executeCommand(Command command, 
ExecutionHandlerContext con
     List<byte[]> commandElems = command.getProcessedCommand();
     RedisStringCommands stringCommands = context.getStringCommands();
 
-    // TODO: make this atomic
+    int numElements = (commandElems.size() - 1) / 2;
+    List<RedisKey> keys = new ArrayList<>(numElements);
+    List<byte[]> values = new ArrayList<>(numElements);
+
+    RedisKey previousKey = null;
     for (int i = 1; i < commandElems.size(); i += 2) {
-      byte[] keyArray = commandElems.get(i);
-      RedisKey key = new RedisKey(keyArray);
-      byte[] valueArray = commandElems.get(i + 1);
-      stringCommands.set(key, valueArray, null);
+      RedisKey key = new RedisKey(commandElems.get(i));
+
+      if (previousKey != null && key.getBucketId() != 
previousKey.getBucketId()) {
+        return RedisResponse.crossSlot(ERROR_WRONG_SLOT);
+      }
+      keys.add(key);
+      values.add(commandElems.get(i + 1));
+
+      previousKey = key;
     }
 
+    stringCommands.mset(keys, values);
+
     return RedisResponse.string(SUCCESS);

Review comment:
       This can be replaced with `return RedisResponse.ok();` and the constant 
can be removed.

##########
File path: 
geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/string/MSetDUnitTest.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.string;
+
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCluster;
+
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class MSetDUnitTest {
+
+  private static final Logger logger = LogService.getLogger();
+
+  @ClassRule
+  public static RedisClusterStartupRule clusterStartUp = new 
RedisClusterStartupRule(4);
+
+  @ClassRule
+  public static ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  private static final String HASHTAG = "{tag}";
+  private static JedisCluster jedis;
+  private static MemberVM server1;
+  private static int locatorPort;
+
+  private static MemberVM locator;
+
+  @BeforeClass
+  public static void classSetup() {
+    locator = clusterStartUp.startLocatorVM(0);
+    locatorPort = locator.getPort();
+    server1 = clusterStartUp.startRedisVM(1, locatorPort);
+    clusterStartUp.startRedisVM(2, locatorPort);
+    clusterStartUp.startRedisVM(3, locatorPort);
+
+    int redisServerPort1 = clusterStartUp.getRedisPort(1);
+    // Ensure that buckets are created using a connection with a fairly high 
timeout since
+    // clusterNodes does not get retried.
+    new Jedis(BIND_ADDRESS, redisServerPort1, 
REDIS_CLIENT_TIMEOUT).clusterNodes();
+
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort1), 
5000, 20);
+  }
+
+  @After
+  public void after() {
+    clusterStartUp.flushAll();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    jedis.close();
+  }
+
+  @Test
+  public void testMSet_concurrentInstancesHandleBucketMovement() {
+    int KEY_COUNT = 5000;
+    String[] keys = new String[KEY_COUNT];
+
+    for (int i = 0; i < keys.length; i++) {
+      keys[i] = HASHTAG + "key" + i;
+    }
+    String[] keysAndValues1 = makeKeysAndValues(keys, "valueOne");
+    String[] keysAndValues2 = makeKeysAndValues(keys, "valueTwo");
+
+    new ConcurrentLoopingThreads(100,
+        i -> jedis.mset(keysAndValues1),
+        i -> jedis.mset(keysAndValues2),
+        i -> clusterStartUp.moveBucketForKey(keys[0]))
+            .runWithAction(() -> {
+              int count = 0;
+              List<String> values = jedis.mget(keys);
+              for (String v : values) {
+                if (v == null) {
+                  continue;
+                }
+                count += v.startsWith("valueOne") ? 1 : -1;
+              }
+              assertThat(Math.abs(count)).isEqualTo(KEY_COUNT);

Review comment:
       A similar verification process to the one described in 
`AbstractMSetIntegrationTest` could be used here for added clarity.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to