IGNITE-9147 Fixed race between async tx rollback and lock mapping on near node 
- Fixes #4574.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3882ec04
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3882ec04
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3882ec04

Branch: refs/heads/ignite-9273
Commit: 3882ec0488b7be1fb6e74e14ecbeb9b396fd932f
Parents: b6182ab
Author: Aleksei Scherbakov <alexey.scherbak...@gmail.com>
Authored: Mon Aug 27 15:34:22 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Mon Aug 27 15:34:22 2018 +0300

----------------------------------------------------------------------
 .../ignite/internal/TransactionsMXBeanImpl.java |  20 +-
 .../internal/commandline/CommandHandler.java    |  24 +-
 .../colocated/GridDhtColocatedLockFuture.java   | 358 +++++++++++--------
 .../GridNearPessimisticTxPrepareFuture.java     |   1 -
 .../near/GridNearTxFinishFuture.java            |  12 +-
 .../ignite/internal/visor/tx/VisorTxInfo.java   |  58 ++-
 .../ignite/internal/visor/tx/VisorTxTask.java   |  20 +-
 .../cache/transactions/TxRollbackAsyncTest.java | 145 +++++++-
 .../transactions/TxRollbackOnTimeoutTest.java   |  26 +-
 .../TxRollbackOnTopologyChangeTest.java         |  16 -
 .../junits/common/GridCommonAbstractTest.java   |  23 ++
 .../ignite/util/GridCommandHandlerTest.java     |  23 --
 12 files changed, 457 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
index 210a320..16738de 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
@@ -22,13 +22,10 @@ import java.io.StringWriter;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.VisorTaskArgument;
 import org.apache.ignite.internal.visor.tx.VisorTxInfo;
 import org.apache.ignite.internal.visor.tx.VisorTxOperation;
@@ -37,7 +34,6 @@ import org.apache.ignite.internal.visor.tx.VisorTxSortOrder;
 import org.apache.ignite.internal.visor.tx.VisorTxTask;
 import org.apache.ignite.internal.visor.tx.VisorTxTaskArg;
 import org.apache.ignite.internal.visor.tx.VisorTxTaskResult;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.mxbean.TransactionsMXBean;
 
 /**
@@ -103,21 +99,7 @@ public class TransactionsMXBeanImpl implements 
TransactionsMXBean {
                     w.println(key.toString());
 
                     for (VisorTxInfo info : entry.getValue().getInfos())
-                        w.println("    Tx: [xid=" + info.getXid() +
-                            ", label=" + info.getLabel() +
-                            ", state=" + info.getState() +
-                            ", startTime=" + info.getFormattedStartTime() +
-                            ", duration=" + info.getDuration() / 1000 +
-                            ", isolation=" + info.getIsolation() +
-                            ", concurrency=" + info.getConcurrency() +
-                            ", timeout=" + info.getTimeout() +
-                            ", size=" + info.getSize() +
-                            ", dhtNodes=" + 
F.transform(info.getPrimaryNodes(), new IgniteClosure<UUID, String>() {
-                            @Override public String apply(UUID id) {
-                                return U.id8(id);
-                            }
-                        }) +
-                            ']');
+                        w.println(info.toUserString());
                 }
 
                 w.flush();

http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
index 7b5ce44..092efff 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
@@ -1060,29 +1060,7 @@ public class CommandHandler {
                     "]");
 
                 for (VisorTxInfo info : entry.getValue().getInfos())
-                    log("    Tx: [xid=" + info.getXid() +
-                        ", label=" + info.getLabel() +
-                        ", state=" + info.getState() +
-                        ", startTime=" + info.getFormattedStartTime() +
-                        ", duration=" + info.getDuration() / 1000 +
-                        ", isolation=" + info.getIsolation() +
-                        ", concurrency=" + info.getConcurrency() +
-                        ", timeout=" + info.getTimeout() +
-                        ", size=" + info.getSize() +
-                        ", dhtNodes=" + (info.getPrimaryNodes() == null ? 
"N/A" :
-                        F.transform(info.getPrimaryNodes(), new 
IgniteClosure<UUID, String>() {
-                            @Override public String apply(UUID id) {
-                                return U.id8(id);
-                            }
-                        })) +
-                        ", nearXid=" + info.getNearXid() +
-                        ", parentNodeIds=" + (info.getMasterNodeIds() == null 
? "N/A" :
-                        F.transform(info.getMasterNodeIds(), new 
IgniteClosure<UUID, String>() {
-                            @Override public String apply(UUID id) {
-                                return U.id8(id);
-                            }
-                        })) +
-                        ']');
+                    log(info.toUserString());
             }
         }
         catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index a5ff2f3..27b3667 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.Deque;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -180,6 +179,9 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
     /** */
     private int miniId;
 
