DonalEvans commented on a change in pull request #6794:
URL: https://github.com/apache/geode/pull/6794#discussion_r695992810



##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
##########
@@ -54,4 +54,8 @@
   long zrevrank(RedisKey key, byte[] member);
 
   byte[] zscore(RedisKey key, byte[] member);
+
+  long zunionstore(RedisKey key, List<RedisKey> sourceSets, List<Double> 
weights,

Review comment:
       Just for clarity, could the `key` argument be renamed `destinationKey`?

##########
File path: 
geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
##########
@@ -84,18 +86,49 @@ public void await() {
    * Start operations and only return once all are complete.
    */
   public void run() {
-    start(false);
+    start(false, null);
     await();
   }
 
   /**
    * Start operations and run each iteration in lockstep
    */
   public void runInLockstep() {
-    start(true);
+    start(true, null);
     await();
   }
 
+  /**
+   * Start operations and provide an action to be performed at the end of 
every iteration. This
+   * implies running in lockstep. This would typically be used to provide some 
form of validation.
+   */
+  public void runWithAction(Runnable action) {
+    Runnable innerRunnable = () -> {
+      try {
+        action.run();
+      } catch (Throwable e) {
+        actionThrowable.set(e);
+        throw e;
+      }
+    };
+
+    start(true, innerRunnable);
+
+    try {
+      await();
+    } catch (Throwable e) {
+      Throwable actionException = actionThrowable.get();
+      if (actionException != null) {
+        if (actionException instanceof Error) {
+          throw (Error) actionException;
+        }
+        throw new RuntimeException(actionThrowable.get());

Review comment:
       Do we expect that the value stored in `actionThrowable` will have 
changed between this call and the one at the start of the catch block? If not, 
this could just be `actionException`.

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
##########
@@ -163,6 +164,19 @@ public RedisStats getRedisStats() {
     }
   }
 
+  public <T> T execute(Object key, List<Object> keysToLock, Callable<T> 
callable) {

Review comment:
       The `key` argument of this method (and of the `execute()` method above 
it) can be a `RedisKey` instead of an `Object` since we always expect that to 
be the type passed to the method. This change also removes the need to cast to 
a `RedisKey` in the catch block. If this change is made, it will also require 
that the `stripedExecute()` methods in `RedisDataCommandsFunctionExecutor` be 
similarly modified.

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
##########
@@ -303,16 +317,14 @@ public RedisKeyCommands getKeyCommands() {
    *
    * @return the keys ordered in the sequence in which they should be locked.
    */
-  public List<RedisKey> orderForLocking(RedisKey key1, RedisKey key2) {
-    List<RedisKey> orderedKeys = new ArrayList<>();
-    if (stripedCoordinator.compareStripes(key1, key2) > 0) {
-      orderedKeys.add(key1);
-      orderedKeys.add(key2);
-    } else {
-      orderedKeys.add(key2);
-      orderedKeys.add(key1);
-    }
+  public List<RedisKey> orderForLocking(List<RedisKey> keys) {
+    Collections.sort(keys, stripedCoordinator::compareStripes);

Review comment:
       This could be replaced with 
`keys.sort(stripedCoordinator::compareStripes);`

##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZUnionStoreIntegrationTest.java
##########
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+import redis.clients.jedis.Tuple;
+import redis.clients.jedis.ZParams;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.internal.RedisConstants;
+
+public abstract class AbstractZUnionStoreIntegrationTest implements 
RedisIntegrationTest {
+
+  private static final String NEW_SET = "{user1}new";
+  private static final String SORTED_SET_KEY1 = "{user1}sset1";
+  private static final String SORTED_SET_KEY2 = "{user1}sset2";
+  private static final String SORTED_SET_KEY3 = "{user1}sset3";
+
+  private JedisCluster jedis;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void shouldError_givenWrongKeyType() {
+    final String NEW_KEY = "{user1}new";
+    final String STRING_KEY = "{user1}stringKey";
+    jedis.set(STRING_KEY, "value");
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_KEY, Protocol.Command.ZUNIONSTORE, 
NEW_KEY, "2", STRING_KEY,
+            SORTED_SET_KEY1))
+                .hasMessage("WRONGTYPE " + RedisConstants.ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void shouldError_givenSetsCrossSlots() {
+    final String NEW_KEY = "{user1}new";
+    final String WRONG_KEY = "{user2}another";
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_KEY, Protocol.Command.ZUNIONSTORE, 
NEW_KEY, "2", WRONG_KEY,
+            SORTED_SET_KEY1))
+                .hasMessage("CROSSSLOT " + RedisConstants.ERROR_WRONG_SLOT);
+  }
+
+  @Test
+  public void shouldError_givenTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZUNIONSTORE, 3);
+  }
+
+  @Test
+  public void shouldError_givenNumkeysTooLarge() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "2",
+            SORTED_SET_KEY1))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenNumkeysTooSmall() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, SORTED_SET_KEY2))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenTooManyWeights() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, "WEIGHTS", "2", "3"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenTooFewWeights() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "2",
+            SORTED_SET_KEY1, SORTED_SET_KEY2, "WEIGHTS", "1"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenWeightNotANumber() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, "WEIGHTS", "not-a-number"))
+                .hasMessage("ERR " + RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT);
+  }
+
+  @Test
+  public void shouldError_givenWeightsWithoutAnyValues() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, "WEIGHTS"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenMultipleWeightKeywords() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, "WEIGHT", "1.0", "WEIGHT", "2.0"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenUnknownAggregate() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, "AGGREGATE", "UNKNOWN", "WEIGHTS", "1"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenAggregateKeywordWithoutValue() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, "AGGREGATE"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenMultipleAggregates() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, "WEIGHTS", "1", "AGGREGATE", "SUM", "MIN"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldUnionize_givenASingleSet() {
+    Map<String, Double> scores = makeScoreMap(10, x -> (double) x);
+    Set<Tuple> expectedResults = convertToTuples(scores, (i, x) -> x);
+    jedis.zadd(SORTED_SET_KEY1, scores);
+
+    assertThat(jedis.zunionstore(NEW_SET, SORTED_SET_KEY1)).isEqualTo(10);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenOneSetDoesNotExist() {
+    Map<String, Double> scores = makeScoreMap(10, x -> (double) x);
+    Set<Tuple> expectedResults = convertToTuples(scores, (i, x) -> x);
+    jedis.zadd(SORTED_SET_KEY1, scores);
+
+    jedis.zunionstore(NEW_SET, SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenWeight() {
+    Map<String, Double> scores = makeScoreMap(10, x -> (double) x);
+    Set<Tuple> expectedResults = convertToTuples(scores, (i, x) -> x * 1.5);
+    jedis.zadd(SORTED_SET_KEY1, scores);
+
+    jedis.zunionstore(SORTED_SET_KEY1, new ZParams().weights(1.5), 
SORTED_SET_KEY1);
+
+    Set<Tuple> results = jedis.zrangeWithScores(SORTED_SET_KEY1, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void 
shouldUnionizeWithWeightAndDefaultAggregate_givenMultipleSetsWithWeights() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(10, x -> (double) (9 - x));
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> (x * 2.0) 
+ ((9 - x) * 1.5));
+
+    jedis.zunionstore(NEW_SET, new ZParams().weights(2.0, 1.5), 
SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenMinAggregate() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(10, x -> 0D);
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults = convertToTuples(scores2, (i, x) -> x);
+
+    jedis.zunionstore(NEW_SET, new ZParams().aggregate(ZParams.Aggregate.MIN),
+        SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenMaxAggregate() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) ((x % 2 == 0) 
? 0 : x));
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(10, x -> (double) ((x % 2 == 0) 
? x : 0));
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> (double) 
i);
+
+    jedis.zunionstore(NEW_SET, new ZParams().aggregate(ZParams.Aggregate.MAX),
+        SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void 
shouldUnionizeUsingLastAggregate_givenMultipleAggregateKeywords() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) 0);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(10, x -> (double) 1);
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults = convertToTuples(scores2, (i, x) -> x);
+
+    jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, NEW_SET, "2",
+        SORTED_SET_KEY1, SORTED_SET_KEY2, "AGGREGATE", "MIN", "AGGREGATE", 
"MAX");
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenMaxAggregateAndMultipleWeights() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) ((x % 2 == 0) 
? 0 : x));
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(10, x -> (double) ((x % 2 == 0) 
? x : 0));
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> (double) 
(i * 2));
+
+    jedis.zunionstore(NEW_SET, new 
ZParams().aggregate(ZParams.Aggregate.MAX).weights(2, 2),
+        SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenSumAggregateAndMultipleSets() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(10, x -> (double) (x * 2));
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Map<String, Double> scores3 = makeScoreMap(10, x -> (double) (x * 3));
+    jedis.zadd(SORTED_SET_KEY3, scores3);
+
+    Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> x * 6);
+
+    jedis.zunionstore(NEW_SET, new ZParams().aggregate(ZParams.Aggregate.SUM),
+        SORTED_SET_KEY1, SORTED_SET_KEY2, SORTED_SET_KEY3);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenSetsDoNotOverlap() {
+    Map<String, Double> scores1 = makeScoreMap(0, 2, 10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(1, 2, 10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults =
+        convertToTuples(makeScoreMap(0, 1, 20, x -> (double) x), (i, x) -> x);
+
+    jedis.zunionstore(NEW_SET, SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 20);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenSetsPartiallyOverlap() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(5, 1, 10, x -> (double) (x < 10 
? x : x * 2));
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults = convertToTuples(makeScoreMap(0, 1, 15, x -> 
(double) x),
+        (i, x) -> (double) (i < 5 ? i : i * 2));
+
+    jedis.zunionstore(NEW_SET, SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 20);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void ensureWeightsAreAppliedBeforeAggregation() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x * 5);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> (double) 
(i * 10));
+
+    jedis.zunionstore(SORTED_SET_KEY1,
+        new ZParams().weights(1, 10).aggregate(ZParams.Aggregate.MAX), 
SORTED_SET_KEY1,
+        SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(SORTED_SET_KEY1, 0, 20);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_whenTargetExistsAndSetsAreDuplicated() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> x * 2);
+
+    // Default aggregation is SUM
+    jedis.zunionstore(SORTED_SET_KEY1, SORTED_SET_KEY1, SORTED_SET_KEY1);
+
+    Set<Tuple> results = jedis.zrangeWithScores(SORTED_SET_KEY1, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldPreserveSet_givenDestinationAndSourceAreTheSame() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> x);
+
+    jedis.zunionstore(SORTED_SET_KEY1, SORTED_SET_KEY1);
+
+    Set<Tuple> results = jedis.zrangeWithScores(SORTED_SET_KEY1, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void ensureSetConsistency_andNoExceptions_whenRunningConcurrently() {
+    int scoreCount = 1000;
+    jedis.zadd("{A}ones", makeScoreMap(scoreCount, x -> 1D));

Review comment:
       Could the `{A}` be extracted to a constant here, to emphasize the fact 
that it's being used to ensure all keys end up in the same slot?

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -383,6 +387,44 @@ long zrevrank(byte[] member) {
     return null;
   }
 
+  long zunionstore(RegionProvider regionProvider, RedisKey key, List<RedisKey> 
sourceSets,
+      List<Double> weights, ZAggregator aggregator) {
+    for (int i = 0; i < sourceSets.size(); i++) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, 
sourceSets.get(i), false);
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+      double weight = weights.get(i);
+
+      Iterator<AbstractOrderedSetEntry> scoreIterator =
+          set.scoreSet.getIndexRange(0, Integer.MAX_VALUE, false);
+      while (scoreIterator.hasNext()) {
+        OrderedSetEntry entry = (OrderedSetEntry) scoreIterator.next();
+        OrderedSetEntry existingValue = members.get(entry.member);
+        if (existingValue == null) {
+          byte[] score;
+          if (weight == 1) {

Review comment:
       It's a small corner case, but you could probably save some work in the 
cases where weight = 0, -inf or +inf, since we should always know the result of 
multiplication by those values and can just add N entries with fixed values to 
the destination set.

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static 
org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZUnionStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext 
context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException nex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<RedisKey> sourceKeys = new ArrayList<>();
+    List<Double> weights = new ArrayList<>();
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (sourceKeys.size() < numKeys) {
+        sourceKeys.add(new RedisKey(arg));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (weights.size() > 0) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            weights.add(Coder.bytesToDouble(argIterator.next()));

Review comment:
       A check is also needed here for if the value is `NaN`, since 
`bytesToDouble()` will parse a String value of "NaN" as `Double.NaN`, which is 
not a valid double for the purposes of this command. I think this might be a 
flaw in the `bytesToDouble()` method, since I don't think there's ever a case 
where we treat a String value of "NaN" as a valid double, so it could probably 
just throw a `NumberFormatException` instead of returning a double.

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static 
org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZUnionStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext 
context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException nex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<RedisKey> sourceKeys = new ArrayList<>();
+    List<Double> weights = new ArrayList<>();

Review comment:
       Minor consideration, but is there a way to set sensible and safe initial 
sizes for these Lists? Just using `numKeys` is risky because a user may input a 
value that causes an OOME, but maybe `commandElements.size()` would be a decent 
estimate? It would result in an initial size that's too big, but not by more 
than a factor of ~2, depending on whether WEIGHTS is specified (unless a user 
does something weird like specifying dozens of AGGREGATE options) and so should 
prevent the need for any array resizing.

##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZUnionStoreIntegrationTest.java
##########
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+import redis.clients.jedis.Tuple;
+import redis.clients.jedis.ZParams;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.internal.RedisConstants;
+
+public abstract class AbstractZUnionStoreIntegrationTest implements 
RedisIntegrationTest {
+
+  private static final String NEW_SET = "{user1}new";
+  private static final String SORTED_SET_KEY1 = "{user1}sset1";
+  private static final String SORTED_SET_KEY2 = "{user1}sset2";
+  private static final String SORTED_SET_KEY3 = "{user1}sset3";
+
+  private JedisCluster jedis;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void shouldError_givenWrongKeyType() {
+    final String NEW_KEY = "{user1}new";

Review comment:
       `NEW_KEY` could be replaced with `NEW_SET` in this and other tests.

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -383,6 +387,44 @@ long zrevrank(byte[] member) {
     return null;
   }
 
+  long zunionstore(RegionProvider regionProvider, RedisKey key, List<RedisKey> 
sourceSets,
+      List<Double> weights, ZAggregator aggregator) {
+    for (int i = 0; i < sourceSets.size(); i++) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, 
sourceSets.get(i), false);
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+      double weight = weights.get(i);
+
+      Iterator<AbstractOrderedSetEntry> scoreIterator =
+          set.scoreSet.getIndexRange(0, Integer.MAX_VALUE, false);
+      while (scoreIterator.hasNext()) {
+        OrderedSetEntry entry = (OrderedSetEntry) scoreIterator.next();

Review comment:
       This can be an `AbstractOrderedSetEntry`, removing the need for the cast.

##########
File path: 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZUnionStoreIntegrationTest.java
##########
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+import redis.clients.jedis.Tuple;
+import redis.clients.jedis.ZParams;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.internal.RedisConstants;
+
+public abstract class AbstractZUnionStoreIntegrationTest implements 
RedisIntegrationTest {

Review comment:
       It would be good to add some tests to confirm the behaviour of the 
command given:
   - Infinite scores (positive and negative) with finite weights
   - Finite scores with infinite weights (positive and negative)
   - Infinite scores with infinite weights
   - Zero-value scores with infinite weights
   - Infinite scores with zero-value weights
   
   since behaviour for these cases is possibly unexpected/incorrect given the 
current implementation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to