ignite-6181 Tx rollback on timeout

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5af30cf1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5af30cf1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5af30cf1

Branch: refs/heads/ignite-3479
Commit: 5af30cf118aeb3910398e6b15dbe2a51b62746d7
Parents: 27295f2
Author: sboikov <sboi...@gridgain.com>
Authored: Fri Sep 22 11:20:16 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri Sep 22 11:20:16 2017 +0300

----------------------------------------------------------------------
 .../IgniteDiagnosticPrepareContext.java         |   4 +-
 .../processors/cache/CacheMetricsImpl.java      |   2 +-
 .../processors/cache/GridCacheAdapter.java      |  18 +-
 .../processors/cache/GridCacheMapEntry.java     |   9 +-
 .../processors/cache/GridCacheMvccManager.java  |   5 +-
 .../GridDistributedTxRemoteAdapter.java         |   2 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   2 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |   6 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   4 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  79 ++-
 .../distributed/near/GridNearLockFuture.java    | 116 ++--
 .../near/GridNearOptimisticTxPrepareFuture.java |   3 +-
 .../near/GridNearTransactionalCache.java        |   3 -
 .../near/GridNearTxFastFinishFuture.java        |  82 +++
 .../near/GridNearTxFinishFuture.java            |  23 +-
 .../cache/distributed/near/GridNearTxLocal.java | 303 ++++++---
 .../distributed/near/NearTxFinishFuture.java    |  31 +
 .../cache/transactions/IgniteInternalTx.java    |   2 +-
 .../cache/transactions/IgniteTxAdapter.java     |  13 +-
 .../cache/transactions/IgniteTxHandler.java     |  26 +-
 .../transactions/IgniteTxLocalAdapter.java      |  10 +-
 .../cache/transactions/IgniteTxLocalEx.java     |   7 +-
 .../cache/transactions/IgniteTxManager.java     |  80 +--
 .../timeout/GridTimeoutProcessor.java           |  23 +-
 .../processors/cache/CacheTxFastFinishTest.java |   9 +-
 .../cache/IgniteTxConfigCacheSelfTest.java      |  14 +
 .../IgniteCacheThreadLocalTxTest.java           | 223 +++++++
 .../IgniteOptimisticTxSuspendResumeTest.java    |   6 +-
 ...ionedMultiNodeLongTxTimeout2FullApiTest.java |  34 +
 .../TxRollbackOnTimeoutNearCacheTest.java       |  28 +
 ...ollbackOnTimeoutNoDeadlockDetectionTest.java |  47 ++
 .../transactions/TxRollbackOnTimeoutTest.java   | 655 +++++++++++++++++++
 .../IgniteCacheFullApiSelfTestSuite.java        |   2 +
 .../testsuites/IgniteCacheTestSuite6.java       |  10 +
 .../hadoop/impl/HadoopTxConfigCacheTest.java    |   4 +-
 35 files changed, 1627 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
