sabbey37 commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r640667051



##########
File path: 
geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new 
RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";

Review comment:
       This variable could be final.

##########
File path: 
geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new 
RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";
+  private final String baseName = "member1-";
+  private final int setSize = 1000;
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  @Before
+  public void setup() {
+    Properties locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, 
DEFAULT_MAX_WAIT_TIME_RECONNECT);
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
+    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+
+    jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), 
JEDIS_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+
+    locator.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void zRemCanRemoveMembersFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    long removed = jedis.zrem(sortedSetKey, 
memberScoreMap.keySet().toArray(new String[] {}));
+    assertThat(removed).isEqualTo(setSize);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(memberScoreMap, totalRemoved),
+        (i) -> doZRem1(totalRemoved)).run();
+
+    assertThat(totalRemoved.get()).isEqualTo(setSize);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  private void doZRem(Map<String, Double> map, AtomicInteger total) {
+    Set<String> keys = map.keySet();
+    for (String key : keys) {
+      long count = jedis.zrem(sortedSetKey, key);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void doZRem1(AtomicInteger total) {
+    for (int i = 0; i < setSize; i++) {
+      long count = jedis.zrem(sortedSetKey, baseName + i);
+      total.addAndGet((int) count);
+    }
+  }
+
+  @Test
+  public void zRemRemovesMembersFromSortedSetAfterPrimaryShutsDown() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    stopNodeWithPrimaryBucketOfTheKey(false);
+
+    doZRemWithRetries(memberScoreMap);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  private void doZRemWithRetries(Map<String, Double> map) {
+    int maxRetryAttempts = 10;
+    int retryAttempts = 0;
+    while (!zRemWithRetries(map, retryAttempts, maxRetryAttempts)) {
+      retryAttempts++;
+    }
+  }
+
+  private boolean zRemWithRetries(Map<String, Double> map, int retries, int 
maxRetries) {
+    long removed;
+    try {
+      removed = jedis.zrem(sortedSetKey, map.keySet().toArray(new String[] 
{}));
+    } catch (JedisClusterMaxAttemptsException e) {
+      if (retries < maxRetries) {
+        return false;
+      }
+      throw e;
+    }
+    assertThat(removed).isEqualTo(map.size());
+    return true;
+  }
+
+  private void doZRem(Map<String, Double> map) {
+    long removed = jedis.zrem(sortedSetKey, map.keySet().toArray(new String[] 
{}));
+    assertThat(removed).isEqualTo(map.size());
+  }
+
+  @Test
+  @Ignore("Fails due to GEODE-9310")
+  public void zRemCanRemoveMembersFromSortedSetDuringPrimaryIsCrashed() throws 
Exception {
+    int mapSize = 300;
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(mapSize);
+
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    int number = 10;
+    String memberNotRemoved = baseName + number;
+    memberScoreMap.remove(memberNotRemoved);
+
+    Future<Void> future1 = executor.submit(() -> doZRem(memberScoreMap));
+    Future<Void> future2 = executor.submit(() -> 
stopNodeWithPrimaryBucketOfTheKey(true));
+
+    future1.get();
+    future2.get();
+
+    GeodeAwaitility.await().until(() -> verifyDataNotExist(memberScoreMap));
+    assertThat(jedis.exists(sortedSetKey)).isTrue();
+  }
+
+  private void verifyDataExist(Map<String, Double> memberScoreMap) {
+    for (String member : memberScoreMap.keySet()) {
+      Double score = jedis.zscore(sortedSetKey, member);
+      assertThat(score).isEqualTo(memberScoreMap.get(member));
+    }
+  }
+
+  private boolean verifyDataNotExist(Map<String, Double> memberScoreMap) {

Review comment:
       Could this be `verifyDataDoesNotExist`?

##########
File path: 
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSortedSetTest.java
##########
@@ -232,6 +237,45 @@ public void 
should_calculateSize_closeToROSSize_ofIndividualInstanceWithMultiple
     assertThat(actual).isCloseTo(expected, offset);
   }
 
+  private final String member1 = "member1";
+  private final String member2 = "member2";
+  private final String score1 = "5.55555";
+  private final String score2 = "209030.31";
+
+  @Test
+  public void zremCanRemoveMembersToBeRemoved() {
+    String member3 = "member3";
+    String score3 = "998955255.66361191";
+    RedisSortedSet sortedSet =
+        spy(createRedisSortedSet(score1, member1, score2, member2, score3, 
member3));
+    Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));
+    RedisKey key = new RedisKey();
+    ArrayList<byte[]> membersToRemove = new ArrayList<>();
+    membersToRemove.add(Coder.stringToBytes("nonExisting"));
+    membersToRemove.add(Coder.stringToBytes(member1));
+    membersToRemove.add(Coder.stringToBytes(member3));
+
+    long removed = sortedSet.zrem(region, key, membersToRemove);
+
+    assertThat(removed).isEqualTo(2);
+    verify(sortedSet).storeChanges(eq(region), eq(key), 
any(RemsDeltaInfo.class));
+  }

