dschneider-pivotal commented on a change in pull request #6794: URL: https://github.com/apache/geode/pull/6794#discussion_r697630911
########## File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.sortedset; + +import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX; +import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT; +import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT; +import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes; +import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE; +import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.geode.redis.internal.data.RedisKey; +import org.apache.geode.redis.internal.executor.AbstractExecutor; +import org.apache.geode.redis.internal.executor.RedisResponse; +import org.apache.geode.redis.internal.netty.Coder; +import org.apache.geode.redis.internal.netty.Command; +import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; + +public class ZUnionStoreExecutor extends AbstractExecutor { + + @Override + public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElements = command.getProcessedCommand(); + + Iterator<byte[]> argIterator = commandElements.iterator(); + // Skip command and destination key + argIterator.next(); + argIterator.next(); + + long numKeys; + try { + numKeys = Coder.bytesToLong(argIterator.next()); + } catch (NumberFormatException nex) { + return RedisResponse.error(ERROR_SYNTAX); + } + + List<RedisKey> sourceKeys = new ArrayList<>(); + List<Double> weights = new ArrayList<>(); + ZAggregator aggregator = ZAggregator.SUM; + + while (argIterator.hasNext()) { + byte[] arg = argIterator.next(); + + if (sourceKeys.size() < numKeys) { + sourceKeys.add(new RedisKey(arg)); + continue; + } + + arg = toUpperCaseBytes(arg); + if (Arrays.equals(arg, bWEIGHTS)) { + if (weights.size() > 0) { + return RedisResponse.error(ERROR_SYNTAX); + } + for (int i = 0; i < numKeys; i++) { + if (!argIterator.hasNext()) { + return RedisResponse.error(ERROR_SYNTAX); + } + try { + weights.add(Coder.bytesToDouble(argIterator.next())); + } catch (NumberFormatException nex) { + return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT); + } + } + continue; + } + + if (Arrays.equals(arg, bAGGREGATE)) { + try { + aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next())); + } catch (IllegalArgumentException | NoSuchElementException e) { + return RedisResponse.error(ERROR_SYNTAX); + } + continue; + } + + return RedisResponse.error(ERROR_SYNTAX); + } + + if (sourceKeys.size() != numKeys) { + return RedisResponse.error(ERROR_SYNTAX); + } + + if (weights.size() == 0) { Review comment: weights.isEmpty() ########## File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/SynchronizedStripedCoordinator.java ########## @@ -51,6 +52,17 @@ public SynchronizedStripedCoordinator(int concurrencyLevel) { } } + @Override + public <T> T execute(List<Object> stripeIds, int index, Callable<T> callable) { Review comment: I think the public interface for this method should drop the index param. Outside callers of this ALWAYS need to call it with index==0. So make this method private and have a public version that is just execute(stripeIds, callable). ########## File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java ########## @@ -163,6 +166,18 @@ @MakeImmutable public static final byte[] bINCR = stringToBytes("INCR"); + // ZUnionStoreExecutor + @MakeImmutable + public static final byte[] bWEIGHTS = stringToBytes("WEIGHTS"); + @MakeImmutable + public static final byte[] bAGGREGATE = stringToBytes("AGGREGATE"); + @MakeImmutable + public static final byte[] bSUM = stringToBytes("SUM"); Review comment: It looks like you ended up not using bSUM, bMIN, and bMAX I think because of the way you did the ZAggregator enum. ########## File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.sortedset; + +import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX; +import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT; +import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT; +import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes; +import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE; +import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.geode.redis.internal.data.RedisKey; +import org.apache.geode.redis.internal.executor.AbstractExecutor; +import org.apache.geode.redis.internal.executor.RedisResponse; +import org.apache.geode.redis.internal.netty.Coder; +import org.apache.geode.redis.internal.netty.Command; +import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; + +public class ZUnionStoreExecutor extends AbstractExecutor { + + @Override + public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElements = command.getProcessedCommand(); + + Iterator<byte[]> argIterator = commandElements.iterator(); + // Skip command and destination key + argIterator.next(); + argIterator.next(); + + long numKeys; + try { + numKeys = Coder.bytesToLong(argIterator.next()); + } catch (NumberFormatException nex) { + return RedisResponse.error(ERROR_SYNTAX); + } + + List<RedisKey> sourceKeys = new ArrayList<>(); + List<Double> weights = new ArrayList<>(); + ZAggregator aggregator = ZAggregator.SUM; + + while (argIterator.hasNext()) { + byte[] arg = argIterator.next(); + + if (sourceKeys.size() < numKeys) { + sourceKeys.add(new RedisKey(arg)); + continue; + } + + arg = toUpperCaseBytes(arg); + if (Arrays.equals(arg, bWEIGHTS)) { + if (weights.size() > 0) { Review comment: !weights.isEmpty() ########## File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java ########## @@ -126,4 +129,20 @@ public long zrevrank(RedisKey key, byte[] member) { public byte[] zscore(RedisKey key, byte[] member) { return stripedExecute(key, () -> getRedisSortedSet(key, true).zscore(member)); } + + @Override + public long zunionstore(RedisKey key, List<RedisKey> sourceSets, List<Double> weights, + ZAggregator aggregator) { + List<Object> keysToLock = new ArrayList<>(sourceSets); + keysToLock.add(key); + return stripedExecute(key, keysToLock, + () -> { + if (!getRegionProvider().getSlotAdvisor().isLocal(key)) { Review comment: consider making this "if" and "throw" a new method on RegionProvider. Something like ensureKeyIsLocal(key). ########## File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.sortedset; + +import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX; +import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT; +import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT; +import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes; +import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE; +import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.geode.redis.internal.data.RedisKey; +import org.apache.geode.redis.internal.executor.AbstractExecutor; +import org.apache.geode.redis.internal.executor.RedisResponse; +import org.apache.geode.redis.internal.netty.Coder; +import org.apache.geode.redis.internal.netty.Command; +import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; + +public class ZUnionStoreExecutor extends AbstractExecutor { + + @Override + public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElements = command.getProcessedCommand(); + + Iterator<byte[]> argIterator = commandElements.iterator(); + // Skip command and destination key + argIterator.next(); + argIterator.next(); + + long numKeys; + try { + numKeys = Coder.bytesToLong(argIterator.next()); + } catch (NumberFormatException nex) { + return RedisResponse.error(ERROR_SYNTAX); + } + + List<RedisKey> sourceKeys = new ArrayList<>(); + List<Double> weights = new ArrayList<>(); + ZAggregator aggregator = ZAggregator.SUM; + + while (argIterator.hasNext()) { + byte[] arg = argIterator.next(); + + if (sourceKeys.size() < numKeys) { + sourceKeys.add(new RedisKey(arg)); + continue; + } + + arg = toUpperCaseBytes(arg); + if (Arrays.equals(arg, bWEIGHTS)) { + if (weights.size() > 0) { + return RedisResponse.error(ERROR_SYNTAX); + } + for (int i = 0; i < numKeys; i++) { + if (!argIterator.hasNext()) { + return RedisResponse.error(ERROR_SYNTAX); + } + try { + weights.add(Coder.bytesToDouble(argIterator.next())); + } catch (NumberFormatException nex) { + return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT); + } + } + continue; + } + + if (Arrays.equals(arg, bAGGREGATE)) { + try { + aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next())); + } catch (IllegalArgumentException | NoSuchElementException e) { + return RedisResponse.error(ERROR_SYNTAX); + } + continue; + } + + return RedisResponse.error(ERROR_SYNTAX); + } + + if (sourceKeys.size() != numKeys) { + return RedisResponse.error(ERROR_SYNTAX); + } + + if (weights.size() == 0) { + for (int i = 0; i < numKeys; i++) { Review comment: Change this for loop to "Collections.full(weights, 1D)". It is cleaner but it also gives you a single instance of Double for each add to the ArrayList instead of a new instance which is what auto boxing gives us (at least on java 8). ########## File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/SynchronizedStripedCoordinator.java ########## @@ -51,6 +52,17 @@ public SynchronizedStripedCoordinator(int concurrencyLevel) { } } + @Override + public <T> T execute(List<Object> stripeIds, int index, Callable<T> callable) { + if (index + 1 == stripeIds.size()) { + return execute(stripeIds.get(index), callable); + } + + synchronized (getSync(stripeIds.get(index))) { Review comment: I think this is okay but it does concern me that when the stripeIds list is long our call stack will get deep. If we end up printing the call stack this recursive call could make the stack hard to understand. Instead of using synchronization we could have the syncs array contains ReentrantLock instead of Object. Then this method could just iterate over the list locking each one; then invoke the callable; and then iterate in reverse order unlocking each one. At the time we created this class we went with synchronization on an Object since it is the most basic and using a ReentrantLock did not give us any benefit. But now it would get rid of this recursion. ########## File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java ########## @@ -383,6 +387,44 @@ long zrevrank(byte[] member) { return null; } + long zunionstore(RegionProvider regionProvider, RedisKey key, List<RedisKey> sourceSets, + List<Double> weights, ZAggregator aggregator) { + for (int i = 0; i < sourceSets.size(); i++) { + RedisSortedSet set = + regionProvider.getTypedRedisData(REDIS_SORTED_SET, sourceSets.get(i), false); + if (set == NULL_REDIS_SORTED_SET) { + continue; + } + double weight = weights.get(i); + + Iterator<AbstractOrderedSetEntry> scoreIterator = + set.scoreSet.getIndexRange(0, Integer.MAX_VALUE, false); + while (scoreIterator.hasNext()) { + OrderedSetEntry entry = (OrderedSetEntry) scoreIterator.next(); + OrderedSetEntry existingValue = members.get(entry.member); + if (existingValue == null) { + byte[] score; + if (weight == 1) { Review comment: since we are changing to storing a double instead of a byte[] it seems like this can just be simplified to "score = entry.score * weight". I doubt that adding conditionals is faster than just doing the multiplication. ########## File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java ########## @@ -383,6 +387,44 @@ long zrevrank(byte[] member) { return null; } + long zunionstore(RegionProvider regionProvider, RedisKey key, List<RedisKey> sourceSets, Review comment: Instead of two list consider having one list that contains SourceSet instances. I SourceSet would have two fields: "RedisKey key "and "double weight". This wouldn't actually produce more garbage since we are already doing that by auto-boxing the doubles into Double instances. I think this would simplify both this code and the code that creates the lists (no need to fill when no weights are provided since you can initialize the primitive weight field to 1d). This for loop could then just be "for (SourceSet sourceSet: sourceSets)". And you could add methods to SourceSet like getRedisSortedSet(RegionProvider) ########## File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.sortedset; + +import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX; +import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT; +import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT; +import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes; +import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE; +import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.geode.redis.internal.data.RedisKey; +import org.apache.geode.redis.internal.executor.AbstractExecutor; +import org.apache.geode.redis.internal.executor.RedisResponse; +import org.apache.geode.redis.internal.netty.Coder; +import org.apache.geode.redis.internal.netty.Command; +import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; + +public class ZUnionStoreExecutor extends AbstractExecutor { + + @Override + public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElements = command.getProcessedCommand(); + + Iterator<byte[]> argIterator = commandElements.iterator(); + // Skip command and destination key + argIterator.next(); + argIterator.next(); + + long numKeys; + try { + numKeys = Coder.bytesToLong(argIterator.next()); + } catch (NumberFormatException nex) { + return RedisResponse.error(ERROR_SYNTAX); + } + + List<RedisKey> sourceKeys = new ArrayList<>(); + List<Double> weights = new ArrayList<>(); Review comment: how about validating that numKeys is not > commandElements.size()? Once you have done that then you can use numKeys safely as the initial size of these lists. ########## File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZAggregator.java ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.executor.sortedset; + +import java.util.function.BiFunction; + +/** + * Enums representing aggregation functions used in {@link ZUnionStoreExecutor} and + * {@link ZinterStoreExecutor}. + */ +public enum ZAggregator { + + SUM(Double::sum), + MIN(Math::min), + MAX(Math::max); + + private BiFunction<Double, Double, Double> function; Review comment: make function final ########## File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java ########## @@ -383,6 +387,44 @@ long zrevrank(byte[] member) { return null; } + long zunionstore(RegionProvider regionProvider, RedisKey key, List<RedisKey> sourceSets, + List<Double> weights, ZAggregator aggregator) { + for (int i = 0; i < sourceSets.size(); i++) { + RedisSortedSet set = + regionProvider.getTypedRedisData(REDIS_SORTED_SET, sourceSets.get(i), false); + if (set == NULL_REDIS_SORTED_SET) { + continue; + } + double weight = weights.get(i); + + Iterator<AbstractOrderedSetEntry> scoreIterator = Review comment: Since we just want to do something for every entry in "set", it seems like it would probably be faster to use "for (OrderedSetEntry entry: set.members.values()". Also if the destination "members" is empty (it always will be for the first source set) you have no need to call members.get while processing that entire source set. And if that is true and the weight is the default of 1d then you could use members.putAll(set.members). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
