IGNITE-6181 wip.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4c3df8f7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4c3df8f7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4c3df8f7 Branch: refs/heads/ignite-6181-1 Commit: 4c3df8f78b47dcfbed15778a87e173d8bace27b0 Parents: 6ee98d1 Author: Aleksei Scherbakov <alexey.scherbak...@gmail.com> Authored: Tue Sep 12 16:31:32 2017 +0300 Committer: Aleksei Scherbakov <alexey.scherbak...@gmail.com> Committed: Tue Sep 12 16:31:32 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/IgniteTransactions.java | 3 +- .../configuration/TransactionConfiguration.java | 6 +++ .../processors/cache/GridCacheProcessor.java | 2 +- .../cache/distributed/near/GridNearTxLocal.java | 5 +- .../transactions/IgniteTransactionsImpl.java | 3 +- .../transactions/IgniteTxLocalAdapter.java | 3 -- .../cache/transactions/IgniteTxManager.java | 49 +++++++++++--------- .../transactions/TxRollbackOnTimeoutTest.java | 13 ++++-- 8 files changed, 49 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java index dfe6a1a..9da862d 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java @@ -77,7 +77,8 @@ public interface IgniteTransactions { * * @param concurrency Concurrency. * @param isolation Isolation. - * @param timeout Timeout. + * @param timeout Timeout. See {@link TransactionConfiguration#setDefaultTxTimeout(long)} + * for additional info on transaction timeouts. * @param txSize Number of entries participating in transaction (may be approximate). * @return New transaction. * @throws IllegalStateException If transaction is already started by this thread. http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java index 0063afc..cbd2d82 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java @@ -19,6 +19,7 @@ package org.apache.ignite.configuration; import java.io.Serializable; import javax.cache.configuration.Factory; +import org.apache.ignite.IgniteTransactions; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -181,6 +182,11 @@ public class TransactionConfiguration implements Serializable { /** * Sets default transaction timeout in milliseconds. By default this value is defined by {@link * #DFLT_TRANSACTION_TIMEOUT}. + * <p> + * If transaction's execution is not completed by calling {@link Transaction#commit()}, + * {@link Transaction#rollback()} or {@link Transaction#close()} + * before timeout expires, it will be rolled back asynchronously. All further caches operations will fail until new + * explicit transaction is started in current thread by calling {@link IgniteTransactions#txStart()}. * * @param dfltTxTimeout Default transaction timeout. * @return {@code this} for chaining. http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 1efb500..bd950fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -3654,7 +3654,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteException If transaction exist. */ private void checkEmptyTransactions() throws IgniteException { - if (transactions().tx() != null) + if (transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null) throw new IgniteException("Cannot start/stop cache within lock or transaction."); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 289096d..aa0c85b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -3694,13 +3694,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { - // If tx was rolled back asynchronously by timeout, user tx will not be cleared. - cctx.tm().resetUserTx(); - TransactionState state = state(); if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED) rollback(); + else + cctx.tm().onLocalClose(this); synchronized (this) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java index 12f655a..997fd8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -154,7 +154,8 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { try { GridNearTxLocal tx = cctx.tm().userTx(sysCacheCtx); - if (tx != null) + // Allow to override timed out transactions. + if (tx != null && !tx.timedOut()) throw new IllegalStateException("Failed to start new transaction " + "(current thread already has a transaction): " + tx); http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index e7ebaae..ea105ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1237,9 +1237,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig * @throws IgniteCheckedException If transaction check failed. */ protected void checkValid() throws IgniteCheckedException { - if (local() && !dht() && remainingTime() == -1) - state(MARKED_ROLLBACK, true); - if (isRollbackOnly()) { if (remainingTime() == -1) throw new IgniteTxTimeoutCheckedException("Cache transaction timed out: " + this); http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index fb04c2b..2214e10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -148,8 +148,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** Topology version should be used when mapping internal tx. */ private final ThreadLocal<AffinityTopologyVersion> txTop = new ThreadLocal<>(); - /** User transaction. */ - private final static ThreadLocal<IgniteInternalTx> userTx = new ThreadLocal<>(); + /** Per-thread transaction map. */ + private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap(); /** Per-thread system transaction map. */ private final ConcurrentMap<TxThreadKey, IgniteInternalTx> sysThreadMap = newMap(); @@ -282,15 +282,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param cacheId Cache ID. */ public void rollbackTransactionsForCache(int cacheId) { - rollbackTransactionsForCache(cacheId, activeTransactions()); + rollbackTransactionsForCache(cacheId, nearIdMap); + + rollbackTransactionsForCache(cacheId, threadMap); } /** * @param cacheId Cache ID. * @param txMap Transactions map. */ - private void rollbackTransactionsForCache(int cacheId, Collection<IgniteInternalTx> txMap) { - for (IgniteInternalTx tx : txMap) { + private void rollbackTransactionsForCache(int cacheId, ConcurrentMap<?, IgniteInternalTx> txMap) { + for (Map.Entry<?, IgniteInternalTx> e : txMap.entrySet()) { + IgniteInternalTx tx = e.getValue(); + for (IgniteTxEntry entry : tx.allEntries()) { if (entry.cacheId() == cacheId) { rollbackTx(tx); @@ -305,8 +309,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @Override public void onDisconnected(IgniteFuture reconnectFut) { txFinishSync.onDisconnected(reconnectFut); - for (IgniteInternalTx tx : activeTransactions()) - rollbackTx(tx); + for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet()) + rollbackTx(e.getValue()); IgniteClientDisconnectedException err = new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."); @@ -365,7 +369,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @Override public void printMemoryStats() { X.println(">>> "); X.println(">>> Transaction manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ']'); - X.println(">>> activeSize [size=" + activeTransactions().size() + ']'); + X.println(">>> threadMapSize: " + threadMap.size()); + X.println(">>> idMap [size=" + idMap.size() + ']'); X.println(">>> completedVersSortedSize: " + completedVersSorted.size()); X.println(">>> completedVersHashMapSize: " + completedVersHashMap.sizex()); } @@ -374,7 +379,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return Thread map size. */ public int threadMapSize() { - return 0; + return threadMap.size(); } /** @@ -483,7 +488,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // and overwrite local transaction. if (tx.local() && !tx.dht()) { if (cacheCtx == null || !cacheCtx.systemTx()) - userTx.set(tx); + threadMap.put(tx.threadId(), tx); else sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()), tx); } @@ -660,7 +665,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return Not null topology version if current thread holds lock preventing topology change. */ @Nullable public AffinityTopologyVersion lockedTopologyVersion(long threadId, IgniteInternalTx ignore) { - IgniteInternalTx tx = userTx.get(); + IgniteInternalTx tx = threadMap.get(threadId); if (tx != null) { AffinityTopologyVersion topVer = tx.topologyVersionSnapshot(); @@ -767,7 +772,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @SuppressWarnings({"unchecked"}) private <T> T tx(GridCacheContext cctx, long threadId) { if (cctx == null || !cctx.systemTx()) - return (T) userTx.get(); + return (T)threadMap.get(threadId); TxThreadKey key = new TxThreadKey(threadId, cctx.cacheId()); @@ -1417,8 +1422,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { */ private void clearThreadMap(IgniteInternalTx tx) { if (tx.local() && !tx.dht()) { - if (!tx.system()) - userTx.set(null); + if (!tx.system() && !tx.timedOut()) + threadMap.remove(tx.threadId(), tx); else { Integer cacheId = tx.txState().firstCacheId(); @@ -1706,10 +1711,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * Reset user tx. + * Remove tx from map. */ - public void resetUserTx() { - userTx.set(null); + public void onLocalClose(IgniteInternalTx tx) { + threadMap.remove(tx.threadId(), tx); } /** @@ -2286,13 +2291,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']'); } - if (userTx.get() != null) - throw new IgniteCheckedException("The thread already has active transaction."); + long threadId = Thread.currentThread().getId(); - userTx.set(tx); + if (threadMap.putIfAbsent(threadId, tx) != null) + throw new IgniteCheckedException("Thread already has started a transaction."); if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null) - throw new IgniteCheckedException("The thread already has active transaction."); + throw new IgniteCheckedException("Thread already has started a transaction."); + + tx.threadId(threadId); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java index 01829ce..529fdc3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java @@ -38,6 +38,7 @@ import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionTimeoutException; +import org.jsr166.ThreadLocalRandom8; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; @@ -217,21 +218,23 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { public void testTimeoutRemoval() throws Exception { IgniteEx client = (IgniteEx)startGrid("client"); - for (int i = 0; i < 5; i++) + int modesCnt = 5; + + for (int i = 0; i < modesCnt; i++) testTimeoutRemoval0(grid(0), i, TX_TIMEOUT); - for (int i = 0; i < 5; i++) + for (int i = 0; i < modesCnt; i++) testTimeoutRemoval0(client, i, TX_TIMEOUT); - for (int i = 0; i < 5; i++) + for (int i = 0; i < modesCnt; i++) testTimeoutRemoval0(grid(0), i, TX_MIN_TIMEOUT); - for (int i = 0; i < 5; i++) + for (int i = 0; i < modesCnt; i++) testTimeoutRemoval0(client, i, TX_MIN_TIMEOUT); // Repeat with more iterations to make sure everything is cleared. for (int i = 0; i < 500; i++) - testTimeoutRemoval0(client, 2, TX_MIN_TIMEOUT); + testTimeoutRemoval0(client, ThreadLocalRandom8.current().nextInt(modesCnt), TX_MIN_TIMEOUT); } /**