dschneider-pivotal commented on a change in pull request #6794:
URL: https://github.com/apache/geode/pull/6794#discussion_r697630911



##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static 
org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZUnionStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext 
context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException nex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<RedisKey> sourceKeys = new ArrayList<>();
+    List<Double> weights = new ArrayList<>();
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (sourceKeys.size() < numKeys) {
+        sourceKeys.add(new RedisKey(arg));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (weights.size() > 0) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            weights.add(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = 
ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));
+        } catch (IllegalArgumentException | NoSuchElementException e) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        continue;
+      }
+
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    if (sourceKeys.size() != numKeys) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    if (weights.size() == 0) {

Review comment:
       weights.isEmpty()

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/SynchronizedStripedCoordinator.java
##########
@@ -51,6 +52,17 @@ public SynchronizedStripedCoordinator(int concurrencyLevel) {
     }
   }
 
+  @Override
+  public <T> T execute(List<Object> stripeIds, int index, Callable<T> 
callable) {

Review comment:
       I think the public interface for this method should drop the index 
param. Outside callers of this ALWAYS need to call it with index==0. So make 
this method private and have a public version that is just execute(stripeIds, 
callable).

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
##########
@@ -163,6 +166,18 @@
   @MakeImmutable
   public static final byte[] bINCR = stringToBytes("INCR");
 
+  // ZUnionStoreExecutor
+  @MakeImmutable
+  public static final byte[] bWEIGHTS = stringToBytes("WEIGHTS");
+  @MakeImmutable
+  public static final byte[] bAGGREGATE = stringToBytes("AGGREGATE");
+  @MakeImmutable
+  public static final byte[] bSUM = stringToBytes("SUM");

Review comment:
       It looks like you ended up not using bSUM, bMIN, and bMAX I think 
because of the way you did the ZAggregator enum. 

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static 
org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZUnionStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext 
context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException nex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<RedisKey> sourceKeys = new ArrayList<>();
+    List<Double> weights = new ArrayList<>();
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (sourceKeys.size() < numKeys) {
+        sourceKeys.add(new RedisKey(arg));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (weights.size() > 0) {

Review comment:
       !weights.isEmpty()

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
##########
@@ -126,4 +129,20 @@ public long zrevrank(RedisKey key, byte[] member) {
   public byte[] zscore(RedisKey key, byte[] member) {
     return stripedExecute(key, () -> getRedisSortedSet(key, 
true).zscore(member));
   }
+
+  @Override
+  public long zunionstore(RedisKey key, List<RedisKey> sourceSets, 
List<Double> weights,
+      ZAggregator aggregator) {
+    List<Object> keysToLock = new ArrayList<>(sourceSets);
+    keysToLock.add(key);
+    return stripedExecute(key, keysToLock,
+        () -> {
+          if (!getRegionProvider().getSlotAdvisor().isLocal(key)) {

Review comment:
       consider making this "if" and "throw" a new method on RegionProvider. 
Something like ensureKeyIsLocal(key).

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static 
org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZUnionStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext 
context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException nex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<RedisKey> sourceKeys = new ArrayList<>();
+    List<Double> weights = new ArrayList<>();
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (sourceKeys.size() < numKeys) {
+        sourceKeys.add(new RedisKey(arg));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (weights.size() > 0) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            weights.add(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = 
ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));
+        } catch (IllegalArgumentException | NoSuchElementException e) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        continue;
+      }
+
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    if (sourceKeys.size() != numKeys) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    if (weights.size() == 0) {
+      for (int i = 0; i < numKeys; i++) {

Review comment:
       Change this for loop to "Collections.full(weights, 1D)".
   It is cleaner but it also gives you a single instance of Double for each add 
to the ArrayList instead of a new instance which is what auto boxing gives us 
(at least on java 8). 

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/SynchronizedStripedCoordinator.java
##########
@@ -51,6 +52,17 @@ public SynchronizedStripedCoordinator(int concurrencyLevel) {
     }
   }
 
+  @Override
+  public <T> T execute(List<Object> stripeIds, int index, Callable<T> 
callable) {
+    if (index + 1 == stripeIds.size()) {
+      return execute(stripeIds.get(index), callable);
+    }
+
+    synchronized (getSync(stripeIds.get(index))) {

Review comment:
       I think this is okay but it does concern me that when the stripeIds list 
is long our call stack will get deep. If we end up printing the call stack this 
recursive call could make the stack hard to understand. 
   Instead of using synchronization we could have the syncs array contains 
ReentrantLock instead of Object. Then this method could just iterate over the 
list locking each one; then invoke the callable; and then iterate in reverse 
order unlocking each one. At the time we created this class we went with 
synchronization on an Object since it is the most basic and using a 
ReentrantLock did not give us any benefit. But now it would get rid of this 
recursion.

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -383,6 +387,44 @@ long zrevrank(byte[] member) {
     return null;
   }
 
+  long zunionstore(RegionProvider regionProvider, RedisKey key, List<RedisKey> 
sourceSets,
+      List<Double> weights, ZAggregator aggregator) {
+    for (int i = 0; i < sourceSets.size(); i++) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, 
sourceSets.get(i), false);
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+      double weight = weights.get(i);
+
+      Iterator<AbstractOrderedSetEntry> scoreIterator =
+          set.scoreSet.getIndexRange(0, Integer.MAX_VALUE, false);
+      while (scoreIterator.hasNext()) {
+        OrderedSetEntry entry = (OrderedSetEntry) scoreIterator.next();
+        OrderedSetEntry existingValue = members.get(entry.member);
+        if (existingValue == null) {
+          byte[] score;
+          if (weight == 1) {

Review comment:
       since we are changing to storing a double instead of a byte[] it seems 
like this can just be simplified to "score = entry.score * weight". I doubt 
that adding conditionals is faster than just doing the multiplication.

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -383,6 +387,44 @@ long zrevrank(byte[] member) {
     return null;
   }
 
+  long zunionstore(RegionProvider regionProvider, RedisKey key, List<RedisKey> 
sourceSets,

Review comment:
       Instead of two list consider having one list that contains SourceSet 
instances. I SourceSet would have two fields: "RedisKey key "and "double 
weight". This wouldn't actually produce more garbage since we are already doing 
that by auto-boxing the doubles into Double instances. I think this would 
simplify both this code and the code that creates the lists (no need to fill 
when no weights are provided since you can initialize the primitive weight 
field to 1d).
   This for loop could then just be "for (SourceSet sourceSet: sourceSets)".
   And you could add methods to SourceSet like getRedisSortedSet(RegionProvider)

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static 
org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZUnionStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext 
context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException nex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<RedisKey> sourceKeys = new ArrayList<>();
+    List<Double> weights = new ArrayList<>();

Review comment:
       how about validating that numKeys is not > commandElements.size()? Once 
you have done that then you can use numKeys safely as the initial size of these 
lists.

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZAggregator.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import java.util.function.BiFunction;
+
+/**
+ * Enums representing aggregation functions used in {@link 
ZUnionStoreExecutor} and
+ * {@link ZinterStoreExecutor}.
+ */
+public enum ZAggregator {
+
+  SUM(Double::sum),
+  MIN(Math::min),
+  MAX(Math::max);
+
+  private BiFunction<Double, Double, Double> function;

Review comment:
       make function final

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -383,6 +387,44 @@ long zrevrank(byte[] member) {
     return null;
   }
 
+  long zunionstore(RegionProvider regionProvider, RedisKey key, List<RedisKey> 
sourceSets,
+      List<Double> weights, ZAggregator aggregator) {
+    for (int i = 0; i < sourceSets.size(); i++) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, 
sourceSets.get(i), false);
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+      double weight = weights.get(i);
+
+      Iterator<AbstractOrderedSetEntry> scoreIterator =

Review comment:
       Since we just want to do something for every entry in "set", it seems 
like it would probably be faster to use "for (OrderedSetEntry entry: 
set.members.values()".
   Also if the destination "members" is empty (it always will be for the first 
source set) you have no need to call members.get while processing that entire 
source set. And if that is true and the weight is the default of 1d then you 
could use members.putAll(set.members).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to