dschneider-pivotal commented on a change in pull request #6862:
URL: https://github.com/apache/geode/pull/6862#discussion_r712602778



##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
##########
@@ -172,6 +176,42 @@ public RedisStats getRedisStats() {
     }
   }
 
+  /**
+   * Execute the given Callable in the context of a GemFire transaction. On 
failure there is no
+   * attempt to retry.
+   */
+  public <T> T executeInTransaction(RedisKey key, Callable<T> callable) {
+    Callable<T> txWrappedCallable = getTxWrappedCallable(callable);
+    return execute(key, txWrappedCallable);
+  }
+
+  /**
+   * Execute the given Callable in the context of a GemFire transaction. On 
failure there is no
+   * attempt to retry.
+   */
+  public <T> T executeInTransaction(RedisKey key, List<RedisKey> keysToLock, 
Callable<T> callable) {
+    Callable<T> txWrappedCallable = getTxWrappedCallable(callable);
+    return execute(key, keysToLock, txWrappedCallable);
+  }
+
+  private <T> Callable<T> getTxWrappedCallable(Callable<T> callable) {
+    Callable<T> txWrappedCallable = () -> {
+      T result;
+      try {
+        txManager.begin();
+        result = callable.call();
+        txManager.commit();
+      } finally {

Review comment:
       I think you want this finally block to complete the tx. So move the 
begin outside the try and then in the finally block you either commit or 
rollback.
   It would look something like this:
   boolean success = false;
   txManager.begin();
   try {
     result = callable.call();
     success = true;
   } finally {
     if (success) {
       txManager.commit();
     } else {
       txManager.rollback();
     }
   }

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisStringCommandsFunctionExecutor.java
##########
@@ -144,6 +143,26 @@ public int setbit(RedisKey key, long offset, int value) {
         () -> getRedisString(key, false).setbit(getRegion(), key, value, 
byteIndex, bitIndex));
   }
 
+  @Override
+  public Void mset(List<RedisKey> keys, List<byte[]> values) {
+    List<RedisKey> keysToLock = new ArrayList<>(keys.size());
+    for (RedisKey key : keys) {
+      getRegionProvider().ensureKeyIsLocal(key);
+      keysToLock.add(key);
+    }
+
+    // Pass a key in so that the bucket will be locked. Since all keys are 
already guaranteed to be
+    // in the same bucket we can use any key for this.
+    return stripedExecuteInTransaction(keysToLock.get(0), keysToLock, () -> 
mset0(keys, values));
+  }
+
+  private Void mset0(List<RedisKey> keys, List<byte[]> values) {
+    for (int i = 0; i < keys.size(); i++) {
+      setRedisString(getRegionProvider(), keys.get(i), values.get(i));

Review comment:
       do you need to also clear any pending expiration in the case of mset 
being called on an existing key that has expiration scheduled? I think the 
easiest way to do this is call the "set" method with SetOptions param set to 
"null".

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/LockingStripedCoordinator.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+
+/**
+ * Implements {@link StripedCoordinator} by using {@link ReentrantLock}s 
synchronization. The thread
+ * that calls execute will also be the thread that does the work. But it will 
do it under
+ * synchronization. The hashCode of the stripeId is used to associate the id 
with a stripe.
+ */
+public class LockingStripedCoordinator implements StripedCoordinator {
+  private static final int DEFAULT_CONCURRENCY_LEVEL = 4093; // use a prime
+  private final ReentrantLock[] locks;
+
+  public LockingStripedCoordinator() {
+    this(DEFAULT_CONCURRENCY_LEVEL);
+  }
+
+  public LockingStripedCoordinator(int concurrencyLevel) {
+    locks = new ReentrantLock[concurrencyLevel];
+    for (int i = 0; i < concurrencyLevel; i++) {
+      locks[i] = new ReentrantLock();
+    }
+  }
+
+  @Override
+  public <T> T execute(RedisKey stripeId, Callable<T> callable) {
+    Lock lock = getLock(stripeId);
+    lock.lock();
+    try {
+      return callable.call();
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public <T> T execute(List<RedisKey> stripeIds, Callable<T> callable) {
+    Lock[] locks = getLocks(stripeIds);
+    for (int i = 0; i < locks.length; i++) {
+      locks[i].lock();
+    }
+    try {
+      return callable.call();
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      for (int i = locks.length - 1; i >= 0; i--) {
+        locks[i].unlock();
+      }
+    }
+  }
+
+  private Lock getLock(RedisKey stripeId) {
+    return locks[getStripeIndex(stripeId)];
+  }
+
+  private Lock[] getLocks(List<RedisKey> stripeIds) {
+    // Sort multiple keys in a consistent manner so that locking always 
happens in the same order
+    // across multiple keys.
+    stripeIds.sort(this::compareStripes);
+    Lock[] locks = new Lock[stripeIds.size()];

Review comment:
       we really don't need this array of locks. Instead we could have sorted 
the stripeIds once and then just iterated it for both lock and unlock. For each 
RedisKey we would call getLock(RedisKey) in these iterations. I'm not sure 
which is best but unless you think the getLock call is expensive its probably 
better to call it twice (instead of once) for each key to save the extra 
ArrayList allocation. If you do get rid of this extra array of locks you could 
then have a more concise iteration (for example: stripeIds.foreach(item -> 
getLock(item).lock())

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/LockingStripedCoordinator.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+
+/**
+ * Implements {@link StripedCoordinator} by using {@link ReentrantLock}s 
synchronization. The thread
+ * that calls execute will also be the thread that does the work. But it will 
do it under
+ * synchronization. The hashCode of the stripeId is used to associate the id 
with a stripe.
+ */
+public class LockingStripedCoordinator implements StripedCoordinator {
+  private static final int DEFAULT_CONCURRENCY_LEVEL = 4093; // use a prime
+  private final ReentrantLock[] locks;
+
+  public LockingStripedCoordinator() {
+    this(DEFAULT_CONCURRENCY_LEVEL);
+  }
+
+  public LockingStripedCoordinator(int concurrencyLevel) {
+    locks = new ReentrantLock[concurrencyLevel];
+    for (int i = 0; i < concurrencyLevel; i++) {
+      locks[i] = new ReentrantLock();
+    }
+  }
+
+  @Override
+  public <T> T execute(RedisKey stripeId, Callable<T> callable) {
+    Lock lock = getLock(stripeId);
+    lock.lock();
+    try {
+      return callable.call();
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public <T> T execute(List<RedisKey> stripeIds, Callable<T> callable) {
+    Lock[] locks = getLocks(stripeIds);
+    for (int i = 0; i < locks.length; i++) {
+      locks[i].lock();
+    }
+    try {
+      return callable.call();
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      for (int i = locks.length - 1; i >= 0; i--) {

Review comment:
       its okay to unlock in reverse order but not needed. For lock we need to 
do it inorder but once we hold all the locks we can unlock in any order without 
getting in trouble.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to