index 378dc74..ed8d35e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
@@ -75,7 +75,7 @@ public class IgniteDiagnosticPrepareContext {
      * @param keys Entry keys.
      * @param msg Initial message.
      */
-    public void  txKeyInfo(UUID nodeId, int cacheId, 
Collection<KeyCacheObject> keys, String msg) {
+    public void txKeyInfo(UUID nodeId, int cacheId, Collection<KeyCacheObject> 
keys, String msg) {
         closure(nodeId).add(msg, new TxEntriesInfoClosure(cacheId, keys));
     }
 
@@ -280,4 +280,4 @@ public class IgniteDiagnosticPrepareContext {
             }
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index d03a6f8..413b60d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -328,7 +328,7 @@ public class CacheMetricsImpl implements CacheMetrics {
 
     /** {@inheritDoc} */
     @Override public int getTxDhtThreadMapSize() {
-        return cctx.isNear() && dhtCtx != null ? dhtCtx.tm().threadMapSize() : 
-1;
+        return cctx.tm().threadMapSize();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 92a8245..32b1b99 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -78,6 +78,7 @@ import 
org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgniteTransactionsEx;
 import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.cluster.IgniteClusterEx;
@@ -1857,7 +1858,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 return new GridFinishedFuture<>(e);
             }
 
-            tx = ctx.tm().threadLocalTx(ctx.systemTx() ? ctx : null);
+            tx = ctx.tm().threadLocalTx(ctx);
         }
 
         if (tx == null || tx.implicit()) {
@@ -4057,7 +4058,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
                     return t;
                 }
-                catch (IgniteInterruptedCheckedException | 
IgniteTxHeuristicCheckedException e) {
+                catch (IgniteInterruptedCheckedException | 
IgniteTxHeuristicCheckedException | NodeStoppingException e) {
                     throw e;
                 }
                 catch (IgniteCheckedException e) {
@@ -4071,7 +4072,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                         catch (IgniteCheckedException | AssertionError | 
RuntimeException e1) {
                             U.error(log, "Failed to rollback transaction 
(cache may contain stale locks): " + tx, e1);
 
-                            e.addSuppressed(e1);
+                            if (e != e1)
+                                e.addSuppressed(e1);
                         }
                     }
 
@@ -4202,7 +4204,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                                         try {
                                             return tFut.get();
                                         }
-                                        catch 
(IgniteTxRollbackCheckedException e) {
+                                        catch 
(IgniteTxRollbackCheckedException | NodeStoppingException e) {
                                             throw e;
                                         }
                                         catch (IgniteCheckedException e1) {
@@ -4210,7 +4212,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                                                 tx0.rollbackNearTxLocalAsync();
                                             }
                                             catch (Throwable e2) {
-                                                e1.addSuppressed(e2);
+                                                if (e1 != e2)
+                                                    e1.addSuppressed(e2);
                                             }
 
                                             throw e1;
@@ -4241,7 +4244,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                         try {
                             return tFut.get();
                         }
-                        catch (IgniteTxRollbackCheckedException e) {
+                        catch (IgniteTxRollbackCheckedException | 
NodeStoppingException e) {
                             throw e;
                         }
                         catch (IgniteCheckedException e1) {
@@ -4249,7 +4252,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                                 tx0.rollbackNearTxLocalAsync();
                             }
                             catch (Throwable e2) {
-                                e1.addSuppressed(e2);
+                                if (e2 != e1)
+                                    e1.addSuppressed(e2);
                             }
 
                             throw e1;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 958f156..7b60b9c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2463,7 +2463,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
     /** {@inheritDoc} */
     @Nullable @Override public CacheObject peek(@Nullable 
IgniteCacheExpiryPolicy plc)
         throws GridCacheEntryRemovedException, IgniteCheckedException {
-        IgniteInternalTx tx = cctx.tm().localTxx();
+        IgniteInternalTx tx = cctx.tm().localTx();
 
         AffinityTopologyVersion topVer = tx != null ? tx.topologyVersion() : 
cctx.affinity().affinityTopologyVersion();
 
@@ -3110,10 +3110,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
      * @return Current transaction.
      */
     private IgniteTxLocalAdapter currentTx() {
-        if (cctx.isDht())
-            return cctx.dht().near().context().tm().localTx();
-        else
-            return cctx.tm().localTx();
+        return cctx.tm().localTx();
     }
 
     /** {@inheritDoc} */
@@ -3491,7 +3488,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             }
         }
 
-        IgniteInternalTx tx = cctx.tm().localTxx();
+        IgniteInternalTx tx = cctx.tm().localTx();
 
         if (tx != null) {
             IgniteTxEntry e = tx.entry(txKey());

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 71df71f..a7d9ce7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -478,13 +478,14 @@ public class GridCacheMvccManager extends 
GridCacheSharedManagerAdapter {
     /**
      * @param fut Future.
      * @param futId Future ID.
+     * @return {@code True} if added.
      */
-    public void addFuture(final GridCacheFuture<?> fut, final IgniteUuid 
futId) {
+    public boolean addFuture(final GridCacheFuture<?> fut, final IgniteUuid 
futId) {
         GridCacheFuture<?> old = futs.put(futId, fut);
 
         assert old == null : old;
 
-        onFutureAdded(fut);
+        return onFutureAdded(fut);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index ea6461d..e5bcc46 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -855,7 +855,7 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
             // Note that we don't evict near entries here -
             // they will be deleted by their corresponding transactions.
             if (state(ROLLING_BACK) || state() == UNKNOWN) {
-                cctx.tm().rollbackTx(this);
+                cctx.tm().rollbackTx(this, false);
 
                 state(ROLLED_BACK);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 5311ddc..6380710 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -227,7 +227,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
                 try {
                     boolean nodeStop = err != null && X.hasCause(err, 
NodeStoppingException.class);
 
-                    this.tx.tmFinish(err == null, nodeStop);
+                    this.tx.tmFinish(err == null, nodeStop, false);
                 }
                 catch (IgniteCheckedException finishErr) {
                     U.error(log, "Failed to finish tx: " + tx, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index ab5631e..28cc018 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -436,7 +436,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter 
implements GridCacheMa
             if (prepFut != null)
                 prepFut.get(); // Check for errors.
 
-            boolean finished = localFinish(commit);
+            boolean finished = localFinish(commit, false);
 
             if (!finished)
                 err = new IgniteCheckedException("Failed to finish transaction 
[commit=" + commit +
@@ -542,7 +542,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter 
implements GridCacheMa
 
     /** {@inheritDoc} */
     @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
-    @Override public boolean localFinish(boolean commit) throws 
IgniteCheckedException {
+    @Override public boolean localFinish(boolean commit, boolean 
clearThreadMap) throws IgniteCheckedException {
         assert nearFinFutId != null || isInvalidate() || !commit || 
isSystemInvalidate()
             || onePhaseCommit() || state() == PREPARED :
             "Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" 
+ isInvalidate() + ", commit=" + commit +
@@ -550,7 +550,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter 
implements GridCacheMa
 
         assert nearMiniId != 0;
 
-        return super.localFinish(commit);
+        return super.localFinish(commit, clearThreadMap);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 86eac42..e4a7141 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -734,7 +734,7 @@ public abstract class GridDhtTxLocalAdapter extends 
IgniteTxLocalAdapter {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
-    @Override public boolean localFinish(boolean commit) throws 
IgniteCheckedException {
+    @Override public boolean localFinish(boolean commit, boolean 
clearThreadMap) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Finishing dht local tx [tx=" + this + ", commit=" + 
commit + "]");
 
@@ -773,7 +773,7 @@ public abstract class GridDhtTxLocalAdapter extends 
IgniteTxLocalAdapter {
             if (commit && !isRollbackOnly())
                 userCommit();
             else
-                userRollback();
+                userRollback(clearThreadMap);
         }
         catch (IgniteCheckedException e) {
             err = e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 58c6319..e4f4601 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -54,6 +55,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
@@ -99,6 +101,10 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
     /** Logger. */
     private static IgniteLogger msgLog;
 
+    /** Done field updater. */
+    private static final AtomicIntegerFieldUpdater<GridDhtColocatedLockFuture> 
DONE_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridDhtColocatedLockFuture.class, 
"done");
+
     /** Cache registry. */
     @GridToStringExclude
     private final GridCacheContext<?, ?> cctx;
@@ -146,6 +152,10 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
     /** Map of current values. */
     private final Map<KeyCacheObject, IgniteBiTuple<GridCacheVersion, 
CacheObject>> valMap;
 
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    private volatile int done;
+
     /** Trackable flag (here may be non-volatile). */
     private boolean trackable;
 
@@ -226,12 +236,6 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
             log = U.logger(cctx.kernalContext(), logRef, 
GridDhtColocatedLockFuture.class);
         }
 
-        if (timeout > 0) {
-            timeoutObj = new LockTimeoutObject();
-
-            cctx.time().addTimeoutObject(timeoutObj);
-        }
-
         valMap = new ConcurrentHashMap8<>();
     }
 
@@ -322,6 +326,8 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
             else {
                 IgniteTxEntry txEntry = tx.entry(txKey);
 
+                assert txEntry != null;
+
                 txEntry.cached(entry);
 
                 // Check transaction entries (corresponding tx entries must be 
enlisted in transaction).
@@ -332,7 +338,7 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
                     threadId,
                     lockVer,
                     true,
-                    tx.entry(txKey).locked(),
+                    txEntry.locked(),
                     inTx(),
                     inTx() && tx.implicitSingle(),
                     false,
@@ -399,7 +405,7 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
      * @param success Success flag.
      */
     public void complete(boolean success) {
-        onComplete(success, true);
+        onComplete(success, true, true);
     }
 
     /**
@@ -533,7 +539,7 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
     /** {@inheritDoc} */
     @Override public boolean cancel() {
         if (onCancelled())
-            onComplete(false, true);
+            onComplete(false, true, true);
 
         return isCancelled();
     }
@@ -556,7 +562,7 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
         if (err != null)
             success = false;
 
-        return onComplete(success, true);
+        return onComplete(success, true, true);
     }
 
     /**
@@ -564,19 +570,32 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
      *
      * @param success {@code True} if lock was acquired.
      * @param distribute {@code True} if need to distribute lock removal in 
case of failure.
+     * @param restoreTimeout {@code True} if need restore tx timeout callback.
      * @return {@code True} if complete by this operation.
      */
-    private boolean onComplete(boolean success, boolean distribute) {
-        if (log.isDebugEnabled())
+    private boolean onComplete(boolean success, boolean distribute, boolean 
restoreTimeout) {
+        if (log.isDebugEnabled()) {
             log.debug("Received onComplete(..) callback [success=" + success + 
", distribute=" + distribute +
                 ", fut=" + this + ']');
+        }
+
+        if (!DONE_UPD.compareAndSet(this, 0, 1))
+            return false;
 
         if (!success)
             undoLocks(distribute, true);
 
-        if (tx != null)
+        if (tx != null) {
             cctx.tm().txContext(tx);
 
+            if (restoreTimeout && tx.trackTimeout()) {
+                // Need restore timeout before onDone is called and next tx 
operation can proceed.
+                boolean add = tx.addTimeoutHandler();
+
+                assert add;
+            }
+        }
+
         if (super.onDone(success, err)) {
             if (log.isDebugEnabled())
                 log.debug("Completing future: " + this);
@@ -675,6 +694,30 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
      * part. Note that if primary node leaves grid, the future will fail and 
transaction will be rolled back.
      */
     void map() {
+        if (tx != null && tx.trackTimeout()) {
+            if (!tx.removeTimeoutHandler()) {
+                tx.finishFuture().listen(new 
IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() {
+                    @Override public void 
apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+                        IgniteTxTimeoutCheckedException err = new 
IgniteTxTimeoutCheckedException("Failed to " +
+                            "acquire lock, transaction was rolled back on 
timeout [timeout=" + tx.timeout() +
+                            ", tx=" + tx + ']');
+
+                        onError(err);
+
+                        onComplete(false, false, false);
+                    }
+                });
+
+                return;
+            }
+        }
+
+        if (timeout > 0) {
+            timeoutObj = new LockTimeoutObject();
+
+            cctx.time().addTimeoutObject(timeoutObj);
+        }
+
         // Obtain the topology version to use.
         AffinityTopologyVersion topVer = 
cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
@@ -930,7 +973,7 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
                             if (log.isDebugEnabled())
                                 log.debug("Entry being locked did not pass 
filter (will not lock): " + entry);
 
-                            onComplete(false, false);
+                            onComplete(false, false, true);
 
                             return;
                         }
@@ -1307,7 +1350,7 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
             if (log.isDebugEnabled())
                 log.debug("Entry being locked did not pass filter (will not 
lock): " + entry);
 
-            onComplete(false, false);
+            onComplete(false, false, true);
 
             return false;
         }
@@ -1419,12 +1462,12 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
                             U.warn(log, "Failed to detect deadlock.", e);
                         }
 
-                        onComplete(false, true);
+                        onComplete(false, true, true);
                     }
                 });
             }
             else
-                onComplete(false, true);
+                onComplete(false, true, true);
         }
 
         /** {@inheritDoc} */
@@ -1673,4 +1716,4 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
             return S.toString(MiniFuture.class, this, "node", node.id(), 
"super", super.toString());
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index c947715..3d9989d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -50,6 +51,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
@@ -89,6 +91,10 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
 
+    /** Done field updater. */
+    private static final AtomicIntegerFieldUpdater<GridNearLockFuture> 
DONE_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridNearLockFuture.class, "done");
+
     /** */
     private static IgniteLogger log;
 
@@ -142,8 +148,9 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
     /** Map of current values. */
     private final Map<KeyCacheObject, IgniteBiTuple<GridCacheVersion, 
CacheObject>> valMap;
 
-    /** Trackable flag. */
-    private boolean trackable = true;
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    private volatile int done;
 
     /** Keys locked so far. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
@@ -184,6 +191,7 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
      * @param filter Filter.
      * @param skipStore skipStore
      * @param keepBinary Keep binary flag.
+     * @param recovery Recovery flag.
      */
     public GridNearLockFuture(
         GridCacheContext<?, ?> cctx,
@@ -230,12 +238,6 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, 
GridNearLockFuture.class);
 
-        if (timeout > 0) {
-            timeoutObj = new LockTimeoutObject();
-
-            cctx.time().addTimeoutObject(timeoutObj);
-        }
-
         valMap = new ConcurrentHashMap8<>();
     }
 
@@ -260,12 +262,12 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
 
     /** {@inheritDoc} */
     @Override public boolean trackable() {
-        return trackable;
+        return true;
     }
 
     /** {@inheritDoc} */
     @Override public void markNotTrackable() {
-        trackable = false;
+        // No-op.
     }
 
     /**
@@ -434,7 +436,7 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
      * @param success Success flag.
      */
     public void complete(boolean success) {
-        onComplete(success, true);
+        onComplete(success, true, true);
     }
 
     /**
@@ -655,7 +657,7 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
                     log.debug("Local lock acquired for entries [fut=" + this + 
", entries=" + entries + "]");
             }
 
-            onComplete(true, true);
+            onComplete(true, true, true);
 
             return true;
         }
@@ -666,7 +668,7 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
     /** {@inheritDoc} */
     @Override public boolean cancel() {
         if (onCancelled())
-            onComplete(false, true);
+            onComplete(false, true, true);
 
         return isCancelled();
     }
@@ -690,7 +692,7 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
         if (err != null)
             success = false;
 
-        return onComplete(success, true);
+        return onComplete(success, true, true);
     }
 
     /**
@@ -698,19 +700,32 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
      *
      * @param success {@code True} if lock was acquired.
      * @param distribute {@code True} if need to distribute lock removal in 
case of failure.
+     * @param restoreTimeout {@code True} if need restore tx timeout callback.
      * @return {@code True} if complete by this operation.
      */
-    private boolean onComplete(boolean success, boolean distribute) {
-        if (log.isDebugEnabled())
+    private boolean onComplete(boolean success, boolean distribute, boolean 
restoreTimeout) {
+        if (log.isDebugEnabled()) {
             log.debug("Received onComplete(..) callback [success=" + success + 
", distribute=" + distribute +
                 ", fut=" + this + ']');
+        }
+
+        if (!DONE_UPD.compareAndSet(this, 0, 1))
+            return false;
 
         if (!success)
             undoLocks(distribute, true);
 
-        if (tx != null)
+        if (tx != null) {
             cctx.tm().txContext(tx);
 
+            if (restoreTimeout && tx.trackTimeout()) {
+                // Need restore timeout before onDone is called and next tx 
operation can proceed.
+                boolean add = tx.addTimeoutHandler();
+
+                assert add;
+            }
+        }
+
         if (super.onDone(success, err)) {
             if (log.isDebugEnabled())
                 log.debug("Completing future: " + this);
@@ -770,6 +785,34 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
      * part. Note that if primary node leaves grid, the future will fail and 
transaction will be rolled back.
      */
     void map() {
+        if (tx != null && tx.trackTimeout()) {
+            if (!tx.removeTimeoutHandler()) {
+                tx.finishFuture().listen(new 
IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() {
+                    @Override public void 
apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+                        IgniteTxTimeoutCheckedException err = new 
IgniteTxTimeoutCheckedException("Failed to " +
+                            "acquire lock, transaction was rolled back on 
timeout [timeout=" + tx.timeout() +
+                            ", tx=" + tx + ']');
+
+                        onError(err);
+
+                        onComplete(false, false, false);
+                    }
+                });
+
+                return;
+            }
+        }
+
+        if (timeout > 0) {
+            timeoutObj = new LockTimeoutObject();
+
+            cctx.time().addTimeoutObject(timeoutObj);
+        }
+
+        boolean added = cctx.mvcc().addFuture(this);
+
+        assert added : this;
+
         // Obtain the topology version to use.
         long threadId = Thread.currentThread().getId();
 
@@ -971,19 +1014,13 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
                             GridNearCacheEntry entry = null;
 
                             try {
-                                entry = cctx.near().entryExx(
-                                    key,
-                                    topVer);
+                                entry = cctx.near().entryExx(key, topVer);
 
-                                if (!cctx.isAll(
-                                    entry,
-                                    filter)) {
+                                if (!cctx.isAll(entry, filter)) {
                                     if (log.isDebugEnabled())
                                         log.debug("Entry being locked did not 
pass filter (will not lock): " + entry);
 
-                                    onComplete(
-                                        false,
-                                        false);
+                                    onComplete(false, false, true);
 
                                     return;
                                 }
@@ -1004,10 +1041,7 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
 
                                 if (cand != null) {
                                     if (tx == null && !cand.reentry())
-                                        cctx.mvcc().addExplicitLock(
-                                            threadId,
-                                            cand,
-                                            topVer);
+                                        
cctx.mvcc().addExplicitLock(threadId,cand,topVer);
 
                                     IgniteBiTuple<GridCacheVersion, 
CacheObject> val = entry.versionedValue();
 
@@ -1032,9 +1066,7 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
                                     if (val != null) {
                                         dhtVer = val.get1();
 
-                                        valMap.put(
-                                            key,
-                                            val);
+                                        valMap.put(key, val);
                                     }
 
                                     if (!cand.reentry()) {
@@ -1083,9 +1115,7 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
                                         distributedKeys.add(key);
 
                                         if (tx != null)
-                                            tx.addKeyMapping(
-                                                txKey,
-                                                mapping.node());
+                                            tx.addKeyMapping(txKey, 
mapping.node());
 
                                         req.addKeyBytes(
                                             key,
@@ -1098,14 +1128,16 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
                                     if (cand.reentry())
                                         explicit = tx != null && 
!entry.hasLockCandidate(tx.xidVersion());
                                 }
-                                else
+                                else {
+                                    if (timedOut)
+                                        return;
+
                                     // Ignore reentries within transactions.
                                     explicit = tx != null && 
!entry.hasLockCandidate(tx.xidVersion());
+                                }
 
                                 if (explicit)
-                                    tx.addKeyMapping(
-                                        txKey,
-                                        mapping.node());
+                                    tx.addKeyMapping(txKey, mapping.node());
 
                                 break;
                             }
@@ -1476,12 +1508,12 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
                             U.warn(log, "Failed to detect deadlock.", e);
                         }
 
-                        onComplete(false, true);
+                        onComplete(false, true, true);
                     }
                 });
             }
             else
-                onComplete(false, true);
+                onComplete(false, true, true);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 77c68bd..6d7a862 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -913,8 +913,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
                 return;
 
             if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
-                if (parent.cctx.tm().deadlockDetectionEnabled() &&
-                    (parent.tx.remainingTime() == -1 || res.error() instanceof 
IgniteTxTimeoutCheckedException)) {
+                if (parent.tx.remainingTime() == -1 || res.error() instanceof 
IgniteTxTimeoutCheckedException) {
                     parent.onTimeout();
 
                     return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 9d0b186..a13da3d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -467,9 +467,6 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
             opCtx != null && opCtx.isKeepBinary(),
             opCtx != null && opCtx.recovery());
 
-        if (!ctx.mvcc().addFuture(fut))
-            throw new IllegalStateException("Duplicate future ID: " + fut);
-
         fut.map();
 
         return fut;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java
new file mode 100644
index 0000000..7222697
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+
+import static org.apache.ignite.transactions.TransactionState.COMMITTED;
+import static org.apache.ignite.transactions.TransactionState.COMMITTING;
+import static org.apache.ignite.transactions.TransactionState.PREPARED;
+import static org.apache.ignite.transactions.TransactionState.PREPARING;
+import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
+import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
+
+/**
+ *
+ */
+public class GridNearTxFastFinishFuture extends 
GridFutureAdapter<IgniteInternalTx> implements NearTxFinishFuture {
+    /** */
+    private final GridNearTxLocal tx;
+
+    /** */
+    private final boolean commit;
+
+    /**
+     * @param tx Transaction.
+     * @param commit Commit flag.
+     */
+    GridNearTxFastFinishFuture(GridNearTxLocal tx, boolean commit) {
+        this.tx = tx;
+        this.commit = commit;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean commit() {
+        return commit;
+    }
+
+    /**
+     *
+     */
+    public void finish() {
+        try {
+            if (commit) {
+                tx.state(PREPARING);
+                tx.state(PREPARED);
+                tx.state(COMMITTING);
+
+                tx.context().tm().fastFinishTx(tx, true);
+
+                tx.state(COMMITTED);
+            }
+            else {
+                tx.state(PREPARING);
+                tx.state(PREPARED);
+                tx.state(ROLLING_BACK);
+
+                tx.context().tm().fastFinishTx(tx, false);
+
+                tx.state(ROLLED_BACK);
+            }
+        }
+        finally {
+            onDone(tx);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index c45eb7b..b6a8855 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -67,7 +67,7 @@ import static 
org.apache.ignite.transactions.TransactionState.UNKNOWN;
  *
  */
 public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentityFuture<IgniteInternalTx>
-    implements GridCacheFuture<IgniteInternalTx> {
+    implements GridCacheFuture<IgniteInternalTx>, NearTxFinishFuture {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -136,6 +136,11 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
     }
 
     /** {@inheritDoc} */
+    @Override public boolean commit() {
+        return commit;
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
         return futId;
     }
@@ -278,6 +283,13 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
         }
     }
 
+    /**
+     *
+     */
+    void forceFinish() {
+        super.onDone(tx, null, false);
+    }
+
     /** {@inheritDoc} */
     @Override public boolean onDone(IgniteInternalTx tx0, Throwable err) {
         if (isDone())
@@ -310,7 +322,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
                         err = new TransactionRollbackException("Failed to 
commit transaction.", err);
 
                     try {
-                        tx.localFinish(err == null);
+                        tx.localFinish(err == null, true);
                     }
                     catch (IgniteCheckedException e) {
                         if (err != null)
@@ -327,7 +339,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
                         finishOnePhase(commit);
 
                     try {
-                        tx.tmFinish(commit, nodeStop);
+                        tx.tmFinish(commit, nodeStop, true);
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to finish tx: " + tx, e);
@@ -386,9 +398,10 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
      * Initializes future.
      *
      * @param commit Commit flag.
+     * @param clearThreadMap If {@code true} removes {@link GridNearTxLocal} 
from thread map.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    void finish(boolean commit) {
+    public void finish(boolean commit, boolean clearThreadMap) {
         if (tx.onNeedCheckBackup()) {
             assert tx.onePhaseCommit();
 
@@ -402,7 +415,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
         }
 
         try {
-            if (tx.localFinish(commit) || (!commit && tx.state() == UNKNOWN)) {
+            if (tx.localFinish(commit, clearThreadMap) || (!commit && 
tx.state() == UNKNOWN)) {
                 if ((tx.onePhaseCommit() && needFinishOnePhase(commit)) || 
(!tx.onePhaseCommit() && mappings != null)) {
                     if (mappings.single()) {
                         GridDistributedTxMapping mapping = 
mappings.singleMapping();

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 058a3ff..8b043d8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -67,6 +67,7 @@ import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import 
org.apache.ignite.internal.processors.cache.transactions.TransactionProxy;
 import 
org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import 
org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
@@ -89,6 +90,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -107,7 +109,7 @@ import static 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxE
 import static 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
 import static org.apache.ignite.transactions.TransactionState.COMMITTED;
 import static org.apache.ignite.transactions.TransactionState.COMMITTING;
-import static org.apache.ignite.transactions.TransactionState.PREPARED;
+import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
 import static org.apache.ignite.transactions.TransactionState.PREPARING;
 import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
 import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
@@ -117,7 +119,7 @@ import static 
org.apache.ignite.transactions.TransactionState.UNKNOWN;
  * Replicated user transaction.
  */
 @SuppressWarnings("unchecked")
-public class GridNearTxLocal extends GridDhtTxLocalAdapter implements 
AutoCloseable  {
+public class GridNearTxLocal extends GridDhtTxLocalAdapter implements 
GridTimeoutObject, AutoCloseable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -126,12 +128,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter 
implements AutoClosea
         AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, 
IgniteInternalFuture.class, "prepFut");
 
     /** Prepare future updater. */
-    private static final AtomicReferenceFieldUpdater<GridNearTxLocal, 
GridNearTxFinishFuture> COMMIT_FUT_UPD =
-        AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, 
GridNearTxFinishFuture.class, "commitFut");
-
-    /** Rollback future updater. */
-    private static final AtomicReferenceFieldUpdater<GridNearTxLocal, 
GridNearTxFinishFuture> ROLLBACK_FUT_UPD =
-        AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, 
GridNearTxFinishFuture.class, "rollbackFut");
+    private static final AtomicReferenceFieldUpdater<GridNearTxLocal, 
NearTxFinishFuture> FINISH_FUT_UPD =
+        AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, 
NearTxFinishFuture.class, "finishFut");
 
     /** DHT mappings. */
     private IgniteTxMappings mappings;
@@ -144,12 +142,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter 
implements AutoClosea
     /** Commit future. */
     @SuppressWarnings("UnusedDeclaration")
     @GridToStringExclude
-    private volatile GridNearTxFinishFuture commitFut;
-
-    /** Rollback future. */
-    @SuppressWarnings("UnusedDeclaration")
-    @GridToStringExclude
-    private volatile GridNearTxFinishFuture rollbackFut;
+    private volatile NearTxFinishFuture finishFut;
 
     /** True if transaction contains near cache entries mapped to local node. 
*/
     private boolean nearLocallyMapped;
@@ -170,6 +163,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter 
implements AutoClosea
     protected boolean transform;
 
     /** */
+    private boolean trackTimeout;
+
+    /** */
     @GridToStringExclude
     private TransactionProxyImpl proxy;
 
@@ -229,6 +225,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter 
implements AutoClosea
         mappings = implicitSingle ? new IgniteTxMappingsSingleImpl() : new 
IgniteTxMappingsImpl();
 
         initResult();
+
+        if (timeout() > 0 && !implicit())
+            trackTimeout = cctx.time().addTimeoutObject(this);
     }
 
     /** {@inheritDoc} */
@@ -3044,7 +3043,7 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements AutoClosea
 
     /** {@inheritDoc} */
     @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
-    @Override public boolean localFinish(boolean commit) throws 
IgniteCheckedException {
+    @Override public boolean localFinish(boolean commit, boolean 
clearThreadMap) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Finishing near local tx [tx=" + this + ", commit=" + 
commit + "]");
 
@@ -3080,7 +3079,7 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements AutoClosea
             if (commit && !isRollbackOnly())
                 userCommit();
             else
-                userRollback();
+                userRollback(clearThreadMap);
         }
         catch (IgniteCheckedException e) {
             err = e;
@@ -3146,6 +3145,9 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements AutoClosea
             if (!PREP_FUT_UPD.compareAndSet(this, null, fut))
                 return prepFut;
 
+            if (trackTimeout)
+                removeTimeoutHandler();
+
             if (timeout == -1) {
                 fut.onDone(this, timeoutException());
 
@@ -3178,14 +3180,7 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements AutoClosea
         if (awaitLastFuture)
             txState().awaitLastFuture(cctx);
 
-        prepareAsync().get();
-    }
-
-    /**
-     * @return Prepare future.
-     */
-    private IgniteInternalFuture<?> prepareAsync() {
-        return prepareNearTxLocal();
+        prepareNearTxLocal().get();
     }
 
     /**
@@ -3202,42 +3197,43 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements AutoClosea
         if (log.isDebugEnabled())
             log.debug("Committing near local tx: " + this);
 
+        NearTxFinishFuture fut = finishFut;
+
+        if (fut != null)
+            return chainFinishFuture(fut, true);
+
         if (fastFinish()) {
-            state(PREPARING);
-            state(PREPARED);
-            state(COMMITTING);
+            GridNearTxFastFinishFuture fut0;
 
-            cctx.tm().fastFinishTx(this, true);
+            if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new 
GridNearTxFastFinishFuture(this, true)))
+                return chainFinishFuture(finishFut, true);
 
-            state(COMMITTED);
+            fut0.finish();
 
-            return new GridFinishedFuture<>((IgniteInternalTx)this);
+            return fut0;
         }
 
-        final IgniteInternalFuture<?> prepareFut = prepareNearTxLocal();
+        final GridNearTxFinishFuture fut0;
 
-        GridNearTxFinishFuture fut = commitFut;
+        if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new 
GridNearTxFinishFuture<>(cctx, this, true)))
+            return chainFinishFuture(finishFut, true);
 
-        if (fut != null ||
-            !COMMIT_FUT_UPD.compareAndSet(this, null, fut = new 
GridNearTxFinishFuture<>(cctx, this, true)))
-            return commitFut;
+        cctx.mvcc().addFuture(fut0, fut0.futureId());
 
-        cctx.mvcc().addFuture(fut, fut.futureId());
+        final IgniteInternalFuture<?> prepareFut = prepareNearTxLocal();
 
         prepareFut.listen(new CI1<IgniteInternalFuture<?>>() {
             @Override public void apply(IgniteInternalFuture<?> f) {
-                GridNearTxFinishFuture fut0 = commitFut;
-
                 try {
                     // Make sure that here are no exceptions.
                     prepareFut.get();
 
-                    fut0.finish(true);
+                    fut0.finish(true, true);
                 }
                 catch (Error | RuntimeException e) {
                     COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, 
e);
 
-                    fut0.finish(false);
+                    fut0.finish(false, true);
 
                     throw e;
                 }
@@ -3245,12 +3241,12 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements AutoClosea
                     COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, 
e);
 
                     if (!(e instanceof NodeStoppingException))
-                        fut0.finish(false);
+                        fut0.finish(false, true);
                 }
             }
         });
 
-        return fut;
+        return fut0;
     }
 
     /** {@inheritDoc} */
@@ -3269,30 +3265,42 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements AutoClosea
      * @return Rollback future.
      */
     public IgniteInternalFuture<IgniteInternalTx> rollbackNearTxLocalAsync() {
+        return rollbackNearTxLocalAsync(false);
+    }
+
+    /**
+     * @param onTimeout {@code True} if rolled back asynchronously on timeout.
+     * @return Rollback future.
+     */
+    private IgniteInternalFuture<IgniteInternalTx> 
rollbackNearTxLocalAsync(final boolean onTimeout) {
         if (log.isDebugEnabled())
             log.debug("Rolling back near tx: " + this);
 
+        if (!onTimeout && trackTimeout)
+            removeTimeoutHandler();
+
+        NearTxFinishFuture fut = finishFut;
+
+        if (fut != null)
+            return chainFinishFuture(finishFut, false);
+
         if (fastFinish()) {
-            state(PREPARING);
-            state(PREPARED);
-            state(ROLLING_BACK);
+            GridNearTxFastFinishFuture fut0;
 
-            cctx.tm().fastFinishTx(this, false);
+            if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new 
GridNearTxFastFinishFuture(this, false)))
+                return chainFinishFuture(finishFut, false);
 
-            state(ROLLED_BACK);
+            fut0.finish();
 
-            return new GridFinishedFuture<>((IgniteInternalTx)this);
+            return fut0;
         }
 
-        GridNearTxFinishFuture fut = rollbackFut;
+        final GridNearTxFinishFuture fut0;
 
-        if (fut != null)
-            return fut;
+        if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new 
GridNearTxFinishFuture<>(cctx, this, false)))
+            return chainFinishFuture(finishFut, false);
 
-        if (!ROLLBACK_FUT_UPD.compareAndSet(this, null, fut = new 
GridNearTxFinishFuture<>(cctx, this, false)))
-            return rollbackFut;
-
-        cctx.mvcc().addFuture(fut, fut.futureId());
+        cctx.mvcc().addFuture(fut0, fut0.futureId());
 
         IgniteInternalFuture<?> prepFut = this.prepFut;
 
@@ -3307,7 +3315,7 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements AutoClosea
                     log.debug("Got optimistic tx failure [tx=" + this + ", 
err=" + e + ']');
             }
 
-            fut.finish(false);
+            fut0.finish(false, !onTimeout);
         }
         else {
             prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
@@ -3321,14 +3329,12 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements AutoClosea
                             log.debug("Got optimistic tx failure [tx=" + this 
+ ", err=" + e + ']');
                     }
 
-                    GridNearTxFinishFuture fut0 = rollbackFut;
-
-                    fut0.finish(false);
+                    fut0.finish(false, !onTimeout);
                 }
             });
         }
 
-        return fut;
+        return fut0;
     }
 
     /** {@inheritDoc} */
@@ -3337,6 +3343,64 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements AutoClosea
     }
 
     /**
+     * @param fut Already started finish future.
+     * @param commit Commit flag.
+     * @return Finish future.
+     */
+    private IgniteInternalFuture<IgniteInternalTx> chainFinishFuture(final 
NearTxFinishFuture fut, final boolean commit) {
+        assert fut != null;
+
+        if (fut.commit() != commit) {
+            final GridNearTxLocal tx = this;
+
+            if (!commit) {
+                final GridNearTxFinishFuture rollbackFut = new 
GridNearTxFinishFuture<>(cctx, this, false);
+
+                fut.listen(new 
IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() {
+                    @Override public void 
apply(IgniteInternalFuture<IgniteInternalTx> fut0) {
+                        if (FINISH_FUT_UPD.compareAndSet(tx, fut, 
rollbackFut)) {
+                            if (tx.state() == COMMITTED) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to rollback, transaction 
is already committed: " + tx);
+
+                                rollbackFut.forceFinish();
+
+                                assert rollbackFut.isDone() : rollbackFut;
+                            }
+                            else {
+                                if (!cctx.mvcc().addFuture(rollbackFut, 
rollbackFut.futureId()))
+                                    return;
+
+                                rollbackFut.finish(false, true);
+                            }
+                        }
+                    }
+                });
+
+                return rollbackFut;
+            }
+            else {
+                final GridFutureAdapter<IgniteInternalTx> fut0 = new 
GridFutureAdapter<>();
+
+                fut.listen(new 
IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() {
+                    @Override public void 
apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+                        if (timedOut())
+                            fut0.onDone(new 
IgniteTxTimeoutCheckedException("Failed to commit transaction, " +
+                                "transaction is concurrently rolled back on 
timeout: " + tx));
+                        else
+                            fut0.onDone(new IgniteCheckedException("Failed to 
commit transaction, " +
+                                "transaction is concurrently rolled back: " + 
tx));
+                    }
+                });
+
+                return fut0;
+            }
+        }
+
+        return fut;
+    }
+
+    /**
      * @return {@code True} if 'fast finish' path can be used for transaction 
completion.
      */
     private boolean fastFinish() {
@@ -3693,38 +3757,53 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements AutoClosea
     @Override public void close() throws IgniteCheckedException {
         TransactionState state = state();
 
-        if (state != ROLLING_BACK && state != ROLLED_BACK && state != 
COMMITTING && state != COMMITTED)
-            rollback();
+        try {
+            if (state == COMMITTED || state == ROLLED_BACK)
+                return;
 
-        synchronized (this) {
-            try {
-                while (!done())
-                    wait();
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
+            if (trackTimeout)
+                removeTimeoutHandler();
 
-                if (!done())
-                    throw new IgniteCheckedException("Got interrupted while 
waiting for transaction to complete: " +
-                        this, e);
+            if (state != ROLLING_BACK && state != COMMITTING)
+                rollback();
+
+            synchronized (this) {
+                try {
+                    while (!done())
+                        wait();
+                }
+                catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+
+                    if (!done())
+                        throw new IgniteCheckedException("Got interrupted 
while waiting for transaction to complete: " +
+                            this, e);
+                }
             }
         }
+        finally {
+            // It is possible tx was rolled back asynchronously on timeout and 
thread map is not cleared yet.
+            boolean cleanup = state == ROLLED_BACK && timedOut();
 
-        if (accessMap != null) {
-            assert optimistic();
+            if (cleanup)
+                cctx.tm().clearThreadMap(this);
 
-            for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : 
accessMap.entrySet()) {
-                if (e.getValue().entries() != null) {
-                    GridCacheContext cctx0 = 
cctx.cacheContext(e.getKey().cacheId());
+            if (accessMap != null) {
+                assert optimistic();
 
-                    if (cctx0.isNear())
-                        cctx0.near().dht().sendTtlUpdateRequest(e.getValue());
-                    else
-                        cctx0.dht().sendTtlUpdateRequest(e.getValue());
+                for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : 
accessMap.entrySet()) {
+                    if (e.getValue().entries() != null) {
+                        GridCacheContext cctx0 = 
cctx.cacheContext(e.getKey().cacheId());
+
+                        if (cctx0.isNear())
+                            
cctx0.near().dht().sendTtlUpdateRequest(e.getValue());
+                        else
+                            cctx0.dht().sendTtlUpdateRequest(e.getValue());
+                    }
                 }
-            }
 
-            accessMap = null;
+                accessMap = null;
+            }
         }
     }
 
@@ -4000,13 +4079,69 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements AutoClosea
 
     /**
      * @param threadId new owner of transaction.
-     * @throws IgniteCheckedException if method executed not in the middle of 
resume or suspend.
      */
     public void threadId(long threadId) {
         this.threadId = threadId;
     }
 
     /**
+     * @return {@code True} if need register callback which cancels tx on 
timeout.
+     */
+    public boolean trackTimeout() {
+        return trackTimeout;
+    }
+
+    /**
+     * Removes timeout handler.
+     *
+     * @return {@code True} if handler was removed.
+     */
+    public boolean removeTimeoutHandler() {
+        assert trackTimeout;
+
+        return cctx.time().removeTimeoutObject(this);
+    }
+
+    /**
+     * @return {@code True} if handler was added.
+     */
+    public boolean addTimeoutHandler() {
+        assert trackTimeout;
+
+        return cctx.time().addTimeoutObject(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid timeoutId() {
+        return xid();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long endTime() {
+        return startTime() + timeout();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTimeout() {
+        if (state(MARKED_ROLLBACK, true) || (state() == MARKED_ROLLBACK)) {
+            if (log.isDebugEnabled())
+                log.debug("Will rollback tx on timeout: " + this);
+
+            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                @Override public void run() {
+                    // Note: if rollback asynchonously on timeout should not 
clear thread map
+                    // since thread started tx still should be able to see 
this tx.
+                    rollbackNearTxLocalAsync(true);
+                }
+            });
+        }
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Skip rollback tx on timeout: " + this);
+        }
+    }
+
+    /**
      * Post-lock closure.
      *
      * @param <T> Return type.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java
new file mode 100644
index 0000000..132c754
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.internal.IgniteInternalFuture;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+
+/**
+ *
+ */
+public interface NearTxFinishFuture extends 
IgniteInternalFuture<IgniteInternalTx> {
+    /**
+     * @return Commit flag.
+     */
+    boolean commit();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 7598003..9e06d9d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -634,4 +634,4 @@ public interface IgniteInternalTx {
      * @param e Commit error.
      */
     public void commitError(Throwable e);
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index c447436..b5178b5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -370,6 +370,13 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
         consistentIdMapper = new ConsistentIdMapper(cctx.discovery());
     }
 
+    /**
+     * @return Shared cache context.
+     */
+    public GridCacheSharedContext<?, ?> context() {
+        return cctx;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean localResult() {
         assert originatingNodeId() != null;
@@ -987,7 +994,7 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
      * @return {@code True} if state changed.
      */
     @SuppressWarnings({"TooBroadScope"})
-    protected boolean state(TransactionState state, boolean timedOut) {
+    protected final boolean state(TransactionState state, boolean timedOut) {
         boolean valid = false;
 
         TransactionState prev;
@@ -1068,7 +1075,9 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
 
             if (valid) {
                 this.state = state;
-                this.timedOut = timedOut;
+
+                if (timedOut)
+                    this.timedOut = true;
 
                 if (log.isDebugEnabled())
                     log.debug("Changed transaction state [prev=" + prev + ", 
new=" + this.state + ", tx=" + this + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index b60bab5..38c877b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -957,28 +957,30 @@ public class IgniteTxHandler {
             }
         }
         catch (Throwable e) {
-            tx.commitError(e);
+            U.error(log, "Failed completing transaction [commit=" + 
req.commit() + ", tx=" + tx + ']', e);
 
-            tx.systemInvalidate(true);
+            if (tx != null) {
+                tx.commitError(e);
 
-            U.error(log, "Failed completing transaction [commit=" + 
req.commit() + ", tx=" + tx + ']', e);
+                tx.systemInvalidate(true);
 
-            IgniteInternalFuture<IgniteInternalTx> res = null;
+                try {
+                    IgniteInternalFuture<IgniteInternalTx> res = 
tx.rollbackDhtLocalAsync();
 
-            try {
-                res = tx.rollbackDhtLocalAsync();
+                    // Only for error logging.
+                    res.listen(CU.errorLogger(log));
 
-                // Only for error logging.
-                res.listen(CU.errorLogger(log));
-            }
-            catch (Throwable e1) {
-                e.addSuppressed(e1);
+                    return res;
+                }
+                catch (Throwable e1) {
+                    e.addSuppressed(e1);
+                }
             }
 
             if (e instanceof Error)
                 throw (Error)e;
 
-            return res;
+            return new GridFinishedFuture<>(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 7ab921c..143e5cb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -54,6 +54,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -898,10 +899,11 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
      * Commits transaction to transaction manager. Used for one-phase commit 
transactions only.
      *
      * @param commit If {@code true} commits transaction, otherwise rollbacks.
+     * @param clearThreadMap If {@code true} removes {@link GridNearTxLocal} 
from thread map.
      * @param nodeStop If {@code true} tx is cancelled on node stop.
      * @throws IgniteCheckedException If failed.
      */
-    public void tmFinish(boolean commit, boolean nodeStop) throws 
IgniteCheckedException {
+    public void tmFinish(boolean commit, boolean nodeStop, boolean 
clearThreadMap) throws IgniteCheckedException {
         assert onePhaseCommit();
 
         if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) {
@@ -910,7 +912,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
                 if (commit)
                     cctx.tm().commitTx(this);
                 else
-                    cctx.tm().rollbackTx(this);
+                    cctx.tm().rollbackTx(this, clearThreadMap);
             }
 
             state(commit ? COMMITTED : ROLLED_BACK);
@@ -957,7 +959,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
     }
 
     /** {@inheritDoc} */
-    @Override public void userRollback() throws IgniteCheckedException {
+    @Override public void userRollback(boolean clearThreadMap) throws 
IgniteCheckedException {
         TransactionState state = state();
 
         if (state != ROLLING_BACK && state != ROLLED_BACK) {
@@ -975,7 +977,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
         }
 
         if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) {
-            cctx.tm().rollbackTx(this);
+            cctx.tm().rollbackTx(this, clearThreadMap);
 
             if (!internal()) {
                 Collection<CacheStoreManager> stores = txState.stores(cctx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 307c348..b61b1a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.transactions;
 
 import org.apache.ignite.IgniteCheckedException;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.jetbrains.annotations.Nullable;
 
@@ -41,16 +42,18 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
     public void userCommit() throws IgniteCheckedException;
 
     /**
+     * @param clearThreadMap If {@code true} removes {@link GridNearTxLocal} 
from thread map.
      * @throws IgniteCheckedException If rollback failed.
      */
-    public void userRollback() throws IgniteCheckedException;
+    public void userRollback(boolean clearThreadMap) throws 
IgniteCheckedException;
 
     /**
      * Finishes transaction (either commit or rollback).
      *
      * @param commit {@code True} if commit, {@code false} if rollback.
+     * @param clearThreadMap If {@code true} removes {@link GridNearTxLocal} 
from thread map.
      * @return {@code True} if state has been changed.
      * @throws IgniteCheckedException If finish failed.
      */
-    public boolean localFinish(boolean commit) throws IgniteCheckedException;
+    public boolean localFinish(boolean commit, boolean clearThreadMap) throws 
IgniteCheckedException;
 }

Reply via email to