This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6b79dab  GEODE-8339: fix Redis Rename hang (#5501)
6b79dab is described below

commit 6b79dab953089979657bc9763321765f45c0f37e
Author: Ray Ingles <ring...@pivotal.io>
AuthorDate: Thu Sep 10 15:36:09 2020 -0400

    GEODE-8339: fix Redis Rename hang (#5501)
    
    The hang was caused by a thread holding a read lock, the rebalance waiting 
for that thread so it could get the write lock, and then another thread waiting 
to get the same read lock that is now blocked behind the write lock. This other 
thread needs to complete before the first thread will release its read lock so 
we ended up deadlocked.
    Now the second thread is told that the read lock is already held on hits 
behalf so it does not try to obtain it again.
    
    Co-authored-by: Ray Ingles <ring...@vmware.com>
    Co-authored-by: Sarah <sab...@pivotal.io>
---
 .../internal/cache/AbstractBucketRegionQueue.java  |  4 +--
 .../apache/geode/internal/cache/BucketRegion.java  | 40 ++++++++++++++++------
 .../geode/internal/cache/BucketRegionQueue.java    |  4 +--
 .../sanctioned-geode-core-serializables.txt        |  1 +
 .../executor/CrashAndNoRepeatDUnitTest.java        |  9 ++---
 .../redis/internal/data/AbstractRedisData.java     | 12 +++++--
 .../key/RedisKeyCommandsFunctionInvoker.java       |  7 ++--
 .../internal/executor/key/RenameFunction.java      |  2 +-
 8 files changed, 52 insertions(+), 27 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index fd8be1d..0813eab 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -294,11 +294,11 @@ public abstract class AbstractBucketRegionQueue extends 
BucketRegion {
   @Override
   public boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld,
       Object expectedOldValue, boolean requireOldValue, long lastModified,
-      boolean overwriteDestroyed, boolean invokeCallbacks, boolean 
throwConcurrentModificaiton)
+      boolean overwriteDestroyed, boolean invokeCallbacks, boolean 
throwConcurrentModification)
       throws TimeoutException, CacheWriterException {
     try {
       boolean success = super.virtualPut(event, ifNew, ifOld, 
expectedOldValue, requireOldValue,
-          lastModified, overwriteDestroyed, invokeCallbacks, 
throwConcurrentModificaiton);
+          lastModified, overwriteDestroyed, invokeCallbacks, 
throwConcurrentModification);
       if (success) {
         if (logger.isDebugEnabled()) {
           logger.debug("Key : ----> {}", event.getKey());
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 0136de6..8662a6e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -18,6 +18,7 @@ package org.apache.geode.internal.cache;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -517,9 +518,11 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
   @Override
   public boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld,
       Object expectedOldValue, boolean requireOldValue, long lastModified,
-      boolean overwriteDestroyed, boolean invokeCallbacks, boolean 
throwConcurrentModificaiton)
+      boolean overwriteDestroyed, boolean invokeCallbacks,
+      boolean throwConcurrentModification)
       throws TimeoutException, CacheWriterException {
-    boolean locked = lockKeysAndPrimary(event);
+
+    boolean isLocked = lockKeysAndPrimary(event);
 
     try {
       if (partitionedRegion.isParallelWanEnabled()) {
@@ -551,7 +554,7 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
       }
       return true;
     } finally {
-      if (locked) {
+      if (isLocked) {
         releaseLockForKeysAndPrimary(event);
       }
     }
@@ -753,7 +756,11 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
     Object[] keys = getKeysToBeLocked(event);
     waitUntilLocked(keys); // it might wait for long time
 
+    if (wasPrimaryLockedPreviously(event)) {
+      return true;
+    }
     boolean lockedForPrimary = false;
+
     try {
       lockedForPrimary = doLockForPrimary(false);
       // tryLock is false means doLockForPrimary won't return false.
@@ -872,7 +879,9 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
    * And release/remove the lockObject on the key(s)
    */
   void releaseLockForKeysAndPrimary(EntryEventImpl event) {
-    doUnlockForPrimary();
+    if (!wasPrimaryLockedPreviously(event)) {
+      doUnlockForPrimary();
+    }
 
     Object[] keys = getKeysToBeLocked(event);
     removeAndNotifyKeys(keys);
@@ -1214,6 +1223,13 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
     }
   }
 
+  public static class PrimaryMoveReadLockAcquired implements Serializable {
+  };
+
+  private boolean wasPrimaryLockedPreviously(EntryEventImpl event) {
+    return event.getCallbackArgument() instanceof PrimaryMoveReadLockAcquired;
+  }
+
   protected void distributeDestroyOperation(EntryEventImpl event) {
     long token = -1;
     DestroyOperation op = null;
@@ -2093,9 +2109,7 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
       // if GII has failed, because there is not primary. So it's safe to set 
these
       // counters to 0.
       oldMemValue = bytesInMemory.getAndSet(0);
-    }
-
-    else {
+    } else {
       throw new InternalGemFireError(
           "Trying to clear a bucket region that was not destroyed or in 
initialization.");
     }
@@ -2255,8 +2269,9 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
 
     final int memoryDelta = op.computeMemoryDelta(oldSize, newSize);
 
-    if (memoryDelta == 0)
+    if (memoryDelta == 0) {
       return;
+    }
     // do the bigger one first to keep the sum > 0
     updateBucketMemoryStats(memoryDelta);
   }
@@ -2308,8 +2323,9 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
   }
 
   public void incNumOverflowBytesOnDisk(long delta) {
-    if (delta == 0)
+    if (delta == 0) {
       return;
+    }
     numOverflowBytesOnDisk.addAndGet(delta);
     // The following could be reenabled at a future time.
     // I deadcoded for now to make sure I didn't have it break
@@ -2346,11 +2362,13 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
 
   public int getSizeForEviction() {
     EvictionAttributes ea = getAttributes().getEvictionAttributes();
-    if (ea == null)
+    if (ea == null) {
       return 0;
+    }
     EvictionAlgorithm algo = ea.getAlgorithm();
-    if (!algo.isLRUHeap())
+    if (!algo.isLRUHeap()) {
       return 0;
+    }
     EvictionAction action = ea.getAction();
     return action.isLocalDestroy() ? getRegionMap().sizeInVM() : (int) 
getNumEntriesInVM();
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 198ba87..f9d5ab7 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -251,11 +251,11 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
   @Override
   public boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld,
       Object expectedOldValue, boolean requireOldValue, long lastModified,
-      boolean overwriteDestroyed, boolean invokeCallbacks, boolean 
throwConcurrentModificaiton)
+      boolean overwriteDestroyed, boolean invokeCallbacks, boolean 
throwConcurrentModification)
       throws TimeoutException, CacheWriterException {
     try {
       boolean success = super.virtualPut(event, ifNew, ifOld, 
expectedOldValue, requireOldValue,
-          lastModified, overwriteDestroyed, invokeCallbacks, 
throwConcurrentModificaiton);
+          lastModified, overwriteDestroyed, invokeCallbacks, 
throwConcurrentModification);
 
       if (success) {
         if (getPartitionedRegion().getColocatedWith() == null) {
diff --git 
a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
 
b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index 2f14f8b..652d1b2 100644
--- 
a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++ 
b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -272,6 +272,7 @@ 
org/apache/geode/internal/admin/remote/DistributionLocatorId,true,65873901869719
 
org/apache/geode/internal/admin/remote/EntryValueNodeImpl,false,fields:org/apache/geode/internal/admin/remote/EntryValueNodeImpl[],name:java/lang/String,primitive:boolean,primitiveVal:java/lang/Object,type:java/lang/String
 
org/apache/geode/internal/cache/BucketAdvisor$SetFromMap,true,2454657854757543876,m:java/util/Map
 
org/apache/geode/internal/cache/BucketNotFoundException,true,2898657229184289911
+org/apache/geode/internal/cache/BucketRegion$PrimaryMoveReadLockAcquired,false
 org/apache/geode/internal/cache/BucketRegion$SizeOp,false
 
org/apache/geode/internal/cache/CacheClientStatus,true,-56148046466517217,_id:org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID,_memberId:java/lang/String,_numberOfConnections:int,_socketAddresses:java/util/List,_socketPorts:java/util/List
 
org/apache/geode/internal/cache/CommitReplyException,true,-7711083075296622596,exceptions:java/util/Set
diff --git 
a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
 
b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
index 2ef20a8..00dfbbb 100644
--- 
a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
+++ 
b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
@@ -34,11 +34,11 @@ import org.apache.logging.log4j.Logger;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.exceptions.JedisConnectionException;
+import redis.clients.jedis.exceptions.JedisDataException;
 
 import org.apache.geode.cache.control.RebalanceFactory;
 import org.apache.geode.cache.control.RebalanceResults;
@@ -176,7 +176,6 @@ public class CrashAndNoRepeatDUnitTest {
   }
 
   @Test
-  @Ignore("GEODE-8339")
   public void givenServerCrashesDuringRename_thenDataIsNotLost() throws 
Exception {
     AtomicBoolean running1 = new AtomicBoolean(true);
     AtomicBoolean running2 = new AtomicBoolean(true);
@@ -245,13 +244,15 @@ public class CrashAndNoRepeatDUnitTest {
       try {
         jedisRef.get().rename(oldKey, newKey);
         iterationCount += 1;
-      } catch (JedisConnectionException ex) {
+      } catch (JedisConnectionException | JedisDataException ex) {
         if (ex.getMessage().contains("Unexpected end of stream.")) {
           if (!doWithRetry(() -> connect(jedisRef).exists(oldKey))) {
             iterationCount += 1;
           }
         } else if (ex.getMessage().contains("no such key")) {
-          iterationCount += 1;
+          if (!doWithRetry(() -> connect(jedisRef).exists(oldKey))) {
+            iterationCount += 1;
+          }
         } else {
           throw ex;
         }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
index 2d90ad1..a07f652 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
@@ -24,7 +24,9 @@ import java.util.Objects;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.InvalidDeltaException;
+import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.BucketRegion;
 import org.apache.geode.redis.internal.delta.AddsDeltaInfo;
 import org.apache.geode.redis.internal.delta.AppendDeltaInfo;
 import org.apache.geode.redis.internal.delta.DeltaInfo;
@@ -33,6 +35,9 @@ import org.apache.geode.redis.internal.delta.RemsDeltaInfo;
 import org.apache.geode.redis.internal.delta.TimestampDeltaInfo;
 
 public abstract class AbstractRedisData implements RedisData {
+  private static final BucketRegion.PrimaryMoveReadLockAcquired 
primaryMoveReadLockAcquired =
+      new BucketRegion.PrimaryMoveReadLockAcquired();
+
   @Override
   public String toString() {
     return "expirationTimestamp=" + expirationTimestamp;
@@ -81,8 +86,11 @@ public abstract class AbstractRedisData implements RedisData 
{
   @Override
   public boolean rename(Region<ByteArrayWrapper, RedisData> region, 
ByteArrayWrapper oldKey,
       ByteArrayWrapper newKey) {
-    region.put(newKey, this);
-    region.remove(oldKey);
+    region.put(newKey, this, primaryMoveReadLockAcquired);
+    try {
+      region.destroy(oldKey, primaryMoveReadLockAcquired);
+    } catch (EntryNotFoundException ignore) {
+    }
     return true;
   }
 
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java
index 00f12af..d7a2b13 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java
@@ -31,9 +31,8 @@ import org.apache.geode.redis.internal.data.RedisData;
 import org.apache.geode.redis.internal.executor.RedisCommandsFunctionInvoker;
 
 /**
- * This class is used by netty redis key command executors
- * to invoke a geode function that will run on a
- * particular server to do the redis command.
+ * This class is used by netty redis key command executors to invoke a geode 
function that will run
+ * on a particular server to do the redis command.
  */
 public class RedisKeyCommandsFunctionInvoker extends 
RedisCommandsFunctionInvoker
     implements RedisKeyCommands {
@@ -74,9 +73,7 @@ public class RedisKeyCommandsFunctionInvoker extends 
RedisCommandsFunctionInvoke
 
 
   @Override
-  @SuppressWarnings("unchecked")
   public boolean rename(ByteArrayWrapper oldKey, ByteArrayWrapper newKey) {
-
     if (!region.containsKey(oldKey)) {
       return false;
     }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java
index 6f0c013..e8c85bd 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java
@@ -51,7 +51,6 @@ public class RenameFunction implements InternalFunction {
     FunctionService.registerFunction(new RenameFunction(dataRegion, 
stripedExecutor, redisStats));
   }
 
-
   public RenameFunction(
       Region<ByteArrayWrapper, RedisData> dataRegion,
       StripedExecutor stripedExecutor,
@@ -72,6 +71,7 @@ public class RenameFunction implements InternalFunction {
       };
 
       partitionedRegion.computeWithPrimaryLocked(renameContext.getKeyToLock(), 
computation);
+
     } else {
       Object result = acquireLockIfNeeded(renameContext);
       context.getResultSender().lastResult(result);

Reply via email to