Repository: ignite
Updated Branches:
  refs/heads/master 00b9e89e7 -> b85b041bc


IGNITE-7955: MVCC: IgniteCache.localPeek operation support. This closes #5284.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b85b041b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b85b041b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b85b041b

Branch: refs/heads/master
Commit: b85b041bcbf199b94a16c9f09dd9d2685f65cbc3
Parents: 00b9e89
Author: ipavlukhin <vololo...@gmail.com>
Authored: Wed Nov 14 15:16:58 2018 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Wed Nov 14 15:16:58 2018 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCache.java     |   4 +-
 .../processors/cache/GridCacheAdapter.java      |   7 +-
 .../processors/cache/GridCacheEntryEx.java      |  10 ++
 .../processors/cache/GridCacheMapEntry.java     |  21 +++
 .../processors/cache/mvcc/MvccUtils.java        |   8 +-
 .../processors/cache/GridCacheTestEntryEx.java  |   8 +-
 .../mvcc/CacheMvccOperationChecksTest.java      |   8 -
 .../cache/mvcc/MvccCachePeekTest.java           | 161 +++++++++++++++++++
 .../testsuites/IgniteCacheMvccTestSuite.java    |  41 +++++
 9 files changed, 248 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 70ee0d5..395c8f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -441,9 +441,9 @@ public interface IgniteCache<K, V> extends 
javax.cache.Cache<K, V>, IgniteAsyncS
     public void localEvict(Collection<? extends K> keys);
 
     /**
-     * Peeks at in-memory cached value using default optional peek mode.
+     * Peeks at a value in the local storage using an optional peek mode.
      * <p>
-     * This method will not load value from any persistent store or from a 
remote node.
+     * This method will not load a value from the configured {@link 
CacheStore} or from a remote node.
      * <h2 class="header">Transactions</h2>
      * This method does not participate in any transactions.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 8ae3450..18a4da4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -821,9 +821,6 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         ctx.checkSecurity(SecurityPermission.CACHE_READ);
 
-        //TODO IGNITE-7955
-        MvccUtils.verifyMvccOperationSupport(ctx, "Peek");
-
         PeekModes modes = parsePeekModes(peekModes, false);
 
         KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
@@ -895,7 +892,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                     ctx.shared().database().checkpointReadLock();
 
                     try {
-                        cacheVal = e.peek(modes.heap, modes.offheap, topVer, 
plc);
+                        cacheVal = ctx.mvccEnabled()
+                            ? e.mvccPeek(modes.heap && !modes.offheap)
+                            : e.peek(modes.heap, modes.offheap, topVer, plc);
                     }
                     catch (GridCacheEntryRemovedException ignore) {
                         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index cfd70ec..319c134 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -713,6 +713,16 @@ public interface GridCacheEntryEx {
     public boolean checkSerializableReadVersion(GridCacheVersion serReadVer) 
throws GridCacheEntryRemovedException;
 
     /**
+     * Retrieves the last committed MVCC entry version.
+     * @param onheapOnly {@code True} if a specified peek mode instructs to 
look only in the on-heap storage.
+     * @return Last committed entry if either or {@code null} otherwise.
+     * @throws GridCacheEntryRemovedException If entry has been removed.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public CacheObject mvccPeek(boolean onheapOnly)
+        throws GridCacheEntryRemovedException, IgniteCheckedException;
+
+    /**
      * Peeks into entry without loading value or updating statistics.
      *
      * @param heap Read from heap flag.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index fa4cc98..95ce2b9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -116,6 +116,7 @@ import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome.INVOKE_NO_OP;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome.REMOVE_NO_VAL;
+import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_MAX_SNAPSHOT;
 import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.compareIgnoreOpCounter;
 import static 
org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.NO_KEY;
 import static 
org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.DUPLICATE_KEY;
@@ -3121,6 +3122,26 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public CacheObject mvccPeek(boolean onheapOnly)
+        throws GridCacheEntryRemovedException, IgniteCheckedException {
+        if (onheapOnly)
+            return null;
+
+        lockEntry();
+
+        try {
+            checkObsolete();
+
+            CacheDataRow row = cctx.offheap().mvccRead(cctx, key, 
MVCC_MAX_SNAPSHOT);
+
+            return row != null ? row.value() : null;
+        }
+        finally {
+            unlockEntry();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public CacheObject peek(
         boolean heap,
         boolean offheap,

http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index f29e23f..a9bb540 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -100,13 +100,13 @@ public class MvccUtils {
     /** */
     public static final int MVCC_VISIBLE = 2;
 
