http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 83bb81c..af74996 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -20,8 +20,10 @@ package org.apache.ignite.internal.processors.cache.mvcc; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -64,7 +66,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryCntr; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx; @@ -86,9 +87,7 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; -import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; -import org.apache.ignite.transactions.TransactionOptimisticException; import org.jetbrains.annotations.Nullable; import org.junit.Assert; @@ -98,11 +97,8 @@ import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstract import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SCAN; import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT; import static org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker.MVCC_TRACKER_ID_NA; -import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; -import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; /** * TODO IGNITE-6739: tests reload @@ -117,40 +113,104 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { } /** - * @throws Exception If failed. + * @throws Exception if failed. */ - public void testPessimisticTx1() throws Exception { - checkTx1(PESSIMISTIC, REPEATABLE_READ); - } + public void testEmptyTx() throws Exception { + Ignite node = startGrids(2); - /** - * @throws Exception If failed. - */ - public void testOptimisticSerializableTx1() throws Exception { - checkTx1(OPTIMISTIC, SERIALIZABLE); - } + IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, DFLT_PARTITION_COUNT)); - /** - * @throws Exception If failed. - */ - public void testOptimisticRepeatableReadTx1() throws Exception { - checkTx1(OPTIMISTIC, REPEATABLE_READ); + cache.putAll(Collections.emptyMap()); + + IgniteTransactions txs = node.transactions(); + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + tx.commit(); + } + finally { + stopAllGrids(); + } } /** - * @throws Exception If failed. + * @throws Exception if failed. */ - public void testOptimisticReadCommittedTx1() throws Exception { - checkTx1(OPTIMISTIC, READ_COMMITTED); + public void testImplicitTxOps() throws Exception { + checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() { + @Override public void apply(IgniteCache<Integer, Integer> cache) { + try { + List<Integer> keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer val = cache.get(key); + + assertNull(val); + + assertFalse(cache.containsKey(key)); + + cache.put(key, -1); + + val = (Integer)checkAndGet(true, cache, key, GET, SCAN); + + assertEquals(Integer.valueOf(-1), val); + + assertTrue(cache.containsKey(key)); + + cache.put(key, key); + + val = (Integer)checkAndGet(true, cache, key, GET, SCAN); + + assertEquals(key, val); + + cache.remove(key); + + val = cache.get(key); + + assertNull(val); + + val = (Integer)checkAndGet(false, cache, key, SCAN, GET); + + assertNull(val); + + assertTrue(cache.putIfAbsent(key, key)); + + val = (Integer)checkAndGet(true, cache, key, GET, SCAN); + + assertEquals(key, val); + + val = cache.getAndReplace(key, -1); + + assertEquals(key, val); + + val = (Integer)checkAndGet(true, cache, key, GET, SCAN); + + assertEquals(Integer.valueOf(-1), val); + + val = cache.getAndRemove(key); + + assertEquals(Integer.valueOf(-1), val); + + val = cache.get(key); + + assertNull(val); + + val = (Integer)checkAndGet(false, cache, key, SCAN, GET); + + assertNull(val); + } + } + catch (Exception e) { + throw new IgniteException(e); + } + } + }); } /** - * @param concurrency Transaction concurrency. - * @param isolation Transaction isolation. * @throws Exception If failed. */ - private void checkTx1(final TransactionConcurrency concurrency, final TransactionIsolation isolation) - throws Exception { + public void testPessimisticTx1() throws Exception { checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() { @Override public void apply(IgniteCache<Integer, Integer> cache) { try { @@ -161,7 +221,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { for (Integer key : keys) { log.info("Test key: " + key); - try (Transaction tx = txs.txStart(concurrency, isolation)) { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { Integer val = cache.get(key); assertNull(val); @@ -191,23 +251,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { * @throws Exception If failed. */ public void testPessimisticTx2() throws Exception { - checkTx2(PESSIMISTIC, REPEATABLE_READ); - } - - /** - * @throws Exception If failed. - */ - public void testOptimisticSerializableTx2() throws Exception { - checkTx2(OPTIMISTIC, SERIALIZABLE); - } - - /** - * @param concurrency Transaction concurrency. - * @param isolation Transaction isolation. - * @throws Exception If failed. - */ - private void checkTx2(final TransactionConcurrency concurrency, final TransactionIsolation isolation) - throws Exception { checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() { @Override public void apply(IgniteCache<Integer, Integer> cache) { try { @@ -218,7 +261,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { for (Integer key : keys) { log.info("Test key: " + key); - try (Transaction tx = txs.txStart(concurrency, isolation)) { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.put(key, key); cache.put(key + 1, key + 1); @@ -394,7 +437,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { keys.add(rnd.nextInt()); if (tx) { - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.getAll(keys); if (rnd.nextBoolean()) @@ -417,8 +460,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { * @throws Exception If failed. */ public void testTxReadIsolationSimple() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-7764"); - Ignite srv0 = startGrids(4); client = true; @@ -453,37 +494,23 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { @Override public Void call() throws Exception { IgniteCache<Object, Object> cache = node.cache(DEFAULT_CACHE_NAME); - try (Transaction tx = node.transactions().txStart(OPTIMISTIC, isolation)) { + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { assertEquals(0, checkAndGet(false, cache, 0, SCAN, GET)); readStart.countDown(); assertTrue(readProceed.await(5, TimeUnit.SECONDS)); - if (isolation == READ_COMMITTED) { - assertNull(checkAndGet(false, cache, 1, SCAN, GET)); - - assertEquals(1, checkAndGet(false, cache, 2, SCAN, GET)); - - Map<Object, Object> res = checkAndGetAll(false, cache, startVals.keySet(), SCAN, GET); - - assertEquals(startVals.size() / 2, res.size()); + assertEquals(0, checkAndGet(true, cache, 1, GET, SCAN)); - for (Map.Entry<Object, Object> e : res.entrySet()) - assertEquals("Invalid value for key: " + e.getKey(), 1, e.getValue()); - } - else { - assertEquals(0, checkAndGet(true, cache, 1, GET, SCAN)); - - assertEquals(0, checkAndGet(true, cache, 2, GET, SCAN)); + assertEquals(0, checkAndGet(true, cache, 2, GET, SCAN)); - Map<Object, Object> res = checkAndGetAll(true, cache, startVals.keySet(), GET, SCAN); + Map<Object, Object> res = checkAndGetAll(true, cache, startVals.keySet(), GET, SCAN); - assertEquals(startVals.size(), res.size()); + assertEquals(startVals.size(), res.size()); - for (Map.Entry<Object, Object> e : res.entrySet()) - assertEquals("Invalid value for key: " + e.getKey(), 0, e.getValue()); - } + for (Map.Entry<Object, Object> e : res.entrySet()) + assertEquals("Invalid value for key: " + e.getKey(), 0, e.getValue()); tx.rollback(); } @@ -589,8 +616,8 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { } /** - * @throws Exception If failed. * @param largeKeys {@code True} to use large keys (not fitting in single page). + * @throws Exception If failed. */ private void putRemoveSimple(boolean largeKeys) throws Exception { Ignite node = startGrid(0); @@ -771,6 +798,8 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { * @throws Exception If failed. */ public void testWaitPreviousTxAck() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9470"); + testSpi = true; startGrid(0); @@ -1032,11 +1061,10 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { @Override public Void call() throws Exception { IgniteCache<Integer, Integer> cache = client.cache(srvCache.getName()); - Map<Integer, Integer> vals; if (inTx) { - try (Transaction tx = client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { vals = checkAndGetAll(false, cache, F.asSet(key1, key2), SCAN, GET); tx.rollback(); @@ -1085,6 +1113,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { * @throws Exception If failed. */ public void testCleanupWaitsForGet2() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9470"); /* Simulate case when there are two active transactions modifying the same key (it is possible if key lock is released but ack message is delayed), and at this moment @@ -1430,8 +1459,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { putAllGetAll(RestartMode.RESTART_RND_SRV, 4, 2, 1, 64, null, SCAN, PUT); } - - /** * @throws Exception If failed. */ @@ -1576,79 +1603,35 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { * @throws Exception If failed. */ public void testPessimisticTxGetAllReadsSnapshot_SingleNode_SinglePartition() throws Exception { - txReadsSnapshot(1, 0, 0, 1, true, GET); + txReadsSnapshot(1, 0, 0, 1, GET); } /** * @throws Exception If failed. */ public void testPessimisticTxGetAllReadsSnapshot_ClientServer() throws Exception { - txReadsSnapshot(4, 2, 1, 64, true, GET); + txReadsSnapshot(4, 2, 1, 64, GET); } /** * @throws Exception If failed. */ - public void testOptimisticTxGetAllReadsSnapshot_SingleNode() throws Exception { - txReadsSnapshot(1, 0, 0, 64, false, GET); + public void testPessimisticTxScanReadsSnapshot_SingleNode_SinglePartition() throws Exception { + txReadsSnapshot(1, 0, 0, 1, SCAN); } /** * @throws Exception If failed. */ - public void testOptimisticTxGetAllReadsSnapshot_SingleNode_SinglePartition() throws Exception { - txReadsSnapshot(1, 0, 0, 1, false, GET); + public void testPessimisticTxScanReadsSnapshot_ClientServer() throws Exception { + txReadsSnapshot(4, 2, 1, 64, SCAN); } /** - * @throws Exception If failed. - */ - public void testOptimisticTxGetAllReadsSnapshot_ClientServer() throws Exception { - txReadsSnapshot(4, 2, 1, 64, false, GET); - } - -// TODO: IGNITE-7371 -// /** -// * @throws Exception If failed. -// */ -// public void testPessimisticTxScanReadsSnapshot_SingleNode_SinglePartition() throws Exception { -// txReadsSnapshot(1, 0, 0, 1, true, SCAN); -// } -// -// /** -// * @throws Exception If failed. -// */ -// public void testPessimisticTxScanReadsSnapshot_ClientServer() throws Exception { -// txReadsSnapshot(4, 2, 1, 64, true, SCAN); -// } -// -// /** -// * @throws Exception If failed. -// */ -// public void testOptimisticTxScanReadsSnapshot_SingleNode() throws Exception { -// txReadsSnapshot(1, 0, 0, 64, false, SCAN); -// } -// -// /** -// * @throws Exception If failed. -// */ -// public void testOptimisticTxScanReadsSnapshot_SingleNode_SinglePartition() throws Exception { -// txReadsSnapshot(1, 0, 0, 1, false, SCAN); -// } -// -// /** -// * @throws Exception If failed. -// */ -// public void testOptimisticTxScanReadsSnapshot_ClientServer() throws Exception { -// txReadsSnapshot(4, 2, 1, 64, false, SCAN); -// } - - /** * @param srvs Number of server nodes. * @param clients Number of client nodes. * @param cacheBackups Number of cache backups. * @param cacheParts Number of cache partitions. - * @param pessimistic If {@code true} uses pessimistic tx, otherwise optimistic. * @param readMode Read mode. * @throws Exception If failed. */ @@ -1657,9 +1640,10 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { final int clients, int cacheBackups, int cacheParts, - final boolean pessimistic, ReadMode readMode ) throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9470"); + final int ACCOUNTS = 20; final int ACCOUNT_START_VAL = 1000; @@ -1668,18 +1652,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { final int readers = 4; - final TransactionConcurrency concurrency; - final TransactionIsolation isolation; - - if (pessimistic) { - concurrency = PESSIMISTIC; - isolation = REPEATABLE_READ; - } - else { - concurrency = OPTIMISTIC; - isolation = SERIALIZABLE; - } - final IgniteInClosure<IgniteCache<Object, Object>> init = new IgniteInClosure<IgniteCache<Object, Object>>() { @Override public void apply(IgniteCache<Object, Object> cache) { final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); @@ -1689,7 +1661,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { for (int i = 0; i < ACCOUNTS; i++) accounts.put(i, new MvccTestAccount(ACCOUNT_START_VAL, 1)); - try (Transaction tx = txs.txStart(concurrency, isolation)) { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.putAll(accounts); tx.commit(); @@ -1718,7 +1690,13 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { while (id1.equals(id2)) id2 = rnd.nextInt(ACCOUNTS); - TreeSet<Integer> keys = new TreeSet<>(); + if(id1 > id2) { + int tmp = id1; + id1 = id2; + id2 = tmp; + } + + Set<Integer> keys = new HashSet<>(); keys.add(id1); keys.add(id2); @@ -1763,88 +1741,36 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { Map<Integer, MvccTestAccount> accounts = new HashMap<>(); - if (pessimistic) { - try (Transaction tx = txs.txStart(concurrency, isolation)) { - int remaining = ACCOUNTS; - - do { - int readCnt = rnd.nextInt(remaining) + 1; - - Set<Integer> readKeys = new TreeSet<>(); - - for (int i = 0; i < readCnt; i++) - readKeys.add(accounts.size() + i); + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + int remaining = ACCOUNTS; - Map<Integer, MvccTestAccount> readRes = - checkAndGetAll(false, cache.cache, readKeys, readMode); + do { + int readCnt = rnd.nextInt(remaining) + 1; - assertEquals(readCnt, readRes.size()); + Set<Integer> readKeys = new TreeSet<>(); - accounts.putAll(readRes); + for (int i = 0; i < readCnt; i++) + readKeys.add(accounts.size() + i); - remaining = ACCOUNTS - accounts.size(); - } - while (remaining > 0); + Map<Integer, MvccTestAccount> readRes = + checkAndGetAll(false, cache.cache, readKeys, readMode); - validateSum(accounts); + assertEquals(readCnt, readRes.size()); - tx.commit(); + accounts.putAll(readRes); - cnt++; + remaining = ACCOUNTS - accounts.size(); } - finally { - cache.readUnlock(); - } - } - else { - try (Transaction tx = txs.txStart(concurrency, isolation)) { - int remaining = ACCOUNTS; - - do { - int readCnt = rnd.nextInt(remaining) + 1; - - if (rnd.nextInt(3) == 0) { - for (int i = 0; i < readCnt; i++) { - Integer key = rnd.nextInt(ACCOUNTS); - - MvccTestAccount account = - (MvccTestAccount)checkAndGet(false, cache.cache, key, readMode); - - assertNotNull(account); - - accounts.put(key, account); - } - } - else { - Set<Integer> readKeys = new LinkedHashSet<>(); - - for (int i = 0; i < readCnt; i++) - readKeys.add(rnd.nextInt(ACCOUNTS)); - - Map<Integer, MvccTestAccount> readRes = - checkAndGetAll(false, cache.cache, readKeys, readMode); + while (remaining > 0); - assertEquals(readKeys.size(), readRes.size()); - - accounts.putAll(readRes); - } + validateSum(accounts); - remaining = ACCOUNTS - accounts.size(); - } - while (remaining > 0); - - validateSum(accounts); - - cnt++; + tx.commit(); - tx.commit(); - } - catch (TransactionOptimisticException ignore) { - // No-op. - } - finally { - cache.readUnlock(); - } + cnt++; + } + finally { + cache.readUnlock(); } } @@ -1954,9 +1880,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { int cacheBackups, int cacheParts, ReadMode readMode - ) - throws Exception - { + ) throws Exception { final int writers = 4; final int readers = 4; @@ -2103,50 +2027,26 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { ThreadLocalRandom rnd = ThreadLocalRandom.current(); Map<Integer, Integer> map = new TreeMap<>(); + Set<Integer> keys = new LinkedHashSet(); int cnt = 0; while (!stop.get()) { - int keys = rnd.nextInt(32) + 1; + int keysCnt = rnd.nextInt(32) + 1; - while (map.size() < keys) - map.put(rnd.nextInt(KEYS), cnt); + while (keys.size() < keysCnt) { + int key = rnd.nextInt(KEYS); + + if(keys.add(key)) + map.put(key, cnt); + } TestCache<Integer, Integer> cache = randomCache(caches, rnd); try { IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); - TransactionConcurrency concurrency; - TransactionIsolation isolation; - - switch (rnd.nextInt(3)) { - case 0: { - concurrency = PESSIMISTIC; - isolation = REPEATABLE_READ; - - break; - } - case 1: { - concurrency = OPTIMISTIC; - isolation = REPEATABLE_READ; - - break; - } - case 2: { - concurrency = OPTIMISTIC; - isolation = SERIALIZABLE; - - break; - } - default: { - fail(); - - return; - } - } - - try (Transaction tx = txs.txStart(concurrency, isolation)) { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { if (rnd.nextBoolean()) { Map<Integer, Integer> res = checkAndGetAll(false, cache.cache, map.keySet(), rnd.nextBoolean() ? GET : SCAN); @@ -2158,18 +2058,16 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { tx.commit(); } - catch (TransactionOptimisticException e) { - assertEquals(SERIALIZABLE, isolation); - } catch (Exception e) { Assert.assertTrue("Unexpected error: " + e, X.hasCause(e, ClusterTopologyException.class)); } } finally { cache.readUnlock(); - } - map.clear(); + keys.clear(); + map.clear(); + } cnt++; } @@ -2282,7 +2180,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { Ignite srv0 = startGrid(0); - IgniteCache<Integer, Integer> cache = (IgniteCache)srv0.createCache( + IgniteCache<Integer, Integer> cache = (IgniteCache)srv0.createCache( cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT)); Map<Integer, Integer> map; @@ -2413,32 +2311,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { * @throws Exception If failed. */ public void testTxPrepareFailureSimplePessimisticTx() throws Exception { - txPrepareFailureSimple(PESSIMISTIC, REPEATABLE_READ); - } - - /** - * @throws Exception If failed. - */ - public void testTxPrepareFailureSimpleSerializableTx() throws Exception { - txPrepareFailureSimple(OPTIMISTIC, SERIALIZABLE); - } - - /** - * @throws Exception If failed. - */ - public void testTxPrepareFailureSimpleOptimisticTx() throws Exception { - txPrepareFailureSimple(OPTIMISTIC, REPEATABLE_READ); - } - - /** - * @param concurrency Transaction concurrency. - * @param isolation Transaction isolation. - * @throws Exception If failed. - */ - private void txPrepareFailureSimple( - final TransactionConcurrency concurrency, - final TransactionIsolation isolation - ) throws Exception { testSpi = true; startGrids(3); @@ -2460,7 +2332,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { @Override public Object call() throws Exception { try { - try (Transaction tx = client.transactions().txStart(concurrency, isolation)) { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.put(key1, 1); cache.put(key2, 2); @@ -2477,7 +2349,16 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { } }, "tx-thread"); - srv1Spi.waitForBlocked(); + GridTestUtils.waitForCondition( + new GridAbsPredicate() { + @Override public boolean apply() { + return srv1Spi.hasBlockedMessages() || fut.isDone() && fut.error() != null; + } + }, 10_000 + ); + + if (fut.isDone()) + fut.get(); // Just to fail with future error. assertFalse(fut.isDone()); @@ -2488,7 +2369,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { assertNull(cache.get(key1)); assertNull(cache.get(key2)); - try (Transaction tx = client.transactions().txStart(concurrency, isolation)) { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.put(key1, 1); cache.put(key2, 2); @@ -2502,64 +2383,9 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { /** * @throws Exception If failed. */ - public void testSerializableTxRemap() throws Exception { - testSpi = true; - - startGrids(2); - - client = true; - - final Ignite client = startGrid(2); - - final IgniteCache cache = client.createCache( - cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT)); - - final Map<Object, Object> vals = new HashMap<>(); - - for (int i = 0; i < 100; i++) - vals.put(i, i); - - TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(ignite(2)); - - clientSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { - @Override public boolean apply(ClusterNode node, Message msg) { - return msg instanceof GridNearTxPrepareRequest; - } - }); - - IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { - @Override public Object call() throws Exception { - try (Transaction tx = client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { - cache.putAll(vals); - - tx.commit(); - } - - return null; - } - }, "tx-thread"); - - clientSpi.waitForBlocked(2); - - this.client = false; - - startGrid(3); - - assertFalse(fut.isDone()); - - clientSpi.stopBlock(); - - fut.get(); - - for (Ignite node : G.allGrids()) - checkValues(vals, node.cache(cache.getName())); - } - - - /** - * @throws Exception If failed. - */ public void testMvccCoordinatorChangeSimple() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9722"); + Ignite srv0 = startGrid(0); final List<String> cacheNames = new ArrayList<>(); @@ -2624,19 +2450,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { for (int i = 0; i < 10; i++) vals.put(i, val); - TransactionConcurrency concurrency; - TransactionIsolation isolation; - - if (ThreadLocalRandom.current().nextBoolean()) { - concurrency = PESSIMISTIC; - isolation = REPEATABLE_READ; - } - else { - concurrency = OPTIMISTIC; - isolation = SERIALIZABLE; - } - - try (Transaction tx = putNode.transactions().txStart(concurrency, isolation)) { + try (Transaction tx = putNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { for (String cacheName : cacheNames) putNode.cache(cacheName).putAll(vals); @@ -2906,8 +2720,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { * @throws Exception If failed. */ public void testUpdate_N_Objects_ClientServer_Backups1_Scan() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-7764"); - int[] nValues = {3, 5, 10}; for (int n : nValues) { @@ -2924,7 +2736,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { doImplicitPartsScanTest(1, 0, 0, 1, 10_000); } - /** * @throws Exception If failed. */ @@ -2967,6 +2778,8 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { int cacheBackups, int cacheParts, long time) throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9470"); + final int KEYS_PER_PART = 20; final int writers = 4; @@ -3033,6 +2846,12 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { while (k1.equals(k2)) k2 = partKeys.get(rnd.nextInt(KEYS_PER_PART)); + if(k1 > k2) { + int tmp = k1; + k1 = k2; + k2 = tmp; + } + TreeSet<Integer> keys = new TreeSet<>(); keys.add(k1); @@ -3074,7 +2893,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { ScanQuery<Integer, MvccTestAccount> qry = new ScanQuery<>(part); - List<Cache.Entry<Integer, MvccTestAccount>> res = cache.cache.query(qry).getAll(); + List<Cache.Entry<Integer, MvccTestAccount>> res = cache.cache.query(qry).getAll(); int sum = 0; @@ -3100,7 +2919,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { try { ScanQuery<Integer, MvccTestAccount> qry = new ScanQuery<>(); - List<Cache.Entry<Integer, MvccTestAccount>> res = cache.cache.query(qry).getAll(); + List<Cache.Entry<Integer, MvccTestAccount>> res = cache.cache.query(qry).getAll(); int sum = 0; @@ -3140,8 +2959,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { * @throws IgniteCheckedException If failed. */ public void testSize() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-9451"); - Ignite node = startGrid(0); IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1)); @@ -3150,6 +2967,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { final int KEYS = 10; + // Initial put. for (int i = 0; i < KEYS; i++) { final Integer key = i; @@ -3162,6 +2980,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { assertEquals(i + 1, cache.size()); } + // Update. for (int i = 0; i < KEYS; i++) { final Integer key = i; @@ -3176,6 +2995,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { int size = KEYS; + // Remove. for (int i = 0; i < KEYS; i++) { if (i % 2 == 0) { final Integer key = i; @@ -3207,6 +3027,48 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { } } + // Check rollback create. + for (int i = 0; i < KEYS; i++) { + if (i % 2 == 0) { + final Integer key = i; + + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, i); + + tx.rollback(); + } + + assertEquals(size, cache.size()); + } + } + + // Check rollback update. + for (int i = 0; i < KEYS; i++) { + final Integer key = i; + + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, -1); + + tx.rollback(); + } + + assertEquals(size, cache.size()); + } + + // Check rollback remove. + for (int i = 0; i < KEYS; i++) { + final Integer key = i; + + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.remove(key); + + tx.rollback(); + } + + assertEquals(size, cache.size()); + } + + // Restore original state. for (int i = 0; i < KEYS; i++) { if (i % 2 == 0) { final Integer key = i; @@ -3222,6 +3084,10 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { assertEquals(size, cache.size()); } } + + // Check state. + for (int i = 0; i < KEYS; i++) + assertEquals(i, cache.get(i)); } /** @@ -3467,7 +3333,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { * @param readModes Read modes to check. * @return Value. */ - private Object checkAndGet(boolean inTx, IgniteCache cache, Object key, ReadMode ... readModes) { + private Object checkAndGet(boolean inTx, IgniteCache cache, Object key, ReadMode... readModes) { assert readModes != null && readModes.length > 0; if (inTx) @@ -3531,13 +3397,12 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { } } - /** * Checks values obtained with different read modes. * And returns value in case of it's equality for all read modes. * Do not use in tests with writers contention. * - * // TODO remove inTx flag in IGNITE-7764 + * // TODO remove inTx flag in IGNITE-6938 * @param inTx Flag whether current read is inside transaction. * This is because reads can't see writes made in current transaction. * @param cache Cache. @@ -3545,7 +3410,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { * @param readModes Read modes to check. * @return Value. */ - private Map checkAndGetAll(boolean inTx, IgniteCache cache, Set keys, ReadMode ... readModes) { + private Map checkAndGetAll(boolean inTx, IgniteCache cache, Set keys, ReadMode... readModes) { assert readModes != null && readModes.length > 0; if (inTx) @@ -3571,11 +3436,10 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { return prevVal; } - /** * Reads value from cache for the given key using given read mode. * - * // TODO IGNITE-7764 remove inTx flag + * // TODO IGNITE-6938 remove inTx flag * // TODO IGNITE-6739 add SQL-get support "select _key, _val from cache where _key in ... keySet" * @param inTx Flag whether current read is inside transaction. * This is because reads can't see writes made in current transaction.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java index d7948bd..381d9a9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java @@ -76,6 +76,11 @@ public class DataStreamProcessorMvccSelfTest extends DataStreamProcessorSelfTest } /** {@inheritDoc} */ + @Override public void testFlushTimeout() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9321"); + } + + /** {@inheritDoc} */ @Override public void testLocal() throws Exception { // Do not check local caches with MVCC enabled. } http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariations.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariations.java b/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariations.java index ca22b56..1b85d33 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariations.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariations.java @@ -42,7 +42,6 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.configuration.TopologyValidator; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.MapCacheStoreStrategy; -import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller; import static org.apache.ignite.internal.util.lang.GridFunc.asArray; http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DhtResultSetEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DhtResultSetEnlistFuture.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DhtResultSetEnlistFuture.java index 1d382f7..7f22107 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DhtResultSetEnlistFuture.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DhtResultSetEnlistFuture.java @@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.query.h2; import java.sql.ResultSet; import java.util.UUID; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxAbstractEnlistFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryAbstractEnlistFuture; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.UpdateSourceIterator; @@ -31,7 +31,7 @@ import org.jetbrains.annotations.Nullable; /** * */ -public class DhtResultSetEnlistFuture extends GridDhtTxAbstractEnlistFuture implements ResultSetEnlistFuture { +public class DhtResultSetEnlistFuture extends GridDhtTxQueryAbstractEnlistFuture implements ResultSetEnlistFuture { /** */ private ResultSet rs; http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/NearResultSetEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/NearResultSetEnlistFuture.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/NearResultSetEnlistFuture.java index 1856430..968a856 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/NearResultSetEnlistFuture.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/NearResultSetEnlistFuture.java @@ -32,9 +32,6 @@ import org.jetbrains.annotations.Nullable; * */ public class NearResultSetEnlistFuture extends GridNearTxQueryResultsEnlistFuture implements ResultSetEnlistFuture { - /** */ - private static final long serialVersionUID = 877907044489718378L; - /** * @param nearNodeId Near node ID. * @param nearLockVer Near lock version. http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java index 8c6f407..76f8013 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java @@ -132,6 +132,8 @@ public class SqlTransactionsCommandsWithMvccEnabledSelfTest extends AbstractSche * Test that attempting to perform various SQL operations within non SQL transaction yields an exception. */ public void testSqlOperationsWithinNonSqlTransaction() { + fail("https://issues.apache.org/jira/browse/IGNITE-9470"); + assertSqlOperationWithinNonSqlTransactionThrows("COMMIT"); assertSqlOperationWithinNonSqlTransactionThrows("ROLLBACK"); @@ -230,8 +232,8 @@ public class SqlTransactionsCommandsWithMvccEnabledSelfTest extends AbstractSche return null; } - }, IgniteCheckedException.class, - "SQL queries and cache operations may not be used in the same transaction."); + }, UnsupportedOperationException.class, + "operations are not supported on transactional caches when MVCC is enabled."); } finally { try { @@ -266,78 +268,6 @@ public class SqlTransactionsCommandsWithMvccEnabledSelfTest extends AbstractSche */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") public void testCacheOperationsFromSqlTransaction() { - checkCacheOperationThrows("get", 1); - - checkCacheOperationThrows("getAsync", 1); - - checkCacheOperationThrows("getEntry", 1); - - checkCacheOperationThrows("getEntryAsync", 1); - - checkCacheOperationThrows("getAndPut", 1, 1); - - checkCacheOperationThrows("getAndPutAsync", 1, 1); - - checkCacheOperationThrows("getAndPutIfAbsent", 1, 1); - - checkCacheOperationThrows("getAndPutIfAbsentAsync", 1, 1); - - checkCacheOperationThrows("getAndReplace", 1, 1); - - checkCacheOperationThrows("getAndReplaceAsync", 1, 1); - - checkCacheOperationThrows("getAndRemove", 1); - - checkCacheOperationThrows("getAndRemoveAsync", 1); - - checkCacheOperationThrows("containsKey", 1); - - checkCacheOperationThrows("containsKeyAsync", 1); - - checkCacheOperationThrows("put", 1, 1); - - checkCacheOperationThrows("putAsync", 1, 1); - - checkCacheOperationThrows("putIfAbsent", 1, 1); - - checkCacheOperationThrows("putIfAbsentAsync", 1, 1); - - checkCacheOperationThrows("remove", 1); - - checkCacheOperationThrows("removeAsync", 1); - - checkCacheOperationThrows("remove", 1, 1); - - checkCacheOperationThrows("removeAsync", 1, 1); - - checkCacheOperationThrows("replace", 1, 1); - - checkCacheOperationThrows("replaceAsync", 1, 1); - - checkCacheOperationThrows("replace", 1, 1, 1); - - checkCacheOperationThrows("replaceAsync", 1, 1, 1); - - checkCacheOperationThrows("getAll", new HashSet<>(Arrays.asList(1, 2))); - - checkCacheOperationThrows("containsKeys", new HashSet<>(Arrays.asList(1, 2))); - - checkCacheOperationThrows("getEntries", new HashSet<>(Arrays.asList(1, 2))); - - checkCacheOperationThrows("putAll", Collections.singletonMap(1, 1)); - - checkCacheOperationThrows("removeAll", new HashSet<>(Arrays.asList(1, 2))); - - checkCacheOperationThrows("getAllAsync", new HashSet<>(Arrays.asList(1, 2))); - - checkCacheOperationThrows("containsKeysAsync", new HashSet<>(Arrays.asList(1, 2))); - - checkCacheOperationThrows("getEntriesAsync", new HashSet<>(Arrays.asList(1, 2))); - - checkCacheOperationThrows("putAllAsync", Collections.singletonMap(1, 1)); - - checkCacheOperationThrows("removeAllAsync", new HashSet<>(Arrays.asList(1, 2))); - checkCacheOperationThrows("invoke", 1, ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); checkCacheOperationThrows("invoke", 1, CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java index 5c81974..00c748e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java @@ -96,6 +96,8 @@ public abstract class CacheMvccSelectForUpdateQueryAbstractTest extends CacheMvc * */ public void testSelectForUpdateDistributed() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9724"); + doTestSelectForUpdateDistributed("Person", false); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java index 796c0bb..4ea53e0 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java @@ -151,6 +151,8 @@ public abstract class CacheMvccSqlQueriesAbstractTest extends CacheMvccAbstractT * @throws Exception If failed. */ private void updateSingleValue(boolean singleNode, final boolean locQry) throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9540"); + final int VALS = 100; final int writers = 4; @@ -377,6 +379,8 @@ public abstract class CacheMvccSqlQueriesAbstractTest extends CacheMvccAbstractT * @throws Exception If failed. */ private void joinTransactional(boolean singleNode, final boolean distributedJoin) throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9470"); + final int KEYS = 100; final int writers = 4; http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java index b881f02..7076362 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java @@ -1120,6 +1120,8 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac * @throws Exception If failed. */ public void testQueryInsertUpdateMultithread() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9470"); + ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) .setIndexedTypes(Integer.class, Integer.class); @@ -1196,14 +1198,17 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac } }, 1)); - fut.markInitialized(); - try { + fut.markInitialized(); + fut.get(TX_TIMEOUT); } catch (IgniteCheckedException e) { onException(ex, e); } + finally { + phaser.forceTermination(); + } Exception ex0 = ex.get(); @@ -1248,7 +1253,7 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { tx.timeout(TX_TIMEOUT); - barrier.await(); + barrier.await(TX_TIMEOUT, TimeUnit.MILLISECONDS); IgniteCache<Object, Object> cache0 = node.cache(DEFAULT_CACHE_NAME); @@ -1262,7 +1267,7 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac } } - barrier.await(); + barrier.await(TX_TIMEOUT, TimeUnit.MILLISECONDS); qry = new SqlFieldsQuery("UPDATE Integer SET _val = (_key * 10)"); @@ -1820,7 +1825,7 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac do { p = phaser.arriveAndAwaitAdvance(); } - while (p < phase); + while (p < phase && p >= 0 /* check termination */ ); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java new file mode 100644 index 0000000..46aeaa1 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.GET; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT; + +/** + * Test basic mvcc bulk cache operations. + */ +public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** */ + private int nodesCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + cleanPersistenceDir(); + + startGridsMultiThreaded(nodesCount() - 1); + + client = true; + + startGrid(nodesCount() - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + grid(0).createCache(cacheConfiguration(cacheMode(), FULL_SYNC, 1, 32). + setIndexedTypes(Integer.class, MvccTestAccount.class)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + grid(0).destroyCache(DEFAULT_CACHE_NAME); + } + + /** + * @throws Exception If failed. + */ + public void testRepeatableReadIsolationGetPut() throws Exception { + checkOperations(GET, GET, PUT, true); + checkOperations(GET, GET, PUT, false); + } + + /** + * @throws Exception If failed. + */ + public void testRepeatableReadIsolationSqlPut() throws Exception { + checkOperations(SQL, SQL, PUT, true); + checkOperations(SQL, SQL, PUT, false); + } + + /** + * @throws Exception If failed. + */ + public void testRepeatableReadIsolationSqlDml() throws Exception { + checkOperations(SQL, SQL, DML, true); + checkOperations(SQL, SQL, DML, false); + } + + /** + * @throws Exception If failed. + */ + public void testRepeatableReadIsolationGetDml() throws Exception { + checkOperations(GET, GET, DML, true); + checkOperations(GET, GET, DML, false); + } + + /** + * @throws Exception If failed. + */ + public void testRepeatableReadIsolationMixedPut() throws Exception { + checkOperations(SQL, GET, PUT, false); + checkOperations(SQL, GET, PUT, true); + } + + /** + * @throws Exception If failed. + */ + public void testRepeatableReadIsolationMixedPut2() throws Exception { + checkOperations(GET, SQL, PUT, false); + checkOperations(GET, SQL, PUT, true); + } + + /** + * @throws Exception If failed. + */ + public void testRepeatableReadIsolationMixedDml() throws Exception { + checkOperations(SQL, GET, DML, false); + checkOperations(SQL, GET, DML, true); + } + + /** + * @throws Exception If failed. + */ + public void testRepeatableReadIsolationMixedDml2() throws Exception { + checkOperations(GET, SQL, DML, false); + checkOperations(GET, SQL, DML, true); + } + + /** + * @throws Exception If failed. + */ + public void testOperationConsistency() throws Exception { + checkOperationsConsistency(PUT, false); + checkOperationsConsistency(DML, false); + checkOperationsConsistency(PUT, true); + checkOperationsConsistency(DML, true); + } + + /** + * Checks SQL and CacheAPI operation isolation consistency. + * + * @param readModeBefore read mode used before value updated. + * @param readModeBefore read mode used after value updated. + * @param writeMode write mode used for update. + * @throws Exception If failed. + */ + private void checkOperations(ReadMode readModeBefore, ReadMode readModeAfter, + WriteMode writeMode, boolean readFromClient) throws Exception { + Ignite node1 = grid(readFromClient ? nodesCount() - 1 : 0); + Ignite node2 = grid(readFromClient ? 0 : nodesCount() - 1); + + TestCache<Integer, MvccTestAccount> cache1 = new TestCache<>(node1.cache(DEFAULT_CACHE_NAME)); + TestCache<Integer, MvccTestAccount> cache2 = new TestCache<>(node2.cache(DEFAULT_CACHE_NAME)); + + final Set<Integer> keysForUpdate = new HashSet<>(3); + final Set<Integer> keysForRemove = new HashSet<>(3); + + final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove); + + final Map<Integer, MvccTestAccount> initialMap = allKeys.stream().collect( + Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1))); + + final Map<Integer, MvccTestAccount> updateMap = keysForUpdate.stream().collect(Collectors.toMap(Function.identity(), + k -> new MvccTestAccount(k, 2))); /* Removed keys are excluded. */ + + cache1.cache.putAll(initialMap); + + IgniteTransactions txs1 = node1.transactions(); + IgniteTransactions txs2 = node2.transactions(); + + CountDownLatch updateStart = new CountDownLatch(1); + CountDownLatch updateFinish = new CountDownLatch(1); + + // Start concurrent transactions and check isolation. + IgniteInternalFuture<Void> updater = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + updateStart.await(); + + try (Transaction tx = txs2.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + + updateEntries(cache2, updateMap, writeMode); + removeEntries(cache2, keysForRemove, writeMode); + + checkContains(cache2, true, updateMap.keySet()); + checkContains(cache2, false, keysForRemove); + + assertEquals(updateMap, cache2.cache.getAll(allKeys)); + + tx.commit(); + } + + updateFinish.countDown(); + + return null; + } + }); + + IgniteInternalFuture<Void> reader = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try (Transaction tx = txs1.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + assertEquals(initialMap, getEntries(cache1, allKeys, readModeBefore)); + + checkContains(cache1, true, allKeys); + + updateStart.countDown(); + updateFinish.await(); + + assertEquals(initialMap, getEntries(cache1, allKeys, readModeAfter)); + + checkContains(cache1, true,allKeys); + + tx.commit(); + } + + return null; + } + }); + + try { + updater.get(3_000, TimeUnit.MILLISECONDS); + reader.get(3_000, TimeUnit.MILLISECONDS); + } + catch (Throwable e) { + throw new AssertionError(e); + } + finally { + updateStart.countDown(); + updateFinish.countDown(); + } + + assertEquals(updateMap, cache1.cache.getAll(allKeys)); + } + + /** + * Generate 2 sets of keys. Each set contains primary, backup and non-affinity key for given node cache. + * + * @param cache Cache. + * @param keySet1 Key set. + * @param keySet2 Key set. + * @return All keys. + * @throws IgniteCheckedException If failed. + */ + protected Set<Integer> generateKeySet(IgniteCache<Object, Object> cache, Set<Integer> keySet1, + Set<Integer> keySet2) throws IgniteCheckedException { + LinkedHashSet<Integer> allKeys = new LinkedHashSet<>(); + + allKeys.addAll(primaryKeys(cache, 2)); + allKeys.addAll(backupKeys(cache, 2, 1)); + allKeys.addAll(nearKeys(cache, 2, 1)); + + List<Integer> keys0 = new ArrayList<>(allKeys); + + for (int i = 0; i < 6; i++) { + if (i % 2 == 0) + keySet1.add(keys0.get(i)); + else + keySet2.add(keys0.get(i)); + } + + assert allKeys.size() == 6; // Expects no duplicates. + + return allKeys; + } + + /** + * Checks SQL and CacheAPI operation see consistent results before and after update. + * + * @throws Exception If failed. + */ + private void checkOperationsConsistency(WriteMode writeMode, boolean requestFromClient) throws Exception { + Ignite node = grid(requestFromClient ? nodesCount() - 1 : 0); + + TestCache<Integer, MvccTestAccount> cache = new TestCache<>(node.cache(DEFAULT_CACHE_NAME)); + + final Set<Integer> keysForUpdate = new HashSet<>(3); + final Set<Integer> keysForRemove = new HashSet<>(3); + + final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove); + + int updCnt = 1; + + final Map<Integer, MvccTestAccount> initialVals = allKeys.stream().collect( + Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1))); + + cache.cache.putAll(initialVals); + + IgniteTransactions txs = node.transactions(); + + Map<Integer, MvccTestAccount> updatedVals = null; + + try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + Map<Integer, MvccTestAccount> vals1 = getEntries(cache, allKeys, GET); + Map<Integer, MvccTestAccount> vals2 = getEntries(cache, allKeys, SQL); + + assertEquals(initialVals, vals1); + assertEquals(initialVals, vals2); + + for (ReadMode readMode : new ReadMode[] {GET, SQL}) { + int updCnt0 = ++updCnt; + + updatedVals = keysForUpdate.stream().collect(Collectors.toMap(Function.identity(), + k -> new MvccTestAccount(k, updCnt0))); + + updateEntries(cache, updatedVals, writeMode); + removeEntries(cache, keysForRemove, writeMode); + + assertEquals(String.valueOf(readMode), updatedVals, getEntries(cache, allKeys, readMode)); + } + + tx.commit(); + } + + try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + assertEquals(updatedVals, getEntries(cache, allKeys, GET)); + assertEquals(updatedVals, getEntries(cache, allKeys, SQL)); + + tx.commit(); + } + } + + /** + * Gets values with given read mode. + * + * @param cache Cache. + * @param keys Key to be read. + * @param readMode Read mode. + * @return Key-value result map. + */ + protected Map<Integer, MvccTestAccount> getEntries( + TestCache<Integer, MvccTestAccount> cache, + Set<Integer> keys, + ReadMode readMode) { + switch (readMode) { + case GET: + return cache.cache.getAll(keys); + case SQL: + return getAllSql(cache); + default: + fail(); + } + + return null; + } + + /** + * Updates entries with given write mode. + * + * @param cache Cache. + * @param entries Entries to be updated. + * @param writeMode Write mode. + */ + protected void updateEntries( + TestCache<Integer, MvccTestAccount> cache, + Map<Integer, MvccTestAccount> entries, + WriteMode writeMode) { + switch (writeMode) { + case PUT: { + cache.cache.putAll(entries); + + break; + } + case DML: { + for (Map.Entry<Integer, MvccTestAccount> e : entries.entrySet()) + mergeSql(cache, e.getKey(), e.getValue().val, e.getValue().updateCnt); + + break; + } + default: + fail(); + } + } + + /** + * Updates entries with given write mode. + * + * @param cache Cache. + * @param keys Key to be deleted. + * @param writeMode Write mode. + */ + protected void removeEntries( + TestCache<Integer, MvccTestAccount> cache, + Set<Integer> keys, + WriteMode writeMode) { + switch (writeMode) { + case PUT: { + cache.cache.removeAll(keys); + + break; + } + case DML: { + for (Integer key : keys) + removeSql(cache, key); + + break; + } + default: + fail(); + } + } + + /** + * Check cache contains entries. + * + * @param cache Cache. + * @param expected Expected result. + * @param keys Keys to check. + */ + protected void checkContains(TestCache<Integer, MvccTestAccount> cache, boolean expected, Set<Integer> keys) { + assertEquals(expected, cache.cache.containsKeys(keys)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java new file mode 100644 index 0000000..c782f98 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.GET; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL; + +/** + * Test basic mvcc cache operation operations. + */ +public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsTest { + /** {@inheritDoc} */ + @Override protected Map<Integer, MvccTestAccount> getEntries( + TestCache<Integer, MvccTestAccount> cache, + Set<Integer> keys, + ReadMode readMode) { + + switch (readMode) { + case GET: { + Map<Integer, MvccTestAccount> res = new HashMap<>(); + + for (Integer key : keys) { + MvccTestAccount val = cache.cache.get(key); + + if(val != null) + res.put(key, val); + } + + return res; + } + case SQL: + return getAllSql(cache); + default: + fail(); + } + + return null; + } + + /** {@inheritDoc} */ + protected void updateEntries( + TestCache<Integer, MvccTestAccount> cache, + Map<Integer, MvccTestAccount> entries, + WriteMode writeMode) { + switch (writeMode) { + case PUT: { + for (Map.Entry<Integer, MvccTestAccount> e : entries.entrySet()) + if (e.getValue() == null) + cache.cache.remove(e.getKey()); + else + cache.cache.put(e.getKey(), e.getValue()); + + break; + } + case DML: { + for (Map.Entry<Integer, MvccTestAccount> e : entries.entrySet()) { + if (e.getValue() == null) + removeSql(cache, e.getKey()); + else + mergeSql(cache, e.getKey(), e.getValue().val, e.getValue().updateCnt); + } + break; + } + default: + fail(); + } + } + + /** {@inheritDoc} */ + protected void removeEntries( + TestCache<Integer, MvccTestAccount> cache, + Set<Integer> keys, + WriteMode writeMode) { + switch (writeMode) { + case PUT: { + for (Integer key : keys) + cache.cache.remove(key); + + break; + } + case DML: { + for (Integer key : keys) + removeSql(cache, key); + + break; + } + default: + fail(); + } + } + + /** {@inheritDoc} */ + @Override protected void checkContains(TestCache<Integer, MvccTestAccount> cache, boolean expected, + Set<Integer> keys) { + for (Integer key : keys) + assertEquals(expected, cache.cache.containsKey(key)); + } + + /** + * Check getAndPut/getAndRemove operations consistency. + * + * @throws IgniteCheckedException If failed. + */ + public void testGetAndUpdateOperations() throws IgniteCheckedException { + Ignite node1 = grid(0); + + TestCache<Integer, MvccTestAccount> cache1 = new TestCache<>(node1.cache(DEFAULT_CACHE_NAME)); + + final Set<Integer> keysForUpdate = new HashSet<>(3); + final Set<Integer> keysForRemove = new HashSet<>(3); + + final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove); + + final Map<Integer, MvccTestAccount> initialMap = keysForRemove.stream().collect( + Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1))); + + Map<Integer, MvccTestAccount> updateMap = keysForUpdate.stream().collect( + Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 3))); + + cache1.cache.putAll(initialMap); + + IgniteTransactions txs = node1.transactions(); + try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + for (Integer key : keysForUpdate) { + MvccTestAccount newVal1 = new MvccTestAccount(key, 1); + + assertNull(cache1.cache.getAndPut(key, newVal1)); // Check create. + + MvccTestAccount newVal2 = new MvccTestAccount(key, 2); + + assertEquals(newVal1, cache1.cache.getAndPut(key, newVal2)); // Check update. + } + + for (Integer key : keysForRemove) { + assertEquals(initialMap.get(key), cache1.cache.getAndRemove(key)); // Check remove existed. + + assertNull(cache1.cache.getAndRemove(key)); // Check remove non-existed. + } + + for (Integer key : allKeys) { + MvccTestAccount oldVal = new MvccTestAccount(key, 2); + MvccTestAccount newVal = new MvccTestAccount(key, 3); + + if (keysForRemove.contains(key)) + assertNull(cache1.cache.getAndReplace(key, newVal)); // Omit update 'null'. + else + assertEquals(oldVal, cache1.cache.getAndReplace(key, newVal)); // Check updated. + } + + assertEquals(updateMap, getEntries(cache1, allKeys, SQL)); + assertEquals(updateMap, getEntries(cache1, allKeys, GET)); + + tx.commit(); + } + + assertEquals(updateMap, getEntries(cache1, allKeys, SQL)); + assertEquals(updateMap, getEntries(cache1, allKeys, GET)); + } + + /** + * Check getAndPut/getAndRemove operations consistency. + * + * @throws IgniteCheckedException If failed. + */ + public void testPutIfAbsentConsistency() throws IgniteCheckedException { + Ignite node1 = grid(0); + + TestCache<Integer, MvccTestAccount> cache1 = new TestCache<>(node1.cache(DEFAULT_CACHE_NAME)); + + final Set<Integer> keysForCreate = new HashSet<>(3); + final Set<Integer> keysForUpdate = new HashSet<>(3); + + final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForCreate, keysForUpdate); + + final Map<Integer, MvccTestAccount> initialMap = keysForUpdate.stream().collect( + Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1))); + + Map<Integer, MvccTestAccount> updatedMap = allKeys.stream().collect( + Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1))); + + cache1.cache.putAll(initialMap); + + IgniteTransactions txs = node1.transactions(); + try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + for (Integer key : keysForUpdate) + assertFalse(cache1.cache.putIfAbsent(key, new MvccTestAccount(key, 2))); // Check update. + + for (Integer key : keysForCreate) + assertTrue(cache1.cache.putIfAbsent(key, new MvccTestAccount(key, 1))); // Check create. + + assertEquals(updatedMap, getEntries(cache1, allKeys, SQL)); + + tx.commit(); + } + + assertEquals(updatedMap, getEntries(cache1, allKeys, SQL)); + assertEquals(updatedMap, getEntries(cache1, allKeys, GET)); + } + + /** + * Check getAndPut/getAndRemove operations consistency. + * + * @throws IgniteCheckedException If failed. + */ + public void testReplaceConsistency() throws IgniteCheckedException { + Ignite node1 = grid(0); + + TestCache<Integer, MvccTestAccount> cache1 = new TestCache<>(node1.cache(DEFAULT_CACHE_NAME)); + + final Set<Integer> existedKeys = new HashSet<>(3); + final Set<Integer> nonExistedKeys = new HashSet<>(3); + + final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), existedKeys, nonExistedKeys); + + final Map<Integer, MvccTestAccount> initialMap = existedKeys.stream().collect( + Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1))); + + Map<Integer, MvccTestAccount> updateMap = existedKeys.stream().collect( + Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 3))); + + cache1.cache.putAll(initialMap); + + IgniteTransactions txs = node1.transactions(); + try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + for (Integer key : allKeys) { + MvccTestAccount newVal = new MvccTestAccount(key, 2); + + if(existedKeys.contains(key)) { + assertTrue(cache1.cache.replace(key, new MvccTestAccount(key, 1), newVal)); + + assertEquals(newVal, cache1.cache.getAndReplace(key, new MvccTestAccount(key, 3))); + } + else { + assertFalse(cache1.cache.replace(key, new MvccTestAccount(key, 1), newVal)); + + assertNull(cache1.cache.getAndReplace(key, new MvccTestAccount(key, 3))); + } + } + + assertEquals(updateMap, getEntries(cache1, allKeys, SQL)); + assertEquals(updateMap, getEntries(cache1, allKeys, GET)); + + tx.commit(); + } + + assertEquals(updateMap, getEntries(cache1, allKeys, SQL)); + assertEquals(updateMap, getEntries(cache1, allKeys, GET)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java index 726c4e9..21ab2e6 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java @@ -42,6 +42,8 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlConfiguratio import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlUpdateCountersTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlLockTimeoutTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccStreamingInsertTest; +import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadBulkOpsTest; +import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadOperationsTest; import org.apache.ignite.internal.processors.query.h2.GridIndexRebuildWithMvccEnabledSelfTest; /** @@ -64,6 +66,10 @@ public class IgniteCacheMvccSqlTestSuite extends TestSuite { suite.addTestSuite(GridIndexRebuildWithMvccEnabledSelfTest.class); + // SQL vs CacheAPI consistency. + suite.addTestSuite(MvccRepeatableReadOperationsTest.class); + suite.addTestSuite(MvccRepeatableReadBulkOpsTest.class); + // JDBC tests. suite.addTestSuite(CacheMvccSizeWithConcurrentJdbcTransactionTest.class); suite.addTestSuite(CacheMvccScanQueryWithConcurrentJdbcTransactionTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs index 1f600dd..5b4106a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs @@ -81,9 +81,8 @@ namespace Apache.Ignite.Core.Tests.ApiParity "TimeServerPortRange", "IncludeProperties", "isAutoActivationEnabled", // IGNITE-7301 - "isMvccEnabled", //TODO: IGNITE-9390: Remove when Mvcc support will be added. - "MvccVacuumTimeInterval", //TODO: IGNITE-9390: Remove when Mvcc support will be added. - "MvccVacuumThreadCnt" //TODO: IGNITE-9390: Remove when Mvcc support will be added. + "MvccVacuumFrequency", //TODO: IGNITE-9390: Remove when Mvcc support will be added. + "MvccVacuumThreadCount" //TODO: IGNITE-9390: Remove when Mvcc support will be added. }; /// <summary>