+    /** {@code True} when mappings are ready for processing. */
+    private boolean mappingsReady;
+
     /**
      * @param cctx Registry.
      * @param keys Keys to lock.
@@ -574,9 +576,24 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
      * Cancellation has special meaning for lock futures. It's called then 
lock must be released on rollback.
      */
     @Override public boolean cancel() {
-        if (inTx())
+        if (inTx()) {
             onError(tx.rollbackException());
 
+            /** Should wait until {@link mappings} are ready before continuing 
with async rollback
+             * or some primary nodes might not receive tx finish messages 
because of race.
+             * If prepare phase has not started waiting is not necessary.
+             */
+            synchronized (this) {
+                while (!mappingsReady)
+                    try {
+                        wait();
+                    }
+                    catch (InterruptedException e) {
+                        // Ignore interrupts.
+                    }
+            }
+        }
+
         return onComplete(false, true);
     }
 
@@ -637,6 +654,17 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
             if (timeoutObj != null)
                 cctx.time().removeTimeoutObject(timeoutObj);
 
+            /** Ensures what waiters for ready {@link mappings} will be 
unblocked if error has occurred while mapping. */
+            if (tx != null) {
+                synchronized (this) {
+                    if (!mappingsReady) {
+                        mappingsReady = true;
+
+                        notifyAll();
+                    }
+                }
+            }
+
             return true;
         }
 
