DonalEvans commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r640912873



##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements 
RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong 
number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();
+
+    assertThat(totalRemoved.get()).isEqualTo(membersCount);
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  private void doZRem(Map<String, Double> map, AtomicInteger total) {
+    Set<String> keys = map.keySet();
+    for (String key : keys) {
+      long count = jedis.zrem(SORTED_SET_KEY, key);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void doZRem1(int memberCounts, AtomicInteger total) {
+    for (int i = 0; i < memberCounts; i++) {
+      long count = jedis.zrem(SORTED_SET_KEY, baseName + i);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void addToSortedSet(Map<String, Double> map) {
+    Set<String> keys = map.keySet();
+    Long count = 0L;
+
+    for (String member : keys) {
+      Double score = map.get(member);
+      Long res = jedis.zadd(SORTED_SET_KEY, score, member);
+      count += res;
+    }
+    assertThat(count).isEqualTo(keys.size());
+  }
+
+  private Map<String, Double> makeMemberScoreMap(int membersCount) {
+    int baseScore = 0;

Review comment:
       This variable is redundant and can probably be removed.

##########
File path: 
geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new 
RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";
+  private final String baseName = "member1-";
+  private final int setSize = 1000;
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  @Before
+  public void setup() {
+    Properties locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, 
DEFAULT_MAX_WAIT_TIME_RECONNECT);
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
+    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+
+    jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), 
JEDIS_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+
+    locator.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();

Review comment:
       It should not be necessary to call `stop()` on members, as this is 
handled by the `RedisClusterStartupRule`.

##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements 
RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong 
number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();
+
+    assertThat(totalRemoved.get()).isEqualTo(membersCount);
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  private void doZRem(Map<String, Double> map, AtomicInteger total) {
+    Set<String> keys = map.keySet();
+    for (String key : keys) {
+      long count = jedis.zrem(SORTED_SET_KEY, key);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void doZRem1(int memberCounts, AtomicInteger total) {

Review comment:
       Could these methods be named a little more descriptively? Perhaps 
something like "doZRemOnAllKeysInMap" and "doMultipleZRem"? Also, the argument 
`memberCounts` could be better named as something like "numToRemove".

##########
File path: 
geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new 
RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";
+  private final String baseName = "member1-";
+  private final int setSize = 1000;
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  @Before
+  public void setup() {
+    Properties locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, 
DEFAULT_MAX_WAIT_TIME_RECONNECT);
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
+    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+
+    jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), 
JEDIS_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+
+    locator.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void zRemCanRemoveMembersFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    long removed = jedis.zrem(sortedSetKey, 
memberScoreMap.keySet().toArray(new String[] {}));
+    assertThat(removed).isEqualTo(setSize);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(memberScoreMap, totalRemoved),
+        (i) -> doZRem1(totalRemoved)).run();
+
+    assertThat(totalRemoved.get()).isEqualTo(setSize);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  private void doZRem(Map<String, Double> map, AtomicInteger total) {
+    Set<String> keys = map.keySet();
+    for (String key : keys) {
+      long count = jedis.zrem(sortedSetKey, key);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void doZRem1(AtomicInteger total) {

Review comment:
       Could these methods be renamed to something a little more descriptive? 
Maybe "doZRemOnAllKeysInMap" and "doZRemOnAllMembers"?

##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements 
RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());

Review comment:
       This constant is defined in `RedisIntegrationTest` with the same value, 
so doesn't need to be redefined here. It seems like this is present in a lot of 
classes that implement `RedisIntegrationTest`, so maybe a ticket could be filed 
to clean up all those uses.

##########
File path: 
geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new 
RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";
+  private final String baseName = "member1-";
+  private final int setSize = 1000;
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  @Before
+  public void setup() {
+    Properties locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, 
DEFAULT_MAX_WAIT_TIME_RECONNECT);
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
+    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+
+    jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), 
JEDIS_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+
+    locator.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void zRemCanRemoveMembersFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    long removed = jedis.zrem(sortedSetKey, 
memberScoreMap.keySet().toArray(new String[] {}));
+    assertThat(removed).isEqualTo(setSize);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(memberScoreMap, totalRemoved),
+        (i) -> doZRem1(totalRemoved)).run();
+
+    assertThat(totalRemoved.get()).isEqualTo(setSize);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  private void doZRem(Map<String, Double> map, AtomicInteger total) {
+    Set<String> keys = map.keySet();
+    for (String key : keys) {
+      long count = jedis.zrem(sortedSetKey, key);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void doZRem1(AtomicInteger total) {
+    for (int i = 0; i < setSize; i++) {
+      long count = jedis.zrem(sortedSetKey, baseName + i);
+      total.addAndGet((int) count);
+    }
+  }
+
+  @Test
+  public void zRemRemovesMembersFromSortedSetAfterPrimaryShutsDown() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    stopNodeWithPrimaryBucketOfTheKey(false);
+
+    doZRemWithRetries(memberScoreMap);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  private void doZRemWithRetries(Map<String, Double> map) {
+    int maxRetryAttempts = 10;
+    int retryAttempts = 0;
+    while (!zRemWithRetries(map, retryAttempts, maxRetryAttempts)) {
+      retryAttempts++;
+    }
+  }
+
+  private boolean zRemWithRetries(Map<String, Double> map, int retries, int 
maxRetries) {
+    long removed;
+    try {
+      removed = jedis.zrem(sortedSetKey, map.keySet().toArray(new String[] 
{}));
+    } catch (JedisClusterMaxAttemptsException e) {
+      if (retries < maxRetries) {
+        return false;
+      }
+      throw e;
+    }
+    assertThat(removed).isEqualTo(map.size());
+    return true;
+  }
+
+  private void doZRem(Map<String, Double> map) {
+    long removed = jedis.zrem(sortedSetKey, map.keySet().toArray(new String[] 
{}));
+    assertThat(removed).isEqualTo(map.size());
+  }
+
+  @Test
+  @Ignore("Fails due to GEODE-9310")
+  public void zRemCanRemoveMembersFromSortedSetDuringPrimaryIsCrashed() throws 
Exception {
+    int mapSize = 300;
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(mapSize);
+
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    int number = 10;
+    String memberNotRemoved = baseName + number;
+    memberScoreMap.remove(memberNotRemoved);
+
+    Future<Void> future1 = executor.submit(() -> doZRem(memberScoreMap));
+    Future<Void> future2 = executor.submit(() -> 
stopNodeWithPrimaryBucketOfTheKey(true));
+
+    future1.get();
+    future2.get();
+
+    GeodeAwaitility.await().until(() -> verifyDataNotExist(memberScoreMap));
+    assertThat(jedis.exists(sortedSetKey)).isTrue();
+  }
+
+  private void verifyDataExist(Map<String, Double> memberScoreMap) {
+    for (String member : memberScoreMap.keySet()) {
+      Double score = jedis.zscore(sortedSetKey, member);
+      assertThat(score).isEqualTo(memberScoreMap.get(member));
+    }
+  }
+
+  private boolean verifyDataNotExist(Map<String, Double> memberScoreMap) {
+    try {
+      for (String member : memberScoreMap.keySet()) {
+        Double score = jedis.zscore(sortedSetKey, member);
+        assertThat(score).isNull();
+      }
+    } catch (JedisClusterMaxAttemptsException e) {
+      return false;
+    }
+    return true;
+  }
+
+  private void stopNodeWithPrimaryBucketOfTheKey(boolean isCrash) {
+    int numOfServers = 4;

Review comment:
       Rather than hard-coding this value, it might be better to add the 
servers to a `List<MemberVM>` in the `setUp()` method and iterate through that 
here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to