Review comment:
       This is a cool test!  I'm not sure it's necessary though since all of 
this should've been validated via the DUnit tests, but it is kind of cool 
verifying all of this at a lower level. 

##########
File path: 
geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new 
RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";
+  private final String baseName = "member1-";
+  private final int setSize = 1000;
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  @Before
+  public void setup() {
+    Properties locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, 
DEFAULT_MAX_WAIT_TIME_RECONNECT);
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
+    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+
+    jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), 
JEDIS_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+
+    locator.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void zRemCanRemoveMembersFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    long removed = jedis.zrem(sortedSetKey, 
memberScoreMap.keySet().toArray(new String[] {}));
+    assertThat(removed).isEqualTo(setSize);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyFromSortedSet() {

Review comment:
       Could this be `zRemCanRemove...`?

##########
File path: 
geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new 
RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";
+  private final String baseName = "member1-";
+  private final int setSize = 1000;
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  @Before
+  public void setup() {
+    Properties locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, 
DEFAULT_MAX_WAIT_TIME_RECONNECT);
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
+    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+
+    jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), 
JEDIS_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+
+    locator.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void zRemCanRemoveMembersFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    long removed = jedis.zrem(sortedSetKey, 
memberScoreMap.keySet().toArray(new String[] {}));
+    assertThat(removed).isEqualTo(setSize);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(memberScoreMap, totalRemoved),
+        (i) -> doZRem1(totalRemoved)).run();
+
+    assertThat(totalRemoved.get()).isEqualTo(setSize);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  private void doZRem(Map<String, Double> map, AtomicInteger total) {
+    Set<String> keys = map.keySet();
+    for (String key : keys) {
+      long count = jedis.zrem(sortedSetKey, key);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void doZRem1(AtomicInteger total) {
+    for (int i = 0; i < setSize; i++) {
+      long count = jedis.zrem(sortedSetKey, baseName + i);
+      total.addAndGet((int) count);
+    }
+  }
+
+  @Test
+  public void zRemRemovesMembersFromSortedSetAfterPrimaryShutsDown() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    stopNodeWithPrimaryBucketOfTheKey(false);
+
+    doZRemWithRetries(memberScoreMap);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  private void doZRemWithRetries(Map<String, Double> map) {
+    int maxRetryAttempts = 10;
+    int retryAttempts = 0;
+    while (!zRemWithRetries(map, retryAttempts, maxRetryAttempts)) {
+      retryAttempts++;
+    }
+  }
+
+  private boolean zRemWithRetries(Map<String, Double> map, int retries, int 
maxRetries) {
+    long removed;
+    try {
+      removed = jedis.zrem(sortedSetKey, map.keySet().toArray(new String[] 
{}));
+    } catch (JedisClusterMaxAttemptsException e) {
+      if (retries < maxRetries) {
+        return false;
+      }
+      throw e;
+    }
+    assertThat(removed).isEqualTo(map.size());
+    return true;
+  }
+
+  private void doZRem(Map<String, Double> map) {
+    long removed = jedis.zrem(sortedSetKey, map.keySet().toArray(new String[] 
{}));
+    assertThat(removed).isEqualTo(map.size());
+  }
+
+  @Test
+  @Ignore("Fails due to GEODE-9310")
+  public void zRemCanRemoveMembersFromSortedSetDuringPrimaryIsCrashed() throws 
Exception {
+    int mapSize = 300;
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(mapSize);
+
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    int number = 10;
+    String memberNotRemoved = baseName + number;
+    memberScoreMap.remove(memberNotRemoved);
+
+    Future<Void> future1 = executor.submit(() -> doZRem(memberScoreMap));
+    Future<Void> future2 = executor.submit(() -> 
stopNodeWithPrimaryBucketOfTheKey(true));
+
+    future1.get();
+    future2.get();
+
+    GeodeAwaitility.await().until(() -> verifyDataNotExist(memberScoreMap));
+    assertThat(jedis.exists(sortedSetKey)).isTrue();
+  }
+
+  private void verifyDataExist(Map<String, Double> memberScoreMap) {

Review comment:
       Could we call this `verifyDataExists`?

##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements 
RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong 
number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();

Review comment:
       Since `ConcurrentLoopingThreads` loops for us, maybe we could loop 
through the full `membersCount` size and do the zrem individually:
   ```
    new ConcurrentLoopingThreads(membersCount,
           (i) -> doZRem(i, totalRemoved),
           (i) -> doZRem(i, totalRemoved)).run();
   ```
   
   ```
    private void doZRem(int i, AtomicInteger total) {
         long count = jedis.zrem(SORTED_SET_KEY, baseName + i);
         total.addAndGet((int) count);
     }
     ```

##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements 
RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong 
number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {

Review comment:
       could this be `zRemRemovesKeyIfAllMembersInASortedSetAreRemoved`?

##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements 
RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong 
number of arguments");
+  }

Review comment:
       This test is unnecessary because it is already verified in the 
`zRemThrowsIfTooFewArguments` test above it.

##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements 
RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong 
number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);

Review comment:
       You can actually just `zadd` all the keys/values in the map directly:
   ```
   jedis.zadd(SORTED_SET_KEY, map);
   ```

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
##########
@@ -43,4 +43,10 @@ public long zadd(RedisKey key, List<byte[]> 
scoresAndMembersToAdd, ZAddOptions o
   public byte[] zscore(RedisKey key, byte[] member) {
     return stripedExecute(key, () -> getRedisSortedSet(key, 
true).zscore(member));
   }
+
+  @Override
+  public long zrem(RedisKey key, List<byte[]> membersToRemove) {
+    return stripedExecute(key,
+        () -> getRedisSortedSet(key, false).zrem(getRegion(), key, 
membersToRemove));

Review comment:
       I didn't think about this for ZADD/ZSCORE, but we should also add tests 
to `AbstractHitsMissesIntegrationTest` verifying that stats should/shouldn't be 
updated for `ZREM` and all new commands.

##########
File path: 
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSortedSetTest.java
##########
@@ -232,6 +237,45 @@ public void 
should_calculateSize_closeToROSSize_ofIndividualInstanceWithMultiple
     assertThat(actual).isCloseTo(expected, offset);
   }
 
+  private final String member1 = "member1";
+  private final String member2 = "member2";
+  private final String score1 = "5.55555";
+  private final String score2 = "209030.31";
+
+  @Test
+  public void zremCanRemoveMembersToBeRemoved() {
+    String member3 = "member3";
+    String score3 = "998955255.66361191";
+    RedisSortedSet sortedSet =
+        spy(createRedisSortedSet(score1, member1, score2, member2, score3, 
member3));
+    Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));
+    RedisKey key = new RedisKey();
+    ArrayList<byte[]> membersToRemove = new ArrayList<>();
+    membersToRemove.add(Coder.stringToBytes("nonExisting"));
+    membersToRemove.add(Coder.stringToBytes(member1));
+    membersToRemove.add(Coder.stringToBytes(member3));
+
+    long removed = sortedSet.zrem(region, key, membersToRemove);
+
+    assertThat(removed).isEqualTo(2);
+    verify(sortedSet).storeChanges(eq(region), eq(key), 
any(RemsDeltaInfo.class));
+  }
+
+  @Test
+  public void memberRemoveCanRemoveMemberInSortedSet() {
+    RedisSortedSet sortedSet = createRedisSortedSet(score1, member1, score2, 
member2);
+    RedisSortedSet sortedSet2 = createRedisSortedSet(score2, member2);
+    int originalSize = sortedSet.getSizeInBytes();
+
+    byte[] returnValue = sortedSet.memberRemove(Coder.stringToBytes(member1));
+    int removedSize =
+        sortedSet.calculateSizeOfFieldValuePair(Coder.stringToBytes(member1), 
returnValue);
+
+    assertThat(sortedSet).isEqualTo(sortedSet2);
+    assertThat(returnValue).isEqualTo(Coder.stringToBytes(score1));
+    assertThat(sortedSet.getSizeInBytes()).isEqualTo(originalSize - 
removedSize);
+  }

Review comment:
       For checking that the sizeInBytes is accurate, I'd prefer we use the 
`ReflectionObjectSizer.sizeOf` directly in the tests.  I think 
@nonbinaryprogrammer has been working on a story to fix the sizeable 
calculations though.  The other assertions in this test seem unnecessary since 
they were already validated by integration tests.  

##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements 
RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong 
number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();
+
+    assertThat(totalRemoved.get()).isEqualTo(membersCount);
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  private void doZRem(Map<String, Double> map, AtomicInteger total) {
+    Set<String> keys = map.keySet();
+    for (String key : keys) {
+      long count = jedis.zrem(SORTED_SET_KEY, key);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void doZRem1(int memberCounts, AtomicInteger total) {
+    for (int i = 0; i < memberCounts; i++) {
+      long count = jedis.zrem(SORTED_SET_KEY, baseName + i);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void addToSortedSet(Map<String, Double> map) {
+    Set<String> keys = map.keySet();
+    Long count = 0L;
+
+    for (String member : keys) {
+      Double score = map.get(member);
+      Long res = jedis.zadd(SORTED_SET_KEY, score, member);
+      count += res;
+    }
+    assertThat(count).isEqualTo(keys.size());
+  }

Review comment:
       This method seems unnecessary since we can `zadd` all the keys/values in 
the map directly.  

##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements 
RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong 
number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {

Review comment:
       Could this be `zRemCanRemove....`?

##########
File path: 
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSortedSetTest.java
##########
@@ -116,7 +121,7 @@ public void 
equals_returnsTrue_givenDifferentEmptySortedSets() {
 
   @Test
   public void zadd_stores_delta_that_is_stable() throws IOException {
-    Region<RedisKey, RedisData> region = 
uncheckedCast(Mockito.mock(Region.class));
+    Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));

Review comment:
       Thanks for making this better!

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZRemExecutor.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZRemExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext 
context) {
+    RedisSortedSetCommands redisSortedSetCommands = 
context.getRedisSortedSetCommands();;
+
+    List<byte[]> commandElements = command.getProcessedCommand();
+    RedisKey key = command.getKey();
+    ArrayList<byte[]> membersToRemove =
+        new ArrayList<>(commandElements.subList(2, commandElements.size()));

Review comment:
       We could use `List<byte[]>` for the variable type here.

##########
File path: 
geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new 
RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";
+  private final String baseName = "member1-";
+  private final int setSize = 1000;
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  @Before
+  public void setup() {
+    Properties locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, 
DEFAULT_MAX_WAIT_TIME_RECONNECT);
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
+    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+
+    jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), 
JEDIS_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+
+    locator.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void zRemCanRemoveMembersFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    long removed = jedis.zrem(sortedSetKey, 
memberScoreMap.keySet().toArray(new String[] {}));
+    assertThat(removed).isEqualTo(setSize);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(memberScoreMap, totalRemoved),
+        (i) -> doZRem1(totalRemoved)).run();

Review comment:
       Since ConcurrentLoopingThreads loops for us, maybe we could loop through 
the full `setSize` and do the zrem individually:
   ```
    new ConcurrentLoopingThreads(setSize,
           (i) -> doZRem(i, totalRemoved),
           (i) -> doZRem(i, totalRemoved)).run();
    ```
    ```
    private void doZRem(int i, AtomicInteger totalRemoved) {
         long count = jedis.zrem(SORTED_SET_KEY, baseName + i);
         totalRemoved.addAndGet((int) count);
     }
     ```

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -87,7 +87,9 @@ protected void applyDelta(DeltaInfo deltaInfo) {
       membersAddAll(addsDeltaInfo);
     } else {
       RemsDeltaInfo remsDeltaInfo = (RemsDeltaInfo) deltaInfo;
-      membersRemoveAll(remsDeltaInfo);
+      for (byte[] member : remsDeltaInfo.getRemoves()) {
+        memberRemove(member);
+      }

Review comment:
       Could we also update the way we're applying delta for adds so that it's 
consistent with how we've changed the removes, so eliminating the 
`memberAddAll` method and moving that logic into this method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to