http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java b/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java deleted file mode 100644 index 8d7cf15..0000000 --- a/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.failure; - -import javax.management.MBeanServer; -import javax.management.MBeanServerInvocationHandler; -import javax.management.ObjectName; -import java.lang.management.ManagementFactory; -import java.util.ArrayList; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteException; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; -import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.DataRegionConfiguration; -import org.apache.ignite.configuration.DataStorageConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.worker.WorkersControlMXBeanImpl; -import org.apache.ignite.mxbean.WorkersControlMXBean; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.ignite.transactions.Transaction; -import org.jetbrains.annotations.NotNull; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; - -/** - * Test transfer amount between accounts with enabled {@link StopNodeFailureHandler}. - */ -public class AccountTransferTransactionTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - /** Count of accounts in one thread. */ - private static final int ACCOUNTS_CNT = 20; - /** Count of threads and caches. */ - private static final int THREADS_CNT = 20; - /** Count of nodes to start. */ - private static final int NODES_CNT = 3; - /** Count of transaction on cache. */ - private static final int TRANSACTION_CNT = 10; - - /** {@inheritDoc} */ - @Override protected FailureHandler getFailureHandler(String igniteInstanceName) { - return new StopNodeFailureHandler(); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { - final IgniteConfiguration cfg = super.getConfiguration(name); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); - cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); - cfg.setLocalHost("127.0.0.1"); - cfg.setDataStorageConfiguration(new DataStorageConfiguration() - .setDefaultDataRegionConfiguration(new DataRegionConfiguration() - .setMaxSize(50 * 1024 * 1024) - .setPersistenceEnabled(true)) - ); - - CacheConfiguration[] cacheConfigurations = new CacheConfiguration[THREADS_CNT]; - for (int i = 0; i < THREADS_CNT; i++) { - cacheConfigurations[i] = new CacheConfiguration() - .setName(cacheName(i)) - .setAffinity(new RendezvousAffinityFunction(false, 32)) - .setBackups(1) - .setAtomicityMode(TRANSACTIONAL) - .setCacheMode(CacheMode.PARTITIONED) - .setWriteSynchronizationMode(FULL_SYNC) - .setEvictionPolicy(new FifoEvictionPolicy(1000)) - .setOnheapCacheEnabled(true); - } - - cfg.setCacheConfiguration(cacheConfigurations); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - stopAllGrids(); - - cleanPersistenceDir(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - stopAllGrids(); - - cleanPersistenceDir(); - } - - /** - * Test transfer amount. - */ - public void testTransferAmount() throws Exception { - //given: started some nodes with client. - startGrids(NODES_CNT); - - IgniteEx igniteClient = startGrid(getClientConfiguration(NODES_CNT)); - - igniteClient.cluster().active(true); - - Random random = new Random(); - - long[] initAmount = new long[THREADS_CNT]; - - //and: fill all accounts on all caches and calculate total amount for every cache. - for (int cachePrefixIdx = 0; cachePrefixIdx < THREADS_CNT; cachePrefixIdx++) { - IgniteCache<Object, Object> cache = igniteClient.getOrCreateCache(cacheName(cachePrefixIdx)); - - try (Transaction tx = igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - for (int accountId = 0; accountId < ACCOUNTS_CNT; accountId++) { - Long amount = (long)random.nextInt(1000); - - cache.put(accountId, amount); - - initAmount[cachePrefixIdx] += amount; - } - - tx.commit(); - } - } - - //when: start transfer amount from account to account in different threads. - CountDownLatch firstTransactionDone = new CountDownLatch(THREADS_CNT); - - ArrayList<Thread> transferThreads = new ArrayList<>(); - - for (int i = 0; i < THREADS_CNT; i++) { - transferThreads.add(new TransferAmountTxThread(firstTransactionDone, igniteClient, cacheName(i))); - - transferThreads.get(i).start(); - } - - firstTransactionDone.await(10, TimeUnit.SECONDS); - - //and: terminate disco-event-worker thread on one node. - WorkersControlMXBean bean = workersMXBean(1); - - bean.terminateWorker( - bean.getWorkerNames().stream() - .filter(name -> name.startsWith("disco-event-worker")) - .findFirst() - .orElse(null) - ); - - for (Thread thread : transferThreads) { - thread.join(); - } - - long[] resultAmount = new long[THREADS_CNT]; - - //then: calculate total amount for every thread. - for (int j = 0; j < THREADS_CNT; j++) { - String cacheName = cacheName(j); - - IgniteCache<Object, Object> cache = igniteClient.getOrCreateCache(cacheName); - - try (Transaction tx = igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - - for (int i = 0; i < ACCOUNTS_CNT; i++) - resultAmount[j] += getNotNullValue(cache, i); - tx.commit(); - } - - long diffAmount = initAmount[j] - resultAmount[j]; - - //and: check that result amount equal to init amount. - assertTrue( - String.format("Total amount before and after transfer is not same: diff=%s, cache=%s", - diffAmount, cacheName), - diffAmount == 0 - ); - } - } - - /** - * Make test cache name by prefix. - */ - @NotNull private String cacheName(int cachePrefixIdx) { - return "cache" + cachePrefixIdx; - } - - /** - * Ignite configuration for client. - */ - @NotNull private IgniteConfiguration getClientConfiguration(int nodesPrefix) throws Exception { - IgniteConfiguration clientConf = getConfiguration(getTestIgniteInstanceName(nodesPrefix)); - - clientConf.setClientMode(true); - - return clientConf; - } - - /** - * Extract not null value from cache. - */ - private long getNotNullValue(IgniteCache<Object, Object> cache, int i) { - Object value = cache.get(i); - - return value == null ? 0 : ((Long)value); - } - - /** - * Configure workers mx bean. - */ - private WorkersControlMXBean workersMXBean(int igniteInt) throws Exception { - ObjectName mbeanName = U.makeMBeanName( - getTestIgniteInstanceName(igniteInt), - "Kernal", - WorkersControlMXBeanImpl.class.getSimpleName() - ); - - MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer(); - - if (!mbeanSrv.isRegistered(mbeanName)) - fail("MBean is not registered: " + mbeanName.getCanonicalName()); - - return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, WorkersControlMXBean.class, true); - } - - /** - * - */ - private static class TransferAmountTxThread extends Thread { - /** */ - private CountDownLatch firstTransactionLatch; - /** */ - private Ignite ignite; - /** */ - private String cacheName; - /** */ - private Random random = new Random(); - - /** - * @param ignite Ignite. - */ - private TransferAmountTxThread(CountDownLatch firstTransactionLatch, final Ignite ignite, String cacheName) { - this.firstTransactionLatch = firstTransactionLatch; - this.ignite = ignite; - this.cacheName = cacheName; - } - - /** {@inheritDoc} */ - @Override public void run() { - for (int i = 0; i < TRANSACTION_CNT; i++) { - try { - updateInTransaction(ignite.cache(cacheName)); - } - finally { - if (i == 0) - firstTransactionLatch.countDown(); - } - } - } - - /** - * @throws IgniteException if fails - */ - @SuppressWarnings("unchecked") - private void updateInTransaction(IgniteCache cache) throws IgniteException { - int accIdFrom = random.nextInt(ACCOUNTS_CNT); - int accIdTo = random.nextInt(ACCOUNTS_CNT); - - if (accIdFrom == accIdTo) - accIdTo = (int)getNextAccountId(accIdFrom); - - Long acctFrom; - Long acctTo; - - try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - acctFrom = (Long)cache.get(accIdFrom); - acctTo = (Long)cache.get(accIdTo); - - long transactionAmount = (long)(random.nextDouble() * acctFrom); - - cache.put(accIdFrom, acctFrom - transactionAmount); - cache.put(accIdTo, acctTo + transactionAmount); - - tx.commit(); - } - } - - /** - * @param curr current - * @return random value - */ - private long getNextAccountId(long curr) { - long randomVal; - - do { - randomVal = random.nextInt(ACCOUNTS_CNT); - } - while (curr == randomVal); - - return randomVal; - } - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java index fc2a7d6..40025f6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java @@ -38,7 +38,6 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.pagemem.PageIdUtils; -import org.apache.ignite.internal.pagemem.wal.StorageException; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java index 379b8c3..5a1a6fa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java @@ -32,7 +32,7 @@ import org.apache.ignite.configuration.WALMode; import org.apache.ignite.failure.FailureHandler; import org.apache.ignite.failure.TestFailureHandler; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; -import org.apache.ignite.internal.pagemem.wal.StorageException; +import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java index ecc7b03..3d62fe1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; -import org.apache.ignite.internal.pagemem.wal.StorageException; +import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java new file mode 100644 index 0000000..fe27e6e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java @@ -0,0 +1,595 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import com.google.common.collect.Sets; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.FailureHandler; +import org.apache.ignite.failure.StopNodeFailureHandler; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.jetbrains.annotations.NotNull; +import org.jsr166.ConcurrentLinkedHashMap; +import org.junit.Assert; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Test transfer amount between accounts with enabled {@link StopNodeFailureHandler}. + * + * This test can be extended to emulate failover scenarios during transactional operations on the grid. + */ +public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Count of accounts in one thread. */ + private static final int DFLT_ACCOUNTS_CNT = 32; + + /** Count of threads and caches. */ + private static final int DFLT_TX_THREADS_CNT = 20; + + /** Count of nodes to start. */ + private static final int DFLT_NODES_CNT = 3; + + /** Count of transaction on cache. */ + private static final int DFLT_TRANSACTIONS_CNT = 10; + + /** Completed transactions map. */ + private ConcurrentLinkedHashMap[] completedTxs; + + /** + * + */ + protected int nodesCount() { + return DFLT_NODES_CNT; + } + + /** + * + */ + protected int accountsCount() { + return DFLT_ACCOUNTS_CNT; + } + + /** + * + */ + protected int transactionsCount() { + return DFLT_TRANSACTIONS_CNT; + } + + /** + * + */ + protected int txThreadsCount() { + return DFLT_TX_THREADS_CNT; + } + + /** + * @return Flag enables secondary index on account caches. + */ + protected boolean indexed() { + return false; + } + + /** + * @return Flag enables persistence on account caches. + */ + protected boolean persistent() { + return true; + } + + /** + * @return Flag enables cross-node transactions, + * when primary partitions participating in transaction spreaded across several cluster nodes. + */ + protected boolean crossNodeTransactions() { + // Commit error during cross node transactions breaks transaction integrity + // TODO: https://issues.apache.org/jira/browse/IGNITE-9086 + return false; + } + + /** {@inheritDoc} */ + @Override protected FailureHandler getFailureHandler(String igniteInstanceName) { + return new StopNodeFailureHandler(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + final IgniteConfiguration cfg = super.getConfiguration(name); + + cfg.setConsistentId(name); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + cfg.setLocalHost("127.0.0.1"); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setMaxSize(256 * 1024 * 1024) + .setPersistenceEnabled(persistent())) + ); + + CacheConfiguration[] cacheConfigurations = new CacheConfiguration[txThreadsCount()]; + + for (int i = 0; i < txThreadsCount(); i++) { + CacheConfiguration ccfg = new CacheConfiguration() + .setName(cacheName(i)) + .setAffinity(new RendezvousAffinityFunction(false, accountsCount())) + .setBackups(1) + .setAtomicityMode(TRANSACTIONAL) + .setCacheMode(CacheMode.PARTITIONED) + .setWriteSynchronizationMode(FULL_SYNC) + .setReadFromBackup(true) + .setOnheapCacheEnabled(true); + + if (indexed()) + ccfg.setIndexedTypes(IgniteUuid.class, AccountState.class); + + cacheConfigurations[i] = ccfg; + } + + cfg.setCacheConfiguration(cacheConfigurations); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * Make test cache name by prefix. + */ + @NotNull private String cacheName(int cachePrefixIdx) { + return "cache" + cachePrefixIdx; + } + + /** + * Ignite configuration for client. + */ + @NotNull private IgniteConfiguration getClientConfiguration(int nodesPrefix) throws Exception { + IgniteConfiguration clientConf = getConfiguration(getTestIgniteInstanceName(nodesPrefix)); + + clientConf.setClientMode(true); + + return clientConf; + } + + /** + * Test transfer amount. + */ + public void doTestTransferAmount(FailoverScenario failoverScenario) throws Exception { + failoverScenario.beforeNodesStarted(); + + //given: started some nodes with client. + startGrids(nodesCount()); + + IgniteEx igniteClient = startGrid(getClientConfiguration(nodesCount())); + + igniteClient.cluster().active(true); + + int[] initAmount = new int[txThreadsCount()]; + completedTxs = new ConcurrentLinkedHashMap[txThreadsCount()]; + + //and: fill all accounts on all caches and calculate total amount for every cache. + for (int cachePrefixIdx = 0; cachePrefixIdx < txThreadsCount(); cachePrefixIdx++) { + IgniteCache<Integer, AccountState> cache = igniteClient.getOrCreateCache(cacheName(cachePrefixIdx)); + + AtomicInteger coinsCounter = new AtomicInteger(); + + try (Transaction tx = igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int accountId = 0; accountId < accountsCount(); accountId++) { + Set<Integer> initialAmount = generateCoins(coinsCounter, 5); + + cache.put(accountId, new AccountState(accountId, tx.xid(), initialAmount)); + } + + tx.commit(); + } + + initAmount[cachePrefixIdx] = coinsCounter.get(); + completedTxs[cachePrefixIdx] = new ConcurrentLinkedHashMap(); + } + + //when: start transfer amount from account to account in different threads. + CountDownLatch firstTransactionDone = new CountDownLatch(txThreadsCount()); + + ArrayList<Thread> transferThreads = new ArrayList<>(); + + for (int i = 0; i < txThreadsCount(); i++) { + transferThreads.add(new TransferAmountTxThread(firstTransactionDone, igniteClient, cacheName(i), i)); + + transferThreads.get(i).start(); + } + + firstTransactionDone.await(10, TimeUnit.SECONDS); + + failoverScenario.afterFirstTransaction(); + + for (Thread thread : transferThreads) { + thread.join(); + } + + failoverScenario.afterTransactionsFinished(); + + consistencyCheck(initAmount); + } + + /** + * Calculates total amount of coins for every thread for every node and checks that coins difference is zero (transaction integrity is saved). + */ + private void consistencyCheck(int[] initAmount) { + for (Ignite node : G.allGrids()) { + for (int j = 0; j < txThreadsCount(); j++) { + List<Integer> totalCoins = new ArrayList<>(); + + String cacheName = cacheName(j); + + IgniteCache<Integer, AccountState> cache = node.getOrCreateCache(cacheName); + + AccountState[] accStates = new AccountState[accountsCount()]; + + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int i = 0; i < accountsCount(); i++) { + AccountState state = cache.get(i); + + Assert.assertNotNull("Account state has lost [node=" + node.name() + ", cache=" + cacheName + ", accNo=" + i + "]", state); + + totalCoins.addAll(state.coins); + + accStates[i] = state; + } + + tx.commit(); + } + + Collections.sort(totalCoins); + + if (initAmount[j] != totalCoins.size()) { + Set<Integer> lostCoins = new HashSet<>(); + Set<Integer> duplicateCoins = new HashSet<>(); + + for (int coin = 1; coin <= initAmount[j]; coin++) + if (!totalCoins.contains(coin)) + lostCoins.add(coin); + + for (int coinIdx = 1; coinIdx < totalCoins.size(); coinIdx++) + if (totalCoins.get(coinIdx).equals(totalCoins.get(coinIdx - 1))) + duplicateCoins.add(totalCoins.get(coinIdx)); + + log.error("Transaction integrity failed for [node=" + node.name() + ", cache=" + cacheName + "]"); + + log.error(String.format("Total amount of coins before and after transfers are not same. Lost coins: %s. Duplicate coins: %s.", + Objects.toString(lostCoins), + Objects.toString(duplicateCoins))); + + ConcurrentLinkedHashMap<IgniteUuid, TxState> txs = completedTxs[j]; + + for (TxState tx : txs.values()) + log.error("Tx: " + tx); + + for (int i = 0; i < accountsCount(); i++) + log.error("Account state " + i + " = " + accStates[i]); + + assertFalse("Test failed. See messages above", true); + } + } + } + } + + /** + * + */ + public static class AccountState { + /** Account id. */ + private final int accId; + + /** Last performed transaction id on account state. */ + @QuerySqlField(index = true) + private final IgniteUuid txId; + + /** Set of coins holds in account. */ + private final Set<Integer> coins; + + /** + * @param accId Acc id. + * @param txId Tx id. + * @param coins Coins. + */ + public AccountState(int accId, IgniteUuid txId, Set<Integer> coins) { + this.txId = txId; + this.coins = Collections.unmodifiableSet(coins); + this.accId = accId; + } + + /** + * @param random Randomizer. + * @return Set of coins need to transfer from. + */ + public Set<Integer> coinsToTransfer(Random random) { + int coinsNum = random.nextInt(coins.size()); + + return coins.stream().limit(coinsNum).collect(Collectors.toSet()); + } + + /** + * @param txId Transaction id. + * @param coinsToAdd Coins to add to current account. + * @return Account state with added coins. + */ + public AccountState addCoins(IgniteUuid txId, Set<Integer> coinsToAdd) { + return new AccountState(accId, txId, Sets.union(coins, coinsToAdd).immutableCopy()); + } + + /** + * @param txId Transaction id. + * @param coinsToRemove Coins to remove from current account. + * @return Account state with removed coins. + */ + public AccountState removeCoins(IgniteUuid txId, Set<Integer> coinsToRemove) { + return new AccountState(accId, txId, Sets.difference(coins, coinsToRemove).immutableCopy()); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AccountState that = (AccountState) o; + return Objects.equals(txId, that.txId) && + Objects.equals(coins, that.coins); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Objects.hash(txId, coins); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "AccountState{" + + "accId=" + Objects.toString(accId) + + ", coins=" + Objects.toString(coins) + + '}'; + } + } + + /** + * @param coinsNum Coins number. + */ + private Set<Integer> generateCoins(AtomicInteger coinsCounter, int coinsNum) { + Set<Integer> res = new HashSet<>(); + + for (int i = 0; i < coinsNum; i++) + res.add(coinsCounter.incrementAndGet()); + + return res; + } + + /** + * State representing transaction between two accounts. + */ + static class TxState { + /** + * Account states before transaction. + */ + AccountState before1, before2; + + /** + * Account states after transaction. + */ + AccountState after1, after2; + + /** + * Transferred coins between accounts during this transaction. + */ + Set<Integer> transferredCoins; + + /** + * @param before1 Before 1. + * @param before2 Before 2. + * @param after1 After 1. + * @param after2 After 2. + * @param transferredCoins Transferred coins. + */ + public TxState(AccountState before1, AccountState before2, AccountState after1, AccountState after2, Set<Integer> transferredCoins) { + this.before1 = before1; + this.before2 = before2; + this.after1 = after1; + this.after2 = after2; + this.transferredCoins = transferredCoins; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "TxState{" + + "before1=" + before1 + + ", before2=" + before2 + + ", transferredCoins=" + transferredCoins + + ", after1=" + after1 + + ", after2=" + after2 + + '}'; + } + } + + /** + * + */ + private class TransferAmountTxThread extends Thread { + /** */ + private CountDownLatch firstTransactionLatch; + /** */ + private IgniteEx ignite; + /** */ + private String cacheName; + /** */ + private int txIndex; + /** */ + private Random random = new Random(); + + /** + * @param ignite Ignite. + */ + private TransferAmountTxThread(CountDownLatch firstTransactionLatch, final IgniteEx ignite, String cacheName, int txIndex) { + this.firstTransactionLatch = firstTransactionLatch; + this.ignite = ignite; + this.cacheName = cacheName; + this.txIndex = txIndex; + } + + /** {@inheritDoc} */ + @Override public void run() { + for (int i = 0; i < transactionsCount(); i++) { + try { + updateInTransaction(ignite.cache(cacheName)); + } + finally { + if (i == 0) + firstTransactionLatch.countDown(); + } + } + } + + /** + * @throws IgniteException if fails + */ + @SuppressWarnings("unchecked") + private void updateInTransaction(IgniteCache<Integer, AccountState> cache) throws IgniteException { + int accIdFrom; + int accIdTo; + + for (;;) { + accIdFrom = random.nextInt(accountsCount()); + accIdTo = random.nextInt(accountsCount()); + + if (accIdFrom == accIdTo) + continue; + + ClusterNode primaryForAccFrom = ignite.cachex(cacheName).affinity().mapKeyToNode(accIdFrom); + ClusterNode primaryForAccTo = ignite.cachex(cacheName).affinity().mapKeyToNode(accIdTo); + + // Allows only transaction between accounts that primary on the same node if corresponding flag is enabled. + if (!crossNodeTransactions() && !primaryForAccFrom.id().equals(primaryForAccTo.id())) + continue; + + break; + } + + AccountState acctFrom; + AccountState acctTo; + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + acctFrom = cache.get(accIdFrom); + acctTo = cache.get(accIdTo); + + Set<Integer> coinsToTransfer = acctFrom.coinsToTransfer(random); + + AccountState nextFrom = acctFrom.removeCoins(tx.xid(), coinsToTransfer); + AccountState nextTo = acctTo.addCoins(tx.xid(), coinsToTransfer); + + cache.put(accIdFrom, nextFrom); + cache.put(accIdTo, nextTo); + + tx.commit(); + + completedTxs[txIndex].put(tx.xid(), new TxState(acctFrom, acctTo, nextFrom, nextTo, coinsToTransfer)); + } + } + + /** + * @param curr current + * @return random value + */ + private long getNextAccountId(long curr) { + long randomVal; + + do { + randomVal = random.nextInt(accountsCount()); + } + while (curr == randomVal); + + return randomVal; + } + } + + /** + * Interface to implement custom failover scenario during transactional amount transfer. + */ + public interface FailoverScenario { + /** + * Callback before nodes have started. + */ + public default void beforeNodesStarted() throws Exception { } + + /** + * Callback when first transaction has finished. + */ + public default void afterFirstTransaction() throws Exception { } + + /** + * Callback when all transactions have finished. + */ + public default void afterTransactionsFinished() throws Exception { } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java new file mode 100644 index 0000000..3260607 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.function.BiFunction; +import java.util.function.Supplier; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.Ignition; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; +import org.apache.ignite.internal.processors.cache.tree.SearchRow; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * Test cases that check transaction data integrity after transaction commit failed. + */ +public class TransactionIntegrityWithPrimaryIndexCorruptionTest extends AbstractTransactionIntergrityTest { + /** Corruption enabled flag. */ + private static volatile boolean corruptionEnabled; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + corruptionEnabled = false; + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 60 * 1000L; + } + + /** + * Throws a test {@link AssertionError} during tx commit from {@link BPlusTree} and checks after that data is consistent. + */ + public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode1() throws Exception { + doTestTransferAmount(new IndexCorruptionFailoverScenario( + true, + (hnd, tree) -> hnd instanceof BPlusTree.Search, + failoverPredicate(true, () -> new AssertionError("Test"))) + ); + } + + /** + * Throws a test {@link RuntimeException} during tx commit from {@link BPlusTree} and checks after that data is consistent. + */ + public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode2() throws Exception { + doTestTransferAmount(new IndexCorruptionFailoverScenario( + true, + (hnd, tree) -> hnd instanceof BPlusTree.Search, + failoverPredicate(true, () -> new RuntimeException("Test"))) + ); + } + + /** + * Throws a test {@link AssertionError} during tx commit from {@link BPlusTree} and checks after that data is consistent. + */ + public void testPrimaryIndexCorruptionDuringCommitOnBackupNode() throws Exception { + doTestTransferAmount(new IndexCorruptionFailoverScenario( + true, + (hnd, tree) -> hnd instanceof BPlusTree.Search, + failoverPredicate(false, () -> new AssertionError("Test"))) + ); + } + + /** + * Throws a test {@link IgniteCheckedException} during tx commit from {@link BPlusTree} and checks after that data is consistent. + */ + public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode3() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9082"); + + doTestTransferAmount(new IndexCorruptionFailoverScenario( + false, + (hnd, tree) -> hnd instanceof BPlusTree.Search, + failoverPredicate(true, () -> new IgniteCheckedException("Test"))) + ); + } + + /** + * Creates failover predicate which generates error during transaction commmit. + * + * @param failOnPrimary If {@code true} index should be failed on transaction primary node. + * @param errorSupplier Supplier to create various errors. + */ + private BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate( + boolean failOnPrimary, + Supplier<Throwable> errorSupplier + ) { + return (ignite, row) -> { + int cacheId = row.cacheId(); + int partId = row.key().partition(); + + final ClusterNode locNode = ignite.localNode(); + final AffinityTopologyVersion curTopVer = ignite.context().discovery().topologyVersionEx(); + + // Throw exception if current node is primary for given row. + return ignite.cachesx(c -> c.context().cacheId() == cacheId) + .stream() + .filter(c -> c.context().affinity().primaryByPartition(locNode, partId, curTopVer) == failOnPrimary) + .map(c -> errorSupplier.get()) + .findFirst() + .orElse(null); + }; + } + + /** + * Index corruption failover scenario. + */ + class IndexCorruptionFailoverScenario implements FailoverScenario { + /** Failed node index. */ + static final int failedNodeIdx = 1; + + /** Is node stopping expected after failover. */ + private final boolean nodeStoppingExpected; + + /** Predicate that will choose an instance of {@link BPlusTree} and page operation + * to make further failover in this tree using {@link #failoverPredicate}. */ + private final BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPredicate; + + /** Function that may return error during row insertion into {@link BPlusTree}. */ + private final BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate; + + /** + * @param nodeStoppingExpected Node stopping expected. + * @param treeCorruptionPredicate Tree corruption predicate. + * @param failoverPredicate Failover predicate. + */ + IndexCorruptionFailoverScenario( + boolean nodeStoppingExpected, + BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPredicate, + BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate + ) { + this.nodeStoppingExpected = nodeStoppingExpected; + this.treeCorruptionPredicate = treeCorruptionPredicate; + this.failoverPredicate = failoverPredicate; + } + + /** {@inheritDoc} */ + @Override public void beforeNodesStarted() { + BPlusTree.pageHndWrapper = (tree, hnd) -> { + final IgniteEx locIgnite = (IgniteEx) Ignition.localIgnite(); + + if (!locIgnite.name().endsWith(String.valueOf(failedNodeIdx))) + return hnd; + + if (treeCorruptionPredicate.apply(hnd, tree)) { + log.info("Created corrupted tree handler for -> " + hnd + " " + tree); + + PageHandler<Object, BPlusTree.Result> delegate = (PageHandler<Object, BPlusTree.Result>) hnd; + + return new PageHandler<BPlusTree.Get, BPlusTree.Result>() { + @Override public BPlusTree.Result run(int cacheId, long pageId, long page, long pageAddr, PageIO io, Boolean walPlc, BPlusTree.Get arg, int lvl) throws IgniteCheckedException { + log.info("Invoked " + " " + cacheId + " " + arg.toString() + " for BTree (" + corruptionEnabled + ") -> " + arg.row() + " / " + arg.row().getClass()); + + if (corruptionEnabled && (arg.row() instanceof SearchRow)) { + SearchRow row = (SearchRow) arg.row(); + + // Store cacheId to search row explicitly, as it can be zero if there is one cache in a group. + Throwable res = failoverPredicate.apply(locIgnite, new SearchRow(cacheId, row.key())); + + if (res != null) { + if (res instanceof Error) + throw (Error) res; + else if (res instanceof RuntimeException) + throw (RuntimeException) res; + else if (res instanceof IgniteCheckedException) + throw (IgniteCheckedException) res; + } + } + + return delegate.run(cacheId, pageId, page, pageAddr, io, walPlc, arg, lvl); + } + + @Override public boolean releaseAfterWrite(int cacheId, long pageId, long page, long pageAddr, BPlusTree.Get g, int lvl) { + return g.canRelease(pageId, lvl); + } + }; + } + + return hnd; + }; + } + + /** {@inheritDoc} */ + @Override public void afterFirstTransaction() { + // Enable BPlus tree corruption after first transactions have finished. + corruptionEnabled = true; + } + + /** {@inheritDoc} */ + @Override public void afterTransactionsFinished() throws Exception { + // Disable index corruption. + BPlusTree.pageHndWrapper = (tree, hnd) -> hnd; + + if (nodeStoppingExpected) { + // Wait until node with corrupted index will left cluster. + GridTestUtils.waitForCondition(() -> { + try { + grid(failedNodeIdx); + } + catch (IgniteIllegalStateException e) { + return true; + } + + return false; + }, getTestTimeout()); + + // Failed node should be stopped. + GridTestUtils.assertThrows(log, () -> grid(failedNodeIdx), IgniteIllegalStateException.class, ""); + + // Re-start failed node. + startGrid(failedNodeIdx); + + awaitPartitionMapExchange(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java new file mode 100644 index 0000000..25aae4b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.lang.management.ManagementFactory; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectName; +import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.worker.WorkersControlMXBeanImpl; +import org.apache.ignite.mxbean.WorkersControlMXBean; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * + */ +public class TransactionIntegrityWithSystemWorkerDeathTest extends AbstractTransactionIntergrityTest { + /** {@inheritDoc}. */ + @Override protected long getTestTimeout() { + return 60 * 1000L; + } + + /** {@inheritDoc}. */ + @Override protected boolean persistent() { + return false; + } + + /** + * + */ + public void testFailoverWithDiscoWorkerTermination() throws Exception { + doTestTransferAmount(new FailoverScenario() { + static final int failedNodeIdx = 1; + + /** {@inheritDoc}. */ + @Override public void afterFirstTransaction() throws Exception { + // Terminate disco-event-worker thread on one node. + WorkersControlMXBean bean = workersMXBean(failedNodeIdx); + + bean.terminateWorker( + bean.getWorkerNames().stream() + .filter(name -> name.startsWith("disco-event-worker")) + .findFirst() + .orElse(null) + ); + } + + /** {@inheritDoc}. */ + @Override public void afterTransactionsFinished() throws Exception { + // Wait until node with death worker will left cluster. + GridTestUtils.waitForCondition(() -> { + try { + grid(failedNodeIdx); + } + catch (IgniteIllegalStateException e) { + return true; + } + + return false; + }, getTestTimeout()); + + // Failed node should be stopped. + GridTestUtils.assertThrows(log, () -> grid(failedNodeIdx), IgniteIllegalStateException.class, ""); + + // Re-start failed node. + startGrid(failedNodeIdx); + + awaitPartitionMapExchange(); + } + }); + } + + /** + * Configure workers mx bean. + */ + private WorkersControlMXBean workersMXBean(int igniteInt) throws Exception { + ObjectName mbeanName = U.makeMBeanName( + getTestIgniteInstanceName(igniteInt), + "Kernal", + WorkersControlMXBeanImpl.class.getSimpleName() + ); + + MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer(); + + if (!mbeanSrv.isRegistered(mbeanName)) + fail("MBean is not registered: " + mbeanName.getCanonicalName()); + + return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, WorkersControlMXBean.class, true); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 74e23ed..ac2bed3 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -17,6 +17,7 @@ package org.apache.ignite.testsuites; +import java.util.Set; import junit.framework.TestSuite; import org.apache.ignite.GridSuppressedExceptionSelfTest; import org.apache.ignite.failure.FailureHandlerTriggeredTest; @@ -55,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheFSRestor import org.apache.ignite.internal.processors.cache.SetTxTimeoutOnPartitionMapExchangeTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteRejectConnectOnNodeStopTest; import org.apache.ignite.internal.processors.cache.transactions.AtomicOperationsInTxTest; +import org.apache.ignite.internal.processors.cache.transactions.TransactionIntegrityWithSystemWorkerDeathTest; import org.apache.ignite.internal.processors.closure.GridClosureProcessorRemoteTest; import org.apache.ignite.internal.processors.closure.GridClosureProcessorSelfTest; import org.apache.ignite.internal.processors.closure.GridClosureSerializationTest; @@ -90,8 +92,6 @@ import org.apache.ignite.testframework.test.VariationsIteratorTest; import org.apache.ignite.util.AttributeNodeFilterSelfTest; import org.jetbrains.annotations.Nullable; -import java.util.Set; - /** * Basic test suite. */ @@ -211,6 +211,7 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTestSuite(StopNodeFailureHandlerTest.class); suite.addTestSuite(StopNodeOrHaltFailureHandlerTest.class); suite.addTestSuite(OomFailureHandlerTest.class); + suite.addTestSuite(TransactionIntegrityWithSystemWorkerDeathTest.class); suite.addTestSuite(AtomicOperationsInTxTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java index 5f1d18d..3171754 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java @@ -17,8 +17,8 @@ package org.apache.ignite.testsuites; +import java.util.Set; import junit.framework.TestSuite; -import org.apache.ignite.failure.AccountTransferTransactionTest; import org.apache.ignite.failure.IoomFailureHandlerTest; import org.apache.ignite.failure.SystemWorkersTerminationTest; import org.apache.ignite.internal.ClusterBaselineNodesMetricsSelfTest; @@ -29,8 +29,6 @@ import org.apache.ignite.util.GridCommandHandlerTest; import org.apache.ignite.util.GridInternalTaskUnusedWalSegmentsTest; import org.jetbrains.annotations.Nullable; -import java.util.Set; - /** * Basic test suite. */ @@ -52,7 +50,6 @@ public class IgniteBasicWithPersistenceTestSuite extends TestSuite { TestSuite suite = new TestSuite("Ignite Basic With Persistence Test Suite"); suite.addTestSuite(IoomFailureHandlerTest.class); - suite.addTestSuite(AccountTransferTransactionTest.class); suite.addTestSuite(ClusterBaselineNodesMetricsSelfTest.class); suite.addTestSuite(ServiceDeploymentOnActivationTest.class); suite.addTestSuite(ServiceDeploymentOutsideBaselineTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java index f9e6b81..1269d0d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java @@ -26,13 +26,11 @@ import org.apache.ignite.internal.processors.cache.ReplicatedAtomicCacheGetsDist import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalOptimisticCacheGetsDistributionTest; import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessimisticCacheGetsDistributionTest; import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest; -import org.apache.ignite.internal.processors.cache.distributed.CacheClientsConcurrentStartTest; import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest; import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest; import org.apache.ignite.internal.processors.cache.distributed.CacheTryLockMultithreadedTest; import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest; -import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientMultiNodeUpdateTopologyLockTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThreadLocalTxTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeMultiServerTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest; @@ -45,11 +43,11 @@ import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPa import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticPrepareOnUnstableTopologyTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncNearCacheTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncTest; +import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnIncorrectParamsTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNearCacheTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNoDeadlockDetectionTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTopologyChangeTest; -import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnIncorrectParamsTest; import org.apache.ignite.internal.processors.cache.transactions.TxStateChangeEventTest; import org.apache.ignite.testframework.junits.GridAbstractTest; http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java index 89e2ffc..2bd0861 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java @@ -17,6 +17,7 @@ package org.apache.ignite.testsuites; +import java.util.Set; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.authentication.Authentication1kUsersNodeRestartTest; import org.apache.ignite.internal.processors.authentication.AuthenticationConfigurationClusterTest; @@ -38,11 +39,10 @@ import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridC import org.apache.ignite.internal.processors.cache.eviction.paged.PageEvictionMultinodeMixedRegionsTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheAssignmentNodeRestartsTest; import org.apache.ignite.internal.processors.cache.persistence.db.CheckpointBufferDeadlockTest; +import org.apache.ignite.internal.processors.cache.transactions.TransactionIntegrityWithPrimaryIndexCorruptionTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncWithPersistenceTest; import org.apache.ignite.internal.processors.cache.transactions.TxWithSmallTimeoutAndContentionOneKeyTest; -import java.util.Set; - /** * Test suite. */ @@ -95,6 +95,8 @@ public class IgniteCacheTestSuite7 extends TestSuite { suite.addTestSuite(CacheRentingStateRepairTest.class); + suite.addTestSuite(TransactionIntegrityWithPrimaryIndexCorruptionTest.class); + return suite; } }