IGNITE-9147 Fixed race between async tx rollback and lock mapping on near node - Fixes #4574.
Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3882ec04 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3882ec04 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3882ec04 Branch: refs/heads/ignite-9273 Commit: 3882ec0488b7be1fb6e74e14ecbeb9b396fd932f Parents: b6182ab Author: Aleksei Scherbakov <alexey.scherbak...@gmail.com> Authored: Mon Aug 27 15:34:22 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Mon Aug 27 15:34:22 2018 +0300 ---------------------------------------------------------------------- .../ignite/internal/TransactionsMXBeanImpl.java | 20 +- .../internal/commandline/CommandHandler.java | 24 +- .../colocated/GridDhtColocatedLockFuture.java | 358 +++++++++++-------- .../GridNearPessimisticTxPrepareFuture.java | 1 - .../near/GridNearTxFinishFuture.java | 12 +- .../ignite/internal/visor/tx/VisorTxInfo.java | 58 ++- .../ignite/internal/visor/tx/VisorTxTask.java | 20 +- .../cache/transactions/TxRollbackAsyncTest.java | 145 +++++++- .../transactions/TxRollbackOnTimeoutTest.java | 26 +- .../TxRollbackOnTopologyChangeTest.java | 16 - .../junits/common/GridCommonAbstractTest.java | 23 ++ .../ignite/util/GridCommandHandlerTest.java | 23 -- 12 files changed, 457 insertions(+), 269 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java index 210a320..16738de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java @@ -22,13 +22,10 @@ import java.io.StringWriter; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.stream.Collectors; import org.apache.ignite.IgniteCompute; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.visor.VisorTaskArgument; import org.apache.ignite.internal.visor.tx.VisorTxInfo; import org.apache.ignite.internal.visor.tx.VisorTxOperation; @@ -37,7 +34,6 @@ import org.apache.ignite.internal.visor.tx.VisorTxSortOrder; import org.apache.ignite.internal.visor.tx.VisorTxTask; import org.apache.ignite.internal.visor.tx.VisorTxTaskArg; import org.apache.ignite.internal.visor.tx.VisorTxTaskResult; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.mxbean.TransactionsMXBean; /** @@ -103,21 +99,7 @@ public class TransactionsMXBeanImpl implements TransactionsMXBean { w.println(key.toString()); for (VisorTxInfo info : entry.getValue().getInfos()) - w.println(" Tx: [xid=" + info.getXid() + - ", label=" + info.getLabel() + - ", state=" + info.getState() + - ", startTime=" + info.getFormattedStartTime() + - ", duration=" + info.getDuration() / 1000 + - ", isolation=" + info.getIsolation() + - ", concurrency=" + info.getConcurrency() + - ", timeout=" + info.getTimeout() + - ", size=" + info.getSize() + - ", dhtNodes=" + F.transform(info.getPrimaryNodes(), new IgniteClosure<UUID, String>() { - @Override public String apply(UUID id) { - return U.id8(id); - } - }) + - ']'); + w.println(info.toUserString()); } w.flush(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java index 7b5ce44..092efff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java @@ -1060,29 +1060,7 @@ public class CommandHandler { "]"); for (VisorTxInfo info : entry.getValue().getInfos()) - log(" Tx: [xid=" + info.getXid() + - ", label=" + info.getLabel() + - ", state=" + info.getState() + - ", startTime=" + info.getFormattedStartTime() + - ", duration=" + info.getDuration() / 1000 + - ", isolation=" + info.getIsolation() + - ", concurrency=" + info.getConcurrency() + - ", timeout=" + info.getTimeout() + - ", size=" + info.getSize() + - ", dhtNodes=" + (info.getPrimaryNodes() == null ? "N/A" : - F.transform(info.getPrimaryNodes(), new IgniteClosure<UUID, String>() { - @Override public String apply(UUID id) { - return U.id8(id); - } - })) + - ", nearXid=" + info.getNearXid() + - ", parentNodeIds=" + (info.getMasterNodeIds() == null ? "N/A" : - F.transform(info.getMasterNodeIds(), new IgniteClosure<UUID, String>() { - @Override public String apply(UUID id) { - return U.id8(id); - } - })) + - ']'); + log(info.toUserString()); } } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index a5ff2f3..27b3667 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Deque; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -180,6 +179,9 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF /** */ private int miniId; + /** {@code True} when mappings are ready for processing. */ + private boolean mappingsReady; + /** * @param cctx Registry. * @param keys Keys to lock. @@ -574,9 +576,24 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF * Cancellation has special meaning for lock futures. It's called then lock must be released on rollback. */ @Override public boolean cancel() { - if (inTx()) + if (inTx()) { onError(tx.rollbackException()); + /** Should wait until {@link mappings} are ready before continuing with async rollback + * or some primary nodes might not receive tx finish messages because of race. + * If prepare phase has not started waiting is not necessary. + */ + synchronized (this) { + while (!mappingsReady) + try { + wait(); + } + catch (InterruptedException e) { + // Ignore interrupts. + } + } + } + return onComplete(false, true); } @@ -637,6 +654,17 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF if (timeoutObj != null) cctx.time().removeTimeoutObject(timeoutObj); + /** Ensures what waiters for ready {@link mappings} will be unblocked if error has occurred while mapping. */ + if (tx != null) { + synchronized (this) { + if (!mappingsReady) { + mappingsReady = true; + + notifyAll(); + } + } + } + return true; } @@ -890,224 +918,234 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF boolean remap, boolean topLocked ) throws IgniteCheckedException { - AffinityTopologyVersion topVer = this.topVer; + try { + AffinityTopologyVersion topVer = this.topVer; - assert topVer != null; + assert topVer != null; - assert topVer.topologyVersion() > 0; + assert topVer.topologyVersion() > 0; - if (CU.affinityNodes(cctx, topVer).isEmpty()) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid): " + cctx.name())); + if (CU.affinityNodes(cctx, topVer).isEmpty()) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid): " + cctx.name())); - return; - } + return; + } - boolean clientNode = cctx.kernalContext().clientNode(); + boolean clientNode = cctx.kernalContext().clientNode(); - assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); + assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); - // First assume this node is primary for all keys passed in. - if (!clientNode && mapAsPrimary(keys, topVer)) - return; + // First assume this node is primary for all keys passed in. + if (!clientNode && mapAsPrimary(keys, topVer)) + return; - mappings = new ArrayDeque<>(); + mappings = new ArrayDeque<>(); - // Assign keys to primary nodes. - GridNearLockMapping map = null; + // Assign keys to primary nodes. + GridNearLockMapping map = null; - for (KeyCacheObject key : keys) { - GridNearLockMapping updated = map(key, map, topVer); + for (KeyCacheObject key : keys) { + GridNearLockMapping updated = map(key, map, topVer); - // If new mapping was created, add to collection. - if (updated != map) { - mappings.add(updated); + // If new mapping was created, add to collection. + if (updated != map) { + mappings.add(updated); - if (tx != null && updated.node().isLocal()) - tx.colocatedLocallyMapped(true); - } + if (tx != null && updated.node().isLocal()) + tx.colocatedLocallyMapped(true); + } - map = updated; - } + map = updated; + } - if (isDone()) { - if (log.isDebugEnabled()) - log.debug("Abandoning (re)map because future is done: " + this); + if (isDone()) { + if (log.isDebugEnabled()) + log.debug("Abandoning (re)map because future is done: " + this); - return; - } + return; + } - if (log.isDebugEnabled()) - log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); + if (log.isDebugEnabled()) + log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); - boolean hasRmtNodes = false; + boolean hasRmtNodes = false; - boolean first = true; + boolean first = true; - // Create mini futures. - for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) { - GridNearLockMapping mapping = iter.next(); + // Create mini futures. + for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) { + GridNearLockMapping mapping = iter.next(); - ClusterNode node = mapping.node(); - Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys(); + ClusterNode node = mapping.node(); + Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys(); - boolean loc = node.equals(cctx.localNode()); + boolean loc = node.equals(cctx.localNode()); - assert !mappedKeys.isEmpty(); + assert !mappedKeys.isEmpty(); - GridNearLockRequest req = null; + GridNearLockRequest req = null; - Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size()); + Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size()); - for (KeyCacheObject key : mappedKeys) { - IgniteTxKey txKey = cctx.txKey(key); + for (KeyCacheObject key : mappedKeys) { + IgniteTxKey txKey = cctx.txKey(key); - GridDistributedCacheEntry entry = null; + GridDistributedCacheEntry entry = null; - if (tx != null) { - IgniteTxEntry txEntry = tx.entry(txKey); + if (tx != null) { + IgniteTxEntry txEntry = tx.entry(txKey); - if (txEntry != null) { - entry = (GridDistributedCacheEntry)txEntry.cached(); + if (txEntry != null) { + entry = (GridDistributedCacheEntry)txEntry.cached(); - if (entry != null && loc == entry.detached()) { - entry = cctx.colocated().entryExx(key, topVer, true); + if (entry != null && loc == entry.detached()) { + entry = cctx.colocated().entryExx(key, topVer, true); - txEntry.cached(entry); + txEntry.cached(entry); + } } } - } - boolean explicit; + boolean explicit; - while (true) { - try { - if (entry == null) - entry = cctx.colocated().entryExx(key, topVer, true); + while (true) { + try { + if (entry == null) + entry = cctx.colocated().entryExx(key, topVer, true); - if (!cctx.isAll(entry, filter)) { - if (log.isDebugEnabled()) - log.debug("Entry being locked did not pass filter (will not lock): " + entry); + if (!cctx.isAll(entry, filter)) { + if (log.isDebugEnabled()) + log.debug("Entry being locked did not pass filter (will not lock): " + entry); - onComplete(false, false); + onComplete(false, false); - return; - } + return; + } - assert loc ^ entry.detached() : "Invalid entry [loc=" + loc + ", entry=" + entry + ']'; + assert loc ^ entry.detached() : "Invalid entry [loc=" + loc + ", entry=" + entry + ']'; - GridCacheMvccCandidate cand = addEntry(entry); + GridCacheMvccCandidate cand = addEntry(entry); - // Will either return value from dht cache or null if this is a miss. - IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.detached() ? null : - ((GridDhtCacheEntry)entry).versionedValue(topVer); + // Will either return value from dht cache or null if this is a miss. + IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.detached() ? null : + ((GridDhtCacheEntry)entry).versionedValue(topVer); - GridCacheVersion dhtVer = null; + GridCacheVersion dhtVer = null; - if (val != null) { - dhtVer = val.get1(); + if (val != null) { + dhtVer = val.get1(); - valMap.put(key, val); - } + valMap.put(key, val); + } - if (cand != null && !cand.reentry()) { - if (req == null) { - boolean clientFirst = false; + if (cand != null && !cand.reentry()) { + if (req == null) { + boolean clientFirst = false; + + if (first) { + clientFirst = clientNode && + !topLocked && + (tx == null || !tx.hasRemoteLocks()); + + first = false; + } + + assert !implicitTx() && !implicitSingleTx() : tx; + + req = new GridNearLockRequest( + cctx.cacheId(), + topVer, + cctx.nodeId(), + threadId, + futId, + lockVer, + inTx(), + read, + retval, + isolation(), + isInvalidate(), + timeout, + mappedKeys.size(), + inTx() ? tx.size() : mappedKeys.size(), + inTx() && tx.syncMode() == FULL_SYNC, + inTx() ? tx.subjectId() : null, + inTx() ? tx.taskNameHash() : 0, + read ? createTtl : -1L, + read ? accessTtl : -1L, + skipStore, + keepBinary, + clientFirst, + false, + cctx.deploymentEnabled()); + + mapping.request(req); + } - if (first) { - clientFirst = clientNode && - !topLocked && - (tx == null || !tx.hasRemoteLocks()); + distributedKeys.add(key); - first = false; - } + if (tx != null) + tx.addKeyMapping(txKey, mapping.node()); - assert !implicitTx() && !implicitSingleTx() : tx; - - req = new GridNearLockRequest( - cctx.cacheId(), - topVer, - cctx.nodeId(), - threadId, - futId, - lockVer, - inTx(), - read, + req.addKeyBytes( + key, retval, - isolation(), - isInvalidate(), - timeout, - mappedKeys.size(), - inTx() ? tx.size() : mappedKeys.size(), - inTx() && tx.syncMode() == FULL_SYNC, - inTx() ? tx.subjectId() : null, - inTx() ? tx.taskNameHash() : 0, - read ? createTtl : -1L, - read ? accessTtl : -1L, - skipStore, - keepBinary, - clientFirst, - false, - cctx.deploymentEnabled()); - - mapping.request(req); + dhtVer, // Include DHT version to match remote DHT entry. + cctx); } - distributedKeys.add(key); + explicit = inTx() && cand == null; - if (tx != null) + if (explicit) tx.addKeyMapping(txKey, mapping.node()); - req.addKeyBytes( - key, - retval, - dhtVer, // Include DHT version to match remote DHT entry. - cctx); + break; } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry); - explicit = inTx() && cand == null; - - if (explicit) - tx.addKeyMapping(txKey, mapping.node()); - - break; + entry = null; + } } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry); - entry = null; + // Mark mapping explicit lock flag. + if (explicit) { + boolean marked = tx != null && tx.markExplicit(node.id()); + + assert tx == null || marked; } } - // Mark mapping explicit lock flag. - if (explicit) { - boolean marked = tx != null && tx.markExplicit(node.id()); + if (!distributedKeys.isEmpty()) { + mapping.distributedKeys(distributedKeys); - assert tx == null || marked; + hasRmtNodes |= !mapping.node().isLocal(); } - } - - if (!distributedKeys.isEmpty()) { - mapping.distributedKeys(distributedKeys); + else { + assert mapping.request() == null; - hasRmtNodes |= !mapping.node().isLocal(); + iter.remove(); + } } - else { - assert mapping.request() == null; - iter.remove(); + if (hasRmtNodes) { + trackable = true; + + if (!remap && !cctx.mvcc().addFuture(this)) + throw new IllegalStateException("Duplicate future ID: " + this); } + else + trackable = false; } + finally { + /** Notify ready {@link mappings} waiters. See {@link #cancel()} */ + if (tx != null) { + mappingsReady = true; - if (hasRmtNodes) { - trackable = true; - - if (!remap && !cctx.mvcc().addFuture(this)) - throw new IllegalStateException("Duplicate future ID: " + this); + notifyAll(); + } } - else - trackable = false; proceedMapping(); } @@ -1137,11 +1175,27 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF throws IgniteCheckedException { GridNearLockMapping map; + // Fail fast if future is completed (in case of async rollback) + if (isDone()) { + clear(); + + return; + } + + // Fail fast if the transaction is timed out. + if (tx != null && tx.remainingTime() == -1) { + GridDhtColocatedLockFuture.this.onDone(false, tx.timeoutException()); + + clear(); + + return; + } + synchronized (this) { map = mappings.poll(); } - // If there are no more mappings to process, complete the future. + // If there are no more mappings to process or prepare has timed out, complete the future. if (map == null) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index c16a934..ffde0f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; -import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.C1; http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index ede8a4e..2974563 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -412,19 +412,19 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit } if (!commit && !clearThreadMap) - tryRollbackAsync(onTimeout); // Asynchronous rollback. + rollbackAsyncSafe(onTimeout); else doFinish(commit, clearThreadMap); } /** - * Does async rollback when it's safe. - * If current future is not lock future (enlist future) waits until completion and tries again. - * Else terminates or waits for lock future depending on rollback mode. + * Rollback tx when it's safe. + * If current future is not lock future (enlist future) wait until completion and tries again. + * Else cancel lock future (onTimeout=false) or wait for completion due to deadlock detection (onTimeout=true). * * @param onTimeout If {@code true} called from timeout handler. */ - private void tryRollbackAsync(boolean onTimeout) { + private void rollbackAsyncSafe(boolean onTimeout) { IgniteInternalFuture<?> curFut = tx.tryRollbackAsync(); if (curFut == null) { // Safe to rollback. @@ -447,7 +447,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit try { fut.get(); - tryRollbackAsync(onTimeout); + rollbackAsyncSafe(onTimeout); } catch (IgniteCheckedException e) { doFinish(false, false); http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java index 03de5b0..be60a5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java @@ -26,9 +26,12 @@ import java.time.format.DateTimeFormatter; import java.util.Collection; import java.util.TimeZone; import java.util.UUID; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.visor.VisorDataTransferObject; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -82,6 +85,9 @@ public class VisorTxInfo extends VisorDataTransferObject { /** */ private Collection<UUID> masterNodeIds; + /** */ + private AffinityTopologyVersion topVer; + /** * Default constructor. */ @@ -103,7 +109,7 @@ public class VisorTxInfo extends VisorDataTransferObject { */ public VisorTxInfo(IgniteUuid xid, long startTime, long duration, TransactionIsolation isolation, TransactionConcurrency concurrency, long timeout, String lb, Collection<UUID> primaryNodes, - TransactionState state, int size, IgniteUuid nearXid, Collection<UUID> masterNodeIds) { + TransactionState state, int size, IgniteUuid nearXid, Collection<UUID> masterNodeIds, AffinityTopologyVersion topVer) { this.xid = xid; this.startTime = startTime; this.duration = duration; @@ -116,11 +122,12 @@ public class VisorTxInfo extends VisorDataTransferObject { this.size = size; this.nearXid = nearXid; this.masterNodeIds = masterNodeIds; + this.topVer = topVer; } /** {@inheritDoc} */ @Override public byte getProtocolVersion() { - return V2; + return V3; } /** */ @@ -154,6 +161,11 @@ public class VisorTxInfo extends VisorDataTransferObject { } /** */ + public AffinityTopologyVersion getTopologyVersion() { + return topVer; + } + + /** */ public long getTimeout() { return timeout; } @@ -202,6 +214,8 @@ public class VisorTxInfo extends VisorDataTransferObject { U.writeGridUuid(out, nearXid); U.writeCollection(out, masterNodeIds); out.writeLong(startTime); + out.writeLong(topVer == null ? -1 : topVer.topologyVersion()); + out.writeInt(topVer == null ? -1 : topVer.minorTopologyVersion()); } /** {@inheritDoc} */ @@ -217,12 +231,48 @@ public class VisorTxInfo extends VisorDataTransferObject { size = in.readInt(); if (protoVer >= V2) { nearXid = U.readGridUuid(in); - masterNodeIds = U.readCollection(in); - startTime = in.readLong(); } + if (protoVer >= V3) { + long topVer = in.readLong(); + int minorTopVer = in.readInt(); + + if (topVer != -1) + this.topVer = new AffinityTopologyVersion(topVer, minorTopVer); + } + } + /** + * Get tx info as user string. + * + * @return User string. + */ + public String toUserString() { + return " Tx: [xid=" + getXid() + + ", label=" + getLabel() + + ", state=" + getState() + + ", startTime=" + getFormattedStartTime() + + ", duration=" + getDuration() / 1000 + + ", isolation=" + getIsolation() + + ", concurrency=" + getConcurrency() + + ", topVer=" + (getTopologyVersion() == null ? "N/A" : getTopologyVersion()) + + ", timeout=" + getTimeout() + + ", size=" + getSize() + + ", dhtNodes=" + (getPrimaryNodes() == null ? "N/A" : + F.transform(getPrimaryNodes(), new IgniteClosure<UUID, String>() { + @Override public String apply(UUID id) { + return U.id8(id); + } + })) + + ", nearXid=" + getNearXid() + + ", parentNodeIds=" + (getMasterNodeIds() == null ? "N/A" : + F.transform(getMasterNodeIds(), new IgniteClosure<UUID, String>() { + @Override public String apply(UUID id) { + return U.id8(id); + } + })) + + ']'; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java index 9919b7d..23d1663 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java @@ -183,7 +183,7 @@ public class VisorTxTask extends VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN private static final TxKillClosure NEAR_KILL_CLOSURE = new NearKillClosure(); /** */ - private static final TxKillClosure LOCAL_KILL_CLOSURE = NEAR_KILL_CLOSURE; + private static final TxKillClosure LOCAL_KILL_CLOSURE = new LocalKillClosure(); /** */ private static final TxKillClosure REMOTE_KILL_CLOSURE = new RemoteKillClosure(); @@ -312,7 +312,7 @@ public class VisorTxTask extends VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN infos.add(new VisorTxInfo(locTx.xid(), locTx.startTime(), duration, locTx.isolation(), locTx.concurrency(), locTx.timeout(), lb, mappings, locTx.state(), - size, locTx.nearXidVersion().asGridUuid(), locTx.masterNodeIds())); + size, locTx.nearXidVersion().asGridUuid(), locTx.masterNodeIds(), locTx.topologyVersionSnapshot())); if (arg.getOperation() == VisorTxOperation.KILL) killClo.apply(locTx, tm); @@ -394,7 +394,7 @@ public class VisorTxTask extends VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN IgniteBiClosure<IgniteInternalTx, IgniteTxManager, IgniteInternalFuture<IgniteInternalTx>> { } - /** Kills near or local tx. */ + /** Kills near tx. */ private static class NearKillClosure implements TxKillClosure { /** */ private static final long serialVersionUID = 0L; @@ -402,7 +402,19 @@ public class VisorTxTask extends VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN /** {@inheritDoc} */ @Override public IgniteInternalFuture<IgniteInternalTx> apply(IgniteInternalTx tx, IgniteTxManager tm) { return tx.isRollbackOnly() || tx.state() == COMMITTING || tx.state() == COMMITTED ? - new GridFinishedFuture<>() : tx.rollbackAsync(); + new GridFinishedFuture<>() : ((GridNearTxLocal)tx).rollbackNearTxLocalAsync(false, false); + } + } + + /** Kills local tx. */ + private static class LocalKillClosure implements TxKillClosure { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<IgniteInternalTx> apply(IgniteInternalTx tx, IgniteTxManager tm) { + return tx.isRollbackOnly() || tx.state() == COMMITTING || tx.state() == COMMITTED ? + new GridFinishedFuture<>() : ((GridDhtTxLocal)tx).rollbackDhtLocalAsync(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java index 4626dcd..646c3f0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java @@ -42,24 +42,34 @@ import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.visor.VisorTaskArgument; +import org.apache.ignite.internal.visor.tx.VisorTxInfo; +import org.apache.ignite.internal.visor.tx.VisorTxOperation; +import org.apache.ignite.internal.visor.tx.VisorTxTask; +import org.apache.ignite.internal.visor.tx.VisorTxTaskArg; +import org.apache.ignite.internal.visor.tx.VisorTxTaskResult; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -75,6 +85,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.configuration.WALMode.LOG_ONLY; import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; @@ -864,6 +875,122 @@ public class TxRollbackAsyncTest extends GridCommonAbstractTest { } /** + * + */ + public void testRollbackOnTopologyLockPessimistic() throws Exception { + final Ignite client = startClient(); + + Ignite crd = grid(0); + + List<Integer> keys = primaryKeys(grid(1).cache(CACHE_NAME), 1); + + assertTrue(crd.cluster().localNode().order() == 1); + + CountDownLatch txLatch = new CountDownLatch(1); + CountDownLatch tx2Latch = new CountDownLatch(1); + CountDownLatch commitLatch = new CountDownLatch(1); + + // Start tx holding topology. + IgniteInternalFuture txFut = runAsync(new Runnable() { + @Override public void run() { + List<Integer> keys = primaryKeys(grid(0).cache(CACHE_NAME), 1); + + try (Transaction tx = client.transactions().txStart()) { + client.cache(CACHE_NAME).put(keys.get(0), 0); + + txLatch.countDown(); + + U.awaitQuiet(commitLatch); + + tx.commit(); + + fail(); + } + catch (Exception e) { + // Expected. + } + } + }); + + U.awaitQuiet(txLatch); + + crd.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + runAsync(new Runnable() { + @Override public void run() { + try(Transaction tx = crd.transactions().withLabel("testLbl").txStart()) { + // Wait for node start. + waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return crd.cluster().topologyVersion() != GRID_CNT + + /** client node */ 1 + /** stop server node */ 1 + /** start server node */ 1; + } + }, 10_000); + + tx2Latch.countDown(); + + crd.cache(CACHE_NAME).put(keys.get(0), 0); + + tx.commit(); + + fail(); + } + catch (Exception e) { + // Expected. + } + } + }); + + return false; + } + }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); + + IgniteInternalFuture restartFut = runAsync(new Runnable() { + @Override public void run() { + stopGrid(2); + + try { + startGrid(2); + } + catch (Exception e) { + fail(); + } + } + }); + + U.awaitQuiet(tx2Latch); + + // Rollback tx using kill task. + VisorTxTaskArg arg = + new VisorTxTaskArg(VisorTxOperation.KILL, null, null, null, null, null, null, null, null, null); + + Map<ClusterNode, VisorTxTaskResult> res = client.compute(client.cluster().forPredicate(F.alwaysTrue())). + execute(new VisorTxTask(), new VisorTaskArgument<>(client.cluster().localNode().id(), arg, false)); + + int expCnt = 0; + + for (Map.Entry<ClusterNode, VisorTxTaskResult> entry : res.entrySet()) { + if (entry.getValue().getInfos().isEmpty()) + continue; + + for (VisorTxInfo info : entry.getValue().getInfos()) { + log.info(info.toUserString()); + + expCnt++; + } + } + + assertEquals("Expecting 2 transactions", 2, expCnt); + + commitLatch.countDown(); + + txFut.get(); + restartFut.get(); + + checkFutures(); + } + + /** * Locks entry in tx and delays commit until signalled. * * @param node Near node. @@ -896,22 +1023,6 @@ public class TxRollbackAsyncTest extends GridCommonAbstractTest { } /** - * Checks if all tx futures are finished. - */ - private void checkFutures() { - for (Ignite ignite : G.allGrids()) { - IgniteEx ig = (IgniteEx)ignite; - - final Collection<GridCacheFuture<?>> futs = ig.context().cache().context().mvcc().activeFutures(); - - for (GridCacheFuture<?> fut : futs) - log.info("Waiting for future: " + fut); - - assertTrue("Expecting no active futures: node=" + ig.localNode().id(), futs.isEmpty()); - } - } - - /** * @param tx Tx to rollback. */ private IgniteInternalFuture<?> rollbackAsync(final Transaction tx) throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java index 9c37cfa..d6f3ae5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java @@ -19,8 +19,10 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,6 +37,8 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; @@ -42,12 +46,23 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.visor.VisorTaskArgument; +import org.apache.ignite.internal.visor.tx.VisorTxInfo; +import org.apache.ignite.internal.visor.tx.VisorTxOperation; +import org.apache.ignite.internal.visor.tx.VisorTxTask; +import org.apache.ignite.internal.visor.tx.VisorTxTaskArg; +import org.apache.ignite.internal.visor.tx.VisorTxTaskResult; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -62,6 +77,7 @@ import org.apache.ignite.transactions.TransactionTimeoutException; import static java.lang.Thread.sleep; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -88,6 +104,8 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + cfg.setConsistentId(igniteInstanceName); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); @@ -298,7 +316,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { final CountDownLatch l = new CountDownLatch(2); - IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Runnable() { + IgniteInternalFuture<?> fut1 = runAsync(new Runnable() { @Override public void run() { try { try (Transaction tx = node1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 5000, 2)) { @@ -322,7 +340,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { } }, "First"); - IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Runnable() { + IgniteInternalFuture<?> fut2 = runAsync(new Runnable() { @Override public void run() { try (Transaction tx = node2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 2)) { node2.cache(CACHE_NAME).put(2, 2); @@ -812,7 +830,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { final int recordsCnt = 5; - IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Runnable() { + IgniteInternalFuture<?> fut1 = runAsync(new Runnable() { @Override public void run() { try (Transaction tx = near.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, 0)) { try { @@ -860,7 +878,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { } }, "First"); - IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Runnable() { + IgniteInternalFuture<?> fut2 = runAsync(new Runnable() { @Override public void run() { U.awaitQuiet(blocked); http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java index 2542cdb..13c5e41 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java @@ -209,20 +209,4 @@ public class TxRollbackOnTopologyChangeTest extends GridCommonAbstractTest { checkFutures(); } - - /** - * Checks if all tx futures are finished. - */ - private void checkFutures() { - for (Ignite ignite : G.allGrids()) { - IgniteEx ig = (IgniteEx)ignite; - - final Collection<GridCacheFuture<?>> futs = ig.context().cache().context().mvcc().activeFutures(); - - for (GridCacheFuture<?> fut : futs) - log.info("Waiting for future: " + fut); - - assertTrue("Expecting no active futures: node=" + ig.localNode().id(), futs.isEmpty()); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 8591f0c..55086f3 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -1995,4 +1995,27 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { new VisorTaskArgument<>(node.id(), taskArg, false) ); } + + /** + * Checks if all txs and mvcc futures are finished. + */ + protected void checkFutures() { + for (Ignite ignite : G.allGrids()) { + IgniteEx ig = (IgniteEx)ignite; + + final Collection<GridCacheFuture<?>> futs = ig.context().cache().context().mvcc().activeFutures(); + + for (GridCacheFuture<?> fut : futs) + log.info("Waiting for future: " + fut); + + assertTrue("Expecting no active futures: node=" + ig.localNode().id(), futs.isEmpty()); + + Collection<IgniteInternalTx> txs = ig.context().cache().context().tm().activeTransactions(); + + for (IgniteInternalTx tx : txs) + log.info("Waiting for tx: " + tx); + + assertTrue("Expecting no active transactions: node=" + ig.localNode().id(), txs.isEmpty()); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3882ec04/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java index f612a96..8af7a10 100644 --- a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java @@ -1415,29 +1415,6 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest { }, 4, "tx-thread"); } - /** - * Checks if all tx futures are finished. - */ - private void checkFutures() { - for (Ignite ignite : G.allGrids()) { - IgniteEx ig = (IgniteEx)ignite; - - final Collection<GridCacheFuture<?>> futs = ig.context().cache().context().mvcc().activeFutures(); - - for (GridCacheFuture<?> fut : futs) - log.info("Waiting for future: " + fut); - - assertTrue("Expecting no active futures: node=" + ig.localNode().id(), futs.isEmpty()); - - Collection<IgniteInternalTx> txs = ig.context().cache().context().tm().activeTransactions(); - - for (IgniteInternalTx tx : txs) - log.info("Waiting for tx: " + tx); - - assertTrue("Expecting no active transactions: node=" + ig.localNode().id(), txs.isEmpty()); - } - } - /** */ private static class IncrementClosure implements EntryProcessor<Long, Long, Void> { /** {@inheritDoc} */