-    /** */
+    /** A special version visible by everyone */
     public static final MvccVersion INITIAL_VERSION =
         mvccVersion(MVCC_CRD_START_CNTR, MVCC_INITIAL_CNTR, 
MVCC_START_OP_CNTR);
 
-    /** */
-    public static final MvccVersion MVCC_VERSION_NA =
-        mvccVersion(MVCC_CRD_COUNTER_NA, MVCC_COUNTER_NA, MVCC_OP_COUNTER_NA);
+    /** A special snapshot for which all committed versions are visible */
+    public static final MvccSnapshot MVCC_MAX_SNAPSHOT =
+        new MvccSnapshotWithoutTxs(Long.MAX_VALUE, Long.MAX_VALUE, 
MVCC_READ_OP_CNTR, MVCC_COUNTER_NA);
 
     /** */
     private static final MvccClosure<Integer> getVisibleState = new 
GetVisibleState();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index cc634fa..6fb200d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -912,11 +912,15 @@ public class GridCacheTestEntryEx extends 
GridMetadataAwareAdapter implements Gr
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public CacheObject mvccPeek(boolean onheapOnly) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public CacheObject peek(boolean heap,
         boolean offheap,
         AffinityTopologyVersion topVer,
-        @Nullable IgniteCacheExpiryPolicy plc)
-    {
+        @Nullable IgniteCacheExpiryPolicy plc) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccOperationChecksTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccOperationChecksTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccOperationChecksTest.java
index aa7f6c3..e9083f7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccOperationChecksTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccOperationChecksTest.java
@@ -106,14 +106,6 @@ public class CacheMvccOperationChecksTest extends 
CacheMvccAbstractTest {
     /**
      * @throws Exception if failed.
      */
-    public void testPeekOperationsUnsupported() throws Exception {
-        checkOperationUnsupported("localPeek", m("Peek"), t(Object.class, 
CachePeekMode[].class), 1,
-            new CachePeekMode[]{CachePeekMode.NEAR});
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
     public void testEvictOperationsUnsupported() throws Exception {
         checkOperationUnsupported("localEvict", m("Evict"), 
t(Collection.class), Collections.singleton(1));
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachePeekTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachePeekTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachePeekTest.java
new file mode 100644
index 0000000..539283b
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachePeekTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+
+import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/** */
+public class MvccCachePeekTest extends CacheMvccAbstractTest {
+    /** */
+    private interface ThrowingRunnable {
+        /** */
+        void run() throws Exception;
+    }
+
+    /** */
+    private IgniteCache<Object, Object> cache;
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGridsMultiThreaded(3);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testPeek() throws Exception {
+        doWithCache(this::checkPeekSerial);
+        doWithCache(this::checkPeekDoesNotSeeAbortedVersions);
+        doWithCache(this::checkPeekDoesNotSeeActiveVersions);
+        doWithCache(this::checkPeekOnheap);
+        doWithCache(this::checkPeekNearCache);
+    }
+
+    /** */
+    private void doWithCache(ThrowingRunnable action) throws Exception {
+        cache = grid(0).getOrCreateCache(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(TRANSACTIONAL_SNAPSHOT)
+            .setBackups(1)
+            .setCacheMode(cacheMode()));
+
+        try {
+            action.run();
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /** */
+    private void checkPeekSerial() throws Exception {
+        Stream.of(primaryKey(cache), backupKey(cache)).forEach(key -> {
+            assertNull(cache.localPeek(key));
+
+            cache.put(key, 1);
+
+            assertEquals(1, cache.localPeek(key));
+
+            cache.put(key, 2);
+
+            assertEquals(2, cache.localPeek(key));
+        });
+    }
+
+    /** */
+    private void checkPeekDoesNotSeeAbortedVersions() throws Exception {
+        Integer pk = primaryKey(cache);
+
+        cache.put(pk, 1);
+
+        try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+            cache.put(pk, 2);
+
+            tx.rollback();
+        }
+
+        assertEquals(1, cache.localPeek(pk));
+    }
+
+    /** */
+    private void checkPeekDoesNotSeeActiveVersions() throws Exception {
+        Integer pk = primaryKey(cache);
+
+        cache.put(pk, 1);
+
+        CountDownLatch writeCompleted = new CountDownLatch(1);
+        CountDownLatch checkCompleted = new CountDownLatch(1);
+
+        IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(() -> {
+            try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                cache.put(pk, 2);
+
+                writeCompleted.countDown();
+                checkCompleted.await();
+
+                tx.commit();
+            }
+
+            return null;
+        });
+
+        writeCompleted.await();
+
+        assertEquals(1, cache.localPeek(pk));
+
+        checkCompleted.countDown();
+
+        fut.get();
+    }
+
+    /** */
+    private void checkPeekOnheap() throws Exception {
+        Stream.of(primaryKey(cache), backupKey(cache), 
nearKey(cache)).forEach(key -> {
+            cache.put(key, 1);
+
+            assertNull(cache.localPeek(key, CachePeekMode.ONHEAP));
+        });
+    }
+
+    /** */
+    private void checkPeekNearCache() throws Exception {
+        Stream.of(primaryKey(cache), backupKey(cache), 
nearKey(cache)).forEach(key -> {
+            cache.put(key, 1);
+
+            assertNull(cache.localPeek(key, CachePeekMode.NEAR));
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
index cf12d24..a522cbd 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.IgniteCacheTxPeekModesTest;
+import 
org.apache.ignite.internal.processors.cache.IgniteCacheTxReplicatedPeekModesTest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccClusterRestartTest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccConfigurationValidationTest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccIteratorWithConcurrentTransactionTest;
@@ -33,8 +36,11 @@ import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeWithConcurr
 import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTransactionsTest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxFailoverTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccVacuumTest;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCachePeekTest;
 import 
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccSelfTest;
 
+import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+
 /**
  *
  */
@@ -56,6 +62,11 @@ public class IgniteCacheMvccTestSuite extends TestSuite {
 
         suite.addTestSuite(CacheMvccRemoteTxOnNearNodeStartTest.class);
 
+        suite.addTestSuite(MvccCachePeekTest.class);
+
+        suite.addTestSuite(MvccIgniteCacheTxPeekModesTest.class);
+        suite.addTestSuite(MvccIgniteCacheTxReplicatedPeekModesTest.class);
+
         // Concurrent ops tests.
         
suite.addTestSuite(CacheMvccIteratorWithConcurrentTransactionTest.class);
         
suite.addTestSuite(CacheMvccLocalEntriesWithConcurrentTransactionTest.class);
@@ -71,4 +82,34 @@ public class IgniteCacheMvccTestSuite extends TestSuite {
 
         return suite;
     }
+
+    /** */
+    public static class MvccIgniteCacheTxPeekModesTest extends 
IgniteCacheTxPeekModesTest {
+        /** {@inheritDoc} */
+        @Override protected CacheAtomicityMode atomicityMode() {
+            return TRANSACTIONAL_SNAPSHOT;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void testLocalEntries() throws Exception {
+            fail("https://issues.apache.org/jira/browse/IGNITE-10167";);
+
+            super.testLocalEntries();
+        }
+    }
+
+    /** */
+    public static class MvccIgniteCacheTxReplicatedPeekModesTest extends 
IgniteCacheTxReplicatedPeekModesTest {
+        /** {@inheritDoc} */
+        @Override protected CacheAtomicityMode atomicityMode() {
+            return TRANSACTIONAL_SNAPSHOT;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void testLocalEntries() throws Exception {
+            fail("https://issues.apache.org/jira/browse/IGNITE-10167";);
+
+            super.testLocalEntries();
+        }
+    }
 }

Reply via email to