DonalEvans commented on a change in pull request #6524: URL: https://github.com/apache/geode/pull/6524#discussion_r640912873
########## File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.sortedset; + +import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs; +import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Protocol; + +import org.apache.geode.redis.ConcurrentLoopingThreads; +import org.apache.geode.redis.RedisIntegrationTest; +import org.apache.geode.test.awaitility.GeodeAwaitility; + +public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest { + private JedisCluster jedis; + private final String baseName = "member_"; + + private static final int REDIS_CLIENT_TIMEOUT = + Math.toIntExact(GeodeAwaitility.getTimeout().toMillis()); + private static final String SORTED_SET_KEY = "ss_key"; + private static final int INITIAL_MEMBER_COUNT = 5; + + @Before + public void setUp() { + jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT); + } + + @After + public void tearDown() { + flushAll(); + jedis.close(); + } + + @Test + public void zRemThrowsIfTooFewArguments() { + assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2); + } + + @Test + public void zRemThrowsIfGivenOnlyKey() { + assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments"); + } + + @Test + public void zRemThrowsErrorIfKeyIsNotASortedSet() { + String key = "key"; + String member = "member1"; + jedis.sadd(key, member); + + assertThatThrownBy(() -> jedis.zrem(key, member)) + .hasMessageContaining(ERROR_WRONG_TYPE); + } + + @Test + public void zRemDoesNotRemoveNonExistingMember() { + Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT); + addToSortedSet(map); + + String nonExistingMember = "nonExisting"; + long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember); + + assertThat(result).isEqualTo(0); + } + + @Test + public void zRemCanRemoveAMemberInASortedSet() { + Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT); + Set<String> keys = map.keySet(); + addToSortedSet(map); + + String memberToRemove = baseName + 1; + Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove); + assertThat(removed).isEqualTo(1); + + for (String member : keys) { + Double score = jedis.zscore(SORTED_SET_KEY, member); + if (member.equals(memberToRemove)) { + assertThat(score).isNull(); + } else { + assertThat(score).isNotNull(); + } + } + assertThat(jedis.exists(SORTED_SET_KEY)).isTrue(); + } + + @Test + public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() { + Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT); + Set<String> keys = map.keySet(); + addToSortedSet(map); + + String[] membersToRemove = new String[keys.size()]; + Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove)); + assertThat(removed).isEqualTo(keys.size()); + + for (String member : keys) { + Double score = jedis.zscore(SORTED_SET_KEY, member); + assertThat(score).isNull(); + } + assertThat(jedis.exists(SORTED_SET_KEY)).isFalse(); + } + + @Test + public void zRemCanRemovesMembersConcurrentlyInASortedSet() { + int membersCount = 1000; + Map<String, Double> map = makeMemberScoreMap(membersCount); + addToSortedSet(map); + + AtomicInteger totalRemoved = new AtomicInteger(); + new ConcurrentLoopingThreads(2, + (i) -> doZRem(map, totalRemoved), + (i) -> doZRem1(membersCount, totalRemoved)).run(); + + assertThat(totalRemoved.get()).isEqualTo(membersCount); + assertThat(jedis.exists(SORTED_SET_KEY)).isFalse(); + } + + private void doZRem(Map<String, Double> map, AtomicInteger total) { + Set<String> keys = map.keySet(); + for (String key : keys) { + long count = jedis.zrem(SORTED_SET_KEY, key); + total.addAndGet((int) count); + } + } + + private void doZRem1(int memberCounts, AtomicInteger total) { + for (int i = 0; i < memberCounts; i++) { + long count = jedis.zrem(SORTED_SET_KEY, baseName + i); + total.addAndGet((int) count); + } + } + + private void addToSortedSet(Map<String, Double> map) { + Set<String> keys = map.keySet(); + Long count = 0L; + + for (String member : keys) { + Double score = map.get(member); + Long res = jedis.zadd(SORTED_SET_KEY, score, member); + count += res; + } + assertThat(count).isEqualTo(keys.size()); + } + + private Map<String, Double> makeMemberScoreMap(int membersCount) { + int baseScore = 0; Review comment: This variable is redundant and can probably be removed. ########## File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.sortedset; + +import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT; +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException; + +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.Region; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionHelper; +import org.apache.geode.redis.ConcurrentLoopingThreads; +import org.apache.geode.redis.internal.RegionProvider; +import org.apache.geode.redis.internal.data.RedisData; +import org.apache.geode.redis.internal.data.RedisKey; +import org.apache.geode.redis.internal.netty.Coder; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.dunit.rules.RedisClusterStartupRule; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; + +public class ZRemDUnitTest implements Serializable { + @Rule + public ExecutorServiceRule executor = new ExecutorServiceRule(); + + @Rule + public RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4); + + private transient JedisCluster jedis; + private MemberVM locator; + private MemberVM server1; + private MemberVM server2; + private MemberVM server3; + private String sortedSetKey = "key"; + private final String baseName = "member1-"; + private final int setSize = 1000; + + private static final String LOCAL_HOST = "127.0.0.1"; + private static final int JEDIS_TIMEOUT = + Math.toIntExact(GeodeAwaitility.getTimeout().toMillis()); + + @Before + public void setup() { + Properties locatorProperties = new Properties(); + locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, DEFAULT_MAX_WAIT_TIME_RECONNECT); + + locator = clusterStartUp.startLocatorVM(0, locatorProperties); + server1 = clusterStartUp.startRedisVM(1, locator.getPort()); + server2 = clusterStartUp.startRedisVM(2, locator.getPort()); + server3 = clusterStartUp.startRedisVM(3, locator.getPort()); + + int redisServerPort = clusterStartUp.getRedisPort(1); + + jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), JEDIS_TIMEOUT); + } + + @After + public void tearDown() { + jedis.close(); + + locator.stop(); + server1.stop(); + server2.stop(); + server3.stop(); Review comment: It should not be necessary to call `stop()` on members, as this is handled by the `RedisClusterStartupRule`. ########## File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.sortedset; + +import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs; +import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Protocol; + +import org.apache.geode.redis.ConcurrentLoopingThreads; +import org.apache.geode.redis.RedisIntegrationTest; +import org.apache.geode.test.awaitility.GeodeAwaitility; + +public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest { + private JedisCluster jedis; + private final String baseName = "member_"; + + private static final int REDIS_CLIENT_TIMEOUT = + Math.toIntExact(GeodeAwaitility.getTimeout().toMillis()); + private static final String SORTED_SET_KEY = "ss_key"; + private static final int INITIAL_MEMBER_COUNT = 5; + + @Before + public void setUp() { + jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT); + } + + @After + public void tearDown() { + flushAll(); + jedis.close(); + } + + @Test + public void zRemThrowsIfTooFewArguments() { + assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2); + } + + @Test + public void zRemThrowsIfGivenOnlyKey() { + assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments"); + } + + @Test + public void zRemThrowsErrorIfKeyIsNotASortedSet() { + String key = "key"; + String member = "member1"; + jedis.sadd(key, member); + + assertThatThrownBy(() -> jedis.zrem(key, member)) + .hasMessageContaining(ERROR_WRONG_TYPE); + } + + @Test + public void zRemDoesNotRemoveNonExistingMember() { + Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT); + addToSortedSet(map); + + String nonExistingMember = "nonExisting"; + long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember); + + assertThat(result).isEqualTo(0); + } + + @Test + public void zRemCanRemoveAMemberInASortedSet() { + Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT); + Set<String> keys = map.keySet(); + addToSortedSet(map); + + String memberToRemove = baseName + 1; + Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove); + assertThat(removed).isEqualTo(1); + + for (String member : keys) { + Double score = jedis.zscore(SORTED_SET_KEY, member); + if (member.equals(memberToRemove)) { + assertThat(score).isNull(); + } else { + assertThat(score).isNotNull(); + } + } + assertThat(jedis.exists(SORTED_SET_KEY)).isTrue(); + } + + @Test + public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() { + Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT); + Set<String> keys = map.keySet(); + addToSortedSet(map); + + String[] membersToRemove = new String[keys.size()]; + Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove)); + assertThat(removed).isEqualTo(keys.size()); + + for (String member : keys) { + Double score = jedis.zscore(SORTED_SET_KEY, member); + assertThat(score).isNull(); + } + assertThat(jedis.exists(SORTED_SET_KEY)).isFalse(); + } + + @Test + public void zRemCanRemovesMembersConcurrentlyInASortedSet() { + int membersCount = 1000; + Map<String, Double> map = makeMemberScoreMap(membersCount); + addToSortedSet(map); + + AtomicInteger totalRemoved = new AtomicInteger(); + new ConcurrentLoopingThreads(2, + (i) -> doZRem(map, totalRemoved), + (i) -> doZRem1(membersCount, totalRemoved)).run(); + + assertThat(totalRemoved.get()).isEqualTo(membersCount); + assertThat(jedis.exists(SORTED_SET_KEY)).isFalse(); + } + + private void doZRem(Map<String, Double> map, AtomicInteger total) { + Set<String> keys = map.keySet(); + for (String key : keys) { + long count = jedis.zrem(SORTED_SET_KEY, key); + total.addAndGet((int) count); + } + } + + private void doZRem1(int memberCounts, AtomicInteger total) { Review comment: Could these methods be named a little more descriptively? Perhaps something like "doZRemOnAllKeysInMap" and "doMultipleZRem"? Also, the argument `memberCounts` could be better named as something like "numToRemove". ########## File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.sortedset; + +import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT; +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException; + +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.Region; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionHelper; +import org.apache.geode.redis.ConcurrentLoopingThreads; +import org.apache.geode.redis.internal.RegionProvider; +import org.apache.geode.redis.internal.data.RedisData; +import org.apache.geode.redis.internal.data.RedisKey; +import org.apache.geode.redis.internal.netty.Coder; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.dunit.rules.RedisClusterStartupRule; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; + +public class ZRemDUnitTest implements Serializable { + @Rule + public ExecutorServiceRule executor = new ExecutorServiceRule(); + + @Rule + public RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4); + + private transient JedisCluster jedis; + private MemberVM locator; + private MemberVM server1; + private MemberVM server2; + private MemberVM server3; + private String sortedSetKey = "key"; + private final String baseName = "member1-"; + private final int setSize = 1000; + + private static final String LOCAL_HOST = "127.0.0.1"; + private static final int JEDIS_TIMEOUT = + Math.toIntExact(GeodeAwaitility.getTimeout().toMillis()); + + @Before + public void setup() { + Properties locatorProperties = new Properties(); + locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, DEFAULT_MAX_WAIT_TIME_RECONNECT); + + locator = clusterStartUp.startLocatorVM(0, locatorProperties); + server1 = clusterStartUp.startRedisVM(1, locator.getPort()); + server2 = clusterStartUp.startRedisVM(2, locator.getPort()); + server3 = clusterStartUp.startRedisVM(3, locator.getPort()); + + int redisServerPort = clusterStartUp.getRedisPort(1); + + jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), JEDIS_TIMEOUT); + } + + @After + public void tearDown() { + jedis.close(); + + locator.stop(); + server1.stop(); + server2.stop(); + server3.stop(); + } + + @Test + public void zRemCanRemoveMembersFromSortedSet() { + Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize); + jedis.zadd(sortedSetKey, memberScoreMap); + verifyDataExist(memberScoreMap); + + long removed = jedis.zrem(sortedSetKey, memberScoreMap.keySet().toArray(new String[] {})); + assertThat(removed).isEqualTo(setSize); + + verifyDataNotExist(memberScoreMap); + assertThat(jedis.exists(sortedSetKey)).isFalse(); + } + + @Test + public void zRemCanRemovesMembersConcurrentlyFromSortedSet() { + Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize); + jedis.zadd(sortedSetKey, memberScoreMap); + verifyDataExist(memberScoreMap); + + AtomicInteger totalRemoved = new AtomicInteger(); + new ConcurrentLoopingThreads(2, + (i) -> doZRem(memberScoreMap, totalRemoved), + (i) -> doZRem1(totalRemoved)).run(); + + assertThat(totalRemoved.get()).isEqualTo(setSize); + assertThat(jedis.exists(sortedSetKey)).isFalse(); + } + + private void doZRem(Map<String, Double> map, AtomicInteger total) { + Set<String> keys = map.keySet(); + for (String key : keys) { + long count = jedis.zrem(sortedSetKey, key); + total.addAndGet((int) count); + } + } + + private void doZRem1(AtomicInteger total) { Review comment: Could these methods be renamed to something a little more descriptive? Maybe "doZRemOnAllKeysInMap" and "doZRemOnAllMembers"? ########## File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.sortedset; + +import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs; +import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Protocol; + +import org.apache.geode.redis.ConcurrentLoopingThreads; +import org.apache.geode.redis.RedisIntegrationTest; +import org.apache.geode.test.awaitility.GeodeAwaitility; + +public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest { + private JedisCluster jedis; + private final String baseName = "member_"; + + private static final int REDIS_CLIENT_TIMEOUT = + Math.toIntExact(GeodeAwaitility.getTimeout().toMillis()); Review comment: This constant is defined in `RedisIntegrationTest` with the same value, so doesn't need to be redefined here. It seems like this is present in a lot of classes that implement `RedisIntegrationTest`, so maybe a ticket could be filed to clean up all those uses. ########## File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.sortedset; + +import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT; +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException; + +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.Region; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionHelper; +import org.apache.geode.redis.ConcurrentLoopingThreads; +import org.apache.geode.redis.internal.RegionProvider; +import org.apache.geode.redis.internal.data.RedisData; +import org.apache.geode.redis.internal.data.RedisKey; +import org.apache.geode.redis.internal.netty.Coder; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.dunit.rules.RedisClusterStartupRule; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; + +public class ZRemDUnitTest implements Serializable { + @Rule + public ExecutorServiceRule executor = new ExecutorServiceRule(); + + @Rule + public RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4); + + private transient JedisCluster jedis; + private MemberVM locator; + private MemberVM server1; + private MemberVM server2; + private MemberVM server3; + private String sortedSetKey = "key"; + private final String baseName = "member1-"; + private final int setSize = 1000; + + private static final String LOCAL_HOST = "127.0.0.1"; + private static final int JEDIS_TIMEOUT = + Math.toIntExact(GeodeAwaitility.getTimeout().toMillis()); + + @Before + public void setup() { + Properties locatorProperties = new Properties(); + locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, DEFAULT_MAX_WAIT_TIME_RECONNECT); + + locator = clusterStartUp.startLocatorVM(0, locatorProperties); + server1 = clusterStartUp.startRedisVM(1, locator.getPort()); + server2 = clusterStartUp.startRedisVM(2, locator.getPort()); + server3 = clusterStartUp.startRedisVM(3, locator.getPort()); + + int redisServerPort = clusterStartUp.getRedisPort(1); + + jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), JEDIS_TIMEOUT); + } + + @After + public void tearDown() { + jedis.close(); + + locator.stop(); + server1.stop(); + server2.stop(); + server3.stop(); + } + + @Test + public void zRemCanRemoveMembersFromSortedSet() { + Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize); + jedis.zadd(sortedSetKey, memberScoreMap); + verifyDataExist(memberScoreMap); + + long removed = jedis.zrem(sortedSetKey, memberScoreMap.keySet().toArray(new String[] {})); + assertThat(removed).isEqualTo(setSize); + + verifyDataNotExist(memberScoreMap); + assertThat(jedis.exists(sortedSetKey)).isFalse(); + } + + @Test + public void zRemCanRemovesMembersConcurrentlyFromSortedSet() { + Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize); + jedis.zadd(sortedSetKey, memberScoreMap); + verifyDataExist(memberScoreMap); + + AtomicInteger totalRemoved = new AtomicInteger(); + new ConcurrentLoopingThreads(2, + (i) -> doZRem(memberScoreMap, totalRemoved), + (i) -> doZRem1(totalRemoved)).run(); + + assertThat(totalRemoved.get()).isEqualTo(setSize); + assertThat(jedis.exists(sortedSetKey)).isFalse(); + } + + private void doZRem(Map<String, Double> map, AtomicInteger total) { + Set<String> keys = map.keySet(); + for (String key : keys) { + long count = jedis.zrem(sortedSetKey, key); + total.addAndGet((int) count); + } + } + + private void doZRem1(AtomicInteger total) { + for (int i = 0; i < setSize; i++) { + long count = jedis.zrem(sortedSetKey, baseName + i); + total.addAndGet((int) count); + } + } + + @Test + public void zRemRemovesMembersFromSortedSetAfterPrimaryShutsDown() { + Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize); + jedis.zadd(sortedSetKey, memberScoreMap); + verifyDataExist(memberScoreMap); + + stopNodeWithPrimaryBucketOfTheKey(false); + + doZRemWithRetries(memberScoreMap); + + verifyDataNotExist(memberScoreMap); + assertThat(jedis.exists(sortedSetKey)).isFalse(); + } + + private void doZRemWithRetries(Map<String, Double> map) { + int maxRetryAttempts = 10; + int retryAttempts = 0; + while (!zRemWithRetries(map, retryAttempts, maxRetryAttempts)) { + retryAttempts++; + } + } + + private boolean zRemWithRetries(Map<String, Double> map, int retries, int maxRetries) { + long removed; + try { + removed = jedis.zrem(sortedSetKey, map.keySet().toArray(new String[] {})); + } catch (JedisClusterMaxAttemptsException e) { + if (retries < maxRetries) { + return false; + } + throw e; + } + assertThat(removed).isEqualTo(map.size()); + return true; + } + + private void doZRem(Map<String, Double> map) { + long removed = jedis.zrem(sortedSetKey, map.keySet().toArray(new String[] {})); + assertThat(removed).isEqualTo(map.size()); + } + + @Test + @Ignore("Fails due to GEODE-9310") + public void zRemCanRemoveMembersFromSortedSetDuringPrimaryIsCrashed() throws Exception { + int mapSize = 300; + Map<String, Double> memberScoreMap = makeMemberScoreMap(mapSize); + + jedis.zadd(sortedSetKey, memberScoreMap); + verifyDataExist(memberScoreMap); + + int number = 10; + String memberNotRemoved = baseName + number; + memberScoreMap.remove(memberNotRemoved); + + Future<Void> future1 = executor.submit(() -> doZRem(memberScoreMap)); + Future<Void> future2 = executor.submit(() -> stopNodeWithPrimaryBucketOfTheKey(true)); + + future1.get(); + future2.get(); + + GeodeAwaitility.await().until(() -> verifyDataNotExist(memberScoreMap)); + assertThat(jedis.exists(sortedSetKey)).isTrue(); + } + + private void verifyDataExist(Map<String, Double> memberScoreMap) { + for (String member : memberScoreMap.keySet()) { + Double score = jedis.zscore(sortedSetKey, member); + assertThat(score).isEqualTo(memberScoreMap.get(member)); + } + } + + private boolean verifyDataNotExist(Map<String, Double> memberScoreMap) { + try { + for (String member : memberScoreMap.keySet()) { + Double score = jedis.zscore(sortedSetKey, member); + assertThat(score).isNull(); + } + } catch (JedisClusterMaxAttemptsException e) { + return false; + } + return true; + } + + private void stopNodeWithPrimaryBucketOfTheKey(boolean isCrash) { + int numOfServers = 4; Review comment: Rather than hard-coding this value, it might be better to add the servers to a `List<MemberVM>` in the `setUp()` method and iterate through that here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org