Repository: ignite Updated Branches: refs/heads/master 00b9e89e7 -> b85b041bc
IGNITE-7955: MVCC: IgniteCache.localPeek operation support. This closes #5284. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b85b041b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b85b041b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b85b041b Branch: refs/heads/master Commit: b85b041bcbf199b94a16c9f09dd9d2685f65cbc3 Parents: 00b9e89 Author: ipavlukhin <vololo...@gmail.com> Authored: Wed Nov 14 15:16:58 2018 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Wed Nov 14 15:16:58 2018 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 4 +- .../processors/cache/GridCacheAdapter.java | 7 +- .../processors/cache/GridCacheEntryEx.java | 10 ++ .../processors/cache/GridCacheMapEntry.java | 21 +++ .../processors/cache/mvcc/MvccUtils.java | 8 +- .../processors/cache/GridCacheTestEntryEx.java | 8 +- .../mvcc/CacheMvccOperationChecksTest.java | 8 - .../cache/mvcc/MvccCachePeekTest.java | 161 +++++++++++++++++++ .../testsuites/IgniteCacheMvccTestSuite.java | 41 +++++ 9 files changed, 248 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 70ee0d5..395c8f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -441,9 +441,9 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public void localEvict(Collection<? extends K> keys); /** - * Peeks at in-memory cached value using default optional peek mode. + * Peeks at a value in the local storage using an optional peek mode. * <p> - * This method will not load value from any persistent store or from a remote node. + * This method will not load a value from the configured {@link CacheStore} or from a remote node. * <h2 class="header">Transactions</h2> * This method does not participate in any transactions. * http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 8ae3450..18a4da4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -821,9 +821,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.checkSecurity(SecurityPermission.CACHE_READ); - //TODO IGNITE-7955 - MvccUtils.verifyMvccOperationSupport(ctx, "Peek"); - PeekModes modes = parsePeekModes(peekModes, false); KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); @@ -895,7 +892,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.shared().database().checkpointReadLock(); try { - cacheVal = e.peek(modes.heap, modes.offheap, topVer, plc); + cacheVal = ctx.mvccEnabled() + ? e.mvccPeek(modes.heap && !modes.offheap) + : e.peek(modes.heap, modes.offheap, topVer, plc); } catch (GridCacheEntryRemovedException ignore) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index cfd70ec..319c134 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -713,6 +713,16 @@ public interface GridCacheEntryEx { public boolean checkSerializableReadVersion(GridCacheVersion serReadVer) throws GridCacheEntryRemovedException; /** + * Retrieves the last committed MVCC entry version. + * @param onheapOnly {@code True} if a specified peek mode instructs to look only in the on-heap storage. + * @return Last committed entry if either or {@code null} otherwise. + * @throws GridCacheEntryRemovedException If entry has been removed. + * @throws IgniteCheckedException If failed. + */ + @Nullable public CacheObject mvccPeek(boolean onheapOnly) + throws GridCacheEntryRemovedException, IgniteCheckedException; + + /** * Peeks into entry without loading value or updating statistics. * * @param heap Read from heap flag. http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index fa4cc98..95ce2b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -116,6 +116,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; import static org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome.INVOKE_NO_OP; import static org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome.REMOVE_NO_VAL; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_MAX_SNAPSHOT; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.compareIgnoreOpCounter; import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.NO_KEY; import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.DUPLICATE_KEY; @@ -3121,6 +3122,26 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ + @Nullable @Override public CacheObject mvccPeek(boolean onheapOnly) + throws GridCacheEntryRemovedException, IgniteCheckedException { + if (onheapOnly) + return null; + + lockEntry(); + + try { + checkObsolete(); + + CacheDataRow row = cctx.offheap().mvccRead(cctx, key, MVCC_MAX_SNAPSHOT); + + return row != null ? row.value() : null; + } + finally { + unlockEntry(); + } + } + + /** {@inheritDoc} */ @Nullable @Override public CacheObject peek( boolean heap, boolean offheap, http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java index f29e23f..a9bb540 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java @@ -100,13 +100,13 @@ public class MvccUtils { /** */ public static final int MVCC_VISIBLE = 2; - /** */ + /** A special version visible by everyone */ public static final MvccVersion INITIAL_VERSION = mvccVersion(MVCC_CRD_START_CNTR, MVCC_INITIAL_CNTR, MVCC_START_OP_CNTR); - /** */ - public static final MvccVersion MVCC_VERSION_NA = - mvccVersion(MVCC_CRD_COUNTER_NA, MVCC_COUNTER_NA, MVCC_OP_COUNTER_NA); + /** A special snapshot for which all committed versions are visible */ + public static final MvccSnapshot MVCC_MAX_SNAPSHOT = + new MvccSnapshotWithoutTxs(Long.MAX_VALUE, Long.MAX_VALUE, MVCC_READ_OP_CNTR, MVCC_COUNTER_NA); /** */ private static final MvccClosure<Integer> getVisibleState = new GetVisibleState(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index cc634fa..6fb200d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -912,11 +912,15 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** {@inheritDoc} */ + @Nullable @Override public CacheObject mvccPeek(boolean onheapOnly) { + return null; + } + + /** {@inheritDoc} */ @Nullable @Override public CacheObject peek(boolean heap, boolean offheap, AffinityTopologyVersion topVer, - @Nullable IgniteCacheExpiryPolicy plc) - { + @Nullable IgniteCacheExpiryPolicy plc) { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccOperationChecksTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccOperationChecksTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccOperationChecksTest.java index aa7f6c3..e9083f7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccOperationChecksTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccOperationChecksTest.java @@ -106,14 +106,6 @@ public class CacheMvccOperationChecksTest extends CacheMvccAbstractTest { /** * @throws Exception if failed. */ - public void testPeekOperationsUnsupported() throws Exception { - checkOperationUnsupported("localPeek", m("Peek"), t(Object.class, CachePeekMode[].class), 1, - new CachePeekMode[]{CachePeekMode.NEAR}); - } - - /** - * @throws Exception if failed. - */ public void testEvictOperationsUnsupported() throws Exception { checkOperationUnsupported("localEvict", m("Evict"), t(Collection.class), Collections.singleton(1)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachePeekTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachePeekTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachePeekTest.java new file mode 100644 index 0000000..539283b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachePeekTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.concurrent.CountDownLatch; +import java.util.stream.Stream; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** */ +public class MvccCachePeekTest extends CacheMvccAbstractTest { + /** */ + private interface ThrowingRunnable { + /** */ + void run() throws Exception; + } + + /** */ + private IgniteCache<Object, Object> cache; + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + startGridsMultiThreaded(3); + } + + /** + * @throws Exception if failed. + */ + public void testPeek() throws Exception { + doWithCache(this::checkPeekSerial); + doWithCache(this::checkPeekDoesNotSeeAbortedVersions); + doWithCache(this::checkPeekDoesNotSeeActiveVersions); + doWithCache(this::checkPeekOnheap); + doWithCache(this::checkPeekNearCache); + } + + /** */ + private void doWithCache(ThrowingRunnable action) throws Exception { + cache = grid(0).getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAtomicityMode(TRANSACTIONAL_SNAPSHOT) + .setBackups(1) + .setCacheMode(cacheMode())); + + try { + action.run(); + } + finally { + cache.destroy(); + } + } + + /** */ + private void checkPeekSerial() throws Exception { + Stream.of(primaryKey(cache), backupKey(cache)).forEach(key -> { + assertNull(cache.localPeek(key)); + + cache.put(key, 1); + + assertEquals(1, cache.localPeek(key)); + + cache.put(key, 2); + + assertEquals(2, cache.localPeek(key)); + }); + } + + /** */ + private void checkPeekDoesNotSeeAbortedVersions() throws Exception { + Integer pk = primaryKey(cache); + + cache.put(pk, 1); + + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(pk, 2); + + tx.rollback(); + } + + assertEquals(1, cache.localPeek(pk)); + } + + /** */ + private void checkPeekDoesNotSeeActiveVersions() throws Exception { + Integer pk = primaryKey(cache); + + cache.put(pk, 1); + + CountDownLatch writeCompleted = new CountDownLatch(1); + CountDownLatch checkCompleted = new CountDownLatch(1); + + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(() -> { + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(pk, 2); + + writeCompleted.countDown(); + checkCompleted.await(); + + tx.commit(); + } + + return null; + }); + + writeCompleted.await(); + + assertEquals(1, cache.localPeek(pk)); + + checkCompleted.countDown(); + + fut.get(); + } + + /** */ + private void checkPeekOnheap() throws Exception { + Stream.of(primaryKey(cache), backupKey(cache), nearKey(cache)).forEach(key -> { + cache.put(key, 1); + + assertNull(cache.localPeek(key, CachePeekMode.ONHEAP)); + }); + } + + /** */ + private void checkPeekNearCache() throws Exception { + Stream.of(primaryKey(cache), backupKey(cache), nearKey(cache)).forEach(key -> { + cache.put(key, 1); + + assertNull(cache.localPeek(key, CachePeekMode.NEAR)); + }); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b85b041b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java index cf12d24..a522cbd 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java @@ -18,6 +18,9 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.internal.processors.cache.IgniteCacheTxPeekModesTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheTxReplicatedPeekModesTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccClusterRestartTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccConfigurationValidationTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccIteratorWithConcurrentTransactionTest; @@ -33,8 +36,11 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeWithConcurr import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTransactionsTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxFailoverTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccVacuumTest; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCachePeekTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccSelfTest; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; + /** * */ @@ -56,6 +62,11 @@ public class IgniteCacheMvccTestSuite extends TestSuite { suite.addTestSuite(CacheMvccRemoteTxOnNearNodeStartTest.class); + suite.addTestSuite(MvccCachePeekTest.class); + + suite.addTestSuite(MvccIgniteCacheTxPeekModesTest.class); + suite.addTestSuite(MvccIgniteCacheTxReplicatedPeekModesTest.class); + // Concurrent ops tests. suite.addTestSuite(CacheMvccIteratorWithConcurrentTransactionTest.class); suite.addTestSuite(CacheMvccLocalEntriesWithConcurrentTransactionTest.class); @@ -71,4 +82,34 @@ public class IgniteCacheMvccTestSuite extends TestSuite { return suite; } + + /** */ + public static class MvccIgniteCacheTxPeekModesTest extends IgniteCacheTxPeekModesTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL_SNAPSHOT; + } + + /** {@inheritDoc} */ + @Override public void testLocalEntries() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-10167"); + + super.testLocalEntries(); + } + } + + /** */ + public static class MvccIgniteCacheTxReplicatedPeekModesTest extends IgniteCacheTxReplicatedPeekModesTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL_SNAPSHOT; + } + + /** {@inheritDoc} */ + @Override public void testLocalEntries() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-10167"); + + super.testLocalEntries(); + } + } }