This is an automated email from the ASF dual-hosted git repository. dschneider pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 6b79dab GEODE-8339: fix Redis Rename hang (#5501) 6b79dab is described below commit 6b79dab953089979657bc9763321765f45c0f37e Author: Ray Ingles <ring...@pivotal.io> AuthorDate: Thu Sep 10 15:36:09 2020 -0400 GEODE-8339: fix Redis Rename hang (#5501) The hang was caused by a thread holding a read lock, the rebalance waiting for that thread so it could get the write lock, and then another thread waiting to get the same read lock that is now blocked behind the write lock. This other thread needs to complete before the first thread will release its read lock so we ended up deadlocked. Now the second thread is told that the read lock is already held on hits behalf so it does not try to obtain it again. Co-authored-by: Ray Ingles <ring...@vmware.com> Co-authored-by: Sarah <sab...@pivotal.io> --- .../internal/cache/AbstractBucketRegionQueue.java | 4 +-- .../apache/geode/internal/cache/BucketRegion.java | 40 ++++++++++++++++------ .../geode/internal/cache/BucketRegionQueue.java | 4 +-- .../sanctioned-geode-core-serializables.txt | 1 + .../executor/CrashAndNoRepeatDUnitTest.java | 9 ++--- .../redis/internal/data/AbstractRedisData.java | 12 +++++-- .../key/RedisKeyCommandsFunctionInvoker.java | 7 ++-- .../internal/executor/key/RenameFunction.java | 2 +- 8 files changed, 52 insertions(+), 27 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java index fd8be1d..0813eab 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java @@ -294,11 +294,11 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { @Override public boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue, long lastModified, - boolean overwriteDestroyed, boolean invokeCallbacks, boolean throwConcurrentModificaiton) + boolean overwriteDestroyed, boolean invokeCallbacks, boolean throwConcurrentModification) throws TimeoutException, CacheWriterException { try { boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue, requireOldValue, - lastModified, overwriteDestroyed, invokeCallbacks, throwConcurrentModificaiton); + lastModified, overwriteDestroyed, invokeCallbacks, throwConcurrentModification); if (success) { if (logger.isDebugEnabled()) { logger.debug("Key : ----> {}", event.getKey()); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index 0136de6..8662a6e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -18,6 +18,7 @@ package org.apache.geode.internal.cache; import java.io.DataOutput; import java.io.IOException; import java.io.InputStream; +import java.io.Serializable; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -517,9 +518,11 @@ public class BucketRegion extends DistributedRegion implements Bucket { @Override public boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue, long lastModified, - boolean overwriteDestroyed, boolean invokeCallbacks, boolean throwConcurrentModificaiton) + boolean overwriteDestroyed, boolean invokeCallbacks, + boolean throwConcurrentModification) throws TimeoutException, CacheWriterException { - boolean locked = lockKeysAndPrimary(event); + + boolean isLocked = lockKeysAndPrimary(event); try { if (partitionedRegion.isParallelWanEnabled()) { @@ -551,7 +554,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { } return true; } finally { - if (locked) { + if (isLocked) { releaseLockForKeysAndPrimary(event); } } @@ -753,7 +756,11 @@ public class BucketRegion extends DistributedRegion implements Bucket { Object[] keys = getKeysToBeLocked(event); waitUntilLocked(keys); // it might wait for long time + if (wasPrimaryLockedPreviously(event)) { + return true; + } boolean lockedForPrimary = false; + try { lockedForPrimary = doLockForPrimary(false); // tryLock is false means doLockForPrimary won't return false. @@ -872,7 +879,9 @@ public class BucketRegion extends DistributedRegion implements Bucket { * And release/remove the lockObject on the key(s) */ void releaseLockForKeysAndPrimary(EntryEventImpl event) { - doUnlockForPrimary(); + if (!wasPrimaryLockedPreviously(event)) { + doUnlockForPrimary(); + } Object[] keys = getKeysToBeLocked(event); removeAndNotifyKeys(keys); @@ -1214,6 +1223,13 @@ public class BucketRegion extends DistributedRegion implements Bucket { } } + public static class PrimaryMoveReadLockAcquired implements Serializable { + }; + + private boolean wasPrimaryLockedPreviously(EntryEventImpl event) { + return event.getCallbackArgument() instanceof PrimaryMoveReadLockAcquired; + } + protected void distributeDestroyOperation(EntryEventImpl event) { long token = -1; DestroyOperation op = null; @@ -2093,9 +2109,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { // if GII has failed, because there is not primary. So it's safe to set these // counters to 0. oldMemValue = bytesInMemory.getAndSet(0); - } - - else { + } else { throw new InternalGemFireError( "Trying to clear a bucket region that was not destroyed or in initialization."); } @@ -2255,8 +2269,9 @@ public class BucketRegion extends DistributedRegion implements Bucket { final int memoryDelta = op.computeMemoryDelta(oldSize, newSize); - if (memoryDelta == 0) + if (memoryDelta == 0) { return; + } // do the bigger one first to keep the sum > 0 updateBucketMemoryStats(memoryDelta); } @@ -2308,8 +2323,9 @@ public class BucketRegion extends DistributedRegion implements Bucket { } public void incNumOverflowBytesOnDisk(long delta) { - if (delta == 0) + if (delta == 0) { return; + } numOverflowBytesOnDisk.addAndGet(delta); // The following could be reenabled at a future time. // I deadcoded for now to make sure I didn't have it break @@ -2346,11 +2362,13 @@ public class BucketRegion extends DistributedRegion implements Bucket { public int getSizeForEviction() { EvictionAttributes ea = getAttributes().getEvictionAttributes(); - if (ea == null) + if (ea == null) { return 0; + } EvictionAlgorithm algo = ea.getAlgorithm(); - if (!algo.isLRUHeap()) + if (!algo.isLRUHeap()) { return 0; + } EvictionAction action = ea.getAction(); return action.isLocalDestroy() ? getRegionMap().sizeInVM() : (int) getNumEntriesInVM(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index 198ba87..f9d5ab7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -251,11 +251,11 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { @Override public boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue, long lastModified, - boolean overwriteDestroyed, boolean invokeCallbacks, boolean throwConcurrentModificaiton) + boolean overwriteDestroyed, boolean invokeCallbacks, boolean throwConcurrentModification) throws TimeoutException, CacheWriterException { try { boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue, requireOldValue, - lastModified, overwriteDestroyed, invokeCallbacks, throwConcurrentModificaiton); + lastModified, overwriteDestroyed, invokeCallbacks, throwConcurrentModification); if (success) { if (getPartitionedRegion().getColocatedWith() == null) { diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt index 2f14f8b..652d1b2 100644 --- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt +++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt @@ -272,6 +272,7 @@ org/apache/geode/internal/admin/remote/DistributionLocatorId,true,65873901869719 org/apache/geode/internal/admin/remote/EntryValueNodeImpl,false,fields:org/apache/geode/internal/admin/remote/EntryValueNodeImpl[],name:java/lang/String,primitive:boolean,primitiveVal:java/lang/Object,type:java/lang/String org/apache/geode/internal/cache/BucketAdvisor$SetFromMap,true,2454657854757543876,m:java/util/Map org/apache/geode/internal/cache/BucketNotFoundException,true,2898657229184289911 +org/apache/geode/internal/cache/BucketRegion$PrimaryMoveReadLockAcquired,false org/apache/geode/internal/cache/BucketRegion$SizeOp,false org/apache/geode/internal/cache/CacheClientStatus,true,-56148046466517217,_id:org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID,_memberId:java/lang/String,_numberOfConnections:int,_socketAddresses:java/util/List,_socketPorts:java/util/List org/apache/geode/internal/cache/CommitReplyException,true,-7711083075296622596,exceptions:java/util/Set diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java index 2ef20a8..00dfbbb 100644 --- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java +++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java @@ -34,11 +34,11 @@ import org.apache.logging.log4j.Logger; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import redis.clients.jedis.Jedis; import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisDataException; import org.apache.geode.cache.control.RebalanceFactory; import org.apache.geode.cache.control.RebalanceResults; @@ -176,7 +176,6 @@ public class CrashAndNoRepeatDUnitTest { } @Test - @Ignore("GEODE-8339") public void givenServerCrashesDuringRename_thenDataIsNotLost() throws Exception { AtomicBoolean running1 = new AtomicBoolean(true); AtomicBoolean running2 = new AtomicBoolean(true); @@ -245,13 +244,15 @@ public class CrashAndNoRepeatDUnitTest { try { jedisRef.get().rename(oldKey, newKey); iterationCount += 1; - } catch (JedisConnectionException ex) { + } catch (JedisConnectionException | JedisDataException ex) { if (ex.getMessage().contains("Unexpected end of stream.")) { if (!doWithRetry(() -> connect(jedisRef).exists(oldKey))) { iterationCount += 1; } } else if (ex.getMessage().contains("no such key")) { - iterationCount += 1; + if (!doWithRetry(() -> connect(jedisRef).exists(oldKey))) { + iterationCount += 1; + } } else { throw ex; } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java index 2d90ad1..a07f652 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java @@ -24,7 +24,9 @@ import java.util.Objects; import org.apache.geode.DataSerializer; import org.apache.geode.InvalidDeltaException; +import org.apache.geode.cache.EntryNotFoundException; import org.apache.geode.cache.Region; +import org.apache.geode.internal.cache.BucketRegion; import org.apache.geode.redis.internal.delta.AddsDeltaInfo; import org.apache.geode.redis.internal.delta.AppendDeltaInfo; import org.apache.geode.redis.internal.delta.DeltaInfo; @@ -33,6 +35,9 @@ import org.apache.geode.redis.internal.delta.RemsDeltaInfo; import org.apache.geode.redis.internal.delta.TimestampDeltaInfo; public abstract class AbstractRedisData implements RedisData { + private static final BucketRegion.PrimaryMoveReadLockAcquired primaryMoveReadLockAcquired = + new BucketRegion.PrimaryMoveReadLockAcquired(); + @Override public String toString() { return "expirationTimestamp=" + expirationTimestamp; @@ -81,8 +86,11 @@ public abstract class AbstractRedisData implements RedisData { @Override public boolean rename(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper oldKey, ByteArrayWrapper newKey) { - region.put(newKey, this); - region.remove(oldKey); + region.put(newKey, this, primaryMoveReadLockAcquired); + try { + region.destroy(oldKey, primaryMoveReadLockAcquired); + } catch (EntryNotFoundException ignore) { + } return true; } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java index 00f12af..d7a2b13 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java @@ -31,9 +31,8 @@ import org.apache.geode.redis.internal.data.RedisData; import org.apache.geode.redis.internal.executor.RedisCommandsFunctionInvoker; /** - * This class is used by netty redis key command executors - * to invoke a geode function that will run on a - * particular server to do the redis command. + * This class is used by netty redis key command executors to invoke a geode function that will run + * on a particular server to do the redis command. */ public class RedisKeyCommandsFunctionInvoker extends RedisCommandsFunctionInvoker implements RedisKeyCommands { @@ -74,9 +73,7 @@ public class RedisKeyCommandsFunctionInvoker extends RedisCommandsFunctionInvoke @Override - @SuppressWarnings("unchecked") public boolean rename(ByteArrayWrapper oldKey, ByteArrayWrapper newKey) { - if (!region.containsKey(oldKey)) { return false; } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java index 6f0c013..e8c85bd 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java @@ -51,7 +51,6 @@ public class RenameFunction implements InternalFunction { FunctionService.registerFunction(new RenameFunction(dataRegion, stripedExecutor, redisStats)); } - public RenameFunction( Region<ByteArrayWrapper, RedisData> dataRegion, StripedExecutor stripedExecutor, @@ -72,6 +71,7 @@ public class RenameFunction implements InternalFunction { }; partitionedRegion.computeWithPrimaryLocked(renameContext.getKeyToLock(), computation); + } else { Object result = acquireLockIfNeeded(renameContext); context.getResultSender().lastResult(result);