IGNITE-8018 Optimized GridCacheMapEntry initialValue() - Fixes #3686. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/57aecd7a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/57aecd7a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/57aecd7a Branch: refs/heads/ignite-8159 Commit: 57aecd7ab2bf5f0836aabf40e4b4051fd07228d2 Parents: 84a40e5 Author: Ilya Lantukh <ilant...@gridgain.com> Authored: Fri Apr 6 13:49:10 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Apr 6 13:49:10 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 158 +++++++++++++++---- .../colocated/GridDhtDetachedCacheEntry.java | 3 +- .../distributed/near/GridNearCacheEntry.java | 3 +- 3 files changed, 131 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/57aecd7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 74dabe9..a6ef0d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -77,6 +77,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED; @@ -2699,40 +2700,80 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme ) throws IgniteCheckedException, GridCacheEntryRemovedException { ensureFreeSpace(); + boolean deferred = false; + boolean obsolete = false; + + GridCacheVersion oldVer = null; + lockEntry(); try { checkObsolete(); + boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled() && cctx.group().walEnabled(); + + long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime; + + val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); + + final boolean unswapped = ((flags & IS_UNSWAPPED_MASK) != 0); + boolean update; - boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled() && cctx.group().walEnabled(); + IgnitePredicate<CacheDataRow> p = new IgnitePredicate<CacheDataRow>() { + @Override public boolean apply(@Nullable CacheDataRow row) { + boolean update0; + + GridCacheVersion currentVer = row != null ? row.version() : GridCacheMapEntry.this.ver; - if (cctx.group().persistenceEnabled()) { - unswap(false); + boolean isStartVer = currentVer.nodeOrder() == cctx.localNode().order() + && currentVer.order() == startVer; - if (!isNew()) { - if (cctx.atomic()) - update = ATOMIC_VER_COMPARATOR.compare(this.ver, ver) < 0; + if (cctx.group().persistenceEnabled()) { + if (!isStartVer) { + if (cctx.atomic()) + update0 = ATOMIC_VER_COMPARATOR.compare(currentVer, ver) < 0; + else + update0 = currentVer.compareTo(ver) < 0; + } + else + update0 = true; + } else - update = this.ver.compareTo(ver) < 0; + update0 = isStartVer; + + update0 |= (!preload && deletedUnlocked()); + + return update0; } - else - update = true; - } - else - update = isNew() && !cctx.offheap().containsKey(this); + }; - update |= !preload && deletedUnlocked(); + if (unswapped) { + update = p.apply(null); - if (update) { - long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime; + if (update) { + // If entry is already unswapped and we are modifying it, we must run deletion callbacks for old value. + long oldExpTime = expireTimeUnlocked(); + long delta = (oldExpTime == 0 ? 0 : oldExpTime - U.currentTimeMillis()); - val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); + if (delta < 0) { + if (onExpired(this.val, null)) { + if (cctx.deferredDelete()) { + deferred = true; + oldVer = this.ver; + } + else if (val == null) + obsolete = true; + } + } - if (val != null) storeValue(val, expTime, ver, null); + } + } + else // Optimization to access storage only once. + update = storeValue(val, expTime, ver, null, p); + if (update) { update(val, expTime, ttl, ver, true); boolean skipQryNtf = false; @@ -2797,6 +2838,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } finally { unlockEntry(); + + // It is necessary to execute these callbacks outside of lock to avoid deadlocks. + + if (obsolete) { + onMarkedObsolete(); + + cctx.cache().removeEntry(this); + } + + if (deferred) { + assert oldVer != null; + + cctx.onDeferredDelete(this, oldVer); + } } } @@ -3516,14 +3571,39 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @param oldRow Old row if available. * @throws IgniteCheckedException If update failed. */ - protected void storeValue(CacheObject val, + protected boolean storeValue(CacheObject val, long expireTime, GridCacheVersion ver, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { - assert lock.isHeldByCurrentThread(); assert val != null : "null values in update for key: " + key; - cctx.offheap().invoke(cctx, key, localPartition(), new UpdateClosure(this, val, ver, expireTime)); + return storeValue(val, expireTime, ver, oldRow, null); + } + + /** + * Stores value in offheap. + * + * @param val Value. + * @param expireTime Expire time. + * @param ver New entry version. + * @param oldRow Old row if available. + * @param predicate Optional predicate. + * @throws IgniteCheckedException If update failed. + * @return {@code True} if storage was modified. + */ + protected boolean storeValue( + @Nullable CacheObject val, + long expireTime, + GridCacheVersion ver, + @Nullable CacheDataRow oldRow, + @Nullable IgnitePredicate<CacheDataRow> predicate) throws IgniteCheckedException { + assert lock.isHeldByCurrentThread(); + + UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate); + + cctx.offheap().invoke(cctx, key, localPartition(), closure); + + return closure.treeOp != IgniteTree.OperationType.NOOP; } /** @@ -4295,7 +4375,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme private final GridCacheMapEntry entry; /** */ - private final CacheObject val; + @Nullable private final CacheObject val; /** */ private final GridCacheVersion ver; @@ -4304,6 +4384,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme private final long expireTime; /** */ + @Nullable private final IgnitePredicate<CacheDataRow> predicate; + + /** */ private CacheDataRow newRow; /** */ @@ -4317,31 +4400,44 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @param val New value. * @param ver New version. * @param expireTime New expire time. + * @param predicate Optional predicate. */ - UpdateClosure(GridCacheMapEntry entry, CacheObject val, GridCacheVersion ver, long expireTime) { + UpdateClosure(GridCacheMapEntry entry, @Nullable CacheObject val, GridCacheVersion ver, long expireTime, + @Nullable IgnitePredicate<CacheDataRow> predicate) { this.entry = entry; this.val = val; this.ver = ver; this.expireTime = expireTime; + this.predicate = predicate; } /** {@inheritDoc} */ @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException { this.oldRow = oldRow; + if (predicate != null && !predicate.apply(oldRow)) { + treeOp = IgniteTree.OperationType.NOOP; + + return; + } + if (oldRow != null) oldRow.key(entry.key); - newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow( - entry.cctx, - entry.key, - val, - ver, - expireTime, - oldRow); + if (val != null) { + newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow( + entry.cctx, + entry.key, + val, + ver, + expireTime, + oldRow); - treeOp = oldRow != null && oldRow.link() == newRow.link() ? - IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT; + treeOp = oldRow != null && oldRow.link() == newRow.link() ? + IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT; + } + else + treeOp = oldRow != null ? IgniteTree.OperationType.REMOVE : IgniteTree.OperationType.NOOP; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/57aecd7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java index 3536908..d02015b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java @@ -65,10 +65,11 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected void storeValue(CacheObject val, + @Override protected boolean storeValue(CacheObject val, long expireTime, GridCacheVersion ver, CacheDataRow oldRow) throws IgniteCheckedException { + return false; // No-op for detached entries, index is updated on primary nodes. } http://git-wip-us.apache.org/repos/asf/ignite/blob/57aecd7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 322e63c..fb41f5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -458,7 +458,8 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected void storeValue(CacheObject val, long expireTime, GridCacheVersion ver, CacheDataRow oldRow) { + @Override protected boolean storeValue(CacheObject val, long expireTime, GridCacheVersion ver, CacheDataRow oldRow) { + return false; // No-op: queries are disabled for near cache. }