ignite-3601 Do not check version on commit for read-only serializable transactions.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0b66d2d7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0b66d2d7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0b66d2d7 Branch: refs/heads/ignite-ssl-hotfix Commit: 0b66d2d7b695cf370a4b8a717844ad67742c6090 Parents: 59b46d3 Author: sboikov <sboi...@gridgain.com> Authored: Mon Oct 3 12:04:03 2016 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Oct 3 12:04:03 2016 +0300 ---------------------------------------------------------------------- .../colocated/GridDhtDetachedCacheEntry.java | 5 + .../cache/distributed/near/GridNearTxLocal.java | 2 +- .../transactions/IgniteTxLocalAdapter.java | 62 ++++--- .../cache/CacheGetEntryAbstractTest.java | 38 +++++ .../processors/cache/CachePutIfAbsentTest.java | 161 +++++++++++++++++++ .../CacheSerializableTransactionsTest.java | 70 +++----- .../processors/cache/CacheTxFastFinishTest.java | 2 +- .../testsuites/IgniteCacheTestSuite4.java | 2 + 8 files changed, 276 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0b66d2d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java index db91134..2e05560 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java @@ -97,4 +97,9 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry { // No-op for detached cache entry. return true; } + + /** {@inheritDoc} */ + @Override public int partition() { + return cctx.affinity().partition(key); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b66d2d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 410baf8..ed37059 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -953,7 +953,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { * @return {@code True} if 'fast finish' path can be used for transaction completion. */ private boolean fastFinish() { - return writeMap().isEmpty() && ((optimistic() && !serializable()) || readMap().isEmpty()); + return writeMap().isEmpty() && (optimistic() || readMap().isEmpty()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0b66d2d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index fe69536..6d21dcf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -2269,7 +2269,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig addInvokeResult(e, cacheVal, ret, ver); } else { - boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter); + boolean success; + + if (hasFilters) { + success = isAll(e.context(), key, cacheVal, filter); + + if (!success) + e.value(cacheVal, false, false); + } + else + success = true; ret.set(cacheCtx, cacheVal, success, keepBinary); } @@ -2411,25 +2420,43 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig else old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); + final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : + entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; + if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { ret.set(cacheCtx, old, false, keepBinary); if (!readCommitted()) { - // Enlist failed filters as reads for non-read-committed mode, - // so future ops will get the same values. - txEntry = addEntry(READ, - old, - null, - null, - entry, - null, - CU.empty0(), - false, - -1L, - -1L, - null, - skipStore, - keepBinary); + if (optimistic() && serializable()) { + txEntry = addEntry(op, + old, + entryProcessor, + invokeArgs, + entry, + expiryPlc, + filter, + true, + drTtl, + drExpireTime, + drVer, + skipStore, + keepBinary); + } + else { + txEntry = addEntry(READ, + old, + null, + null, + entry, + null, + CU.empty0(), + false, + -1L, + -1L, + null, + skipStore, + keepBinary); + } txEntry.markValid(); @@ -2446,9 +2473,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig break; // While. } - final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : - entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; - txEntry = addEntry(op, cacheCtx.toCacheObject(val), entryProcessor, http://git-wip-us.apache.org/repos/asf/ignite/blob/0b66d2d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java index 34480a2..2eab6d9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java @@ -40,6 +40,7 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionOptimisticException; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; @@ -52,6 +53,7 @@ import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; /** * Test getEntry and getEntries methods. @@ -247,6 +249,10 @@ public abstract class CacheGetEntryAbstractTest extends GridCacheAbstractSelfTes testConcurrentTx(cache, PESSIMISTIC, REPEATABLE_READ, oneEntry); testConcurrentTx(cache, PESSIMISTIC, READ_COMMITTED, oneEntry); + + testConcurrentOptimisticTxGet(cache, REPEATABLE_READ); + testConcurrentOptimisticTxGet(cache, READ_COMMITTED); + testConcurrentOptimisticTxGet(cache, SERIALIZABLE); } } finally { @@ -256,6 +262,38 @@ public abstract class CacheGetEntryAbstractTest extends GridCacheAbstractSelfTes /** * @param cache Cache. + * @param txIsolation Transaction isolation. + * @throws Exception If failed. + */ + private void testConcurrentOptimisticTxGet(final IgniteCache<Integer, TestValue> cache, + final TransactionIsolation txIsolation) throws Exception { + GridTestUtils.runMultiThreaded(new Runnable() { + @Override public void run() { + final int key = 42; + + IgniteTransactions txs = grid(0).transactions(); + + cache.put(key, new TestValue(key)); + + long stopTime = System.currentTimeMillis() + 3000; + + while (System.currentTimeMillis() < stopTime) { + try (Transaction tx = txs.txStart(OPTIMISTIC, txIsolation)) { + cache.get(key); + + tx.commit(); + } + catch (TransactionOptimisticException e) { + assertTrue("Should not throw optimistic exception in only read TX. Tx isolation: " + + txIsolation, false); + } + } + } + }, 10, "tx-thread"); + } + + /** + * @param cache Cache. * @param txConcurrency Transaction concurrency. * @param txIsolation Transaction isolation. * @param oneEntry If {@code true} then single entry is tested. http://git-wip-us.apache.org/repos/asf/ignite/blob/0b66d2d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutIfAbsentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutIfAbsentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutIfAbsentTest.java new file mode 100644 index 0000000..6ed9049 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutIfAbsentTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class CachePutIfAbsentTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int SRVS = 4; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(SRVS); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60_000; + } + + /** + * @return Cache configurations. + */ + private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() { + List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>(); + + ccfgs.add(cacheConfiguration(REPLICATED, FULL_SYNC, 0)); + + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2)); + + return ccfgs; + } + + /** + * @param cacheMode Cache mode. + * @param syncMode Write synchronization mode. + * @param backups Number of backups. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration( + CacheMode cacheMode, + CacheWriteSynchronizationMode syncMode, + int backups) { + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(syncMode); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictGetAndPutIfAbsent() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 10; i++) { + Integer key = rnd.nextInt(10_000); + + cache.put(key, 2); + + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + try (Transaction tx = txs.txStart(concurrency, isolation)) { + Object old = cache.getAndPutIfAbsent(key, 3); + + assertEquals(2, old); + + Object val = cache.get(key); + + assertEquals(2, val); + + tx.commit(); + } + + assertEquals((Integer)2, cache.get(key)); + } + } + } + } + finally { + ignite0.destroyCache(ccfg.getName()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0b66d2d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index 3d4f850..6a73f79 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -709,6 +709,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @param noVal If {@code true} there is no cache value when read in tx. + * @param needVer If {@code true} then gets entry, otherwise just value. * @throws Exception If failed. */ private void txConflictRead(boolean noVal, boolean needVer) throws Exception { @@ -735,28 +736,21 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { cache.put(key, expVal); } - try { - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - if (needVer) { - CacheEntry<Integer, Integer> val = cache.getEntry(key); - - assertEquals(expVal, val == null ? null : val.getValue()); - } - else { - Integer val = cache.get(key); - - assertEquals(expVal, val); - } + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + if (needVer) { + CacheEntry<Integer, Integer> val = cache.getEntry(key); - updateKey(cache, key, 1); + assertEquals(expVal, val == null ? null : val.getValue()); + } + else { + Integer val = cache.get(key); - tx.commit(); + assertEquals(expVal, val); } - fail(); - } - catch (TransactionOptimisticException e) { - log.info("Expected exception: " + e); + updateKey(cache, key, 1); + + tx.commit(); } checkValue(key, 1, cache.getName()); @@ -2625,21 +2619,14 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { cache0.put(key2, -1); cache0.put(key3, -1); - try { - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - cache.get(key1); - cache.get(key2); - cache.get(key3); - - updateKey(near ? cache : cache0, key2, -2); + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.get(key1); + cache.get(key2); + cache.get(key3); - tx.commit(); - } + updateKey(near ? cache : cache0, key2, -2); - fail(); - } - catch (TransactionOptimisticException e) { - log.info("Expected exception: " + e); + tx.commit(); } checkValue(key1, -1, cacheName); @@ -2890,23 +2877,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { checkValue(key1, newVal, CACHE1); checkValue(key2, newVal, CACHE2); - try { - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Object val1 = cache1.get(key1); - Object val2 = cache2.get(key2); - - assertEquals(newVal, val1); - assertEquals(newVal, val2); + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val1 = cache1.get(key1); + Object val2 = cache2.get(key2); - updateKey(cache2, key2, newVal); + assertEquals(newVal, val1); + assertEquals(newVal, val2); - tx.commit(); - } + updateKey(cache2, key2, newVal); - fail(); - } - catch (TransactionOptimisticException e) { - log.info("Expected exception: " + e); + tx.commit(); } try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0b66d2d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java index 35b1405..f9c6683 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java @@ -173,7 +173,7 @@ public class CacheTxFastFinishTest extends GridCommonAbstractTest { try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { cache.get(i); - checkNormalTxFinish(tx, commit); + checkFastTxFinish(tx, commit); } try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0b66d2d7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index c494e73..2b446bb 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.CacheGetEntryPessimisticReadC import org.apache.ignite.internal.processors.cache.CacheGetEntryPessimisticRepeatableReadSeltTest; import org.apache.ignite.internal.processors.cache.CacheGetEntryPessimisticSerializableSeltTest; import org.apache.ignite.internal.processors.cache.CacheOffheapMapEntrySelfTest; +import org.apache.ignite.internal.processors.cache.CachePutIfAbsentTest; import org.apache.ignite.internal.processors.cache.CacheReadThroughAtomicRestartSelfTest; import org.apache.ignite.internal.processors.cache.CacheReadThroughLocalAtomicRestartSelfTest; import org.apache.ignite.internal.processors.cache.CacheReadThroughLocalRestartSelfTest; @@ -326,6 +327,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(IgniteCachePrimarySyncTest.class); suite.addTestSuite(IgniteTxCachePrimarySyncTest.class); suite.addTestSuite(IgniteTxCacheWriteSynchronizationModesMultithreadedTest.class); + suite.addTestSuite(CachePutIfAbsentTest.class); suite.addTestSuite(MarshallerCacheJobRunNodeRestartTest.class);