dschneider-pivotal commented on a change in pull request #6862:
URL: https://github.com/apache/geode/pull/6862#discussion_r712602778
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
##########
@@ -172,6 +176,42 @@ public RedisStats getRedisStats() {
}
}
+ /**
+ * Execute the given Callable in the context of a GemFire transaction. On
failure there is no
+ * attempt to retry.
+ */
+ public <T> T executeInTransaction(RedisKey key, Callable<T> callable) {
+ Callable<T> txWrappedCallable = getTxWrappedCallable(callable);
+ return execute(key, txWrappedCallable);
+ }
+
+ /**
+ * Execute the given Callable in the context of a GemFire transaction. On
failure there is no
+ * attempt to retry.
+ */
+ public <T> T executeInTransaction(RedisKey key, List<RedisKey> keysToLock,
Callable<T> callable) {
+ Callable<T> txWrappedCallable = getTxWrappedCallable(callable);
+ return execute(key, keysToLock, txWrappedCallable);
+ }
+
+ private <T> Callable<T> getTxWrappedCallable(Callable<T> callable) {
+ Callable<T> txWrappedCallable = () -> {
+ T result;
+ try {
+ txManager.begin();
+ result = callable.call();
+ txManager.commit();
+ } finally {
Review comment:
I think you want this finally block to complete the tx. So move the
begin outside the try and then in the finally block you either commit or
rollback.
It would look something like this:
boolean success = false;
txManager.begin();
try {
result = callable.call();
success = true;
} finally {
if (success) {
txManager.commit();
} else {
txManager.rollback();
}
}
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisStringCommandsFunctionExecutor.java
##########
@@ -144,6 +143,26 @@ public int setbit(RedisKey key, long offset, int value) {
() -> getRedisString(key, false).setbit(getRegion(), key, value,
byteIndex, bitIndex));
}
+ @Override
+ public Void mset(List<RedisKey> keys, List<byte[]> values) {
+ List<RedisKey> keysToLock = new ArrayList<>(keys.size());
+ for (RedisKey key : keys) {
+ getRegionProvider().ensureKeyIsLocal(key);
+ keysToLock.add(key);
+ }
+
+ // Pass a key in so that the bucket will be locked. Since all keys are
already guaranteed to be
+ // in the same bucket we can use any key for this.
+ return stripedExecuteInTransaction(keysToLock.get(0), keysToLock, () ->
mset0(keys, values));
+ }
+
+ private Void mset0(List<RedisKey> keys, List<byte[]> values) {
+ for (int i = 0; i < keys.size(); i++) {
+ setRedisString(getRegionProvider(), keys.get(i), values.get(i));
Review comment:
do you need to also clear any pending expiration in the case of mset
being called on an existing key that has expiration scheduled? I think the
easiest way to do this is call the "set" method with SetOptions param set to
"null".
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/LockingStripedCoordinator.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+
+/**
+ * Implements {@link StripedCoordinator} by using {@link ReentrantLock}s
synchronization. The thread
+ * that calls execute will also be the thread that does the work. But it will
do it under
+ * synchronization. The hashCode of the stripeId is used to associate the id
with a stripe.
+ */
+public class LockingStripedCoordinator implements StripedCoordinator {
+ private static final int DEFAULT_CONCURRENCY_LEVEL = 4093; // use a prime
+ private final ReentrantLock[] locks;
+
+ public LockingStripedCoordinator() {
+ this(DEFAULT_CONCURRENCY_LEVEL);
+ }
+
+ public LockingStripedCoordinator(int concurrencyLevel) {
+ locks = new ReentrantLock[concurrencyLevel];
+ for (int i = 0; i < concurrencyLevel; i++) {
+ locks[i] = new ReentrantLock();
+ }
+ }
+
+ @Override
+ public <T> T execute(RedisKey stripeId, Callable<T> callable) {
+ Lock lock = getLock(stripeId);
+ lock.lock();
+ try {
+ return callable.call();
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public <T> T execute(List<RedisKey> stripeIds, Callable<T> callable) {
+ Lock[] locks = getLocks(stripeIds);
+ for (int i = 0; i < locks.length; i++) {
+ locks[i].lock();
+ }
+ try {
+ return callable.call();
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ for (int i = locks.length - 1; i >= 0; i--) {
+ locks[i].unlock();
+ }
+ }
+ }
+
+ private Lock getLock(RedisKey stripeId) {
+ return locks[getStripeIndex(stripeId)];
+ }
+
+ private Lock[] getLocks(List<RedisKey> stripeIds) {
+ // Sort multiple keys in a consistent manner so that locking always
happens in the same order
+ // across multiple keys.
+ stripeIds.sort(this::compareStripes);
+ Lock[] locks = new Lock[stripeIds.size()];
Review comment:
we really don't need this array of locks. Instead we could have sorted
the stripeIds once and then just iterated it for both lock and unlock. For each
RedisKey we would call getLock(RedisKey) in these iterations. I'm not sure
which is best but unless you think the getLock call is expensive its probably
better to call it twice (instead of once) for each key to save the extra
ArrayList allocation. If you do get rid of this extra array of locks you could
then have a more concise iteration (for example: stripeIds.foreach(item ->
getLock(item).lock())
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/LockingStripedCoordinator.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+
+/**
+ * Implements {@link StripedCoordinator} by using {@link ReentrantLock}s
synchronization. The thread
+ * that calls execute will also be the thread that does the work. But it will
do it under
+ * synchronization. The hashCode of the stripeId is used to associate the id
with a stripe.
+ */
+public class LockingStripedCoordinator implements StripedCoordinator {
+ private static final int DEFAULT_CONCURRENCY_LEVEL = 4093; // use a prime
+ private final ReentrantLock[] locks;
+
+ public LockingStripedCoordinator() {
+ this(DEFAULT_CONCURRENCY_LEVEL);
+ }
+
+ public LockingStripedCoordinator(int concurrencyLevel) {
+ locks = new ReentrantLock[concurrencyLevel];
+ for (int i = 0; i < concurrencyLevel; i++) {
+ locks[i] = new ReentrantLock();
+ }
+ }
+
+ @Override
+ public <T> T execute(RedisKey stripeId, Callable<T> callable) {
+ Lock lock = getLock(stripeId);
+ lock.lock();
+ try {
+ return callable.call();
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public <T> T execute(List<RedisKey> stripeIds, Callable<T> callable) {
+ Lock[] locks = getLocks(stripeIds);
+ for (int i = 0; i < locks.length; i++) {
+ locks[i].lock();
+ }
+ try {
+ return callable.call();
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ for (int i = locks.length - 1; i >= 0; i--) {
Review comment:
its okay to unlock in reverse order but not needed. For lock we need to
do it inorder but once we hold all the locks we can unlock in any order without
getting in trouble.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]