IGNITE-8018 Optimized GridCacheMapEntry initialValue() - Fixes #3686.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/57aecd7a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/57aecd7a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/57aecd7a

Branch: refs/heads/ignite-6083
Commit: 57aecd7ab2bf5f0836aabf40e4b4051fd07228d2
Parents: 84a40e5
Author: Ilya Lantukh <ilant...@gridgain.com>
Authored: Fri Apr 6 13:49:10 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Fri Apr 6 13:49:10 2018 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 158 +++++++++++++++----
 .../colocated/GridDhtDetachedCacheEntry.java    |   3 +-
 .../distributed/near/GridNearCacheEntry.java    |   3 +-
 3 files changed, 131 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/57aecd7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 74dabe9..a6ef0d2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED;
@@ -2699,40 +2700,80 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         ensureFreeSpace();
 
+        boolean deferred = false;
+        boolean obsolete = false;
+
+        GridCacheVersion oldVer = null;
+
         lockEntry();
 
         try {
             checkObsolete();
 
+            boolean walEnabled = !cctx.isNear() && 
cctx.group().persistenceEnabled() && cctx.group().walEnabled();
+
+            long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime;
+
+            val = cctx.kernalContext().cacheObjects().prepareForCache(val, 
cctx);
+
+            final boolean unswapped = ((flags & IS_UNSWAPPED_MASK) != 0);
+
             boolean update;
 
-            boolean walEnabled = !cctx.isNear() && 
cctx.group().persistenceEnabled() && cctx.group().walEnabled();
+            IgnitePredicate<CacheDataRow> p = new 
IgnitePredicate<CacheDataRow>() {
+                @Override public boolean apply(@Nullable CacheDataRow row) {
+                    boolean update0;
+
+                    GridCacheVersion currentVer = row != null ? row.version() 
: GridCacheMapEntry.this.ver;
 
-            if (cctx.group().persistenceEnabled()) {
-                unswap(false);
+                    boolean isStartVer = currentVer.nodeOrder() == 
cctx.localNode().order()
+                        && currentVer.order() == startVer;
 
-                if (!isNew()) {
-                    if (cctx.atomic())
-                        update = ATOMIC_VER_COMPARATOR.compare(this.ver, ver) 
< 0;
+                    if (cctx.group().persistenceEnabled()) {
+                        if (!isStartVer) {
+                            if (cctx.atomic())
+                                update0 = 
ATOMIC_VER_COMPARATOR.compare(currentVer, ver) < 0;
+                            else
+                                update0 = currentVer.compareTo(ver) < 0;
+                        }
+                        else
+                            update0 = true;
+                    }
                     else
-                        update = this.ver.compareTo(ver) < 0;
+                        update0 = isStartVer;
+
+                    update0 |= (!preload && deletedUnlocked());
+
+                    return update0;
                 }
-                else
-                    update = true;
-            }
-            else
-                update = isNew() && !cctx.offheap().containsKey(this);
+            };
 
-            update |= !preload && deletedUnlocked();
+            if (unswapped) {
+                update = p.apply(null);
 
-            if (update) {
-                long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : 
expireTime;
+                if (update) {
+                    // If entry is already unswapped and we are modifying it, 
we must run deletion callbacks for old value.
+                    long oldExpTime = expireTimeUnlocked();
+                    long delta = (oldExpTime == 0 ? 0 : oldExpTime - 
U.currentTimeMillis());
 
-                val = cctx.kernalContext().cacheObjects().prepareForCache(val, 
cctx);
+                    if (delta < 0) {
+                        if (onExpired(this.val, null)) {
+                            if (cctx.deferredDelete()) {
+                                deferred = true;
+                                oldVer = this.ver;
+                            }
+                            else if (val == null)
+                                obsolete = true;
+                        }
+                    }
 
-                if (val != null)
                     storeValue(val, expTime, ver, null);
+                }
+            }
+            else // Optimization to access storage only once.
+                update = storeValue(val, expTime, ver, null, p);
 
+            if (update) {
                 update(val, expTime, ttl, ver, true);
 
                 boolean skipQryNtf = false;
@@ -2797,6 +2838,20 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         }
         finally {
             unlockEntry();
+
+            // It is necessary to execute these callbacks outside of lock to 
avoid deadlocks.
+
+            if (obsolete) {
+                onMarkedObsolete();
+
+                cctx.cache().removeEntry(this);
+            }
+
+            if (deferred) {
+                assert oldVer != null;
+
+                cctx.onDeferredDelete(this, oldVer);
+            }
         }
     }
 
@@ -3516,14 +3571,39 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
      * @param oldRow Old row if available.
      * @throws IgniteCheckedException If update failed.
      */
-    protected void storeValue(CacheObject val,
+    protected boolean storeValue(CacheObject val,
         long expireTime,
         GridCacheVersion ver,
         @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
-        assert lock.isHeldByCurrentThread();
         assert val != null : "null values in update for key: " + key;
 
-        cctx.offheap().invoke(cctx, key,  localPartition(), new 
UpdateClosure(this, val, ver, expireTime));
+        return storeValue(val, expireTime, ver, oldRow, null);
+    }
+
+    /**
+     * Stores value in offheap.
+     *
+     * @param val Value.
+     * @param expireTime Expire time.
+     * @param ver New entry version.
+     * @param oldRow Old row if available.
+     * @param predicate Optional predicate.
+     * @throws IgniteCheckedException If update failed.
+     * @return {@code True} if storage was modified.
+     */
+    protected boolean storeValue(
+        @Nullable CacheObject val,
+        long expireTime,
+        GridCacheVersion ver,
+        @Nullable CacheDataRow oldRow,
+        @Nullable IgnitePredicate<CacheDataRow> predicate) throws 
IgniteCheckedException {
+        assert lock.isHeldByCurrentThread();
+
+        UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, 
predicate);
+
+        cctx.offheap().invoke(cctx, key,  localPartition(), closure);
+
+        return closure.treeOp != IgniteTree.OperationType.NOOP;
     }
 
     /**
@@ -4295,7 +4375,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         private final GridCacheMapEntry entry;
 
         /** */
-        private final CacheObject val;
+        @Nullable private final CacheObject val;
 
         /** */
         private final GridCacheVersion ver;
@@ -4304,6 +4384,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         private final long expireTime;
 
         /** */
+        @Nullable private final IgnitePredicate<CacheDataRow> predicate;
+
+        /** */
         private CacheDataRow newRow;
 
         /** */
@@ -4317,31 +4400,44 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
          * @param val New value.
          * @param ver New version.
          * @param expireTime New expire time.
+         * @param predicate Optional predicate.
          */
-        UpdateClosure(GridCacheMapEntry entry, CacheObject val, 
GridCacheVersion ver, long expireTime) {
+        UpdateClosure(GridCacheMapEntry entry, @Nullable CacheObject val, 
GridCacheVersion ver, long expireTime,
+            @Nullable IgnitePredicate<CacheDataRow> predicate) {
             this.entry = entry;
             this.val = val;
             this.ver = ver;
             this.expireTime = expireTime;
+            this.predicate = predicate;
         }
 
         /** {@inheritDoc} */
         @Override public void call(@Nullable CacheDataRow oldRow) throws 
IgniteCheckedException {
             this.oldRow = oldRow;
 
+            if (predicate != null && !predicate.apply(oldRow)) {
+                treeOp = IgniteTree.OperationType.NOOP;
+
+                return;
+            }
+
             if (oldRow != null)
                 oldRow.key(entry.key);
 
-            newRow = 
entry.cctx.offheap().dataStore(entry.localPartition()).createRow(
-                entry.cctx,
-                entry.key,
-                val,
-                ver,
-                expireTime,
-                oldRow);
+            if (val != null) {
+                newRow = 
entry.cctx.offheap().dataStore(entry.localPartition()).createRow(
+                    entry.cctx,
+                    entry.key,
+                    val,
+                    ver,
+                    expireTime,
+                    oldRow);
 
-            treeOp = oldRow != null && oldRow.link() == newRow.link() ?
-                IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+                treeOp = oldRow != null && oldRow.link() == newRow.link() ?
+                    IgniteTree.OperationType.NOOP : 
IgniteTree.OperationType.PUT;
+            }
+            else
+                treeOp = oldRow != null ? IgniteTree.OperationType.REMOVE : 
IgniteTree.OperationType.NOOP;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/57aecd7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 3536908..d02015b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -65,10 +65,11 @@ public class GridDhtDetachedCacheEntry extends 
GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected void storeValue(CacheObject val,
+    @Override protected boolean storeValue(CacheObject val,
         long expireTime,
         GridCacheVersion ver,
         CacheDataRow oldRow) throws IgniteCheckedException {
+        return false;
         // No-op for detached entries, index is updated on primary nodes.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/57aecd7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 322e63c..fb41f5c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -458,7 +458,8 @@ public class GridNearCacheEntry extends 
GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected void storeValue(CacheObject val, long expireTime, 
GridCacheVersion ver, CacheDataRow oldRow) {
+    @Override protected boolean storeValue(CacheObject val, long expireTime, 
GridCacheVersion ver, CacheDataRow oldRow) {
+        return false;
         // No-op: queries are disabled for near cache.
     }
 

Reply via email to