GEODE-599: fix clear with concurrent writes

Cache modification lock was being released before operation distribution to 
other members.
This provided a small window in which an operation from another thread could 
update the
region prior to the other members receiving notification (i.e. down leveled)

This closes #232


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3ea7dde0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3ea7dde0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3ea7dde0

Branch: refs/heads/feature/GEODE-1781
Commit: 3ea7dde029ced39ef33cd03a7bfdadd6bce84e9d
Parents: 89d4270
Author: Scott Jewell <sjew...@pivotal.io>
Authored: Fri Jul 8 15:52:03 2016 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Tue Aug 16 17:25:24 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/AbstractRegionMap.java       | 917 ++++++++++---------
 .../gemfire/internal/cache/BucketRegion.java    |  38 +-
 .../cache/DistributedClearOperation.java        |   2 +-
 .../internal/cache/DistributedRegion.java       |  46 +-
 .../gemfire/internal/cache/LocalRegion.java     | 493 +++++-----
 .../gemfire/internal/cache/ProxyRegionMap.java  |   6 +
 .../gemfire/internal/cache/RegionMap.java       |   4 +
 .../internal/cache/StateFlushOperation.java     |   5 +
 .../cache/versions/RegionVersionVector.java     |  12 +-
 .../internal/cache/ARMLockTestHookAdapter.java  |  38 +
 .../cache/ClearRvvLockingDUnitTest.java         | 667 ++++++++++++++
 11 files changed, 1529 insertions(+), 699 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3ea7dde0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index faa8580..81e4d9f 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -41,7 +41,6 @@ import 
com.gemstone.gemfire.internal.cache.tier.sockets.HAEventWrapper;
 import com.gemstone.gemfire.internal.cache.versions.*;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
 import com.gemstone.gemfire.internal.concurrent.MapCallbackAdapter;
-import com.gemstone.gemfire.internal.concurrent.MapResult;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
@@ -54,6 +53,7 @@ import 
com.gemstone.gemfire.internal.offheap.annotations.Retained;
 import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
 import 
com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap;
+
 import org.apache.logging.log4j.Logger;
 
 import java.util.*;