@@ -890,224 +918,234 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
         boolean remap,
         boolean topLocked
     ) throws IgniteCheckedException {
-        AffinityTopologyVersion topVer = this.topVer;
+        try {
+            AffinityTopologyVersion topVer = this.topVer;
 
-        assert topVer != null;
+            assert topVer != null;
 
-        assert topVer.topologyVersion() > 0;
+            assert topVer.topologyVersion() > 0;
 
-        if (CU.affinityNodes(cctx, topVer).isEmpty()) {
-            onDone(new ClusterTopologyServerNotFoundException("Failed to map 
keys for cache " +
-                "(all partition nodes left the grid): " + cctx.name()));
+            if (CU.affinityNodes(cctx, topVer).isEmpty()) {
+                onDone(new ClusterTopologyServerNotFoundException("Failed to 
map keys for cache " +
+                    "(all partition nodes left the grid): " + cctx.name()));
 
-            return;
-        }
+                return;
+            }
 
-        boolean clientNode = cctx.kernalContext().clientNode();
+            boolean clientNode = cctx.kernalContext().clientNode();
 
-        assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
+            assert !remap || (clientNode && (tx == null || 
!tx.hasRemoteLocks()));
 
-        // First assume this node is primary for all keys passed in.
-        if (!clientNode && mapAsPrimary(keys, topVer))
-            return;
+            // First assume this node is primary for all keys passed in.
+            if (!clientNode && mapAsPrimary(keys, topVer))
+                return;
 
-        mappings = new ArrayDeque<>();
+            mappings = new ArrayDeque<>();
 
-        // Assign keys to primary nodes.
-        GridNearLockMapping map = null;
+            // Assign keys to primary nodes.
+            GridNearLockMapping map = null;
 
-        for (KeyCacheObject key : keys) {
-            GridNearLockMapping updated = map(key, map, topVer);
+            for (KeyCacheObject key : keys) {
+                GridNearLockMapping updated = map(key, map, topVer);
 
-            // If new mapping was created, add to collection.
-            if (updated != map) {
-                mappings.add(updated);
+                // If new mapping was created, add to collection.
+                if (updated != map) {
+                    mappings.add(updated);
 
-                if (tx != null && updated.node().isLocal())
-                    tx.colocatedLocallyMapped(true);
-            }
+                    if (tx != null && updated.node().isLocal())
+                        tx.colocatedLocallyMapped(true);
+                }
 
-            map = updated;
-        }
+                map = updated;
+            }
 
-        if (isDone()) {
-            if (log.isDebugEnabled())
-                log.debug("Abandoning (re)map because future is done: " + 
this);
+            if (isDone()) {
+                if (log.isDebugEnabled())
+                    log.debug("Abandoning (re)map because future is done: " + 
this);
 
-            return;
-        }
+                return;
+            }
 
-        if (log.isDebugEnabled())
-            log.debug("Starting (re)map for mappings [mappings=" + mappings + 
", fut=" + this + ']');
+            if (log.isDebugEnabled())
+                log.debug("Starting (re)map for mappings [mappings=" + 
mappings + ", fut=" + this + ']');
 
-        boolean hasRmtNodes = false;
+            boolean hasRmtNodes = false;
 
-        boolean first = true;
+            boolean first = true;
 
-        // Create mini futures.
-        for (Iterator<GridNearLockMapping> iter = mappings.iterator(); 
iter.hasNext(); ) {
-            GridNearLockMapping mapping = iter.next();
+            // Create mini futures.
+            for (Iterator<GridNearLockMapping> iter = mappings.iterator(); 
iter.hasNext(); ) {
+                GridNearLockMapping mapping = iter.next();
 
-            ClusterNode node = mapping.node();
-            Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
+                ClusterNode node = mapping.node();
+                Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
 
-            boolean loc = node.equals(cctx.localNode());
+                boolean loc = node.equals(cctx.localNode());
 
-            assert !mappedKeys.isEmpty();
+                assert !mappedKeys.isEmpty();
 
-            GridNearLockRequest req = null;
+                GridNearLockRequest req = null;
 
-            Collection<KeyCacheObject> distributedKeys = new 
ArrayList<>(mappedKeys.size());
+                Collection<KeyCacheObject> distributedKeys = new 
ArrayList<>(mappedKeys.size());
 
-            for (KeyCacheObject key : mappedKeys) {
-                IgniteTxKey txKey = cctx.txKey(key);
+                for (KeyCacheObject key : mappedKeys) {
+                    IgniteTxKey txKey = cctx.txKey(key);
 
-                GridDistributedCacheEntry entry = null;
+                    GridDistributedCacheEntry entry = null;
 
-                if (tx != null) {
-                    IgniteTxEntry txEntry = tx.entry(txKey);
+                    if (tx != null) {
+                        IgniteTxEntry txEntry = tx.entry(txKey);
 
-                    if (txEntry != null) {
-                        entry = (GridDistributedCacheEntry)txEntry.cached();
+                        if (txEntry != null) {
+                            entry = 
(GridDistributedCacheEntry)txEntry.cached();
 
-                        if (entry != null && loc == entry.detached()) {
-                            entry = cctx.colocated().entryExx(key, topVer, 
true);
+                            if (entry != null && loc == entry.detached()) {
+                                entry = cctx.colocated().entryExx(key, topVer, 
true);
 
-                            txEntry.cached(entry);
+                                txEntry.cached(entry);
+                            }
                         }
                     }
-                }
 
-                boolean explicit;
+                    boolean explicit;
 
-                while (true) {
-                    try {
-                        if (entry == null)
-                            entry = cctx.colocated().entryExx(key, topVer, 
true);
+                    while (true) {
+                        try {
+                            if (entry == null)
+                                entry = cctx.colocated().entryExx(key, topVer, 
true);
 
-                        if (!cctx.isAll(entry, filter)) {
-                            if (log.isDebugEnabled())
-                                log.debug("Entry being locked did not pass 
filter (will not lock): " + entry);
+                            if (!cctx.isAll(entry, filter)) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Entry being locked did not pass 
filter (will not lock): " + entry);
 
-                            onComplete(false, false);
+                                onComplete(false, false);
 
-                            return;
-                        }
+                                return;
+                            }
 
-                        assert loc ^ entry.detached() : "Invalid entry [loc=" 
+ loc + ", entry=" + entry + ']';
+                            assert loc ^ entry.detached() : "Invalid entry 
[loc=" + loc + ", entry=" + entry + ']';
 
-                        GridCacheMvccCandidate cand = addEntry(entry);
+                            GridCacheMvccCandidate cand = addEntry(entry);
 
-                        // Will either return value from dht cache or null if 
this is a miss.
-                        IgniteBiTuple<GridCacheVersion, CacheObject> val = 
entry.detached() ? null :
-                            ((GridDhtCacheEntry)entry).versionedValue(topVer);
+                            // Will either return value from dht cache or null 
if this is a miss.
+                            IgniteBiTuple<GridCacheVersion, CacheObject> val = 
entry.detached() ? null :
+                                
((GridDhtCacheEntry)entry).versionedValue(topVer);
 
-                        GridCacheVersion dhtVer = null;
+                            GridCacheVersion dhtVer = null;
 
-                        if (val != null) {
-                            dhtVer = val.get1();
+                            if (val != null) {
+                                dhtVer = val.get1();
 
-                            valMap.put(key, val);
-                        }
+                                valMap.put(key, val);
+                            }
 
-                        if (cand != null && !cand.reentry()) {
-                            if (req == null) {
-                                boolean clientFirst = false;
+                            if (cand != null && !cand.reentry()) {
+                                if (req == null) {
+                                    boolean clientFirst = false;
+
+                                    if (first) {
+                                        clientFirst = clientNode &&
+                                            !topLocked &&
+                                            (tx == null || 
!tx.hasRemoteLocks());
+
+                                        first = false;
+                                    }
+
+                                    assert !implicitTx() && 
!implicitSingleTx() : tx;
+
+                                    req = new GridNearLockRequest(
+                                        cctx.cacheId(),
+                                        topVer,
+                                        cctx.nodeId(),
+                                        threadId,
+                                        futId,
+                                        lockVer,
+                                        inTx(),
+                                        read,
+                                        retval,
+                                        isolation(),
+                                        isInvalidate(),
+                                        timeout,
+                                        mappedKeys.size(),
+                                        inTx() ? tx.size() : mappedKeys.size(),
+                                        inTx() && tx.syncMode() == FULL_SYNC,
+                                        inTx() ? tx.subjectId() : null,
+                                        inTx() ? tx.taskNameHash() : 0,
+                                        read ? createTtl : -1L,
+                                        read ? accessTtl : -1L,
+                                        skipStore,
+                                        keepBinary,
+                                        clientFirst,
+                                        false,
+                                        cctx.deploymentEnabled());
+
+                                    mapping.request(req);
+                                }
 
-                                if (first) {
-                                    clientFirst = clientNode &&
-                                        !topLocked &&
-                                        (tx == null || !tx.hasRemoteLocks());
+                                distributedKeys.add(key);
 
-                                    first = false;
-                                }
+                                if (tx != null)
+                                    tx.addKeyMapping(txKey, mapping.node());
 
-                                assert !implicitTx() && !implicitSingleTx() : 
tx;
-
-                                req = new GridNearLockRequest(
-                                    cctx.cacheId(),
-                                    topVer,
-                                    cctx.nodeId(),
-                                    threadId,
-                                    futId,
-                                    lockVer,
-                                    inTx(),
-                                    read,
+                                req.addKeyBytes(
+                                    key,
                                     retval,
-                                    isolation(),
-                                    isInvalidate(),
-                                    timeout,
-                                    mappedKeys.size(),
-                                    inTx() ? tx.size() : mappedKeys.size(),
-                                    inTx() && tx.syncMode() == FULL_SYNC,
-                                    inTx() ? tx.subjectId() : null,
-                                    inTx() ? tx.taskNameHash() : 0,
-                                    read ? createTtl : -1L,
-                                    read ? accessTtl : -1L,
-                                    skipStore,
-                                    keepBinary,
-                                    clientFirst,
-                                    false,
-                                    cctx.deploymentEnabled());
-
-                                mapping.request(req);
+                                    dhtVer, // Include DHT version to match 
remote DHT entry.
+                                    cctx);
                             }
 
-                            distributedKeys.add(key);
+                            explicit = inTx() && cand == null;
 
-                            if (tx != null)
+                            if (explicit)
                                 tx.addKeyMapping(txKey, mapping.node());
 
-                            req.addKeyBytes(
-                                key,
-                                retval,
-                                dhtVer, // Include DHT version to match remote 
DHT entry.
-                                cctx);
+                            break;
                         }
+                        catch (GridCacheEntryRemovedException ignored) {
+                            if (log.isDebugEnabled())
+                                log.debug("Got removed entry in lockAsync(..) 
method (will retry): " + entry);
 
-                        explicit = inTx() && cand == null;
-
-                        if (explicit)
-                            tx.addKeyMapping(txKey, mapping.node());
-
-                        break;
+                            entry = null;
+                        }
                     }
-                    catch (GridCacheEntryRemovedException ignored) {
-                        if (log.isDebugEnabled())
-                            log.debug("Got removed entry in lockAsync(..) 
method (will retry): " + entry);
 
-                        entry = null;
+                    // Mark mapping explicit lock flag.
+                    if (explicit) {
+                        boolean marked = tx != null && 
tx.markExplicit(node.id());
+
+                        assert tx == null || marked;
                     }
                 }
 
-                // Mark mapping explicit lock flag.
-                if (explicit) {
-                    boolean marked = tx != null && tx.markExplicit(node.id());
+                if (!distributedKeys.isEmpty()) {
+                    mapping.distributedKeys(distributedKeys);
 
-                    assert tx == null || marked;
+                    hasRmtNodes |= !mapping.node().isLocal();
                 }
-            }
-
-            if (!distributedKeys.isEmpty()) {
-                mapping.distributedKeys(distributedKeys);
+                else {
+                    assert mapping.request() == null;
 
-                hasRmtNodes |= !mapping.node().isLocal();
+                    iter.remove();
+                }
             }
-            else {
-                assert mapping.request() == null;
 
-                iter.remove();
+            if (hasRmtNodes) {
+                trackable = true;
+
+                if (!remap && !cctx.mvcc().addFuture(this))
+                    throw new IllegalStateException("Duplicate future ID: " + 
this);
             }
+            else
+                trackable = false;
         }
+        finally {
+            /** Notify ready {@link mappings} waiters. See {@link #cancel()} */
+            if (tx != null) {
+                mappingsReady = true;
 
-        if (hasRmtNodes) {
-            trackable = true;
-
-            if (!remap && !cctx.mvcc().addFuture(this))
-                throw new IllegalStateException("Duplicate future ID: " + 
this);
+                notifyAll();
+            }
         }
-        else
-            trackable = false;
 
         proceedMapping();
     }
@@ -1137,11 +1175,27 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
         throws IgniteCheckedException {
         GridNearLockMapping map;
 
+        // Fail fast if future is completed (in case of async rollback)
+        if (isDone()) {
+            clear();
+
+            return;
+        }
+
+        // Fail fast if the transaction is timed out.
+        if (tx != null && tx.remainingTime() == -1) {
+            GridDhtColocatedLockFuture.this.onDone(false, 
tx.timeoutException());
+
+            clear();
+
+            return;
+        }
+
         synchronized (this) {
             map = mappings.poll();
         }
 
-        // If there are no more mappings to process, complete the future.
+        // If there are no more mappings to process or prepare has timed out, 
complete the future.
         if (map == null)
             return;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index c16a934..ffde0f3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -38,7 +38,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.C1;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index ede8a4e..2974563 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -412,19 +412,19 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
         }
 
         if (!commit && !clearThreadMap)
-            tryRollbackAsync(onTimeout); // Asynchronous rollback.
+            rollbackAsyncSafe(onTimeout);
         else
             doFinish(commit, clearThreadMap);
     }
 
     /**
-     * Does async rollback when it's safe.
-     * If current future is not lock future (enlist future) waits until 
completion and tries again.
-     * Else terminates or waits for lock future depending on rollback mode.
+     * Rollback tx when it's safe.
+     * If current future is not lock future (enlist future) wait until 
completion and tries again.
+     * Else cancel lock future (onTimeout=false) or wait for completion due to 
deadlock detection (onTimeout=true).
      *
      * @param onTimeout If {@code true} called from timeout handler.
      */
-    private void tryRollbackAsync(boolean onTimeout) {
+    private void rollbackAsyncSafe(boolean onTimeout) {
         IgniteInternalFuture<?> curFut = tx.tryRollbackAsync();
 
         if (curFut == null) { // Safe to rollback.
@@ -447,7 +447,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
                 try {
                     fut.get();
 
-                    tryRollbackAsync(onTimeout);
+                    rollbackAsyncSafe(onTimeout);
                 }
                 catch (IgniteCheckedException e) {
                     doFinish(false, false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java
index 03de5b0..be60a5e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java
@@ -26,9 +26,12 @@ import java.time.format.DateTimeFormatter;
 import java.util.Collection;
 import java.util.TimeZone;
 import java.util.UUID;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.VisorDataTransferObject;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -82,6 +85,9 @@ public class VisorTxInfo extends VisorDataTransferObject {
     /** */
     private Collection<UUID> masterNodeIds;
 
+    /** */
+    private AffinityTopologyVersion topVer;
+
     /**
      * Default constructor.
      */
@@ -103,7 +109,7 @@ public class VisorTxInfo extends VisorDataTransferObject {
      */
     public VisorTxInfo(IgniteUuid xid, long startTime, long duration, 
TransactionIsolation isolation,
         TransactionConcurrency concurrency, long timeout, String lb, 
Collection<UUID> primaryNodes,
-        TransactionState state, int size, IgniteUuid nearXid, Collection<UUID> 
masterNodeIds) {
+        TransactionState state, int size, IgniteUuid nearXid, Collection<UUID> 
masterNodeIds, AffinityTopologyVersion topVer) {
         this.xid = xid;
         this.startTime = startTime;
         this.duration = duration;
@@ -116,11 +122,12 @@ public class VisorTxInfo extends VisorDataTransferObject {
         this.size = size;
         this.nearXid = nearXid;
         this.masterNodeIds = masterNodeIds;
+        this.topVer = topVer;
     }
 
     /** {@inheritDoc} */
     @Override public byte getProtocolVersion() {
-        return V2;
+        return V3;
     }
 
     /** */
@@ -154,6 +161,11 @@ public class VisorTxInfo extends VisorDataTransferObject {
     }
 
     /** */
+    public AffinityTopologyVersion getTopologyVersion() {
+        return topVer;
+    }
+
+    /** */
     public long getTimeout() {
         return timeout;
     }
@@ -202,6 +214,8 @@ public class VisorTxInfo extends VisorDataTransferObject {
         U.writeGridUuid(out, nearXid);
         U.writeCollection(out, masterNodeIds);
         out.writeLong(startTime);
+        out.writeLong(topVer == null ? -1 : topVer.topologyVersion());
+        out.writeInt(topVer == null ? -1 : topVer.minorTopologyVersion());
     }
 
     /** {@inheritDoc} */
@@ -217,12 +231,48 @@ public class VisorTxInfo extends VisorDataTransferObject {
         size = in.readInt();
         if (protoVer >= V2) {
             nearXid = U.readGridUuid(in);
-
             masterNodeIds = U.readCollection(in);
-
             startTime = in.readLong();
         }
+        if (protoVer >= V3) {
+            long topVer = in.readLong();
+            int minorTopVer = in.readInt();
+
+            if (topVer != -1)
+                this.topVer = new AffinityTopologyVersion(topVer, minorTopVer);
+        }
+    }
 
+    /**
+     * Get tx info as user string.
+     *
+     * @return User string.
+     */
+    public String toUserString() {
+        return "    Tx: [xid=" + getXid() +
+            ", label=" + getLabel() +
+            ", state=" + getState() +
+            ", startTime=" + getFormattedStartTime() +
+            ", duration=" + getDuration() / 1000 +
+            ", isolation=" + getIsolation() +
+            ", concurrency=" + getConcurrency() +
+            ", topVer=" + (getTopologyVersion() == null ? "N/A" : 
getTopologyVersion()) +
+            ", timeout=" + getTimeout() +
+            ", size=" + getSize() +
+            ", dhtNodes=" + (getPrimaryNodes() == null ? "N/A" :
+            F.transform(getPrimaryNodes(), new IgniteClosure<UUID, String>() {
+                @Override public String apply(UUID id) {
+                    return U.id8(id);
+                }
+            })) +
+            ", nearXid=" + getNearXid() +
+            ", parentNodeIds=" + (getMasterNodeIds() == null ? "N/A" :
+            F.transform(getMasterNodeIds(), new IgniteClosure<UUID, String>() {
+                @Override public String apply(UUID id) {
+                    return U.id8(id);
+                }
+            })) +
+            ']';
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java
index 9919b7d..23d1663 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java
@@ -183,7 +183,7 @@ public class VisorTxTask extends 
VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN
         private static final TxKillClosure NEAR_KILL_CLOSURE = new 
NearKillClosure();
 
         /** */
-        private static final TxKillClosure LOCAL_KILL_CLOSURE = 
NEAR_KILL_CLOSURE;
+        private static final TxKillClosure LOCAL_KILL_CLOSURE = new 
LocalKillClosure();
 
         /** */
         private static final TxKillClosure REMOTE_KILL_CLOSURE = new 
RemoteKillClosure();
@@ -312,7 +312,7 @@ public class VisorTxTask extends 
VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN
 
                 infos.add(new VisorTxInfo(locTx.xid(), locTx.startTime(), 
duration, locTx.isolation(), locTx.concurrency(),
                     locTx.timeout(), lb, mappings, locTx.state(),
-                    size, locTx.nearXidVersion().asGridUuid(), 
locTx.masterNodeIds()));
+                    size, locTx.nearXidVersion().asGridUuid(), 
locTx.masterNodeIds(), locTx.topologyVersionSnapshot()));
 
                 if (arg.getOperation() == VisorTxOperation.KILL)
                     killClo.apply(locTx, tm);
@@ -394,7 +394,7 @@ public class VisorTxTask extends 
VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN
         IgniteBiClosure<IgniteInternalTx, IgniteTxManager, 
IgniteInternalFuture<IgniteInternalTx>> {
     }
 
-    /** Kills near or local tx. */
+    /** Kills near tx. */
     private static class NearKillClosure implements TxKillClosure {
         /** */
         private static final long serialVersionUID = 0L;
@@ -402,7 +402,19 @@ public class VisorTxTask extends 
VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN
         /** {@inheritDoc} */
         @Override public IgniteInternalFuture<IgniteInternalTx> 
apply(IgniteInternalTx tx, IgniteTxManager tm) {
             return tx.isRollbackOnly() || tx.state() == COMMITTING || 
tx.state() == COMMITTED ?
-                new GridFinishedFuture<>() : tx.rollbackAsync();
+                new GridFinishedFuture<>() : 
((GridNearTxLocal)tx).rollbackNearTxLocalAsync(false, false);
+        }
+    }
+
+    /** Kills local tx. */
+    private static class LocalKillClosure implements TxKillClosure {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public IgniteInternalFuture<IgniteInternalTx> 
apply(IgniteInternalTx tx, IgniteTxManager tm) {
+            return tx.isRollbackOnly() || tx.state() == COMMITTING || 
tx.state() == COMMITTED ?
+                new GridFinishedFuture<>() : 
((GridDhtTxLocal)tx).rollbackDhtLocalAsync();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java
index 4626dcd..646c3f0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java
@@ -42,24 +42,34 @@ import 
org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.internal.visor.tx.VisorTxInfo;
+import org.apache.ignite.internal.visor.tx.VisorTxOperation;
+import org.apache.ignite.internal.visor.tx.VisorTxTask;
+import org.apache.ignite.internal.visor.tx.VisorTxTaskArg;
+import org.apache.ignite.internal.visor.tx.VisorTxTaskResult;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -75,6 +85,7 @@ import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.configuration.WALMode.LOG_ONLY;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
@@ -864,6 +875,122 @@ public class TxRollbackAsyncTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     *
+     */
+    public void testRollbackOnTopologyLockPessimistic() throws Exception {
+        final Ignite client = startClient();
+
+        Ignite crd = grid(0);
+
+        List<Integer> keys = primaryKeys(grid(1).cache(CACHE_NAME), 1);
+
+        assertTrue(crd.cluster().localNode().order() == 1);
+
+        CountDownLatch txLatch = new CountDownLatch(1);
+        CountDownLatch tx2Latch = new CountDownLatch(1);
+        CountDownLatch commitLatch = new CountDownLatch(1);
+
+        // Start tx holding topology.
+        IgniteInternalFuture txFut = runAsync(new Runnable() {
+            @Override public void run() {
+                List<Integer> keys = primaryKeys(grid(0).cache(CACHE_NAME), 1);
+
+                try (Transaction tx = client.transactions().txStart()) {
+                    client.cache(CACHE_NAME).put(keys.get(0), 0);
+
+                    txLatch.countDown();
+
+                    U.awaitQuiet(commitLatch);
+
+                    tx.commit();
+
+                    fail();
+                }
+                catch (Exception e) {
+                    // Expected.
+                }
+            }
+        });
+
+        U.awaitQuiet(txLatch);
+
+        crd.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                runAsync(new Runnable() {
+                    @Override public void run() {
+                        try(Transaction tx = 
crd.transactions().withLabel("testLbl").txStart()) {
+                            // Wait for node start.
+                            waitForCondition(new GridAbsPredicate() {
+                                @Override public boolean apply() {
+                                    return crd.cluster().topologyVersion() != 
GRID_CNT +
+                                        /** client node */ 1  + /** stop 
server node */ 1 + /** start server node */ 1;
+                                }
+                            }, 10_000);
+
+                            tx2Latch.countDown();
+
+                            crd.cache(CACHE_NAME).put(keys.get(0), 0);
+
+                            tx.commit();
+
+                            fail();
+                        }
+                        catch (Exception e) {
+                            // Expected.
+                        }
+                    }
+                });
+
+                return false;
+            }
+        }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
+
+        IgniteInternalFuture restartFut = runAsync(new Runnable() {
+            @Override public void run() {
+                stopGrid(2);
+
+                try {
+                    startGrid(2);
+                }
+                catch (Exception e) {
+                    fail();
+                }
+            }
+        });
+
+        U.awaitQuiet(tx2Latch);
+
+        // Rollback tx using kill task.
+        VisorTxTaskArg arg =
+            new VisorTxTaskArg(VisorTxOperation.KILL, null, null, null, null, 
null, null, null, null, null);
+
+        Map<ClusterNode, VisorTxTaskResult> res = 
client.compute(client.cluster().forPredicate(F.alwaysTrue())).
+            execute(new VisorTxTask(), new 
VisorTaskArgument<>(client.cluster().localNode().id(), arg, false));
+
+        int expCnt = 0;
+
+        for (Map.Entry<ClusterNode, VisorTxTaskResult> entry : res.entrySet()) 
{
+            if (entry.getValue().getInfos().isEmpty())
+                continue;
+
+            for (VisorTxInfo info : entry.getValue().getInfos()) {
+                log.info(info.toUserString());
+
+                expCnt++;
+            }
+        }
+
+        assertEquals("Expecting 2 transactions", 2, expCnt);
+
+        commitLatch.countDown();
+
+        txFut.get();
+        restartFut.get();
+
+        checkFutures();
+    }
+
+    /**
      * Locks entry in tx and delays commit until signalled.
      *
      * @param node Near node.
@@ -896,22 +1023,6 @@ public class TxRollbackAsyncTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * Checks if all tx futures are finished.
-     */
-    private void checkFutures() {
-        for (Ignite ignite : G.allGrids()) {
-            IgniteEx ig = (IgniteEx)ignite;
-
-            final Collection<GridCacheFuture<?>> futs = 
ig.context().cache().context().mvcc().activeFutures();
-
-            for (GridCacheFuture<?> fut : futs)
-                log.info("Waiting for future: " + fut);
-
-            assertTrue("Expecting no active futures: node=" + 
ig.localNode().id(), futs.isEmpty());
-        }
-    }
-
-    /**
      * @param tx Tx to rollback.
      */
     private IgniteInternalFuture<?> rollbackAsync(final Transaction tx) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
index 9c37cfa..d6f3ae5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
@@ -19,8 +19,10 @@ package 
org.apache.ignite.internal.processors.cache.transactions;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,6 +37,8 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -42,12 +46,23 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.internal.visor.tx.VisorTxInfo;
+import org.apache.ignite.internal.visor.tx.VisorTxOperation;
+import org.apache.ignite.internal.visor.tx.VisorTxTask;
+import org.apache.ignite.internal.visor.tx.VisorTxTaskArg;
+import org.apache.ignite.internal.visor.tx.VisorTxTaskResult;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -62,6 +77,7 @@ import 
org.apache.ignite.transactions.TransactionTimeoutException;
 import static java.lang.Thread.sleep;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
@@ -88,6 +104,8 @@ public class TxRollbackOnTimeoutTest extends 
GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
+        cfg.setConsistentId(igniteInstanceName);
+
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
 
         cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
@@ -298,7 +316,7 @@ public class TxRollbackOnTimeoutTest extends 
GridCommonAbstractTest {
 
         final CountDownLatch l = new CountDownLatch(2);
 
-        IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Runnable() {
+        IgniteInternalFuture<?> fut1 = runAsync(new Runnable() {
             @Override public void run() {
                 try {
                     try (Transaction tx = 
node1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 5000, 2)) {
@@ -322,7 +340,7 @@ public class TxRollbackOnTimeoutTest extends 
GridCommonAbstractTest {
             }
         }, "First");
 
-        IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Runnable() {
+        IgniteInternalFuture<?> fut2 = runAsync(new Runnable() {
             @Override public void run() {
                 try (Transaction tx = 
node2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 2)) {
                     node2.cache(CACHE_NAME).put(2, 2);
@@ -812,7 +830,7 @@ public class TxRollbackOnTimeoutTest extends 
GridCommonAbstractTest {
 
         final int recordsCnt = 5;
 
-        IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Runnable() {
+        IgniteInternalFuture<?> fut1 = runAsync(new Runnable() {
             @Override public void run() {
                 try (Transaction tx = near.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ, timeout, 0)) {
                     try {
@@ -860,7 +878,7 @@ public class TxRollbackOnTimeoutTest extends 
GridCommonAbstractTest {
             }
         }, "First");
 
-        IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Runnable() {
+        IgniteInternalFuture<?> fut2 = runAsync(new Runnable() {
             @Override public void run() {
                 U.awaitQuiet(blocked);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java
index 2542cdb..13c5e41 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java
@@ -209,20 +209,4 @@ public class TxRollbackOnTopologyChangeTest extends 
GridCommonAbstractTest {
 
         checkFutures();
     }
-
-    /**
-     * Checks if all tx futures are finished.
-     */
-    private void checkFutures() {
-        for (Ignite ignite : G.allGrids()) {
-            IgniteEx ig = (IgniteEx)ignite;
-
-            final Collection<GridCacheFuture<?>> futs = 
ig.context().cache().context().mvcc().activeFutures();
-
-            for (GridCacheFuture<?> fut : futs)
-                log.info("Waiting for future: " + fut);
-
-            assertTrue("Expecting no active futures: node=" + 
ig.localNode().id(), futs.isEmpty());
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 8591f0c..55086f3 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -1995,4 +1995,27 @@ public abstract class GridCommonAbstractTest extends 
GridAbstractTest {
             new VisorTaskArgument<>(node.id(), taskArg, false)
         );
     }
+
+    /**
+     * Checks if all txs and mvcc futures are finished.
+     */
+    protected void checkFutures() {
+        for (Ignite ignite : G.allGrids()) {
+            IgniteEx ig = (IgniteEx)ignite;
+
+            final Collection<GridCacheFuture<?>> futs = 
ig.context().cache().context().mvcc().activeFutures();
+
+            for (GridCacheFuture<?> fut : futs)
+                log.info("Waiting for future: " + fut);
+
+            assertTrue("Expecting no active futures: node=" + 
ig.localNode().id(), futs.isEmpty());
+
+            Collection<IgniteInternalTx> txs = 
ig.context().cache().context().tm().activeTransactions();
+
+            for (IgniteInternalTx tx : txs)
+                log.info("Waiting for tx: " + tx);
+
+            assertTrue("Expecting no active transactions: node=" + 
ig.localNode().id(), txs.isEmpty());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java 
b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index f612a96..8af7a10 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -1415,29 +1415,6 @@ public class GridCommandHandlerTest extends 
GridCommonAbstractTest {
         }, 4, "tx-thread");
     }
 
-    /**
-     * Checks if all tx futures are finished.
-     */
-    private void checkFutures() {
-        for (Ignite ignite : G.allGrids()) {
-            IgniteEx ig = (IgniteEx)ignite;
-
-            final Collection<GridCacheFuture<?>> futs = 
ig.context().cache().context().mvcc().activeFutures();
-
-            for (GridCacheFuture<?> fut : futs)
-                log.info("Waiting for future: " + fut);
-
-            assertTrue("Expecting no active futures: node=" + 
ig.localNode().id(), futs.isEmpty());
-
-            Collection<IgniteInternalTx> txs = 
ig.context().cache().context().tm().activeTransactions();
-
-            for (IgniteInternalTx tx : txs)
-                log.info("Waiting for tx: " + tx);
-
-            assertTrue("Expecting no active transactions: node=" + 
ig.localNode().id(), txs.isEmpty());
-        }
-    }
-
     /** */
     private static class IncrementClosure implements EntryProcessor<Long, 
Long, Void> {
         /** {@inheritDoc} */

Reply via email to