IGNITE-6181 wip.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4c3df8f7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4c3df8f7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4c3df8f7

Branch: refs/heads/ignite-6181-1
Commit: 4c3df8f78b47dcfbed15778a87e173d8bace27b0
Parents: 6ee98d1
Author: Aleksei Scherbakov <alexey.scherbak...@gmail.com>
Authored: Tue Sep 12 16:31:32 2017 +0300
Committer: Aleksei Scherbakov <alexey.scherbak...@gmail.com>
Committed: Tue Sep 12 16:31:32 2017 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/IgniteTransactions.java   |  3 +-
 .../configuration/TransactionConfiguration.java |  6 +++
 .../processors/cache/GridCacheProcessor.java    |  2 +-
 .../cache/distributed/near/GridNearTxLocal.java |  5 +-
 .../transactions/IgniteTransactionsImpl.java    |  3 +-
 .../transactions/IgniteTxLocalAdapter.java      |  3 --
 .../cache/transactions/IgniteTxManager.java     | 49 +++++++++++---------
 .../transactions/TxRollbackOnTimeoutTest.java   | 13 ++++--
 8 files changed, 49 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
index dfe6a1a..9da862d 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
@@ -77,7 +77,8 @@ public interface IgniteTransactions {
      *
      * @param concurrency Concurrency.
      * @param isolation Isolation.
-     * @param timeout Timeout.
+     * @param timeout Timeout. See {@link 
TransactionConfiguration#setDefaultTxTimeout(long)}
+     * for additional info on transaction timeouts.
      * @param txSize Number of entries participating in transaction (may be 
approximate).
      * @return New transaction.
      * @throws IllegalStateException If transaction is already started by this 
thread.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
index 0063afc..cbd2d82 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.ignite.configuration;
 
 import java.io.Serializable;
 import javax.cache.configuration.Factory;
+import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -181,6 +182,11 @@ public class TransactionConfiguration implements 
Serializable {
     /**
      * Sets default transaction timeout in milliseconds. By default this value 
is defined by {@link
      * #DFLT_TRANSACTION_TIMEOUT}.
+     * <p>
+     * If transaction's execution is not completed by calling {@link 
Transaction#commit()},
+     * {@link Transaction#rollback()} or {@link Transaction#close()}
+     * before timeout expires, it will be rolled back asynchronously. All 
further caches operations will fail until new
+     * explicit transaction is started in current thread by calling {@link 
IgniteTransactions#txStart()}.
      *
      * @param dfltTxTimeout Default transaction timeout.
      * @return {@code this} for chaining.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 1efb500..bd950fa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -3654,7 +3654,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @throws IgniteException If transaction exist.
      */
     private void checkEmptyTransactions() throws IgniteException {
-        if (transactions().tx() != null)
+        if (transactions().tx() != null || 
sharedCtx.lockedTopologyVersion(null) != null)
             throw new IgniteException("Cannot start/stop cache within lock or 
transaction.");
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 289096d..aa0c85b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3694,13 +3694,12 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
 
     /** {@inheritDoc} */
     @Override public void close() throws IgniteCheckedException {
-        // If tx was rolled back asynchronously by timeout, user tx will not 
be cleared.
-        cctx.tm().resetUserTx();
-
         TransactionState state = state();
 
         if (state != ROLLING_BACK && state != ROLLED_BACK && state != 
COMMITTING && state != COMMITTED)
             rollback();
+        else
+            cctx.tm().onLocalClose(this);
 
         synchronized (this) {
             try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
index 12f655a..997fd8c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
@@ -154,7 +154,8 @@ public class IgniteTransactionsImpl<K, V> implements 
IgniteTransactionsEx {
         try {
             GridNearTxLocal tx = cctx.tm().userTx(sysCacheCtx);
 
-            if (tx != null)
+            // Allow to override timed out transactions.
+            if (tx != null && !tx.timedOut())
                 throw new IllegalStateException("Failed to start new 
transaction " +
                     "(current thread already has a transaction): " + tx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index e7ebaae..ea105ae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1237,9 +1237,6 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
      * @throws IgniteCheckedException If transaction check failed.
      */
     protected void checkValid() throws IgniteCheckedException {
-        if (local() && !dht() && remainingTime() == -1)
-            state(MARKED_ROLLBACK, true);
-
         if (isRollbackOnly()) {
             if (remainingTime() == -1)
                 throw new IgniteTxTimeoutCheckedException("Cache transaction 
timed out: " + this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index fb04c2b..2214e10 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -148,8 +148,8 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     /** Topology version should be used when mapping internal tx. */
     private final ThreadLocal<AffinityTopologyVersion> txTop = new 
ThreadLocal<>();
 
-    /** User transaction. */
-    private final static ThreadLocal<IgniteInternalTx> userTx = new 
ThreadLocal<>();
+    /** Per-thread transaction map. */
+    private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap();
 
     /** Per-thread system transaction map. */
     private final ConcurrentMap<TxThreadKey, IgniteInternalTx> sysThreadMap = 
newMap();
@@ -282,15 +282,19 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * @param cacheId Cache ID.
      */
     public void rollbackTransactionsForCache(int cacheId) {
-        rollbackTransactionsForCache(cacheId, activeTransactions());
+        rollbackTransactionsForCache(cacheId, nearIdMap);
+
+        rollbackTransactionsForCache(cacheId, threadMap);
     }
 
     /**
      * @param cacheId Cache ID.
      * @param txMap Transactions map.
      */
-    private void rollbackTransactionsForCache(int cacheId, 
Collection<IgniteInternalTx> txMap) {
-        for (IgniteInternalTx tx : txMap) {
+    private void rollbackTransactionsForCache(int cacheId, ConcurrentMap<?, 
IgniteInternalTx> txMap) {
+        for (Map.Entry<?, IgniteInternalTx> e : txMap.entrySet()) {
+            IgniteInternalTx tx = e.getValue();
+
             for (IgniteTxEntry entry : tx.allEntries()) {
                 if (entry.cacheId() == cacheId) {
                     rollbackTx(tx);
@@ -305,8 +309,8 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     @Override public void onDisconnected(IgniteFuture reconnectFut) {
         txFinishSync.onDisconnected(reconnectFut);
 
-        for (IgniteInternalTx tx : activeTransactions())
-            rollbackTx(tx);
+        for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet())
+            rollbackTx(e.getValue());
 
         IgniteClientDisconnectedException err =
             new IgniteClientDisconnectedException(reconnectFut, "Client node 
disconnected.");
@@ -365,7 +369,8 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     @Override public void printMemoryStats() {
         X.println(">>> ");
         X.println(">>> Transaction manager memory stats [igniteInstanceName=" 
+ cctx.igniteInstanceName() + ']');
-        X.println(">>>   activeSize [size=" + activeTransactions().size() + 
']');
+        X.println(">>>   threadMapSize: " + threadMap.size());
+        X.println(">>>   idMap [size=" + idMap.size() + ']');
         X.println(">>>   completedVersSortedSize: " + 
completedVersSorted.size());
         X.println(">>>   completedVersHashMapSize: " + 
completedVersHashMap.sizex());
     }
@@ -374,7 +379,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * @return Thread map size.
      */
     public int threadMapSize() {
-        return 0;
+        return threadMap.size();
     }
 
     /**
@@ -483,7 +488,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
             // and overwrite local transaction.
             if (tx.local() && !tx.dht()) {
                 if (cacheCtx == null || !cacheCtx.systemTx())
-                    userTx.set(tx);
+                    threadMap.put(tx.threadId(), tx);
                 else
                     sysThreadMap.put(new TxThreadKey(tx.threadId(), 
cacheCtx.cacheId()), tx);
             }
@@ -660,7 +665,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * @return Not null topology version if current thread holds lock 
preventing topology change.
      */
     @Nullable public AffinityTopologyVersion lockedTopologyVersion(long 
threadId, IgniteInternalTx ignore) {
-        IgniteInternalTx tx = userTx.get();
+        IgniteInternalTx tx = threadMap.get(threadId);
 
         if (tx != null) {
             AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
@@ -767,7 +772,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     @SuppressWarnings({"unchecked"})
     private <T> T tx(GridCacheContext cctx, long threadId) {
         if (cctx == null || !cctx.systemTx())
-            return (T) userTx.get();
+            return (T)threadMap.get(threadId);
 
         TxThreadKey key = new TxThreadKey(threadId, cctx.cacheId());
 
@@ -1417,8 +1422,8 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      */
     private void clearThreadMap(IgniteInternalTx tx) {
         if (tx.local() && !tx.dht()) {
-            if (!tx.system())
-                userTx.set(null);
+            if (!tx.system() && !tx.timedOut())
+                threadMap.remove(tx.threadId(), tx);
             else {
                 Integer cacheId = tx.txState().firstCacheId();
 
@@ -1706,10 +1711,10 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     }
 
     /**
-     * Reset user tx.
+     * Remove tx from map.
      */
-    public void resetUserTx() {
-        userTx.set(null);
+    public void onLocalClose(IgniteInternalTx tx) {
+        threadMap.remove(tx.threadId(), tx);
     }
 
     /**
@@ -2286,13 +2291,15 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
                     + "[expected=" + SUSPENDED + ", actual=" + tx.state() + 
']');
         }
 
-        if (userTx.get() != null)
-            throw new IgniteCheckedException("The thread already has active 
transaction.");
+        long threadId = Thread.currentThread().getId();
 
-        userTx.set(tx);
+        if (threadMap.putIfAbsent(threadId, tx) != null)
+            throw new IgniteCheckedException("Thread already has started a 
transaction.");
 
         if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null)
-            throw new IgniteCheckedException("The thread already has active 
transaction.");
+            throw new IgniteCheckedException("Thread already has started a 
transaction.");
+
+        tx.threadId(threadId);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c3df8f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
index 01829ce..529fdc3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionTimeoutException;
+import org.jsr166.ThreadLocalRandom8;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -217,21 +218,23 @@ public class TxRollbackOnTimeoutTest extends 
GridCommonAbstractTest {
     public void testTimeoutRemoval() throws Exception {
         IgniteEx client = (IgniteEx)startGrid("client");
 
-        for (int i = 0; i < 5; i++)
+        int modesCnt = 5;
+
+        for (int i = 0; i < modesCnt; i++)
             testTimeoutRemoval0(grid(0), i, TX_TIMEOUT);
 
-        for (int i = 0; i < 5; i++)
+        for (int i = 0; i < modesCnt; i++)
             testTimeoutRemoval0(client, i, TX_TIMEOUT);
 
-        for (int i = 0; i < 5; i++)
+        for (int i = 0; i < modesCnt; i++)
             testTimeoutRemoval0(grid(0), i, TX_MIN_TIMEOUT);
 
-        for (int i = 0; i < 5; i++)
+        for (int i = 0; i < modesCnt; i++)
             testTimeoutRemoval0(client, i, TX_MIN_TIMEOUT);
 
         // Repeat with more iterations to make sure everything is cleared.
         for (int i = 0; i < 500; i++)
-            testTimeoutRemoval0(client, 2, TX_MIN_TIMEOUT);
+            testTimeoutRemoval0(client, 
ThreadLocalRandom8.current().nextInt(modesCnt), TX_MIN_TIMEOUT);
     }
 
     /**

Reply via email to