@@ -1031,21 +1031,19 @@ public abstract class AbstractRegionMap implements 
RegionMap {
     
     boolean retry = true;
     
-    while (retry) {
-      retry = false;
-
-      boolean opCompleted = false;
-      boolean doPart3 = false;
-
-      // We need to acquire the region entry while holding the lock to avoid 
#45620.
-      // However, we also want to release the lock before distribution to 
prevent
-      // potential deadlocks.  The outer try/finally ensures that the lock 
will be
-      // released without fail.  I'm avoiding indenting just to preserve the 
ability
-      // to track diffs since the code is fairly complex.
-      boolean doUnlock = true;
-      lockForCacheModification(owner, event);
-      try {
+    lockForCacheModification(owner, event);
+    try {
+
+      while (retry) {
+        retry = false;
 
+        boolean opCompleted = false;
+        boolean doPart3 = false;
+
+        // We need to acquire the region entry while holding the lock to avoid 
#45620.
+        // The outer try/finally ensures that the lock will be released 
without fail.  
+        // I'm avoiding indenting just to preserve the ability
+        // to track diffs since the code is fairly complex.
 
         RegionEntry re = getOrCreateRegionEntry(owner, event, 
Token.REMOVED_PHASE1, null, true, true); 
         RegionEntry tombstone = null;
@@ -1498,9 +1496,6 @@ public abstract class AbstractRegionMap implements 
RegionMap {
           return opCompleted;
         }
         finally {
-          releaseCacheModificationLock(owner, event);
-          doUnlock = false;
-
           try {
             // If concurrency conflict is there and event contains gateway 
version tag then
             // do NOT distribute.
@@ -1523,12 +1518,10 @@ public abstract class AbstractRegionMap implements 
RegionMap {
           }
         }
 
-      } finally { // failsafe on the read lock...see comment above
-        if (doUnlock) {
-          releaseCacheModificationLock(owner, event);
-        }
-      }
-    } // retry loop
+      } // retry loop
+    } finally { // failsafe on the read lock...see comment above
+      releaseCacheModificationLock(owner, event);
+    }
     return false;
   }
 
@@ -1840,10 +1833,10 @@ public abstract class AbstractRegionMap implements 
RegionMap {
   
   public final boolean invalidate(EntryEventImpl event,
       boolean invokeCallbacks, boolean forceNewEntry, boolean forceCallbacks)
-      throws EntryNotFoundException
+          throws EntryNotFoundException
   {
     final boolean isDebugEnabled = logger.isDebugEnabled();
-    
+
     final LocalRegion owner = _getOwner();
     if (owner == null) {
       // "fix" for bug 32440
@@ -1857,377 +1850,381 @@ public abstract class AbstractRegionMap implements 
RegionMap {
     DiskRegion dr = owner.getDiskRegion();
     boolean ownerIsInitialized = owner.isInitialized();
     try {
-    // Fix for Bug #44431. We do NOT want to update the region and wait
-    // later for index INIT as region.clear() can cause inconsistency if
-    // happened in parallel as it also does index INIT.
-    IndexManager oqlIndexManager = owner.getIndexManager() ; 
-    if (oqlIndexManager != null) {
-      oqlIndexManager.waitForIndexInit();
-    }
-    lockForCacheModification(owner, event);
-    try {
-      if (forceNewEntry || forceCallbacks) {
-        boolean opCompleted = false;
-        RegionEntry newRe = getEntryFactory().createEntry(owner, 
event.getKey(),
-            Token.REMOVED_PHASE1);
-          synchronized (newRe) {
-            try {
-              RegionEntry oldRe = putEntryIfAbsent(event.getKey(), newRe);
-              
-              while (!opCompleted && oldRe != null) {
-                synchronized (oldRe) {
-                  // if the RE is in phase 2 of removal, it will really be 
removed
-                  // from the map.  Otherwise, we can use it here and the 
thread
-                  // that is destroying the RE will see the invalidation and 
not
-                  // proceed to phase 2 of removal.
-                  if (oldRe.isRemovedPhase2()) {
-                    oldRe = putEntryIfAbsent(event.getKey(), newRe);
-                    if (oldRe != null) {
-                      owner.getCachePerfStats().incRetries();
-                    }
-                  } else {
-                    opCompleted = true;
-                    event.setRegionEntry(oldRe);
-                    if (oldRe.isDestroyed()) {
-                      if (isDebugEnabled) {
-                        logger.debug("mapInvalidate: Found DESTROYED token, 
not invalidated; key={}", event.getKey());
-                      }
-                    } else if (oldRe.isInvalid()) {
-                    
-                      // was already invalid, do not invoke listeners or 
increment stat
-                      if (isDebugEnabled) {
-                        logger.debug("mapInvalidate: Entry already invalid: 
'{}'", event.getKey());
-                      }
-                      processVersionTag(oldRe, event);
-                      try {
-                        oldRe.setValue(owner, oldRe.getValueInVM(owner)); // 
OFFHEAP noop setting an already invalid to invalid; No need to call 
prepareValueForCache since it is an invalid token.
-                      } catch (RegionClearedException e) {
-                        // that's okay - when writing an invalid into a disk, 
the
-                        // region has been cleared (including this token)
+      // Fix for Bug #44431. We do NOT want to update the region and wait
+      // later for index INIT as region.clear() can cause inconsistency if
+      // happened in parallel as it also does index INIT.
+      IndexManager oqlIndexManager = owner.getIndexManager() ; 
+      if (oqlIndexManager != null) {
+        oqlIndexManager.waitForIndexInit();
+      }
+      lockForCacheModification(owner, event);
+      try {
+        try {
+          if (forceNewEntry || forceCallbacks) {
+            boolean opCompleted = false;
+            RegionEntry newRe = getEntryFactory().createEntry(owner, 
event.getKey(),
+                Token.REMOVED_PHASE1);
+            synchronized (newRe) {
+              try {
+                RegionEntry oldRe = putEntryIfAbsent(event.getKey(), newRe);
+
+                while (!opCompleted && oldRe != null) {
+                  synchronized (oldRe) {
+                    // if the RE is in phase 2 of removal, it will really be 
removed
+                    // from the map.  Otherwise, we can use it here and the 
thread
+                    // that is destroying the RE will see the invalidation and 
not
+                    // proceed to phase 2 of removal.
+                    if (oldRe.isRemovedPhase2()) {
+                      oldRe = putEntryIfAbsent(event.getKey(), newRe);
+                      if (oldRe != null) {
+                        owner.getCachePerfStats().incRetries();
                       }
                     } else {
-                      owner.serverInvalidate(event);
-                      if (owner.concurrencyChecksEnabled && 
event.noVersionReceivedFromServer()) {
-                        // server did not perform the invalidation, so don't 
leave an invalid
-                        // entry here
-                        return false;
-                      }
-                      final int oldSize = 
owner.calculateRegionEntryValueSize(oldRe);
-                      //added for cq which needs old value. rdubey
-                      FilterProfile fp = owner.getFilterProfile();
-                      if (!oldRe.isRemoved() && 
-                          (fp != null && fp.getCqCount() > 0)) {
-                        
-                        Object oldValue = oldRe.getValueInVM(owner); // 
OFFHEAP EntryEventImpl oldValue
-                        
-                        // this will not fault in the value.
-                        if (oldValue == Token.NOT_AVAILABLE){
-                          
event.setOldValue(oldRe.getValueOnDiskOrBuffer(owner));
-                        } else {
-                          event.setOldValue(oldValue);
+                      opCompleted = true;
+                      event.setRegionEntry(oldRe);
+                      if (oldRe.isDestroyed()) {
+                        if (isDebugEnabled) {
+                          logger.debug("mapInvalidate: Found DESTROYED token, 
not invalidated; key={}", event.getKey());
                         }
-                      }
-                      boolean isCreate = false;
-                      try {
-                        if (oldRe.isRemoved()) {
-                          processVersionTag(oldRe, event);
-                          event.putNewEntry(owner, oldRe);
-                          EntryLogger.logInvalidate(event);
-                          owner.recordEvent(event);
-                          if (!oldRe.isTombstone()) {
-                            owner.updateSizeOnPut(event.getKey(), oldSize, 
event.getNewValueBucketSize());
+                      } else if (oldRe.isInvalid()) {
+
+                        // was already invalid, do not invoke listeners or 
increment stat
+                        if (isDebugEnabled) {
+                          logger.debug("mapInvalidate: Entry already invalid: 
'{}'", event.getKey());
+                        }
+                        processVersionTag(oldRe, event);
+                        try {
+                          oldRe.setValue(owner, oldRe.getValueInVM(owner)); // 
OFFHEAP noop setting an already invalid to invalid; No need to call 
prepareValueForCache since it is an invalid token.
+                        } catch (RegionClearedException e) {
+                          // that's okay - when writing an invalid into a 
disk, the
+                          // region has been cleared (including this token)
+                        }
+                      } else {
+                        owner.serverInvalidate(event);
+                        if (owner.concurrencyChecksEnabled && 
event.noVersionReceivedFromServer()) {
+                          // server did not perform the invalidation, so don't 
leave an invalid
+                          // entry here
+                          return false;
+                        }
+                        final int oldSize = 
owner.calculateRegionEntryValueSize(oldRe);
+                        //added for cq which needs old value. rdubey
+                        FilterProfile fp = owner.getFilterProfile();
+                        if (!oldRe.isRemoved() && 
+                            (fp != null && fp.getCqCount() > 0)) {
+
+                          Object oldValue = oldRe.getValueInVM(owner); // 
OFFHEAP EntryEventImpl oldValue
+
+                          // this will not fault in the value.
+                          if (oldValue == Token.NOT_AVAILABLE){
+                            
event.setOldValue(oldRe.getValueOnDiskOrBuffer(owner));
                           } else {
-                            owner.updateSizeOnCreate(event.getKey(), 
event.getNewValueBucketSize());
-                            isCreate = true;
+                            event.setOldValue(oldValue);
                           }
-                        } else {
-                          processVersionTag(oldRe, event);
-                          event.putExistingEntry(owner, oldRe);
+                        }
+                        boolean isCreate = false;
+                        try {
+                          if (oldRe.isRemoved()) {
+                            processVersionTag(oldRe, event);
+                            event.putNewEntry(owner, oldRe);
+                            EntryLogger.logInvalidate(event);
+                            owner.recordEvent(event);
+                            if (!oldRe.isTombstone()) {
+                              owner.updateSizeOnPut(event.getKey(), oldSize, 
event.getNewValueBucketSize());
+                            } else {
+                              owner.updateSizeOnCreate(event.getKey(), 
event.getNewValueBucketSize());
+                              isCreate = true;
+                            }
+                          } else {
+                            processVersionTag(oldRe, event);
+                            event.putExistingEntry(owner, oldRe);
+                            EntryLogger.logInvalidate(event);
+                            owner.recordEvent(event);
+                            owner.updateSizeOnPut(event.getKey(), oldSize, 
event.getNewValueBucketSize());
+                          }
+                        }
+                        catch (RegionClearedException e) {
+                          // generate versionTag for the event
                           EntryLogger.logInvalidate(event);
                           owner.recordEvent(event);
-                          owner.updateSizeOnPut(event.getKey(), oldSize, 
event.getNewValueBucketSize());
+                          clearOccured = true;
                         }
+                        owner.basicInvalidatePart2(oldRe, event,
+                            clearOccured /* conflict with clear */, 
invokeCallbacks);
+                        if (!clearOccured) {
+                          if (isCreate) {
+                            lruEntryCreate(oldRe);
+                          } else {
+                            lruEntryUpdate(oldRe);
+                          }
+                        }                   
+                        didInvalidate = true;
+                        invalidatedRe = oldRe;
                       }
-                      catch (RegionClearedException e) {
-                        // generate versionTag for the event
-                        EntryLogger.logInvalidate(event);
-                        owner.recordEvent(event);
-                        clearOccured = true;
-                      }
-                      owner.basicInvalidatePart2(oldRe, event,
-                          clearOccured /* conflict with clear */, 
invokeCallbacks);
-                      if (!clearOccured) {
-                        if (isCreate) {
-                          lruEntryCreate(oldRe);
-                        } else {
-                          lruEntryUpdate(oldRe);
-                        }
-                      }                   
-                      didInvalidate = true;
-                      invalidatedRe = oldRe;
+                    }
+                  } // synchronized oldRe
+                } // while oldRe exists
+
+                if (!opCompleted) {
+                  if (forceNewEntry && event.isFromServer()) {
+                    // don't invoke listeners - we didn't force new entries for
+                    // CCU invalidations before 7.0, and listeners don't care
+                    if (!FORCE_INVALIDATE_EVENT) {
+                      event.inhibitCacheListenerNotification(true);
                     }
                   }
-                } // synchronized oldRe
-              } // while oldRe exists
-              
-              if (!opCompleted) {
-                if (forceNewEntry && event.isFromServer()) {
-                  // don't invoke listeners - we didn't force new entries for
-                  // CCU invalidations before 7.0, and listeners don't care
-                  if (!FORCE_INVALIDATE_EVENT) {
-                    event.inhibitCacheListenerNotification(true);
+                  event.setRegionEntry(newRe);
+                  owner.serverInvalidate(event);
+                  if (!forceNewEntry && event.noVersionReceivedFromServer()) {
+                    // server did not perform the invalidation, so don't leave 
an invalid
+                    // entry here
+                    return false;
                   }
-                }
-                event.setRegionEntry(newRe);
-                owner.serverInvalidate(event);
-                if (!forceNewEntry && event.noVersionReceivedFromServer()) {
-                  // server did not perform the invalidation, so don't leave 
an invalid
-                  // entry here
-                  return false;
-                }
-                try {
-                  ownerIsInitialized = owner.isInitialized();
-                  if (!ownerIsInitialized && 
owner.getDataPolicy().withReplication()) {
-                    final int oldSize = 
owner.calculateRegionEntryValueSize(newRe);
-                    invalidateEntry(event, newRe, oldSize);
+                  try {
+                    ownerIsInitialized = owner.isInitialized();
+                    if (!ownerIsInitialized && 
owner.getDataPolicy().withReplication()) {
+                      final int oldSize = 
owner.calculateRegionEntryValueSize(newRe);
+                      invalidateEntry(event, newRe, oldSize);
+                    }
+                    else {
+                      invalidateNewEntry(event, owner, newRe);
+                    }
                   }
-                  else {
-                    invalidateNewEntry(event, owner, newRe);
+                  catch (RegionClearedException e) {
+                    // TODO: deltaGII: do we even need RegionClearedException?
+                    // generate versionTag for the event
+                    owner.recordEvent(event);
+                    clearOccured = true;
                   }
+                  owner.basicInvalidatePart2(newRe, event, clearOccured 
/*conflict with clear*/, invokeCallbacks);
+                  if (!clearOccured) {
+                    lruEntryCreate(newRe);
+                    incEntryCount(1);
+                  }            
+                  opCompleted = true;
+                  didInvalidate = true;
+                  invalidatedRe = newRe;
+                  // Don't leave an entry in the cache, if we
+                  // just wanted to force the distribution and events
+                  // for this invalidate
+                  if (!forceNewEntry) {
+                    removeEntry(event.getKey(), newRe, false);
+                  } 
+                } // !opCompleted
+              } catch (ConcurrentCacheModificationException ccme) {
+                VersionTag tag = event.getVersionTag();
+                if (tag != null && tag.isTimeStampUpdated()) {
+                  // Notify gateways of new time-stamp.
+                  owner.notifyTimestampsToGateways(event);
                 }
-                catch (RegionClearedException e) {
-                  // TODO: deltaGII: do we even need RegionClearedException?
-                  // generate versionTag for the event
-                  owner.recordEvent(event);
-                  clearOccured = true;
-                }
-                owner.basicInvalidatePart2(newRe, event, clearOccured 
/*conflict with clear*/, invokeCallbacks);
-                if (!clearOccured) {
-                  lruEntryCreate(newRe);
-                  incEntryCount(1);
-                }            
-                opCompleted = true;
-                didInvalidate = true;
-                invalidatedRe = newRe;
-                // Don't leave an entry in the cache, if we
-                // just wanted to force the distribution and events
-                // for this invalidate
-                if (!forceNewEntry) {
+                throw ccme;
+              } finally {
+                if (!opCompleted) {
                   removeEntry(event.getKey(), newRe, false);
-                } 
-              } // !opCompleted
-            } catch (ConcurrentCacheModificationException ccme) {
-              VersionTag tag = event.getVersionTag();
-              if (tag != null && tag.isTimeStampUpdated()) {
-                // Notify gateways of new time-stamp.
-                owner.notifyTimestampsToGateways(event);
-              }
-              throw ccme;
-            } finally {
-              if (!opCompleted) {
-                removeEntry(event.getKey(), newRe, false);
-              }
-            }
-          } // synchronized newRe
-      } // forceNewEntry
-      else { // !forceNewEntry
-        boolean retry = true;
-        // RegionEntry retryEntry = null;
-        // int retries = -1;
-        
-        while (retry) {
-          retry = false;
-          boolean entryExisted = false;
-          RegionEntry re = getEntry(event.getKey());
-          RegionEntry tombstone = null;
-          boolean haveTombstone = false;
-          if (re != null && re.isTombstone()) {
-            tombstone = re;
-            haveTombstone = true;
-            re = null;
-          }
-          if (re == null) {
-            ownerIsInitialized = owner.isInitialized();
-            if (!ownerIsInitialized) {
-              // when GII message arrived or processed later than invalidate
-              // message, the entry should be created as placeholder
-              RegionEntry newRe = haveTombstone? tombstone : 
getEntryFactory().createEntry(owner, event.getKey(),
-                  Token.INVALID);
-              synchronized (newRe) {
-                if (haveTombstone && !tombstone.isTombstone()) {
-                  // state of the tombstone has changed so we need to retry
-                  retry = true;
-                  //retryEntry = tombstone; // leave this in place for 
debugging
-                  continue;
-                }
-                re = putEntryIfAbsent(event.getKey(), newRe);
-                if (re == tombstone) {
-                  re = null; // pretend we don't have an entry
                 }
               }
-            } else if (owner.getServerProxy() != null) {
-              Object sync = haveTombstone? tombstone : new Object();
-              synchronized(sync) {
-                if (haveTombstone && !tombstone.isTombstone()) { 
-                  // bug 45295: state of the tombstone has changed so we need 
to retry
-                  retry = true;
-                  //retryEntry = tombstone; // leave this in place for 
debugging
-                  continue;
-                }
-       
-                // bug #43287 - send event to server even if it's not in the 
client (LRU may have evicted it)
-                owner.serverInvalidate(event);
-                if (owner.concurrencyChecksEnabled) {
-                  if (event.getVersionTag() == null) {
-                    // server did not perform the invalidation, so don't leave 
an invalid
-                    // entry here
-                    return false;
-                  } else if (tombstone != null) {
-                    processVersionTag(tombstone, event);
-                    try {
-                      if (!tombstone.isTombstone()) {
-                        if (isDebugEnabled) {
-                          logger.debug("tombstone is no longer a tombstone. 
{}:event={}", tombstone, event);
+            } // synchronized newRe
+          } // forceNewEntry
+          else { // !forceNewEntry
+            boolean retry = true;
+            // RegionEntry retryEntry = null;
+            // int retries = -1;
+
+            while (retry) {
+              retry = false;
+              boolean entryExisted = false;
+              RegionEntry re = getEntry(event.getKey());
+              RegionEntry tombstone = null;
+              boolean haveTombstone = false;
+              if (re != null && re.isTombstone()) {
+                tombstone = re;
+                haveTombstone = true;
+                re = null;
+              }
+              if (re == null) {
+                ownerIsInitialized = owner.isInitialized();
+                if (!ownerIsInitialized) {
+                  // when GII message arrived or processed later than 
invalidate
+                  // message, the entry should be created as placeholder
+                  RegionEntry newRe = haveTombstone? tombstone : 
getEntryFactory().createEntry(owner, event.getKey(),
+                      Token.INVALID);
+                  synchronized (newRe) {
+                    if (haveTombstone && !tombstone.isTombstone()) {
+                      // state of the tombstone has changed so we need to retry
+                      retry = true;
+                      //retryEntry = tombstone; // leave this in place for 
debugging
+                      continue;
+                    }
+                    re = putEntryIfAbsent(event.getKey(), newRe);
+                    if (re == tombstone) {
+                      re = null; // pretend we don't have an entry
+                    }
+                  }
+                } else if (owner.getServerProxy() != null) {
+                  Object sync = haveTombstone? tombstone : new Object();
+                  synchronized(sync) {
+                    if (haveTombstone && !tombstone.isTombstone()) { 
+                      // bug 45295: state of the tombstone has changed so we 
need to retry
+                      retry = true;
+                      //retryEntry = tombstone; // leave this in place for 
debugging
+                      continue;
+                    }
+
+                    // bug #43287 - send event to server even if it's not in 
the client (LRU may have evicted it)
+                    owner.serverInvalidate(event);
+                    if (owner.concurrencyChecksEnabled) {
+                      if (event.getVersionTag() == null) {
+                        // server did not perform the invalidation, so don't 
leave an invalid
+                        // entry here
+                        return false;
+                      } else if (tombstone != null) {
+                        processVersionTag(tombstone, event);
+                        try {
+                          if (!tombstone.isTombstone()) {
+                            if (isDebugEnabled) {
+                              logger.debug("tombstone is no longer a 
tombstone. {}:event={}", tombstone, event);
+                            }
+                          }
+                          tombstone.setValue(owner, Token.TOMBSTONE);
+                        } catch (RegionClearedException e) {
+                          // that's okay - when writing a tombstone into a 
disk, the
+                          // region has been cleared (including this tombstone)
+                        } catch (ConcurrentCacheModificationException ccme) {
+                          VersionTag tag = event.getVersionTag();
+                          if (tag != null && tag.isTimeStampUpdated()) {
+                            // Notify gateways of new time-stamp.
+                            owner.notifyTimestampsToGateways(event);
+                          }
+                          throw ccme;
                         }
+                        // update the tombstone's version to prevent an older 
CCU/putAll from overwriting it
+                        owner.rescheduleTombstone(tombstone, 
event.getVersionTag());
                       }
-                      tombstone.setValue(owner, Token.TOMBSTONE);
-                    } catch (RegionClearedException e) {
-                      // that's okay - when writing a tombstone into a disk, 
the
-                      // region has been cleared (including this tombstone)
-                    } catch (ConcurrentCacheModificationException ccme) {
-                      VersionTag tag = event.getVersionTag();
-                      if (tag != null && tag.isTimeStampUpdated()) {
-                        // Notify gateways of new time-stamp.
-                        owner.notifyTimestampsToGateways(event);
-                      }
-                      throw ccme;
                     }
-                    // update the tombstone's version to prevent an older 
CCU/putAll from overwriting it
-                    owner.rescheduleTombstone(tombstone, 
event.getVersionTag());
                   }
+                  entryExisted = true;
                 }
               }
-              entryExisted = true;
-            }
-          }
-          if (re != null) {
-            // Gester: Race condition in GII
-            // when adding the placeholder for invalidate entry during GII,
-            // if the GII got processed earlier for this entry, then do 
-            // normal invalidate operation
-            synchronized (re) {
-              if (!event.isOriginRemote() && 
event.getOperation().isExpiration()) {
-                // If this expiration started locally then only do it if the 
RE is not being used by a tx.
-                if (re.isInUseByTransaction()) {
-                  return false;
-                }
-              }
-              if (re.isTombstone() || (!re.isRemoved() && !re.isDestroyed())) {
-                entryExisted = true;
-                if (re.isInvalid()) {
-                  // was already invalid, do not invoke listeners or increment
-                  // stat
-                  if (isDebugEnabled) {
-                    logger.debug("Invalidate: Entry already invalid: '{}'", 
event.getKey());
-                  }
-                  if (event.getVersionTag() != null && 
owner.getVersionVector() != null) {
-                    
owner.getVersionVector().recordVersion((InternalDistributedMember) 
event.getDistributedMember(), event.getVersionTag());
-                  }
-                }
-                else { // previous value not invalid
-                  event.setRegionEntry(re);
-                  owner.serverInvalidate(event);
-                  if (owner.concurrencyChecksEnabled && 
event.noVersionReceivedFromServer()) {
-                    // server did not perform the invalidation, so don't leave 
an invalid
-                    // entry here
-                    if (isDebugEnabled) {
-                      logger.debug("returning early because server did not 
generate a version stamp for this event:{}", event);
-                    }
-                    return false;
-                  }
-             // in case of overflow to disk we need the old value for cqs.
-                  if(owner.getFilterProfile().getCqCount() > 0){
-                    //use to be getValue and can cause dead lock rdubey.
-                    if (re.isValueNull()) {
-                      event.setOldValue(re.getValueOnDiskOrBuffer(owner));
-                    } else {
-                      Object v = re.getValueInVM(owner);
-                      event.setOldValue(v); // OFFHEAP escapes to 
EntryEventImpl oldValue
+              if (re != null) {
+                // Gester: Race condition in GII
+                // when adding the placeholder for invalidate entry during GII,
+                // if the GII got processed earlier for this entry, then do 
+                // normal invalidate operation
+                synchronized (re) {
+                  if (!event.isOriginRemote() && 
event.getOperation().isExpiration()) {
+                    // If this expiration started locally then only do it if 
the RE is not being used by a tx.
+                    if (re.isInUseByTransaction()) {
+                      return false;
                     }
                   }
-                  final boolean oldWasTombstone = re.isTombstone();
-                  final int oldSize = 
_getOwner().calculateRegionEntryValueSize(re);
-                  try {
-                    invalidateEntry(event, re, oldSize);
-                  }
-                  catch (RegionClearedException rce) {
-                    // generate versionTag for the event
-                    EntryLogger.logInvalidate(event);
-                    _getOwner().recordEvent(event);
-                    clearOccured = true;
-                  } catch (ConcurrentCacheModificationException ccme) {
-                    VersionTag tag = event.getVersionTag();
-                    if (tag != null && tag.isTimeStampUpdated()) {
-                      // Notify gateways of new time-stamp.
-                      owner.notifyTimestampsToGateways(event);
+                  if (re.isTombstone() || (!re.isRemoved() && 
!re.isDestroyed())) {
+                    entryExisted = true;
+                    if (re.isInvalid()) {
+                      // was already invalid, do not invoke listeners or 
increment
+                      // stat
+                      if (isDebugEnabled) {
+                        logger.debug("Invalidate: Entry already invalid: 
'{}'", event.getKey());
+                      }
+                      if (event.getVersionTag() != null && 
owner.getVersionVector() != null) {
+                        
owner.getVersionVector().recordVersion((InternalDistributedMember) 
event.getDistributedMember(), event.getVersionTag());
+                      }
                     }
-                    throw ccme;
+                    else { // previous value not invalid
+                      event.setRegionEntry(re);
+                      owner.serverInvalidate(event);
+                      if (owner.concurrencyChecksEnabled && 
event.noVersionReceivedFromServer()) {
+                        // server did not perform the invalidation, so don't 
leave an invalid
+                        // entry here
+                        if (isDebugEnabled) {
+                          logger.debug("returning early because server did not 
generate a version stamp for this event:{}", event);
+                        }
+                        return false;
+                      }
+                      // in case of overflow to disk we need the old value for 
cqs.
+                      if(owner.getFilterProfile().getCqCount() > 0){
+                        //use to be getValue and can cause dead lock rdubey.
+                        if (re.isValueNull()) {
+                          event.setOldValue(re.getValueOnDiskOrBuffer(owner));
+                        } else {
+                          Object v = re.getValueInVM(owner);
+                          event.setOldValue(v); // OFFHEAP escapes to 
EntryEventImpl oldValue
+                        }
+                      }
+                      final boolean oldWasTombstone = re.isTombstone();
+                      final int oldSize = 
_getOwner().calculateRegionEntryValueSize(re);
+                      try {
+                        invalidateEntry(event, re, oldSize);
+                      }
+                      catch (RegionClearedException rce) {
+                        // generate versionTag for the event
+                        EntryLogger.logInvalidate(event);
+                        _getOwner().recordEvent(event);
+                        clearOccured = true;
+                      } catch (ConcurrentCacheModificationException ccme) {
+                        VersionTag tag = event.getVersionTag();
+                        if (tag != null && tag.isTimeStampUpdated()) {
+                          // Notify gateways of new time-stamp.
+                          owner.notifyTimestampsToGateways(event);
+                        }
+                        throw ccme;
+                      }
+                      owner.basicInvalidatePart2(re, event,
+                          clearOccured /* conflict with clear */, 
invokeCallbacks);
+                      if (!clearOccured) {
+                        if (oldWasTombstone) {
+                          lruEntryCreate(re);
+                        } else {
+                          lruEntryUpdate(re);
+                        }
+                      }             
+                      didInvalidate = true;
+                      invalidatedRe = re;
+                    } // previous value not invalid
                   }
-                  owner.basicInvalidatePart2(re, event,
-                      clearOccured /* conflict with clear */, invokeCallbacks);
-                  if (!clearOccured) {
-                    if (oldWasTombstone) {
-                      lruEntryCreate(re);
-                    } else {
-                      lruEntryUpdate(re);
-                    }
-                  }             
-                  didInvalidate = true;
-                  invalidatedRe = re;
-                } // previous value not invalid
+                } // synchronized re
+              } // re != null
+              else {
+                // At this point, either it's not in GII mode, or the 
placeholder
+                // is in region, do nothing
               }
-            } // synchronized re
-          } // re != null
-          else {
-            // At this point, either it's not in GII mode, or the placeholder
-            // is in region, do nothing
-          }
-          if (!entryExisted) {
-            owner.checkEntryNotFound(event.getKey());
-          }
-        } // while(retry)
-      } // !forceNewEntry
-    } catch( DiskAccessException dae) {
-      invalidatedRe = null;
-      didInvalidate = false;
-      this._getOwner().handleDiskAccessException(dae);
-      throw dae;
-    } finally {
-      releaseCacheModificationLock(owner, event);
-      if (oqlIndexManager != null) {
-        oqlIndexManager.countDownIndexUpdaters();
-      }
-      if (invalidatedRe != null) {
-        owner.basicInvalidatePart3(invalidatedRe, event, invokeCallbacks);
-      }
-      if (didInvalidate && !clearOccured) {
-        try {
-          lruUpdateCallback();
+              if (!entryExisted) {
+                owner.checkEntryNotFound(event.getKey());
+              }
+            } // while(retry)
+          } // !forceNewEntry
         } catch( DiskAccessException dae) {
+          invalidatedRe = null;
+          didInvalidate = false;
           this._getOwner().handleDiskAccessException(dae);
           throw dae;
+        } finally {
+          if (oqlIndexManager != null) {
+            oqlIndexManager.countDownIndexUpdaters();
+          }
+          if (invalidatedRe != null) {
+            owner.basicInvalidatePart3(invalidatedRe, event, invokeCallbacks);
+          }
+          if (didInvalidate && !clearOccured) {
+            try {
+              lruUpdateCallback();
+            } catch( DiskAccessException dae) {
+              this._getOwner().handleDiskAccessException(dae);
+              throw dae;
+            }
+          }
+          else if (!didInvalidate){
+            resetThreadLocals();
+          }
+        }
+        return didInvalidate;
+      } finally {
+        if (ownerIsInitialized) {
+          forceInvalidateEvent(event, owner);
         }
       }
-      else if (!didInvalidate){
-        resetThreadLocals();
-      }
-    }
-    return didInvalidate;
     } finally {
-      if (ownerIsInitialized) {
-        forceInvalidateEvent(event, owner);
-      }
+      releaseCacheModificationLock(owner, event);
     }
+
   }
 
   protected void invalidateNewEntry(EntryEventImpl event,
@@ -2682,27 +2679,30 @@ public abstract class AbstractRegionMap implements 
RegionMap {
     boolean uninitialized = !owner.isInitialized();
     boolean retrieveOldValueForDelta = event.getDeltaBytes() != null
         && event.getRawNewValue() == null;
-    lockForCacheModification(owner, event);
     IndexManager oqlIndexManager = null;
+    lockForCacheModification(owner, event);
     try {
-      // Fix for Bug #44431. We do NOT want to update the region and wait
-      // later for index INIT as region.clear() can cause inconsistency if
-      // happened in parallel as it also does index INIT.
-      oqlIndexManager = owner.getIndexManager() ; 
-      if (oqlIndexManager != null) {
-        oqlIndexManager.waitForIndexInit();
-      }
+      try {
+        // Fix for Bug #44431. We do NOT want to update the region and wait
+        // later for index INIT as region.clear() can cause inconsistency if
+        // happened in parallel as it also does index INIT.
+        oqlIndexManager = owner.getIndexManager() ; 
+        if (oqlIndexManager != null) {
+          oqlIndexManager.waitForIndexInit();
+        }
 
-      // fix for bug #42169, replace must go to server if entry not on client
-      boolean replaceOnClient = event.getOperation() == Operation.REPLACE
-                && owner.getServerProxy() != null; 
+        // fix for bug #42169, replace must go to server if entry not on client
+        boolean replaceOnClient = event.getOperation() == Operation.REPLACE
+            && owner.getServerProxy() != null; 
         // Rather than having two different blocks for synchronizing oldRe
         // and newRe, have only one block and synchronize re
         RegionEntry re = null;
         boolean eventRecorded = false;
         boolean onlyExisting = ifOld && !replaceOnClient;
-               re = getOrCreateRegionEntry(owner, event, 
-                   Token.REMOVED_PHASE1, null, onlyExisting, false);
+
+        re = getOrCreateRegionEntry(owner, event, 
+
+            Token.REMOVED_PHASE1, null, onlyExisting, false);
         if (re == null) {
           return null;
         }
@@ -2712,9 +2712,8 @@ public abstract class AbstractRegionMap implements 
RegionMap {
             // from the map. otherwise we can append an event to it
             // and change its state
             if (re.isRemovedPhase2()) {
-                re = getOrCreateRegionEntry(owner, event,
-                    Token.REMOVED_PHASE1, null, onlyExisting, false);
-                _getOwner().getCachePerfStats().incRetries();
+              re = getOrCreateRegionEntry(owner, event, Token.REMOVED_PHASE1, 
null, onlyExisting, false);
+              _getOwner().getCachePerfStats().incRetries();
               if (re == null) {
                 // this will happen when onlyExisting is true
                 return null;
@@ -2757,48 +2756,48 @@ public abstract class AbstractRegionMap implements 
RegionMap {
 
                 // notify index of an update
                 notifyIndex(re, true);
+                try {
                   try {
-                    try {
-                      if ((cacheWrite && event.getOperation().isUpdate()) // 
if there is a cacheWriter, type of event has already been set
-                          || !re.isRemoved()
-                          || replaceOnClient) {
-                        // update
-                        updateEntry(event, requireOldValue, oldValueForDelta, 
re);
-                      } else {
-                        // create
-                        createEntry(event, owner, re);
-                      }
-                      owner.recordEvent(event);
-                      eventRecorded = true;
-                    } catch (RegionClearedException rce) {
-                      clearOccured = true;
-                      owner.recordEvent(event);
-                    } catch (ConcurrentCacheModificationException ccme) {
-                      VersionTag tag = event.getVersionTag();
-                      if (tag != null && tag.isTimeStampUpdated()) {
-                        // Notify gateways of new time-stamp.
-                        owner.notifyTimestampsToGateways(event);
-                      }
-                      throw ccme;
+                    if ((cacheWrite && event.getOperation().isUpdate()) // if 
there is a cacheWriter, type of event has already been set
+                        || !re.isRemoved()
+                        || replaceOnClient) {
+                      // update
+                      updateEntry(event, requireOldValue, oldValueForDelta, 
re);
+                    } else {
+                      // create
+                      createEntry(event, owner, re);
                     }
-                    if (uninitialized) {
-                      event.inhibitCacheListenerNotification(true);
+                    owner.recordEvent(event);
+                    eventRecorded = true;
+                  } catch (RegionClearedException rce) {
+                    clearOccured = true;
+                    owner.recordEvent(event);
+                  } catch (ConcurrentCacheModificationException ccme) {
+                    VersionTag tag = event.getVersionTag();
+                    if (tag != null && tag.isTimeStampUpdated()) {
+                      // Notify gateways of new time-stamp.
+                      owner.notifyTimestampsToGateways(event);
                     }
-                    updateLru(clearOccured, re, event);
-
-                    lastModifiedTime = owner.basicPutPart2(event, re,
-                        !uninitialized, lastModifiedTime, clearOccured);
-                  } finally {
-                    notifyIndex(re, false);
+                    throw ccme;
                   }
-                  result = re;
-                  break;
-                } finally {
-                  OffHeapHelper.release(oldValueForDelta);
-                  if (re != null && !onlyExisting && !isOpComplete(re, event)) 
{
-                    owner.cleanUpOnIncompleteOp(event, re);
+                  if (uninitialized) {
+                    event.inhibitCacheListenerNotification(true);
                   }
-                  else if (re != null && 
owner.isUsedForPartitionedRegionBucket()) {
+                  updateLru(clearOccured, re, event);
+
+                  lastModifiedTime = owner.basicPutPart2(event, re,
+                      !uninitialized, lastModifiedTime, clearOccured);
+                } finally {
+                  notifyIndex(re, false);
+                }
+                result = re;
+                break;
+              } finally {
+                OffHeapHelper.release(oldValueForDelta);
+                if (re != null && !onlyExisting && !isOpComplete(re, event)) {
+                  owner.cleanUpOnIncompleteOp(event, re);
+                }
+                else if (re != null && 
owner.isUsedForPartitionedRegionBucket()) {
                   BucketRegion br = (BucketRegion)owner;
                   CachePerfStats stats = 
br.getPartitionedRegion().getCachePerfStats();
                 }
@@ -2806,14 +2805,13 @@ public abstract class AbstractRegionMap implements 
RegionMap {
             }
           } // sync re
         }// end while
-    } catch (DiskAccessException dae) {
-      //Asif:Feel that it is safe to destroy the region here as there appears
-      // to be no chance of deadlock during region destruction      
-      result = null;
-      this._getOwner().handleDiskAccessException(dae);
-      throw dae;
-    } finally {
-        releaseCacheModificationLock(owner, event);
+      } catch (DiskAccessException dae) {
+        //Asif:Feel that it is safe to destroy the region here as there appears
+        // to be no chance of deadlock during region destruction      
+        result = null;
+        this._getOwner().handleDiskAccessException(dae);
+        throw dae;
+      } finally {
         if (oqlIndexManager != null) {
           oqlIndexManager.countDownIndexUpdaters();
         }
@@ -2841,8 +2839,10 @@ public abstract class AbstractRegionMap implements 
RegionMap {
         } else {
           resetThreadLocals();
         }
-    } // finally
-    
+      } 
+    } finally {
+      releaseCacheModificationLock(owner, event);
+    }
     return result;
   }
 
@@ -3632,45 +3632,69 @@ public abstract class AbstractRegionMap implements 
RegionMap {
   
 
   /** get version-generation permission from the region's version vector */
-  private void lockForCacheModification(LocalRegion owner, EntryEventImpl 
event) {
+  void lockForCacheModification(LocalRegion owner, EntryEventImpl event) {
     boolean lockedByBulkOp = event.isBulkOpInProgress() && 
owner.dataPolicy.withReplication();
-    if (!event.isOriginRemote() && !lockedByBulkOp) {
+    
+    if(armLockTestHook!=null) armLockTestHook.beforeLock(owner, event);
+    
+    if (!event.isOriginRemote() && !lockedByBulkOp && !owner.hasServerProxy()) 
{
       RegionVersionVector vector = owner.getVersionVector();
       if (vector != null) {
-        vector.lockForCacheModification(owner);
+        vector.lockForCacheModification();
       }
     }
+    
+    if(armLockTestHook!=null) armLockTestHook.afterLock(owner, event);
+
   }
   
   /** release version-generation permission from the region's version vector */
-  private void releaseCacheModificationLock(LocalRegion owner, EntryEventImpl 
event) {
+  void releaseCacheModificationLock(LocalRegion owner, EntryEventImpl event) {
     boolean lockedByBulkOp = event.isBulkOpInProgress() && 
owner.dataPolicy.withReplication();
-    if (!event.isOriginRemote() && !lockedByBulkOp) {
+    
+    if(armLockTestHook!=null) armLockTestHook.beforeRelease(owner, event);
+
+    if (!event.isOriginRemote() && !lockedByBulkOp && !owner.hasServerProxy()) 
{
       RegionVersionVector vector = owner.getVersionVector();
       if (vector != null) {
-        vector.releaseCacheModificationLock(owner);
+        vector.releaseCacheModificationLock();
       }
     }
+    
+    if(armLockTestHook!=null) armLockTestHook.afterRelease(owner, event);
+
   }
   
   /** get version-generation permission from the region's version vector */
   private void lockForTXCacheModification(LocalRegion owner, VersionTag tag) {
+    
+    if(armLockTestHook!=null) armLockTestHook.beforeLock(owner, null);
+    
     if ( !(tag != null && tag.isFromOtherMember()) ) {
       RegionVersionVector vector = owner.getVersionVector();
-      if (vector != null) {
-        vector.lockForCacheModification(owner);
+      if (vector != null && !owner.hasServerProxy()) {
+        vector.lockForCacheModification();
       }
     }
+    
+    if(armLockTestHook!=null) armLockTestHook.afterLock(owner, null);
+
   }
   
   /** release version-generation permission from the region's version vector */
   private void releaseTXCacheModificationLock(LocalRegion owner, VersionTag 
tag) {
+    
+    if(armLockTestHook!=null) armLockTestHook.beforeRelease(owner, null);
+
     if ( !(tag != null && tag.isFromOtherMember()) ) {
       RegionVersionVector vector = owner.getVersionVector();
-      if (vector != null) {
-        vector.releaseCacheModificationLock(owner);
+      if (vector != null && !owner.hasServerProxy()) {
+       vector.releaseCacheModificationLock();
       }
     }
+    
+    if(armLockTestHook!=null) armLockTestHook.afterRelease(owner, null);
+
   }
 
   /**
@@ -3798,5 +3822,30 @@ public abstract class AbstractRegionMap implements 
RegionMap {
     }
     return true;
   }
+  
+  public interface ARMLockTestHook {
+    public void beforeBulkLock(LocalRegion region);
+    public void afterBulkLock(LocalRegion region);
+    public void beforeBulkRelease(LocalRegion region);
+    public void afterBulkRelease(LocalRegion region);
+
+    public void beforeLock(LocalRegion region, CacheEvent event);
+    public void afterLock(LocalRegion region, CacheEvent event);
+    public void beforeRelease(LocalRegion region, CacheEvent event);
+    public void afterRelease(LocalRegion region, CacheEvent event);
+
+    public void beforeStateFlushWait();
+  }
+  
+  private ARMLockTestHook armLockTestHook;
+  
+  public ARMLockTestHook getARMLockTestHook() {
+    return armLockTestHook;
+  }
+  
+  public void setARMLockTestHook(ARMLockTestHook theHook) {
+    armLockTestHook = theHook;
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3ea7dde0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index f1627d3..4e4c417 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -1268,26 +1268,32 @@ implements Bucket
     Assert.assertTrue(!isTX());
     Assert.assertTrue(event.getOperation().isDistributed());
 
-    beginLocalWrite(event);
+    LocalRegion lr = event.getLocalRegion();
+    AbstractRegionMap arm = ((AbstractRegionMap) lr.getRegionMap());
     try {
-      
-      if (!hasSeenEvent(event)) {
-        this.entries.updateEntryVersion(event);
-      } else {
-        if (logger.isTraceEnabled(LogMarker.DM)) {
-          logger.trace(LogMarker.DM, "BR.basicUpdateEntryVersion: this cache 
has already seen this event {}", event);
+      arm.lockForCacheModification(lr, event);
+      beginLocalWrite(event);
+      try {      
+        if (!hasSeenEvent(event)) {
+          this.entries.updateEntryVersion(event);
+        } else {
+          if (logger.isTraceEnabled(LogMarker.DM)) {
+            logger.trace(LogMarker.DM, "BR.basicUpdateEntryVersion: this cache 
has already seen this event {}", event);
+          }
         }
+        if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
+          // This cache has processed the event, forward operation
+          // and event messages to backup buckets
+          if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+            distributeUpdateEntryVersionOperation(event);
+          }
+        }
+        return;
+      } finally {
+        endLocalWrite(event);
       }
-      if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
-        // This cache has processed the event, forward operation
-        // and event messages to backup buckets
-       if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
-          distributeUpdateEntryVersionOperation(event);
-       }
-      }
-      return;
     } finally {
-      endLocalWrite(event);
+      arm.releaseCacheModificationLock(event.getLocalRegion(), event);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3ea7dde0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java
index 5526ef0..b6e1c35 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java
@@ -219,7 +219,7 @@ public class DistributedClearOperation extends 
DistributedCacheOperation
       case OP_LOCK_FOR_CLEAR:
         if (region.getDataPolicy().withStorage()) {
           DistributedClearOperation.regionLocked(this.getSender(), 
region.getFullPath(), region);
-          region.lockLocallyForClear(dm, this.getSender());
+          region.lockLocallyForClear(dm, this.getSender(), event);
         }
         this.appliedOperation = true;
         break;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3ea7dde0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
index b42a617..a7b82ac 100755
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
@@ -41,6 +41,7 @@ import 
com.gemstone.gemfire.distributed.internal.locks.DLockService;
 import 
com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.i18n.StringId;
 import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.AbstractRegionMap.ARMLockTestHook;
 import 
com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile;
 import com.gemstone.gemfire.internal.cache.InitialImageOperation.GIIStatus;
 import 
com.gemstone.gemfire.internal.cache.RemoteFetchVersionMessage.FetchVersionResponse;
@@ -1966,20 +1967,25 @@ public class DistributedRegion extends LocalRegion 
implements
     }
   }
 
-  
   @Override
   void basicUpdateEntryVersion(EntryEventImpl event)
       throws EntryNotFoundException {
-
+    LocalRegion lr = event.getLocalRegion();
+    AbstractRegionMap arm = ((AbstractRegionMap) lr.getRegionMap());
     try {
-      if (!hasSeenEvent(event)) {
-        super.basicUpdateEntryVersion(event);
+      arm.lockForCacheModification(lr, event);
+      try {
+        if (!hasSeenEvent(event)) {
+          super.basicUpdateEntryVersion(event);
+        }
+        return;
+      } finally {
+        if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+          distributeUpdateEntryVersion(event);
+        }
       }
-      return;
     } finally {
-      if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
-        distributeUpdateEntryVersion(event);
-      }
+      arm.releaseCacheModificationLock(lr, event);
     }
   }
 
@@ -2110,33 +2116,47 @@ public class DistributedRegion extends LocalRegion 
implements
    * @param participants 
    **/
   private void obtainWriteLocksForClear(RegionEventImpl regionEvent, 
Set<InternalDistributedMember> participants) {
-    lockLocallyForClear(getDistributionManager(), getMyId());
+    lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent);
     DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
   }
   
   /** pause local operations so that a clear() can be performed and flush comm 
channels to the given member
   */
-  public void lockLocallyForClear(DM dm, InternalDistributedMember locker) {
+  public void lockLocallyForClear(DM dm, InternalDistributedMember locker, 
CacheEvent event) {
     RegionVersionVector rvv = getVersionVector();
+    
+    ARMLockTestHook alth = getRegionMap().getARMLockTestHook();
+    if(alth!=null) alth.beforeLock(this, event);
+    
     if (rvv != null) {
       // block new operations from being applied to the region map
       rvv.lockForClear(getFullPath(), dm, locker);
       //Check for region destroyed after we have locked, to make sure
       //we don't continue a clear if the region has been destroyed.
       checkReadiness();
-      // wait for current operations to 
-      if (!locker.equals(dm.getDistributionManagerId())) {
+      // Only need to flush if NOACK at this point
+      if (this.getAttributes().getScope().isDistributedNoAck()) {
         Set<InternalDistributedMember> mbrs = 
getDistributionAdvisor().adviseCacheOp();
         StateFlushOperation.flushTo(mbrs, this);
-      }
+      }      
     }
+    
+    if(alth!=null) alth.afterLock(this, null);
+
   }
 
   /** releases the locks obtained in obtainWriteLocksForClear 
    * @param participants */
   private void releaseWriteLocksForClear(RegionEventImpl regionEvent, 
Set<InternalDistributedMember> participants) {
+    
+    ARMLockTestHook alth = getRegionMap().getARMLockTestHook();
+    if(alth!=null) alth.beforeRelease(this, regionEvent);
+    
     getVersionVector().unlockForClear(getMyId());
     DistributedClearOperation.releaseLocks(regionEvent, participants);
+    
+    if(alth!=null) alth.afterRelease(this, regionEvent);
+
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3ea7dde0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 73dc5ab..304a5ad 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -157,6 +157,7 @@ import 
com.gemstone.gemfire.internal.InternalStatisticsDisabledException;
 import com.gemstone.gemfire.internal.NanoTimer;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.admin.ClientHealthMonitoringRegion;
+import com.gemstone.gemfire.internal.cache.AbstractRegionMap.ARMLockTestHook;
 import 
com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile;
 import com.gemstone.gemfire.internal.cache.DiskInitFile.DiskRegionFlag;
 import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo;
@@ -10198,141 +10199,144 @@ public class LocalRegion extends AbstractRegion
     }
     lockRVVForBulkOp();
     try {
+      try {
 
-      final DistributedPutAllOperation dpao = putAllOp;
-      int size = (proxyResult == null)? map.size() : proxyResult.size();
-      
-      if (isDebugEnabled) {
-        logger.debug( "size of put result is {} maps is {} proxyResult is {}", 
size, map, proxyResult);
-      }
+        final DistributedPutAllOperation dpao = putAllOp;
+        int size = (proxyResult == null)? map.size() : proxyResult.size();
 
-      final PutAllPartialResult partialKeys = new PutAllPartialResult(size);
-      final Iterator iterator;
-      final boolean isVersionedResults;
-      if (proxyResult != null) {
-        iterator = proxyResult.iterator();
-        isVersionedResults = true;
-      } else {
-        iterator = map.entrySet().iterator();
-        isVersionedResults = false;
-      }
-      Runnable r = new Runnable() {
-        public void run() {
-          int offset = 0;
-          VersionTagHolder tagHolder = new VersionTagHolder();
-          while (iterator.hasNext()) {
-            stopper.checkCancelInProgress(null);
-            Map.Entry mapEntry = (Map.Entry)iterator.next();
-            Object key = mapEntry.getKey();
-            VersionTag versionTag = null;
-            tagHolder.setVersionTag(null);
-            final Object value;
-            boolean overwritten = false;
-            if (isVersionedResults) {
-              versionTag = 
((VersionedObjectList.Entry)mapEntry).getVersionTag();
-              value = map.get(key);
-              if (isDebugEnabled) {
-                logger.debug("putAll key {} -> {} version={}", key, value, 
versionTag);
-              }
-              if (versionTag == null && serverIsVersioned && 
concurrencyChecksEnabled && dataPolicy.withStorage()) {
-                // server was unable to determine the version for this 
operation.
-                // I'm not sure this can still happen as described below on a 
pr.
-                // But it can happen on the server if NORMAL or PRELOADED. See 
bug 51644.
-                // This can happen in a PR with redundancy if there is a bucket
-                // failure or migration during the operation.  We destroy the
-                // entry since we don't know what its state should be (but the 
server should)
+        if (isDebugEnabled) {
+          logger.debug( "size of put result is {} maps is {} proxyResult is 
{}", size, map, proxyResult);
+        }
+
+        final PutAllPartialResult partialKeys = new PutAllPartialResult(size);
+        final Iterator iterator;
+        final boolean isVersionedResults;
+        if (proxyResult != null) {
+          iterator = proxyResult.iterator();
+          isVersionedResults = true;
+        } else {
+          iterator = map.entrySet().iterator();
+          isVersionedResults = false;
+        }
+        Runnable r = new Runnable() {
+          public void run() {
+            int offset = 0;
+            VersionTagHolder tagHolder = new VersionTagHolder();
+            while (iterator.hasNext()) {
+              stopper.checkCancelInProgress(null);
+              Map.Entry mapEntry = (Map.Entry)iterator.next();
+              Object key = mapEntry.getKey();
+              VersionTag versionTag = null;
+              tagHolder.setVersionTag(null);
+              final Object value;
+              boolean overwritten = false;
+              if (isVersionedResults) {
+                versionTag = 
((VersionedObjectList.Entry)mapEntry).getVersionTag();
+                value = map.get(key);
                 if (isDebugEnabled) {
-                  logger.debug("server returned no version information for 
{}", key);
+                  logger.debug("putAll key {} -> {} version={}", key, value, 
versionTag);
                 }
-                localDestroyNoCallbacks(key);
-                // to be consistent we need to fetch the current entry
-                get(key, event.getCallbackArgument(), false, null);
-                overwritten = true;
-              }
-            } else {
-              value = mapEntry.getValue();
-              if (isDebugEnabled) {
-                logger.debug("putAll {} -> {}", key, value);
-              }
-            }
-            try {
-              if (serverIsVersioned) {
+                if (versionTag == null && serverIsVersioned && 
concurrencyChecksEnabled && dataPolicy.withStorage()) {
+                  // server was unable to determine the version for this 
operation.
+                  // I'm not sure this can still happen as described below on 
a pr.
+                  // But it can happen on the server if NORMAL or PRELOADED. 
See bug 51644.
+                  // This can happen in a PR with redundancy if there is a 
bucket
+                  // failure or migration during the operation.  We destroy the
+                  // entry since we don't know what its state should be (but 
the server should)
+                  if (isDebugEnabled) {
+                    logger.debug("server returned no version information for 
{}", key);
+                  }
+                  localDestroyNoCallbacks(key);
+                  // to be consistent we need to fetch the current entry
+                  get(key, event.getCallbackArgument(), false, null);
+                  overwritten = true;
+                }
+              } else {
+                value = mapEntry.getValue();
                 if (isDebugEnabled) {
-                  logger.debug("associating version tag with {} version={}", 
key, versionTag);
+                  logger.debug("putAll {} -> {}", key, value);
                 }
-                //If we have received a version tag from a server, add it to 
the event
-                tagHolder.setVersionTag(versionTag);
-                tagHolder.setFromServer(true);
-              } else if (retryVersions != null && 
retryVersions.containsKey(key)) {
-                //If this is a retried event, and we have a version tag for 
the retry,
-                //add it to the event.
-                tagHolder.setVersionTag(retryVersions.get(key));
-              }
-              
-              if (!overwritten) {
-                basicEntryPutAll(key, value, dpao, offset, tagHolder);
               }
-              // now we must check again since the cache may have closed during
-              // distribution (causing this process to not receive and queue 
the
-              // event for clients
-              stopper.checkCancelInProgress(null);
-              succeeded.addKeyAndVersion(key, tagHolder.getVersionTag());
-            } 
-            catch (Exception ex) {
-              // TODO ask Gester if this debug logging can be removed
-              if (isDebugEnabled) {
-                logger.debug("PutAll operation encountered exception for key 
{}", key, ex);
+              try {
+                if (serverIsVersioned) {
+                  if (isDebugEnabled) {
+                    logger.debug("associating version tag with {} version={}", 
key, versionTag);
+                  }
+                  //If we have received a version tag from a server, add it to 
the event
+                  tagHolder.setVersionTag(versionTag);
+                  tagHolder.setFromServer(true);
+                } else if (retryVersions != null && 
retryVersions.containsKey(key)) {
+                  //If this is a retried event, and we have a version tag for 
the retry,
+                  //add it to the event.
+                  tagHolder.setVersionTag(retryVersions.get(key));
+                }
+
+                if (!overwritten) {
+                  basicEntryPutAll(key, value, dpao, offset, tagHolder);
+                }
+                // now we must check again since the cache may have closed 
during
+                // distribution (causing this process to not receive and queue 
the
+                // event for clients
+                stopper.checkCancelInProgress(null);
+                succeeded.addKeyAndVersion(key, tagHolder.getVersionTag());
+              } 
+              catch (Exception ex) {
+                // TODO ask Gester if this debug logging can be removed
+                if (isDebugEnabled) {
+                  logger.debug("PutAll operation encountered exception for key 
{}", key, ex);
+                }
+                partialKeys.saveFailedKey(key, ex);
               }
-              partialKeys.saveFailedKey(key, ex);
+              offset++;
             }
-            offset++;
           }
-        }
-      };
-      this.syncBulkOp(r, eventId);
-      if (partialKeys.hasFailure()) {
-        // Bug 51725: Now succeeded contains an order key list, may be missing 
the version tags. 
-        // Save reference of succeeded into partialKeys. The succeeded may be 
modified by
-        // postPutAll() to fill in the version tags. 
-        partialKeys.setSucceededKeysAndVersions(succeeded);
-        
logger.info(LocalizedMessage.create(LocalizedStrings.Region_PutAll_Applied_PartialKeys_0_1,
-                new Object[] {getFullPath(), partialKeys}));
-        if (isDebugEnabled) {
-          logger.debug(partialKeys.detailString());
-        }
-        if (e == null) {
-          // if received exception from server first, ignore local exception
-          if (dpao.isBridgeOperation()) {
-            if (partialKeys.getFailure() instanceof CancelException) {
-              e = (CancelException)partialKeys.getFailure(); 
-            } else if (partialKeys.getFailure() instanceof LowMemoryException) 
{
-              throw partialKeys.getFailure();  // fix for #43589
-            } else {
-              e = new PutAllPartialResultException(partialKeys);
-              if (isDebugEnabled) {
-                logger.debug("basicPutAll:"+partialKeys.detailString());
+        };
+        this.syncBulkOp(r, eventId);
+        if (partialKeys.hasFailure()) {
+          // Bug 51725: Now succeeded contains an order key list, may be 
missing the version tags. 
+          // Save reference of succeeded into partialKeys. The succeeded may 
be modified by
+          // postPutAll() to fill in the version tags. 
+          partialKeys.setSucceededKeysAndVersions(succeeded);
+          
logger.info(LocalizedMessage.create(LocalizedStrings.Region_PutAll_Applied_PartialKeys_0_1,
+              new Object[] {getFullPath(), partialKeys}));
+          if (isDebugEnabled) {
+            logger.debug(partialKeys.detailString());
+          }
+          if (e == null) {
+            // if received exception from server first, ignore local exception
+            if (dpao.isBridgeOperation()) {
+              if (partialKeys.getFailure() instanceof CancelException) {
+                e = (CancelException)partialKeys.getFailure(); 
+              } else if (partialKeys.getFailure() instanceof 
LowMemoryException) {
+                throw partialKeys.getFailure();  // fix for #43589
+              } else {
+                e = new PutAllPartialResultException(partialKeys);
+                if (isDebugEnabled) {
+                  logger.debug("basicPutAll:"+partialKeys.detailString());
+                }
               }
+            } else {
+              throw partialKeys.getFailure();
             }
-          } else {
-            throw partialKeys.getFailure();
           }
         }
       }
-    }
-    catch (LowMemoryException lme) {
-      throw lme;
-    }
-    catch (RuntimeException ex) {
-      e = ex;
-    }
-    catch (Exception ex) {
-      e = new RuntimeException(ex);
+      catch (LowMemoryException lme) {
+        throw lme;
+      }
+      catch (RuntimeException ex) {
+        e = ex;
+      }
+      catch (Exception ex) {
+        e = new RuntimeException(ex);
+      } finally {
+        putAllOp.getBaseEvent().release();
+        putAllOp.freeOffHeapResources();
+      }
+      getDataView().postPutAll(putAllOp, succeeded, this);
     } finally {
       unlockRVVForBulkOp();
-      putAllOp.getBaseEvent().release();
-      putAllOp.freeOffHeapResources();
     }
-    getDataView().postPutAll(putAllOp, succeeded, this);
     if (e != null) {
       throw e;
     }
@@ -10400,150 +10404,153 @@ public class LocalRegion extends AbstractRegion
     }
     lockRVVForBulkOp();
     try {
+      try {
 
-      final DistributedRemoveAllOperation op = removeAllOp;
-      int size = (proxyResult == null)? keys.size() : proxyResult.size();
-      
-      if (isInternalRegion()) {
-        if (isTraceEnabled) {
-          logger.trace(
-              "size of removeAll result is {} keys are {} proxyResult is {}", 
size, keys, proxyResult);
+        final DistributedRemoveAllOperation op = removeAllOp;
+        int size = (proxyResult == null)? keys.size() : proxyResult.size();
+
+        if (isInternalRegion()) {
+          if (isTraceEnabled) {
+            logger.trace(
+                "size of removeAll result is {} keys are {} proxyResult is 
{}", size, keys, proxyResult);
+          } else {
+            if (isTraceEnabled) {
+              logger.trace(
+                  "size of removeAll result is {} keys are {} proxyResult is 
{}", size, keys, proxyResult);
+            }
+          }
         } else {
           if (isTraceEnabled) {
             logger.trace(
                 "size of removeAll result is {} keys are {} proxyResult is 
{}", size, keys, proxyResult);
           }
         }
-      } else {
-        if (isTraceEnabled) {
-          logger.trace(
-              "size of removeAll result is {} keys are {} proxyResult is {}", 
size, keys, proxyResult);
-        }
-      }
 
-      final PutAllPartialResult partialKeys = new PutAllPartialResult(size);
-      final Iterator iterator;
-      final boolean isVersionedResults;
-      if (proxyResult != null) {
-        iterator = proxyResult.iterator();
-        isVersionedResults = true;
-      } else {
-        iterator = keys.iterator();
-        isVersionedResults = false;
-      }
-      Runnable r = new Runnable() {
-        public void run() {
-          int offset = 0;
-          VersionTagHolder tagHolder = new VersionTagHolder();
-          while (iterator.hasNext()) {
-            stopper.checkCancelInProgress(null);
-            Object key;
-            VersionTag versionTag = null;
-            tagHolder.setVersionTag(null);
-            if (isVersionedResults) {
-              Map.Entry mapEntry = (Map.Entry)iterator.next();
-              key = mapEntry.getKey();
-              versionTag = 
((VersionedObjectList.Entry)mapEntry).getVersionTag();
-              if (isDebugEnabled) {
-                logger.debug("removeAll key {} version={}",key, versionTag);
-              }
-              if (versionTag == null) {
+        final PutAllPartialResult partialKeys = new PutAllPartialResult(size);
+        final Iterator iterator;
+        final boolean isVersionedResults;
+        if (proxyResult != null) {
+          iterator = proxyResult.iterator();
+          isVersionedResults = true;
+        } else {
+          iterator = keys.iterator();
+          isVersionedResults = false;
+        }
+        Runnable r = new Runnable() {
+          public void run() {
+            int offset = 0;
+            VersionTagHolder tagHolder = new VersionTagHolder();
+            while (iterator.hasNext()) {
+              stopper.checkCancelInProgress(null);
+              Object key;
+              VersionTag versionTag = null;
+              tagHolder.setVersionTag(null);
+              if (isVersionedResults) {
+                Map.Entry mapEntry = (Map.Entry)iterator.next();
+                key = mapEntry.getKey();
+                versionTag = 
((VersionedObjectList.Entry)mapEntry).getVersionTag();
                 if (isDebugEnabled) {
-                  logger.debug("removeAll found invalid version tag, which 
means the entry is not found at server for key={}.", key);
+                  logger.debug("removeAll key {} version={}",key, versionTag);
                 }
-                succeeded.addKeyAndVersion(key, null);
-                continue;
-              }
-              // No need for special handling here in removeAll.
-              // We can just remove this key from the client with versionTag 
set to null.
-            } else {
-              key = iterator.next();
-              if (isInternalRegion()) {
-                if (isTraceEnabled) {
-                  logger.trace("removeAll {}", key);
+                if (versionTag == null) {
+                  if (isDebugEnabled) {
+                    logger.debug("removeAll found invalid version tag, which 
means the entry is not found at server for key={}.", key);
+                  }
+                  succeeded.addKeyAndVersion(key, null);
+                  continue;
                 }
+                // No need for special handling here in removeAll.
+                // We can just remove this key from the client with versionTag 
set to null.
               } else {
-                if (isTraceEnabled) {
-                  logger.trace("removeAll {}", key);
+                key = iterator.next();
+                if (isInternalRegion()) {
+                  if (isTraceEnabled) {
+                    logger.trace("removeAll {}", key);
+                  }
+                } else {
+                  if (isTraceEnabled) {
+                    logger.trace("removeAll {}", key);
+                  }
                 }
+
               }
+              try {
+                if (serverIsVersioned) {
+                  if (isDebugEnabled) {
+                    logger.debug("associating version tag with {} version={}", 
key, versionTag);
+                  }
+                  //If we have received a version tag from a server, add it to 
the event
+                  tagHolder.setVersionTag(versionTag);
+                  tagHolder.setFromServer(true);
+                } else if (retryVersions != null) {
+                  VersionTag vt = retryVersions.get(offset);
+                  if (vt != null) {
+                    //If this is a retried event, and we have a version tag 
for the retry,
+                    //add it to the event.
+                    tagHolder.setVersionTag(vt);
+                  }
+                }
 
+                basicEntryRemoveAll(key, op, offset, tagHolder);
+                // now we must check again since the cache may have closed 
during
+                // distribution causing this process to not receive and queue 
the
+                // event for clients
+                stopper.checkCancelInProgress(null);
+                succeeded.addKeyAndVersion(key, tagHolder.getVersionTag());
+              } 
+              catch (Exception ex) {
+                partialKeys.saveFailedKey(key, ex);
+              }
+              offset++;
             }
-            try {
-              if (serverIsVersioned) {
+          }
+        };
+        syncBulkOp(r, eventId);
+        if (partialKeys.hasFailure()) {
+          // Bug 51725: Now succeeded contains an order key list, may be 
missing the version tags. 
+          // Save reference of succeeded into partialKeys. The succeeded may 
be modified by
+          // postRemoveAll() to fill in the version tags.
+          partialKeys.setSucceededKeysAndVersions(succeeded);
+          
logger.info(LocalizedMessage.create(LocalizedStrings.Region_RemoveAll_Applied_PartialKeys_0_1,
+              new Object[] {getFullPath(), partialKeys}));
+          if (isDebugEnabled) {
+            logger.debug(partialKeys.detailString());
+          }
+          if (e == null) {
+            // if received exception from server first, ignore local exception
+            if (op.isBridgeOperation()) {
+              if (partialKeys.getFailure() instanceof CancelException) {
+                e = (CancelException)partialKeys.getFailure(); 
+              } else if (partialKeys.getFailure() instanceof 
LowMemoryException) {
+                throw partialKeys.getFailure();  // fix for #43589
+              } else {
+                e = new PutAllPartialResultException(partialKeys);
                 if (isDebugEnabled) {
-                  logger.debug("associating version tag with {} version={}", 
key, versionTag);
-                }
-                //If we have received a version tag from a server, add it to 
the event
-                tagHolder.setVersionTag(versionTag);
-                tagHolder.setFromServer(true);
-              } else if (retryVersions != null) {
-                VersionTag vt = retryVersions.get(offset);
-                if (vt != null) {
-                  //If this is a retried event, and we have a version tag for 
the retry,
-                  //add it to the event.
-                  tagHolder.setVersionTag(vt);
+                  logger.debug("basicRemoveAll:"+partialKeys.detailString());
                 }
               }
-              
-              basicEntryRemoveAll(key, op, offset, tagHolder);
-              // now we must check again since the cache may have closed during
-              // distribution causing this process to not receive and queue the
-              // event for clients
-              stopper.checkCancelInProgress(null);
-              succeeded.addKeyAndVersion(key, tagHolder.getVersionTag());
-            } 
-            catch (Exception ex) {
-              partialKeys.saveFailedKey(key, ex);
-            }
-            offset++;
-          }
-        }
-      };
-      syncBulkOp(r, eventId);
-      if (partialKeys.hasFailure()) {
-        // Bug 51725: Now succeeded contains an order key list, may be missing 
the version tags. 
-        // Save reference of succeeded into partialKeys. The succeeded may be 
modified by
-        // postRemoveAll() to fill in the version tags.
-        partialKeys.setSucceededKeysAndVersions(succeeded);
-        
logger.info(LocalizedMessage.create(LocalizedStrings.Region_RemoveAll_Applied_PartialKeys_0_1,
-                new Object[] {getFullPath(), partialKeys}));
-        if (isDebugEnabled) {
-          logger.debug(partialKeys.detailString());
-        }
-        if (e == null) {
-          // if received exception from server first, ignore local exception
-          if (op.isBridgeOperation()) {
-            if (partialKeys.getFailure() instanceof CancelException) {
-              e = (CancelException)partialKeys.getFailure(); 
-            } else if (partialKeys.getFailure() instanceof LowMemoryException) 
{
-              throw partialKeys.getFailure();  // fix for #43589
             } else {
-              e = new PutAllPartialResultException(partialKeys);
-              if (isDebugEnabled) {
-                logger.debug("basicRemoveAll:"+partialKeys.detailString());
-              }
+              throw partialKeys.getFailure();
             }
-          } else {
-            throw partialKeys.getFailure();
           }
         }
       }
-    }
-    catch (LowMemoryException lme) {
-      throw lme;
-    }
-    catch (RuntimeException ex) {
-      e = ex;
-    }
-    catch (Exception ex) {
-      e = new RuntimeException(ex);
+      catch (LowMemoryException lme) {
+        throw lme;
+      }
+      catch (RuntimeException ex) {
+        e = ex;
+      }
+      catch (Exception ex) {
+        e = new RuntimeException(ex);
+      } finally {
+        removeAllOp.getBaseEvent().release();
+        removeAllOp.freeOffHeapResources();
+      }
+      getDataView().postRemoveAll(removeAllOp, succeeded, this);
     } finally {
       unlockRVVForBulkOp();
-      removeAllOp.getBaseEvent().release();
-      removeAllOp.freeOffHeapResources();
     }
-    getDataView().postRemoveAll(removeAllOp, succeeded, this);
     if (e != null) {
       throw e;
     }
@@ -10558,15 +10565,33 @@ public class LocalRegion extends AbstractRegion
    *  to get a valid version tag.
    */
   private void lockRVVForBulkOp() {
+    ARMLockTestHook alth = getRegionMap().getARMLockTestHook();
+    if(alth!=null) { 
+      alth.beforeBulkLock(this); 
+    }
+    
     if (this.versionVector != null && this.dataPolicy.withReplication()) {
       this.versionVector.lockForCacheModification(this);
     }
+    
+    if(alth!=null) {
+      alth.afterBulkLock(this);
+    }
   }
   
   private void unlockRVVForBulkOp() {
+    ARMLockTestHook alth = getRegionMap().getARMLockTestHook();
+    if(alth!=null) {
+      alth.beforeBulkRelease(this);
+    }
+    
     if (this.versionVector != null && this.dataPolicy.withReplication()) {
       this.versionVector.releaseCacheModificationLock(this);
     }
+    
+    if(alth!=null) {
+      alth.afterBulkRelease(this);
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3ea7dde0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
index 3ad2cc1..99d96ab 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
@@ -34,6 +34,7 @@ import com.gemstone.gemfire.distributed.internal.DM;
 import 
com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.ByteArrayDataInput;
 import com.gemstone.gemfire.internal.InternalStatisticsDisabledException;
+import com.gemstone.gemfire.internal.cache.AbstractRegionMap.ARMLockTestHook;
 import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
 import com.gemstone.gemfire.internal.cache.lru.NewLRUClockHand;
 import 
com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
@@ -745,4 +746,9 @@ final class ProxyRegionMap implements RegionMap {
   @Override
   public void close() {
   }
+
+  @Override
+  public ARMLockTestHook getARMLockTestHook() {
+    return null;
+  }
 }


Reply via email to