GEODE-599: fix clear with concurrent writes Cache modification lock was being released before operation distribution to other members. This provided a small window in which an operation from another thread could update the region prior to the other members receiving notification (i.e. down leveled)
This closes #232 Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3ea7dde0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3ea7dde0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3ea7dde0 Branch: refs/heads/feature/GEODE-1781 Commit: 3ea7dde029ced39ef33cd03a7bfdadd6bce84e9d Parents: 89d4270 Author: Scott Jewell <sjew...@pivotal.io> Authored: Fri Jul 8 15:52:03 2016 -0700 Committer: Kirk Lund <kl...@apache.org> Committed: Tue Aug 16 17:25:24 2016 -0700 ---------------------------------------------------------------------- .../internal/cache/AbstractRegionMap.java | 917 ++++++++++--------- .../gemfire/internal/cache/BucketRegion.java | 38 +- .../cache/DistributedClearOperation.java | 2 +- .../internal/cache/DistributedRegion.java | 46 +- .../gemfire/internal/cache/LocalRegion.java | 493 +++++----- .../gemfire/internal/cache/ProxyRegionMap.java | 6 + .../gemfire/internal/cache/RegionMap.java | 4 + .../internal/cache/StateFlushOperation.java | 5 + .../cache/versions/RegionVersionVector.java | 12 +- .../internal/cache/ARMLockTestHookAdapter.java | 38 + .../cache/ClearRvvLockingDUnitTest.java | 667 ++++++++++++++ 11 files changed, 1529 insertions(+), 699 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3ea7dde0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java index faa8580..81e4d9f 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java @@ -41,7 +41,6 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.HAEventWrapper; import com.gemstone.gemfire.internal.cache.versions.*; import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; import com.gemstone.gemfire.internal.concurrent.MapCallbackAdapter; -import com.gemstone.gemfire.internal.concurrent.MapResult; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; @@ -54,6 +53,7 @@ import com.gemstone.gemfire.internal.offheap.annotations.Retained; import com.gemstone.gemfire.internal.offheap.annotations.Unretained; import com.gemstone.gemfire.internal.sequencelog.EntryLogger; import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap; + import org.apache.logging.log4j.Logger; import java.util.*; @@ -1031,21 +1031,19 @@ public abstract class AbstractRegionMap implements RegionMap { boolean retry = true; - while (retry) { - retry = false; - - boolean opCompleted = false; - boolean doPart3 = false; - - // We need to acquire the region entry while holding the lock to avoid #45620. - // However, we also want to release the lock before distribution to prevent - // potential deadlocks. The outer try/finally ensures that the lock will be - // released without fail. I'm avoiding indenting just to preserve the ability - // to track diffs since the code is fairly complex. - boolean doUnlock = true; - lockForCacheModification(owner, event); - try { + lockForCacheModification(owner, event); + try { + + while (retry) { + retry = false; + boolean opCompleted = false; + boolean doPart3 = false; + + // We need to acquire the region entry while holding the lock to avoid #45620. + // The outer try/finally ensures that the lock will be released without fail. + // I'm avoiding indenting just to preserve the ability + // to track diffs since the code is fairly complex. RegionEntry re = getOrCreateRegionEntry(owner, event, Token.REMOVED_PHASE1, null, true, true); RegionEntry tombstone = null; @@ -1498,9 +1496,6 @@ public abstract class AbstractRegionMap implements RegionMap { return opCompleted; } finally { - releaseCacheModificationLock(owner, event); - doUnlock = false; - try { // If concurrency conflict is there and event contains gateway version tag then // do NOT distribute. @@ -1523,12 +1518,10 @@ public abstract class AbstractRegionMap implements RegionMap { } } - } finally { // failsafe on the read lock...see comment above - if (doUnlock) { - releaseCacheModificationLock(owner, event); - } - } - } // retry loop + } // retry loop + } finally { // failsafe on the read lock...see comment above + releaseCacheModificationLock(owner, event); + } return false; } @@ -1840,10 +1833,10 @@ public abstract class AbstractRegionMap implements RegionMap { public final boolean invalidate(EntryEventImpl event, boolean invokeCallbacks, boolean forceNewEntry, boolean forceCallbacks) - throws EntryNotFoundException + throws EntryNotFoundException { final boolean isDebugEnabled = logger.isDebugEnabled(); - + final LocalRegion owner = _getOwner(); if (owner == null) { // "fix" for bug 32440 @@ -1857,377 +1850,381 @@ public abstract class AbstractRegionMap implements RegionMap { DiskRegion dr = owner.getDiskRegion(); boolean ownerIsInitialized = owner.isInitialized(); try { - // Fix for Bug #44431. We do NOT want to update the region and wait - // later for index INIT as region.clear() can cause inconsistency if - // happened in parallel as it also does index INIT. - IndexManager oqlIndexManager = owner.getIndexManager() ; - if (oqlIndexManager != null) { - oqlIndexManager.waitForIndexInit(); - } - lockForCacheModification(owner, event); - try { - if (forceNewEntry || forceCallbacks) { - boolean opCompleted = false; - RegionEntry newRe = getEntryFactory().createEntry(owner, event.getKey(), - Token.REMOVED_PHASE1); - synchronized (newRe) { - try { - RegionEntry oldRe = putEntryIfAbsent(event.getKey(), newRe); - - while (!opCompleted && oldRe != null) { - synchronized (oldRe) { - // if the RE is in phase 2 of removal, it will really be removed - // from the map. Otherwise, we can use it here and the thread - // that is destroying the RE will see the invalidation and not - // proceed to phase 2 of removal. - if (oldRe.isRemovedPhase2()) { - oldRe = putEntryIfAbsent(event.getKey(), newRe); - if (oldRe != null) { - owner.getCachePerfStats().incRetries(); - } - } else { - opCompleted = true; - event.setRegionEntry(oldRe); - if (oldRe.isDestroyed()) { - if (isDebugEnabled) { - logger.debug("mapInvalidate: Found DESTROYED token, not invalidated; key={}", event.getKey()); - } - } else if (oldRe.isInvalid()) { - - // was already invalid, do not invoke listeners or increment stat - if (isDebugEnabled) { - logger.debug("mapInvalidate: Entry already invalid: '{}'", event.getKey()); - } - processVersionTag(oldRe, event); - try { - oldRe.setValue(owner, oldRe.getValueInVM(owner)); // OFFHEAP noop setting an already invalid to invalid; No need to call prepareValueForCache since it is an invalid token. - } catch (RegionClearedException e) { - // that's okay - when writing an invalid into a disk, the - // region has been cleared (including this token) + // Fix for Bug #44431. We do NOT want to update the region and wait + // later for index INIT as region.clear() can cause inconsistency if + // happened in parallel as it also does index INIT. + IndexManager oqlIndexManager = owner.getIndexManager() ; + if (oqlIndexManager != null) { + oqlIndexManager.waitForIndexInit(); + } + lockForCacheModification(owner, event); + try { + try { + if (forceNewEntry || forceCallbacks) { + boolean opCompleted = false; + RegionEntry newRe = getEntryFactory().createEntry(owner, event.getKey(), + Token.REMOVED_PHASE1); + synchronized (newRe) { + try { + RegionEntry oldRe = putEntryIfAbsent(event.getKey(), newRe); + + while (!opCompleted && oldRe != null) { + synchronized (oldRe) { + // if the RE is in phase 2 of removal, it will really be removed + // from the map. Otherwise, we can use it here and the thread + // that is destroying the RE will see the invalidation and not + // proceed to phase 2 of removal. + if (oldRe.isRemovedPhase2()) { + oldRe = putEntryIfAbsent(event.getKey(), newRe); + if (oldRe != null) { + owner.getCachePerfStats().incRetries(); } } else { - owner.serverInvalidate(event); - if (owner.concurrencyChecksEnabled && event.noVersionReceivedFromServer()) { - // server did not perform the invalidation, so don't leave an invalid - // entry here - return false; - } - final int oldSize = owner.calculateRegionEntryValueSize(oldRe); - //added for cq which needs old value. rdubey - FilterProfile fp = owner.getFilterProfile(); - if (!oldRe.isRemoved() && - (fp != null && fp.getCqCount() > 0)) { - - Object oldValue = oldRe.getValueInVM(owner); // OFFHEAP EntryEventImpl oldValue - - // this will not fault in the value. - if (oldValue == Token.NOT_AVAILABLE){ - event.setOldValue(oldRe.getValueOnDiskOrBuffer(owner)); - } else { - event.setOldValue(oldValue); + opCompleted = true; + event.setRegionEntry(oldRe); + if (oldRe.isDestroyed()) { + if (isDebugEnabled) { + logger.debug("mapInvalidate: Found DESTROYED token, not invalidated; key={}", event.getKey()); } - } - boolean isCreate = false; - try { - if (oldRe.isRemoved()) { - processVersionTag(oldRe, event); - event.putNewEntry(owner, oldRe); - EntryLogger.logInvalidate(event); - owner.recordEvent(event); - if (!oldRe.isTombstone()) { - owner.updateSizeOnPut(event.getKey(), oldSize, event.getNewValueBucketSize()); + } else if (oldRe.isInvalid()) { + + // was already invalid, do not invoke listeners or increment stat + if (isDebugEnabled) { + logger.debug("mapInvalidate: Entry already invalid: '{}'", event.getKey()); + } + processVersionTag(oldRe, event); + try { + oldRe.setValue(owner, oldRe.getValueInVM(owner)); // OFFHEAP noop setting an already invalid to invalid; No need to call prepareValueForCache since it is an invalid token. + } catch (RegionClearedException e) { + // that's okay - when writing an invalid into a disk, the + // region has been cleared (including this token) + } + } else { + owner.serverInvalidate(event); + if (owner.concurrencyChecksEnabled && event.noVersionReceivedFromServer()) { + // server did not perform the invalidation, so don't leave an invalid + // entry here + return false; + } + final int oldSize = owner.calculateRegionEntryValueSize(oldRe); + //added for cq which needs old value. rdubey + FilterProfile fp = owner.getFilterProfile(); + if (!oldRe.isRemoved() && + (fp != null && fp.getCqCount() > 0)) { + + Object oldValue = oldRe.getValueInVM(owner); // OFFHEAP EntryEventImpl oldValue + + // this will not fault in the value. + if (oldValue == Token.NOT_AVAILABLE){ + event.setOldValue(oldRe.getValueOnDiskOrBuffer(owner)); } else { - owner.updateSizeOnCreate(event.getKey(), event.getNewValueBucketSize()); - isCreate = true; + event.setOldValue(oldValue); } - } else { - processVersionTag(oldRe, event); - event.putExistingEntry(owner, oldRe); + } + boolean isCreate = false; + try { + if (oldRe.isRemoved()) { + processVersionTag(oldRe, event); + event.putNewEntry(owner, oldRe); + EntryLogger.logInvalidate(event); + owner.recordEvent(event); + if (!oldRe.isTombstone()) { + owner.updateSizeOnPut(event.getKey(), oldSize, event.getNewValueBucketSize()); + } else { + owner.updateSizeOnCreate(event.getKey(), event.getNewValueBucketSize()); + isCreate = true; + } + } else { + processVersionTag(oldRe, event); + event.putExistingEntry(owner, oldRe); + EntryLogger.logInvalidate(event); + owner.recordEvent(event); + owner.updateSizeOnPut(event.getKey(), oldSize, event.getNewValueBucketSize()); + } + } + catch (RegionClearedException e) { + // generate versionTag for the event EntryLogger.logInvalidate(event); owner.recordEvent(event); - owner.updateSizeOnPut(event.getKey(), oldSize, event.getNewValueBucketSize()); + clearOccured = true; } + owner.basicInvalidatePart2(oldRe, event, + clearOccured /* conflict with clear */, invokeCallbacks); + if (!clearOccured) { + if (isCreate) { + lruEntryCreate(oldRe); + } else { + lruEntryUpdate(oldRe); + } + } + didInvalidate = true; + invalidatedRe = oldRe; } - catch (RegionClearedException e) { - // generate versionTag for the event - EntryLogger.logInvalidate(event); - owner.recordEvent(event); - clearOccured = true; - } - owner.basicInvalidatePart2(oldRe, event, - clearOccured /* conflict with clear */, invokeCallbacks); - if (!clearOccured) { - if (isCreate) { - lruEntryCreate(oldRe); - } else { - lruEntryUpdate(oldRe); - } - } - didInvalidate = true; - invalidatedRe = oldRe; + } + } // synchronized oldRe + } // while oldRe exists + + if (!opCompleted) { + if (forceNewEntry && event.isFromServer()) { + // don't invoke listeners - we didn't force new entries for + // CCU invalidations before 7.0, and listeners don't care + if (!FORCE_INVALIDATE_EVENT) { + event.inhibitCacheListenerNotification(true); } } - } // synchronized oldRe - } // while oldRe exists - - if (!opCompleted) { - if (forceNewEntry && event.isFromServer()) { - // don't invoke listeners - we didn't force new entries for - // CCU invalidations before 7.0, and listeners don't care - if (!FORCE_INVALIDATE_EVENT) { - event.inhibitCacheListenerNotification(true); + event.setRegionEntry(newRe); + owner.serverInvalidate(event); + if (!forceNewEntry && event.noVersionReceivedFromServer()) { + // server did not perform the invalidation, so don't leave an invalid + // entry here + return false; } - } - event.setRegionEntry(newRe); - owner.serverInvalidate(event); - if (!forceNewEntry && event.noVersionReceivedFromServer()) { - // server did not perform the invalidation, so don't leave an invalid - // entry here - return false; - } - try { - ownerIsInitialized = owner.isInitialized(); - if (!ownerIsInitialized && owner.getDataPolicy().withReplication()) { - final int oldSize = owner.calculateRegionEntryValueSize(newRe); - invalidateEntry(event, newRe, oldSize); + try { + ownerIsInitialized = owner.isInitialized(); + if (!ownerIsInitialized && owner.getDataPolicy().withReplication()) { + final int oldSize = owner.calculateRegionEntryValueSize(newRe); + invalidateEntry(event, newRe, oldSize); + } + else { + invalidateNewEntry(event, owner, newRe); + } } - else { - invalidateNewEntry(event, owner, newRe); + catch (RegionClearedException e) { + // TODO: deltaGII: do we even need RegionClearedException? + // generate versionTag for the event + owner.recordEvent(event); + clearOccured = true; } + owner.basicInvalidatePart2(newRe, event, clearOccured /*conflict with clear*/, invokeCallbacks); + if (!clearOccured) { + lruEntryCreate(newRe); + incEntryCount(1); + } + opCompleted = true; + didInvalidate = true; + invalidatedRe = newRe; + // Don't leave an entry in the cache, if we + // just wanted to force the distribution and events + // for this invalidate + if (!forceNewEntry) { + removeEntry(event.getKey(), newRe, false); + } + } // !opCompleted + } catch (ConcurrentCacheModificationException ccme) { + VersionTag tag = event.getVersionTag(); + if (tag != null && tag.isTimeStampUpdated()) { + // Notify gateways of new time-stamp. + owner.notifyTimestampsToGateways(event); } - catch (RegionClearedException e) { - // TODO: deltaGII: do we even need RegionClearedException? - // generate versionTag for the event - owner.recordEvent(event); - clearOccured = true; - } - owner.basicInvalidatePart2(newRe, event, clearOccured /*conflict with clear*/, invokeCallbacks); - if (!clearOccured) { - lruEntryCreate(newRe); - incEntryCount(1); - } - opCompleted = true; - didInvalidate = true; - invalidatedRe = newRe; - // Don't leave an entry in the cache, if we - // just wanted to force the distribution and events - // for this invalidate - if (!forceNewEntry) { + throw ccme; + } finally { + if (!opCompleted) { removeEntry(event.getKey(), newRe, false); - } - } // !opCompleted - } catch (ConcurrentCacheModificationException ccme) { - VersionTag tag = event.getVersionTag(); - if (tag != null && tag.isTimeStampUpdated()) { - // Notify gateways of new time-stamp. - owner.notifyTimestampsToGateways(event); - } - throw ccme; - } finally { - if (!opCompleted) { - removeEntry(event.getKey(), newRe, false); - } - } - } // synchronized newRe - } // forceNewEntry - else { // !forceNewEntry - boolean retry = true; - // RegionEntry retryEntry = null; - // int retries = -1; - - while (retry) { - retry = false; - boolean entryExisted = false; - RegionEntry re = getEntry(event.getKey()); - RegionEntry tombstone = null; - boolean haveTombstone = false; - if (re != null && re.isTombstone()) { - tombstone = re; - haveTombstone = true; - re = null; - } - if (re == null) { - ownerIsInitialized = owner.isInitialized(); - if (!ownerIsInitialized) { - // when GII message arrived or processed later than invalidate - // message, the entry should be created as placeholder - RegionEntry newRe = haveTombstone? tombstone : getEntryFactory().createEntry(owner, event.getKey(), - Token.INVALID); - synchronized (newRe) { - if (haveTombstone && !tombstone.isTombstone()) { - // state of the tombstone has changed so we need to retry - retry = true; - //retryEntry = tombstone; // leave this in place for debugging - continue; - } - re = putEntryIfAbsent(event.getKey(), newRe); - if (re == tombstone) { - re = null; // pretend we don't have an entry } } - } else if (owner.getServerProxy() != null) { - Object sync = haveTombstone? tombstone : new Object(); - synchronized(sync) { - if (haveTombstone && !tombstone.isTombstone()) { - // bug 45295: state of the tombstone has changed so we need to retry - retry = true; - //retryEntry = tombstone; // leave this in place for debugging - continue; - } - - // bug #43287 - send event to server even if it's not in the client (LRU may have evicted it) - owner.serverInvalidate(event); - if (owner.concurrencyChecksEnabled) { - if (event.getVersionTag() == null) { - // server did not perform the invalidation, so don't leave an invalid - // entry here - return false; - } else if (tombstone != null) { - processVersionTag(tombstone, event); - try { - if (!tombstone.isTombstone()) { - if (isDebugEnabled) { - logger.debug("tombstone is no longer a tombstone. {}:event={}", tombstone, event); + } // synchronized newRe + } // forceNewEntry + else { // !forceNewEntry + boolean retry = true; + // RegionEntry retryEntry = null; + // int retries = -1; + + while (retry) { + retry = false; + boolean entryExisted = false; + RegionEntry re = getEntry(event.getKey()); + RegionEntry tombstone = null; + boolean haveTombstone = false; + if (re != null && re.isTombstone()) { + tombstone = re; + haveTombstone = true; + re = null; + } + if (re == null) { + ownerIsInitialized = owner.isInitialized(); + if (!ownerIsInitialized) { + // when GII message arrived or processed later than invalidate + // message, the entry should be created as placeholder + RegionEntry newRe = haveTombstone? tombstone : getEntryFactory().createEntry(owner, event.getKey(), + Token.INVALID); + synchronized (newRe) { + if (haveTombstone && !tombstone.isTombstone()) { + // state of the tombstone has changed so we need to retry + retry = true; + //retryEntry = tombstone; // leave this in place for debugging + continue; + } + re = putEntryIfAbsent(event.getKey(), newRe); + if (re == tombstone) { + re = null; // pretend we don't have an entry + } + } + } else if (owner.getServerProxy() != null) { + Object sync = haveTombstone? tombstone : new Object(); + synchronized(sync) { + if (haveTombstone && !tombstone.isTombstone()) { + // bug 45295: state of the tombstone has changed so we need to retry + retry = true; + //retryEntry = tombstone; // leave this in place for debugging + continue; + } + + // bug #43287 - send event to server even if it's not in the client (LRU may have evicted it) + owner.serverInvalidate(event); + if (owner.concurrencyChecksEnabled) { + if (event.getVersionTag() == null) { + // server did not perform the invalidation, so don't leave an invalid + // entry here + return false; + } else if (tombstone != null) { + processVersionTag(tombstone, event); + try { + if (!tombstone.isTombstone()) { + if (isDebugEnabled) { + logger.debug("tombstone is no longer a tombstone. {}:event={}", tombstone, event); + } + } + tombstone.setValue(owner, Token.TOMBSTONE); + } catch (RegionClearedException e) { + // that's okay - when writing a tombstone into a disk, the + // region has been cleared (including this tombstone) + } catch (ConcurrentCacheModificationException ccme) { + VersionTag tag = event.getVersionTag(); + if (tag != null && tag.isTimeStampUpdated()) { + // Notify gateways of new time-stamp. + owner.notifyTimestampsToGateways(event); + } + throw ccme; } + // update the tombstone's version to prevent an older CCU/putAll from overwriting it + owner.rescheduleTombstone(tombstone, event.getVersionTag()); } - tombstone.setValue(owner, Token.TOMBSTONE); - } catch (RegionClearedException e) { - // that's okay - when writing a tombstone into a disk, the - // region has been cleared (including this tombstone) - } catch (ConcurrentCacheModificationException ccme) { - VersionTag tag = event.getVersionTag(); - if (tag != null && tag.isTimeStampUpdated()) { - // Notify gateways of new time-stamp. - owner.notifyTimestampsToGateways(event); - } - throw ccme; } - // update the tombstone's version to prevent an older CCU/putAll from overwriting it - owner.rescheduleTombstone(tombstone, event.getVersionTag()); } + entryExisted = true; } } - entryExisted = true; - } - } - if (re != null) { - // Gester: Race condition in GII - // when adding the placeholder for invalidate entry during GII, - // if the GII got processed earlier for this entry, then do - // normal invalidate operation - synchronized (re) { - if (!event.isOriginRemote() && event.getOperation().isExpiration()) { - // If this expiration started locally then only do it if the RE is not being used by a tx. - if (re.isInUseByTransaction()) { - return false; - } - } - if (re.isTombstone() || (!re.isRemoved() && !re.isDestroyed())) { - entryExisted = true; - if (re.isInvalid()) { - // was already invalid, do not invoke listeners or increment - // stat - if (isDebugEnabled) { - logger.debug("Invalidate: Entry already invalid: '{}'", event.getKey()); - } - if (event.getVersionTag() != null && owner.getVersionVector() != null) { - owner.getVersionVector().recordVersion((InternalDistributedMember) event.getDistributedMember(), event.getVersionTag()); - } - } - else { // previous value not invalid - event.setRegionEntry(re); - owner.serverInvalidate(event); - if (owner.concurrencyChecksEnabled && event.noVersionReceivedFromServer()) { - // server did not perform the invalidation, so don't leave an invalid - // entry here - if (isDebugEnabled) { - logger.debug("returning early because server did not generate a version stamp for this event:{}", event); - } - return false; - } - // in case of overflow to disk we need the old value for cqs. - if(owner.getFilterProfile().getCqCount() > 0){ - //use to be getValue and can cause dead lock rdubey. - if (re.isValueNull()) { - event.setOldValue(re.getValueOnDiskOrBuffer(owner)); - } else { - Object v = re.getValueInVM(owner); - event.setOldValue(v); // OFFHEAP escapes to EntryEventImpl oldValue + if (re != null) { + // Gester: Race condition in GII + // when adding the placeholder for invalidate entry during GII, + // if the GII got processed earlier for this entry, then do + // normal invalidate operation + synchronized (re) { + if (!event.isOriginRemote() && event.getOperation().isExpiration()) { + // If this expiration started locally then only do it if the RE is not being used by a tx. + if (re.isInUseByTransaction()) { + return false; } } - final boolean oldWasTombstone = re.isTombstone(); - final int oldSize = _getOwner().calculateRegionEntryValueSize(re); - try { - invalidateEntry(event, re, oldSize); - } - catch (RegionClearedException rce) { - // generate versionTag for the event - EntryLogger.logInvalidate(event); - _getOwner().recordEvent(event); - clearOccured = true; - } catch (ConcurrentCacheModificationException ccme) { - VersionTag tag = event.getVersionTag(); - if (tag != null && tag.isTimeStampUpdated()) { - // Notify gateways of new time-stamp. - owner.notifyTimestampsToGateways(event); + if (re.isTombstone() || (!re.isRemoved() && !re.isDestroyed())) { + entryExisted = true; + if (re.isInvalid()) { + // was already invalid, do not invoke listeners or increment + // stat + if (isDebugEnabled) { + logger.debug("Invalidate: Entry already invalid: '{}'", event.getKey()); + } + if (event.getVersionTag() != null && owner.getVersionVector() != null) { + owner.getVersionVector().recordVersion((InternalDistributedMember) event.getDistributedMember(), event.getVersionTag()); + } } - throw ccme; + else { // previous value not invalid + event.setRegionEntry(re); + owner.serverInvalidate(event); + if (owner.concurrencyChecksEnabled && event.noVersionReceivedFromServer()) { + // server did not perform the invalidation, so don't leave an invalid + // entry here + if (isDebugEnabled) { + logger.debug("returning early because server did not generate a version stamp for this event:{}", event); + } + return false; + } + // in case of overflow to disk we need the old value for cqs. + if(owner.getFilterProfile().getCqCount() > 0){ + //use to be getValue and can cause dead lock rdubey. + if (re.isValueNull()) { + event.setOldValue(re.getValueOnDiskOrBuffer(owner)); + } else { + Object v = re.getValueInVM(owner); + event.setOldValue(v); // OFFHEAP escapes to EntryEventImpl oldValue + } + } + final boolean oldWasTombstone = re.isTombstone(); + final int oldSize = _getOwner().calculateRegionEntryValueSize(re); + try { + invalidateEntry(event, re, oldSize); + } + catch (RegionClearedException rce) { + // generate versionTag for the event + EntryLogger.logInvalidate(event); + _getOwner().recordEvent(event); + clearOccured = true; + } catch (ConcurrentCacheModificationException ccme) { + VersionTag tag = event.getVersionTag(); + if (tag != null && tag.isTimeStampUpdated()) { + // Notify gateways of new time-stamp. + owner.notifyTimestampsToGateways(event); + } + throw ccme; + } + owner.basicInvalidatePart2(re, event, + clearOccured /* conflict with clear */, invokeCallbacks); + if (!clearOccured) { + if (oldWasTombstone) { + lruEntryCreate(re); + } else { + lruEntryUpdate(re); + } + } + didInvalidate = true; + invalidatedRe = re; + } // previous value not invalid } - owner.basicInvalidatePart2(re, event, - clearOccured /* conflict with clear */, invokeCallbacks); - if (!clearOccured) { - if (oldWasTombstone) { - lruEntryCreate(re); - } else { - lruEntryUpdate(re); - } - } - didInvalidate = true; - invalidatedRe = re; - } // previous value not invalid + } // synchronized re + } // re != null + else { + // At this point, either it's not in GII mode, or the placeholder + // is in region, do nothing } - } // synchronized re - } // re != null - else { - // At this point, either it's not in GII mode, or the placeholder - // is in region, do nothing - } - if (!entryExisted) { - owner.checkEntryNotFound(event.getKey()); - } - } // while(retry) - } // !forceNewEntry - } catch( DiskAccessException dae) { - invalidatedRe = null; - didInvalidate = false; - this._getOwner().handleDiskAccessException(dae); - throw dae; - } finally { - releaseCacheModificationLock(owner, event); - if (oqlIndexManager != null) { - oqlIndexManager.countDownIndexUpdaters(); - } - if (invalidatedRe != null) { - owner.basicInvalidatePart3(invalidatedRe, event, invokeCallbacks); - } - if (didInvalidate && !clearOccured) { - try { - lruUpdateCallback(); + if (!entryExisted) { + owner.checkEntryNotFound(event.getKey()); + } + } // while(retry) + } // !forceNewEntry } catch( DiskAccessException dae) { + invalidatedRe = null; + didInvalidate = false; this._getOwner().handleDiskAccessException(dae); throw dae; + } finally { + if (oqlIndexManager != null) { + oqlIndexManager.countDownIndexUpdaters(); + } + if (invalidatedRe != null) { + owner.basicInvalidatePart3(invalidatedRe, event, invokeCallbacks); + } + if (didInvalidate && !clearOccured) { + try { + lruUpdateCallback(); + } catch( DiskAccessException dae) { + this._getOwner().handleDiskAccessException(dae); + throw dae; + } + } + else if (!didInvalidate){ + resetThreadLocals(); + } + } + return didInvalidate; + } finally { + if (ownerIsInitialized) { + forceInvalidateEvent(event, owner); } } - else if (!didInvalidate){ - resetThreadLocals(); - } - } - return didInvalidate; } finally { - if (ownerIsInitialized) { - forceInvalidateEvent(event, owner); - } + releaseCacheModificationLock(owner, event); } + } protected void invalidateNewEntry(EntryEventImpl event, @@ -2682,27 +2679,30 @@ public abstract class AbstractRegionMap implements RegionMap { boolean uninitialized = !owner.isInitialized(); boolean retrieveOldValueForDelta = event.getDeltaBytes() != null && event.getRawNewValue() == null; - lockForCacheModification(owner, event); IndexManager oqlIndexManager = null; + lockForCacheModification(owner, event); try { - // Fix for Bug #44431. We do NOT want to update the region and wait - // later for index INIT as region.clear() can cause inconsistency if - // happened in parallel as it also does index INIT. - oqlIndexManager = owner.getIndexManager() ; - if (oqlIndexManager != null) { - oqlIndexManager.waitForIndexInit(); - } + try { + // Fix for Bug #44431. We do NOT want to update the region and wait + // later for index INIT as region.clear() can cause inconsistency if + // happened in parallel as it also does index INIT. + oqlIndexManager = owner.getIndexManager() ; + if (oqlIndexManager != null) { + oqlIndexManager.waitForIndexInit(); + } - // fix for bug #42169, replace must go to server if entry not on client - boolean replaceOnClient = event.getOperation() == Operation.REPLACE - && owner.getServerProxy() != null; + // fix for bug #42169, replace must go to server if entry not on client + boolean replaceOnClient = event.getOperation() == Operation.REPLACE + && owner.getServerProxy() != null; // Rather than having two different blocks for synchronizing oldRe // and newRe, have only one block and synchronize re RegionEntry re = null; boolean eventRecorded = false; boolean onlyExisting = ifOld && !replaceOnClient; - re = getOrCreateRegionEntry(owner, event, - Token.REMOVED_PHASE1, null, onlyExisting, false); + + re = getOrCreateRegionEntry(owner, event, + + Token.REMOVED_PHASE1, null, onlyExisting, false); if (re == null) { return null; } @@ -2712,9 +2712,8 @@ public abstract class AbstractRegionMap implements RegionMap { // from the map. otherwise we can append an event to it // and change its state if (re.isRemovedPhase2()) { - re = getOrCreateRegionEntry(owner, event, - Token.REMOVED_PHASE1, null, onlyExisting, false); - _getOwner().getCachePerfStats().incRetries(); + re = getOrCreateRegionEntry(owner, event, Token.REMOVED_PHASE1, null, onlyExisting, false); + _getOwner().getCachePerfStats().incRetries(); if (re == null) { // this will happen when onlyExisting is true return null; @@ -2757,48 +2756,48 @@ public abstract class AbstractRegionMap implements RegionMap { // notify index of an update notifyIndex(re, true); + try { try { - try { - if ((cacheWrite && event.getOperation().isUpdate()) // if there is a cacheWriter, type of event has already been set - || !re.isRemoved() - || replaceOnClient) { - // update - updateEntry(event, requireOldValue, oldValueForDelta, re); - } else { - // create - createEntry(event, owner, re); - } - owner.recordEvent(event); - eventRecorded = true; - } catch (RegionClearedException rce) { - clearOccured = true; - owner.recordEvent(event); - } catch (ConcurrentCacheModificationException ccme) { - VersionTag tag = event.getVersionTag(); - if (tag != null && tag.isTimeStampUpdated()) { - // Notify gateways of new time-stamp. - owner.notifyTimestampsToGateways(event); - } - throw ccme; + if ((cacheWrite && event.getOperation().isUpdate()) // if there is a cacheWriter, type of event has already been set + || !re.isRemoved() + || replaceOnClient) { + // update + updateEntry(event, requireOldValue, oldValueForDelta, re); + } else { + // create + createEntry(event, owner, re); } - if (uninitialized) { - event.inhibitCacheListenerNotification(true); + owner.recordEvent(event); + eventRecorded = true; + } catch (RegionClearedException rce) { + clearOccured = true; + owner.recordEvent(event); + } catch (ConcurrentCacheModificationException ccme) { + VersionTag tag = event.getVersionTag(); + if (tag != null && tag.isTimeStampUpdated()) { + // Notify gateways of new time-stamp. + owner.notifyTimestampsToGateways(event); } - updateLru(clearOccured, re, event); - - lastModifiedTime = owner.basicPutPart2(event, re, - !uninitialized, lastModifiedTime, clearOccured); - } finally { - notifyIndex(re, false); + throw ccme; } - result = re; - break; - } finally { - OffHeapHelper.release(oldValueForDelta); - if (re != null && !onlyExisting && !isOpComplete(re, event)) { - owner.cleanUpOnIncompleteOp(event, re); + if (uninitialized) { + event.inhibitCacheListenerNotification(true); } - else if (re != null && owner.isUsedForPartitionedRegionBucket()) { + updateLru(clearOccured, re, event); + + lastModifiedTime = owner.basicPutPart2(event, re, + !uninitialized, lastModifiedTime, clearOccured); + } finally { + notifyIndex(re, false); + } + result = re; + break; + } finally { + OffHeapHelper.release(oldValueForDelta); + if (re != null && !onlyExisting && !isOpComplete(re, event)) { + owner.cleanUpOnIncompleteOp(event, re); + } + else if (re != null && owner.isUsedForPartitionedRegionBucket()) { BucketRegion br = (BucketRegion)owner; CachePerfStats stats = br.getPartitionedRegion().getCachePerfStats(); } @@ -2806,14 +2805,13 @@ public abstract class AbstractRegionMap implements RegionMap { } } // sync re }// end while - } catch (DiskAccessException dae) { - //Asif:Feel that it is safe to destroy the region here as there appears - // to be no chance of deadlock during region destruction - result = null; - this._getOwner().handleDiskAccessException(dae); - throw dae; - } finally { - releaseCacheModificationLock(owner, event); + } catch (DiskAccessException dae) { + //Asif:Feel that it is safe to destroy the region here as there appears + // to be no chance of deadlock during region destruction + result = null; + this._getOwner().handleDiskAccessException(dae); + throw dae; + } finally { if (oqlIndexManager != null) { oqlIndexManager.countDownIndexUpdaters(); } @@ -2841,8 +2839,10 @@ public abstract class AbstractRegionMap implements RegionMap { } else { resetThreadLocals(); } - } // finally - + } + } finally { + releaseCacheModificationLock(owner, event); + } return result; } @@ -3632,45 +3632,69 @@ public abstract class AbstractRegionMap implements RegionMap { /** get version-generation permission from the region's version vector */ - private void lockForCacheModification(LocalRegion owner, EntryEventImpl event) { + void lockForCacheModification(LocalRegion owner, EntryEventImpl event) { boolean lockedByBulkOp = event.isBulkOpInProgress() && owner.dataPolicy.withReplication(); - if (!event.isOriginRemote() && !lockedByBulkOp) { + + if(armLockTestHook!=null) armLockTestHook.beforeLock(owner, event); + + if (!event.isOriginRemote() && !lockedByBulkOp && !owner.hasServerProxy()) { RegionVersionVector vector = owner.getVersionVector(); if (vector != null) { - vector.lockForCacheModification(owner); + vector.lockForCacheModification(); } } + + if(armLockTestHook!=null) armLockTestHook.afterLock(owner, event); + } /** release version-generation permission from the region's version vector */ - private void releaseCacheModificationLock(LocalRegion owner, EntryEventImpl event) { + void releaseCacheModificationLock(LocalRegion owner, EntryEventImpl event) { boolean lockedByBulkOp = event.isBulkOpInProgress() && owner.dataPolicy.withReplication(); - if (!event.isOriginRemote() && !lockedByBulkOp) { + + if(armLockTestHook!=null) armLockTestHook.beforeRelease(owner, event); + + if (!event.isOriginRemote() && !lockedByBulkOp && !owner.hasServerProxy()) { RegionVersionVector vector = owner.getVersionVector(); if (vector != null) { - vector.releaseCacheModificationLock(owner); + vector.releaseCacheModificationLock(); } } + + if(armLockTestHook!=null) armLockTestHook.afterRelease(owner, event); + } /** get version-generation permission from the region's version vector */ private void lockForTXCacheModification(LocalRegion owner, VersionTag tag) { + + if(armLockTestHook!=null) armLockTestHook.beforeLock(owner, null); + if ( !(tag != null && tag.isFromOtherMember()) ) { RegionVersionVector vector = owner.getVersionVector(); - if (vector != null) { - vector.lockForCacheModification(owner); + if (vector != null && !owner.hasServerProxy()) { + vector.lockForCacheModification(); } } + + if(armLockTestHook!=null) armLockTestHook.afterLock(owner, null); + } /** release version-generation permission from the region's version vector */ private void releaseTXCacheModificationLock(LocalRegion owner, VersionTag tag) { + + if(armLockTestHook!=null) armLockTestHook.beforeRelease(owner, null); + if ( !(tag != null && tag.isFromOtherMember()) ) { RegionVersionVector vector = owner.getVersionVector(); - if (vector != null) { - vector.releaseCacheModificationLock(owner); + if (vector != null && !owner.hasServerProxy()) { + vector.releaseCacheModificationLock(); } } + + if(armLockTestHook!=null) armLockTestHook.afterRelease(owner, null); + } /** @@ -3798,5 +3822,30 @@ public abstract class AbstractRegionMap implements RegionMap { } return true; } + + public interface ARMLockTestHook { + public void beforeBulkLock(LocalRegion region); + public void afterBulkLock(LocalRegion region); + public void beforeBulkRelease(LocalRegion region); + public void afterBulkRelease(LocalRegion region); + + public void beforeLock(LocalRegion region, CacheEvent event); + public void afterLock(LocalRegion region, CacheEvent event); + public void beforeRelease(LocalRegion region, CacheEvent event); + public void afterRelease(LocalRegion region, CacheEvent event); + + public void beforeStateFlushWait(); + } + + private ARMLockTestHook armLockTestHook; + + public ARMLockTestHook getARMLockTestHook() { + return armLockTestHook; + } + + public void setARMLockTestHook(ARMLockTestHook theHook) { + armLockTestHook = theHook; + } + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3ea7dde0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java index f1627d3..4e4c417 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java @@ -1268,26 +1268,32 @@ implements Bucket Assert.assertTrue(!isTX()); Assert.assertTrue(event.getOperation().isDistributed()); - beginLocalWrite(event); + LocalRegion lr = event.getLocalRegion(); + AbstractRegionMap arm = ((AbstractRegionMap) lr.getRegionMap()); try { - - if (!hasSeenEvent(event)) { - this.entries.updateEntryVersion(event); - } else { - if (logger.isTraceEnabled(LogMarker.DM)) { - logger.trace(LogMarker.DM, "BR.basicUpdateEntryVersion: this cache has already seen this event {}", event); + arm.lockForCacheModification(lr, event); + beginLocalWrite(event); + try { + if (!hasSeenEvent(event)) { + this.entries.updateEntryVersion(event); + } else { + if (logger.isTraceEnabled(LogMarker.DM)) { + logger.trace(LogMarker.DM, "BR.basicUpdateEntryVersion: this cache has already seen this event {}", event); + } } + if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) { + // This cache has processed the event, forward operation + // and event messages to backup buckets + if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) { + distributeUpdateEntryVersionOperation(event); + } + } + return; + } finally { + endLocalWrite(event); } - if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) { - // This cache has processed the event, forward operation - // and event messages to backup buckets - if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) { - distributeUpdateEntryVersionOperation(event); - } - } - return; } finally { - endLocalWrite(event); + arm.releaseCacheModificationLock(event.getLocalRegion(), event); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3ea7dde0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java index 5526ef0..b6e1c35 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java @@ -219,7 +219,7 @@ public class DistributedClearOperation extends DistributedCacheOperation case OP_LOCK_FOR_CLEAR: if (region.getDataPolicy().withStorage()) { DistributedClearOperation.regionLocked(this.getSender(), region.getFullPath(), region); - region.lockLocallyForClear(dm, this.getSender()); + region.lockLocallyForClear(dm, this.getSender(), event); } this.appliedOperation = true; break; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3ea7dde0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java index b42a617..a7b82ac 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java @@ -41,6 +41,7 @@ import com.gemstone.gemfire.distributed.internal.locks.DLockService; import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; import com.gemstone.gemfire.i18n.StringId; import com.gemstone.gemfire.internal.Assert; +import com.gemstone.gemfire.internal.cache.AbstractRegionMap.ARMLockTestHook; import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile; import com.gemstone.gemfire.internal.cache.InitialImageOperation.GIIStatus; import com.gemstone.gemfire.internal.cache.RemoteFetchVersionMessage.FetchVersionResponse; @@ -1966,20 +1967,25 @@ public class DistributedRegion extends LocalRegion implements } } - @Override void basicUpdateEntryVersion(EntryEventImpl event) throws EntryNotFoundException { - + LocalRegion lr = event.getLocalRegion(); + AbstractRegionMap arm = ((AbstractRegionMap) lr.getRegionMap()); try { - if (!hasSeenEvent(event)) { - super.basicUpdateEntryVersion(event); + arm.lockForCacheModification(lr, event); + try { + if (!hasSeenEvent(event)) { + super.basicUpdateEntryVersion(event); + } + return; + } finally { + if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) { + distributeUpdateEntryVersion(event); + } } - return; } finally { - if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) { - distributeUpdateEntryVersion(event); - } + arm.releaseCacheModificationLock(lr, event); } } @@ -2110,33 +2116,47 @@ public class DistributedRegion extends LocalRegion implements * @param participants **/ private void obtainWriteLocksForClear(RegionEventImpl regionEvent, Set<InternalDistributedMember> participants) { - lockLocallyForClear(getDistributionManager(), getMyId()); + lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent); DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants); } /** pause local operations so that a clear() can be performed and flush comm channels to the given member */ - public void lockLocallyForClear(DM dm, InternalDistributedMember locker) { + public void lockLocallyForClear(DM dm, InternalDistributedMember locker, CacheEvent event) { RegionVersionVector rvv = getVersionVector(); + + ARMLockTestHook alth = getRegionMap().getARMLockTestHook(); + if(alth!=null) alth.beforeLock(this, event); + if (rvv != null) { // block new operations from being applied to the region map rvv.lockForClear(getFullPath(), dm, locker); //Check for region destroyed after we have locked, to make sure //we don't continue a clear if the region has been destroyed. checkReadiness(); - // wait for current operations to - if (!locker.equals(dm.getDistributionManagerId())) { + // Only need to flush if NOACK at this point + if (this.getAttributes().getScope().isDistributedNoAck()) { Set<InternalDistributedMember> mbrs = getDistributionAdvisor().adviseCacheOp(); StateFlushOperation.flushTo(mbrs, this); - } + } } + + if(alth!=null) alth.afterLock(this, null); + } /** releases the locks obtained in obtainWriteLocksForClear * @param participants */ private void releaseWriteLocksForClear(RegionEventImpl regionEvent, Set<InternalDistributedMember> participants) { + + ARMLockTestHook alth = getRegionMap().getARMLockTestHook(); + if(alth!=null) alth.beforeRelease(this, regionEvent); + getVersionVector().unlockForClear(getMyId()); DistributedClearOperation.releaseLocks(regionEvent, participants); + + if(alth!=null) alth.afterRelease(this, regionEvent); + } /** http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3ea7dde0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java index 73dc5ab..304a5ad 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java @@ -157,6 +157,7 @@ import com.gemstone.gemfire.internal.InternalStatisticsDisabledException; import com.gemstone.gemfire.internal.NanoTimer; import com.gemstone.gemfire.internal.Version; import com.gemstone.gemfire.internal.admin.ClientHealthMonitoringRegion; +import com.gemstone.gemfire.internal.cache.AbstractRegionMap.ARMLockTestHook; import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile; import com.gemstone.gemfire.internal.cache.DiskInitFile.DiskRegionFlag; import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo; @@ -10198,141 +10199,144 @@ public class LocalRegion extends AbstractRegion } lockRVVForBulkOp(); try { + try { - final DistributedPutAllOperation dpao = putAllOp; - int size = (proxyResult == null)? map.size() : proxyResult.size(); - - if (isDebugEnabled) { - logger.debug( "size of put result is {} maps is {} proxyResult is {}", size, map, proxyResult); - } + final DistributedPutAllOperation dpao = putAllOp; + int size = (proxyResult == null)? map.size() : proxyResult.size(); - final PutAllPartialResult partialKeys = new PutAllPartialResult(size); - final Iterator iterator; - final boolean isVersionedResults; - if (proxyResult != null) { - iterator = proxyResult.iterator(); - isVersionedResults = true; - } else { - iterator = map.entrySet().iterator(); - isVersionedResults = false; - } - Runnable r = new Runnable() { - public void run() { - int offset = 0; - VersionTagHolder tagHolder = new VersionTagHolder(); - while (iterator.hasNext()) { - stopper.checkCancelInProgress(null); - Map.Entry mapEntry = (Map.Entry)iterator.next(); - Object key = mapEntry.getKey(); - VersionTag versionTag = null; - tagHolder.setVersionTag(null); - final Object value; - boolean overwritten = false; - if (isVersionedResults) { - versionTag = ((VersionedObjectList.Entry)mapEntry).getVersionTag(); - value = map.get(key); - if (isDebugEnabled) { - logger.debug("putAll key {} -> {} version={}", key, value, versionTag); - } - if (versionTag == null && serverIsVersioned && concurrencyChecksEnabled && dataPolicy.withStorage()) { - // server was unable to determine the version for this operation. - // I'm not sure this can still happen as described below on a pr. - // But it can happen on the server if NORMAL or PRELOADED. See bug 51644. - // This can happen in a PR with redundancy if there is a bucket - // failure or migration during the operation. We destroy the - // entry since we don't know what its state should be (but the server should) + if (isDebugEnabled) { + logger.debug( "size of put result is {} maps is {} proxyResult is {}", size, map, proxyResult); + } + + final PutAllPartialResult partialKeys = new PutAllPartialResult(size); + final Iterator iterator; + final boolean isVersionedResults; + if (proxyResult != null) { + iterator = proxyResult.iterator(); + isVersionedResults = true; + } else { + iterator = map.entrySet().iterator(); + isVersionedResults = false; + } + Runnable r = new Runnable() { + public void run() { + int offset = 0; + VersionTagHolder tagHolder = new VersionTagHolder(); + while (iterator.hasNext()) { + stopper.checkCancelInProgress(null); + Map.Entry mapEntry = (Map.Entry)iterator.next(); + Object key = mapEntry.getKey(); + VersionTag versionTag = null; + tagHolder.setVersionTag(null); + final Object value; + boolean overwritten = false; + if (isVersionedResults) { + versionTag = ((VersionedObjectList.Entry)mapEntry).getVersionTag(); + value = map.get(key); if (isDebugEnabled) { - logger.debug("server returned no version information for {}", key); + logger.debug("putAll key {} -> {} version={}", key, value, versionTag); } - localDestroyNoCallbacks(key); - // to be consistent we need to fetch the current entry - get(key, event.getCallbackArgument(), false, null); - overwritten = true; - } - } else { - value = mapEntry.getValue(); - if (isDebugEnabled) { - logger.debug("putAll {} -> {}", key, value); - } - } - try { - if (serverIsVersioned) { + if (versionTag == null && serverIsVersioned && concurrencyChecksEnabled && dataPolicy.withStorage()) { + // server was unable to determine the version for this operation. + // I'm not sure this can still happen as described below on a pr. + // But it can happen on the server if NORMAL or PRELOADED. See bug 51644. + // This can happen in a PR with redundancy if there is a bucket + // failure or migration during the operation. We destroy the + // entry since we don't know what its state should be (but the server should) + if (isDebugEnabled) { + logger.debug("server returned no version information for {}", key); + } + localDestroyNoCallbacks(key); + // to be consistent we need to fetch the current entry + get(key, event.getCallbackArgument(), false, null); + overwritten = true; + } + } else { + value = mapEntry.getValue(); if (isDebugEnabled) { - logger.debug("associating version tag with {} version={}", key, versionTag); + logger.debug("putAll {} -> {}", key, value); } - //If we have received a version tag from a server, add it to the event - tagHolder.setVersionTag(versionTag); - tagHolder.setFromServer(true); - } else if (retryVersions != null && retryVersions.containsKey(key)) { - //If this is a retried event, and we have a version tag for the retry, - //add it to the event. - tagHolder.setVersionTag(retryVersions.get(key)); - } - - if (!overwritten) { - basicEntryPutAll(key, value, dpao, offset, tagHolder); } - // now we must check again since the cache may have closed during - // distribution (causing this process to not receive and queue the - // event for clients - stopper.checkCancelInProgress(null); - succeeded.addKeyAndVersion(key, tagHolder.getVersionTag()); - } - catch (Exception ex) { - // TODO ask Gester if this debug logging can be removed - if (isDebugEnabled) { - logger.debug("PutAll operation encountered exception for key {}", key, ex); + try { + if (serverIsVersioned) { + if (isDebugEnabled) { + logger.debug("associating version tag with {} version={}", key, versionTag); + } + //If we have received a version tag from a server, add it to the event + tagHolder.setVersionTag(versionTag); + tagHolder.setFromServer(true); + } else if (retryVersions != null && retryVersions.containsKey(key)) { + //If this is a retried event, and we have a version tag for the retry, + //add it to the event. + tagHolder.setVersionTag(retryVersions.get(key)); + } + + if (!overwritten) { + basicEntryPutAll(key, value, dpao, offset, tagHolder); + } + // now we must check again since the cache may have closed during + // distribution (causing this process to not receive and queue the + // event for clients + stopper.checkCancelInProgress(null); + succeeded.addKeyAndVersion(key, tagHolder.getVersionTag()); + } + catch (Exception ex) { + // TODO ask Gester if this debug logging can be removed + if (isDebugEnabled) { + logger.debug("PutAll operation encountered exception for key {}", key, ex); + } + partialKeys.saveFailedKey(key, ex); } - partialKeys.saveFailedKey(key, ex); + offset++; } - offset++; } - } - }; - this.syncBulkOp(r, eventId); - if (partialKeys.hasFailure()) { - // Bug 51725: Now succeeded contains an order key list, may be missing the version tags. - // Save reference of succeeded into partialKeys. The succeeded may be modified by - // postPutAll() to fill in the version tags. - partialKeys.setSucceededKeysAndVersions(succeeded); - logger.info(LocalizedMessage.create(LocalizedStrings.Region_PutAll_Applied_PartialKeys_0_1, - new Object[] {getFullPath(), partialKeys})); - if (isDebugEnabled) { - logger.debug(partialKeys.detailString()); - } - if (e == null) { - // if received exception from server first, ignore local exception - if (dpao.isBridgeOperation()) { - if (partialKeys.getFailure() instanceof CancelException) { - e = (CancelException)partialKeys.getFailure(); - } else if (partialKeys.getFailure() instanceof LowMemoryException) { - throw partialKeys.getFailure(); // fix for #43589 - } else { - e = new PutAllPartialResultException(partialKeys); - if (isDebugEnabled) { - logger.debug("basicPutAll:"+partialKeys.detailString()); + }; + this.syncBulkOp(r, eventId); + if (partialKeys.hasFailure()) { + // Bug 51725: Now succeeded contains an order key list, may be missing the version tags. + // Save reference of succeeded into partialKeys. The succeeded may be modified by + // postPutAll() to fill in the version tags. + partialKeys.setSucceededKeysAndVersions(succeeded); + logger.info(LocalizedMessage.create(LocalizedStrings.Region_PutAll_Applied_PartialKeys_0_1, + new Object[] {getFullPath(), partialKeys})); + if (isDebugEnabled) { + logger.debug(partialKeys.detailString()); + } + if (e == null) { + // if received exception from server first, ignore local exception + if (dpao.isBridgeOperation()) { + if (partialKeys.getFailure() instanceof CancelException) { + e = (CancelException)partialKeys.getFailure(); + } else if (partialKeys.getFailure() instanceof LowMemoryException) { + throw partialKeys.getFailure(); // fix for #43589 + } else { + e = new PutAllPartialResultException(partialKeys); + if (isDebugEnabled) { + logger.debug("basicPutAll:"+partialKeys.detailString()); + } } + } else { + throw partialKeys.getFailure(); } - } else { - throw partialKeys.getFailure(); } } } - } - catch (LowMemoryException lme) { - throw lme; - } - catch (RuntimeException ex) { - e = ex; - } - catch (Exception ex) { - e = new RuntimeException(ex); + catch (LowMemoryException lme) { + throw lme; + } + catch (RuntimeException ex) { + e = ex; + } + catch (Exception ex) { + e = new RuntimeException(ex); + } finally { + putAllOp.getBaseEvent().release(); + putAllOp.freeOffHeapResources(); + } + getDataView().postPutAll(putAllOp, succeeded, this); } finally { unlockRVVForBulkOp(); - putAllOp.getBaseEvent().release(); - putAllOp.freeOffHeapResources(); } - getDataView().postPutAll(putAllOp, succeeded, this); if (e != null) { throw e; } @@ -10400,150 +10404,153 @@ public class LocalRegion extends AbstractRegion } lockRVVForBulkOp(); try { + try { - final DistributedRemoveAllOperation op = removeAllOp; - int size = (proxyResult == null)? keys.size() : proxyResult.size(); - - if (isInternalRegion()) { - if (isTraceEnabled) { - logger.trace( - "size of removeAll result is {} keys are {} proxyResult is {}", size, keys, proxyResult); + final DistributedRemoveAllOperation op = removeAllOp; + int size = (proxyResult == null)? keys.size() : proxyResult.size(); + + if (isInternalRegion()) { + if (isTraceEnabled) { + logger.trace( + "size of removeAll result is {} keys are {} proxyResult is {}", size, keys, proxyResult); + } else { + if (isTraceEnabled) { + logger.trace( + "size of removeAll result is {} keys are {} proxyResult is {}", size, keys, proxyResult); + } + } } else { if (isTraceEnabled) { logger.trace( "size of removeAll result is {} keys are {} proxyResult is {}", size, keys, proxyResult); } } - } else { - if (isTraceEnabled) { - logger.trace( - "size of removeAll result is {} keys are {} proxyResult is {}", size, keys, proxyResult); - } - } - final PutAllPartialResult partialKeys = new PutAllPartialResult(size); - final Iterator iterator; - final boolean isVersionedResults; - if (proxyResult != null) { - iterator = proxyResult.iterator(); - isVersionedResults = true; - } else { - iterator = keys.iterator(); - isVersionedResults = false; - } - Runnable r = new Runnable() { - public void run() { - int offset = 0; - VersionTagHolder tagHolder = new VersionTagHolder(); - while (iterator.hasNext()) { - stopper.checkCancelInProgress(null); - Object key; - VersionTag versionTag = null; - tagHolder.setVersionTag(null); - if (isVersionedResults) { - Map.Entry mapEntry = (Map.Entry)iterator.next(); - key = mapEntry.getKey(); - versionTag = ((VersionedObjectList.Entry)mapEntry).getVersionTag(); - if (isDebugEnabled) { - logger.debug("removeAll key {} version={}",key, versionTag); - } - if (versionTag == null) { + final PutAllPartialResult partialKeys = new PutAllPartialResult(size); + final Iterator iterator; + final boolean isVersionedResults; + if (proxyResult != null) { + iterator = proxyResult.iterator(); + isVersionedResults = true; + } else { + iterator = keys.iterator(); + isVersionedResults = false; + } + Runnable r = new Runnable() { + public void run() { + int offset = 0; + VersionTagHolder tagHolder = new VersionTagHolder(); + while (iterator.hasNext()) { + stopper.checkCancelInProgress(null); + Object key; + VersionTag versionTag = null; + tagHolder.setVersionTag(null); + if (isVersionedResults) { + Map.Entry mapEntry = (Map.Entry)iterator.next(); + key = mapEntry.getKey(); + versionTag = ((VersionedObjectList.Entry)mapEntry).getVersionTag(); if (isDebugEnabled) { - logger.debug("removeAll found invalid version tag, which means the entry is not found at server for key={}.", key); + logger.debug("removeAll key {} version={}",key, versionTag); } - succeeded.addKeyAndVersion(key, null); - continue; - } - // No need for special handling here in removeAll. - // We can just remove this key from the client with versionTag set to null. - } else { - key = iterator.next(); - if (isInternalRegion()) { - if (isTraceEnabled) { - logger.trace("removeAll {}", key); + if (versionTag == null) { + if (isDebugEnabled) { + logger.debug("removeAll found invalid version tag, which means the entry is not found at server for key={}.", key); + } + succeeded.addKeyAndVersion(key, null); + continue; } + // No need for special handling here in removeAll. + // We can just remove this key from the client with versionTag set to null. } else { - if (isTraceEnabled) { - logger.trace("removeAll {}", key); + key = iterator.next(); + if (isInternalRegion()) { + if (isTraceEnabled) { + logger.trace("removeAll {}", key); + } + } else { + if (isTraceEnabled) { + logger.trace("removeAll {}", key); + } } + } + try { + if (serverIsVersioned) { + if (isDebugEnabled) { + logger.debug("associating version tag with {} version={}", key, versionTag); + } + //If we have received a version tag from a server, add it to the event + tagHolder.setVersionTag(versionTag); + tagHolder.setFromServer(true); + } else if (retryVersions != null) { + VersionTag vt = retryVersions.get(offset); + if (vt != null) { + //If this is a retried event, and we have a version tag for the retry, + //add it to the event. + tagHolder.setVersionTag(vt); + } + } + basicEntryRemoveAll(key, op, offset, tagHolder); + // now we must check again since the cache may have closed during + // distribution causing this process to not receive and queue the + // event for clients + stopper.checkCancelInProgress(null); + succeeded.addKeyAndVersion(key, tagHolder.getVersionTag()); + } + catch (Exception ex) { + partialKeys.saveFailedKey(key, ex); + } + offset++; } - try { - if (serverIsVersioned) { + } + }; + syncBulkOp(r, eventId); + if (partialKeys.hasFailure()) { + // Bug 51725: Now succeeded contains an order key list, may be missing the version tags. + // Save reference of succeeded into partialKeys. The succeeded may be modified by + // postRemoveAll() to fill in the version tags. + partialKeys.setSucceededKeysAndVersions(succeeded); + logger.info(LocalizedMessage.create(LocalizedStrings.Region_RemoveAll_Applied_PartialKeys_0_1, + new Object[] {getFullPath(), partialKeys})); + if (isDebugEnabled) { + logger.debug(partialKeys.detailString()); + } + if (e == null) { + // if received exception from server first, ignore local exception + if (op.isBridgeOperation()) { + if (partialKeys.getFailure() instanceof CancelException) { + e = (CancelException)partialKeys.getFailure(); + } else if (partialKeys.getFailure() instanceof LowMemoryException) { + throw partialKeys.getFailure(); // fix for #43589 + } else { + e = new PutAllPartialResultException(partialKeys); if (isDebugEnabled) { - logger.debug("associating version tag with {} version={}", key, versionTag); - } - //If we have received a version tag from a server, add it to the event - tagHolder.setVersionTag(versionTag); - tagHolder.setFromServer(true); - } else if (retryVersions != null) { - VersionTag vt = retryVersions.get(offset); - if (vt != null) { - //If this is a retried event, and we have a version tag for the retry, - //add it to the event. - tagHolder.setVersionTag(vt); + logger.debug("basicRemoveAll:"+partialKeys.detailString()); } } - - basicEntryRemoveAll(key, op, offset, tagHolder); - // now we must check again since the cache may have closed during - // distribution causing this process to not receive and queue the - // event for clients - stopper.checkCancelInProgress(null); - succeeded.addKeyAndVersion(key, tagHolder.getVersionTag()); - } - catch (Exception ex) { - partialKeys.saveFailedKey(key, ex); - } - offset++; - } - } - }; - syncBulkOp(r, eventId); - if (partialKeys.hasFailure()) { - // Bug 51725: Now succeeded contains an order key list, may be missing the version tags. - // Save reference of succeeded into partialKeys. The succeeded may be modified by - // postRemoveAll() to fill in the version tags. - partialKeys.setSucceededKeysAndVersions(succeeded); - logger.info(LocalizedMessage.create(LocalizedStrings.Region_RemoveAll_Applied_PartialKeys_0_1, - new Object[] {getFullPath(), partialKeys})); - if (isDebugEnabled) { - logger.debug(partialKeys.detailString()); - } - if (e == null) { - // if received exception from server first, ignore local exception - if (op.isBridgeOperation()) { - if (partialKeys.getFailure() instanceof CancelException) { - e = (CancelException)partialKeys.getFailure(); - } else if (partialKeys.getFailure() instanceof LowMemoryException) { - throw partialKeys.getFailure(); // fix for #43589 } else { - e = new PutAllPartialResultException(partialKeys); - if (isDebugEnabled) { - logger.debug("basicRemoveAll:"+partialKeys.detailString()); - } + throw partialKeys.getFailure(); } - } else { - throw partialKeys.getFailure(); } } } - } - catch (LowMemoryException lme) { - throw lme; - } - catch (RuntimeException ex) { - e = ex; - } - catch (Exception ex) { - e = new RuntimeException(ex); + catch (LowMemoryException lme) { + throw lme; + } + catch (RuntimeException ex) { + e = ex; + } + catch (Exception ex) { + e = new RuntimeException(ex); + } finally { + removeAllOp.getBaseEvent().release(); + removeAllOp.freeOffHeapResources(); + } + getDataView().postRemoveAll(removeAllOp, succeeded, this); } finally { unlockRVVForBulkOp(); - removeAllOp.getBaseEvent().release(); - removeAllOp.freeOffHeapResources(); } - getDataView().postRemoveAll(removeAllOp, succeeded, this); if (e != null) { throw e; } @@ -10558,15 +10565,33 @@ public class LocalRegion extends AbstractRegion * to get a valid version tag. */ private void lockRVVForBulkOp() { + ARMLockTestHook alth = getRegionMap().getARMLockTestHook(); + if(alth!=null) { + alth.beforeBulkLock(this); + } + if (this.versionVector != null && this.dataPolicy.withReplication()) { this.versionVector.lockForCacheModification(this); } + + if(alth!=null) { + alth.afterBulkLock(this); + } } private void unlockRVVForBulkOp() { + ARMLockTestHook alth = getRegionMap().getARMLockTestHook(); + if(alth!=null) { + alth.beforeBulkRelease(this); + } + if (this.versionVector != null && this.dataPolicy.withReplication()) { this.versionVector.releaseCacheModificationLock(this); } + + if(alth!=null) { + alth.afterBulkRelease(this); + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3ea7dde0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java index 3ad2cc1..99d96ab 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java @@ -34,6 +34,7 @@ import com.gemstone.gemfire.distributed.internal.DM; import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; import com.gemstone.gemfire.internal.ByteArrayDataInput; import com.gemstone.gemfire.internal.InternalStatisticsDisabledException; +import com.gemstone.gemfire.internal.cache.AbstractRegionMap.ARMLockTestHook; import com.gemstone.gemfire.internal.cache.lru.LRUEntry; import com.gemstone.gemfire.internal.cache.lru.NewLRUClockHand; import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; @@ -745,4 +746,9 @@ final class ProxyRegionMap implements RegionMap { @Override public void close() { } + + @Override + public ARMLockTestHook getARMLockTestHook() { + return null; + } }