ignite-6181-1
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2468e009 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2468e009 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2468e009 Branch: refs/heads/ignite-6181-1 Commit: 2468e009032d36404f372d1f3bd616d9320efae4 Parents: f950cb1 Author: sboikov <sboi...@gridgain.com> Authored: Tue Sep 19 16:45:29 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Sep 19 17:37:31 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/cache/CacheMetrics.java | 2 + .../processors/cache/CacheMetricsImpl.java | 4 +- .../processors/cache/GridCacheAdapter.java | 18 +- .../processors/cache/GridCacheMapEntry.java | 9 +- .../cache/GridCacheSharedContext.java | 11 +- .../dht/colocated/GridDhtColocatedCache.java | 4 +- .../colocated/GridDhtColocatedLockFuture.java | 90 ++++--- .../distributed/near/GridNearLockFuture.java | 58 +++-- ...ridNearOptimisticTxPrepareFutureAdapter.java | 2 +- .../near/GridNearTransactionalCache.java | 2 +- .../cache/distributed/near/GridNearTxLocal.java | 71 ++++-- .../cache/transactions/IgniteTxAdapter.java | 4 +- .../cache/transactions/IgniteTxHandler.java | 4 +- .../cache/transactions/IgniteTxManager.java | 230 ++++++------------ .../timeout/GridTimeoutProcessor.java | 17 +- .../IgniteCacheThreadLocalTxTest.java | 179 ++++++++++++++ .../IgniteOptimisticTxSuspendResumeTest.java | 1 + ...ollbackOnTimeoutNoDeadlockDetectionTest.java | 4 +- .../transactions/TxRollbackOnTimeoutTest.java | 238 ++++++++----------- .../ignite/testsuites/IgniteCacheTestSuite.java | 4 +- 20 files changed, 550 insertions(+), 402 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java index 20ea692..470645b 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java @@ -269,6 +269,7 @@ public interface CacheMetrics { * * @return Thread map size. */ + @Deprecated public int getTxThreadMapSize(); /** @@ -318,6 +319,7 @@ public interface CacheMetrics { * * @return DHT thread map size. */ + @Deprecated public int getTxDhtThreadMapSize(); /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index d03a6f8..d608435 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -298,7 +298,7 @@ public class CacheMetricsImpl implements CacheMetrics { /** {@inheritDoc} */ @Override public int getTxThreadMapSize() { - return cctx.tm().threadMapSize(); + return 0; } /** {@inheritDoc} */ @@ -328,7 +328,7 @@ public class CacheMetricsImpl implements CacheMetrics { /** {@inheritDoc} */ @Override public int getTxDhtThreadMapSize() { - return cctx.isNear() && dhtCtx != null ? dhtCtx.tm().threadMapSize() : -1; + return 0; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index fed716c..ebd2ab8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1857,7 +1857,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return new GridFinishedFuture<>(e); } - tx = ctx.tm().threadLocalTx(ctx.systemTx() ? ctx : null); + tx = ctx.tm().currentThreadTx(ctx); } if (tx == null || tx.implicit()) { @@ -3236,7 +3236,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Nullable @Override public GridNearTxLocal tx() { - return ctx.tm().threadLocalTx(ctx); + return ctx.tm().currentThreadTx(ctx); } /** {@inheritDoc} */ @@ -3965,6 +3965,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ @SuppressWarnings("unchecked") IgniteInternalFuture<IgniteInternalTx> commitTxAsync(final GridNearTxLocal tx) { + ctx.tm().clearThreadLocalTx(tx); + FutureHolder holder = lastFut.get(); holder.lock(); @@ -3976,7 +3978,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<IgniteInternalTx> f = new GridEmbeddedFuture<>(fut, new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() { @Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception e) { - return tx.commitNearTxLocalAsync(); + return tx.commitNearTxLocalAsync(false); } }); @@ -3985,7 +3987,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return f; } - IgniteInternalFuture<IgniteInternalTx> f = tx.commitNearTxLocalAsync(); + IgniteInternalFuture<IgniteInternalTx> f = tx.commitNearTxLocalAsync(false); saveFuture(holder, f, /*retry*/false); @@ -4030,7 +4032,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V awaitLastFut(); - GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); + GridNearTxLocal tx = ctx.tm().currentThreadTx(ctx); if (tx == null || tx.implicit()) { TransactionConfiguration tCfg = CU.transactionConfiguration(ctx, ctx.kernalContext().config()); @@ -4126,7 +4128,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (log.isDebugEnabled()) log.debug("Performing async op: " + op); - GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); + GridNearTxLocal tx = ctx.tm().currentThreadTx(ctx); CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -4208,7 +4210,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } catch (IgniteCheckedException e1) { try { - tx0.rollbackNearTxLocalAsync(); + tx0.rollbackNearTxLocalAsync(false); } catch (Throwable e2) { e1.addSuppressed(e2); @@ -4239,7 +4241,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } catch (IgniteCheckedException e1) { try { - tx0.rollbackNearTxLocalAsync(); + tx0.rollbackNearTxLocalAsync(false); } catch (Throwable e2) { e1.addSuppressed(e2); http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index d991c86..0f54762 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -2451,7 +2451,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** {@inheritDoc} */ @Nullable @Override public CacheObject peek(@Nullable IgniteCacheExpiryPolicy plc) throws GridCacheEntryRemovedException, IgniteCheckedException { - IgniteInternalTx tx = cctx.tm().localTxx(); + IgniteInternalTx tx = cctx.tm().localTx(); AffinityTopologyVersion topVer = tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion(); @@ -3098,10 +3098,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @return Current transaction. */ private IgniteTxLocalAdapter currentTx() { - if (cctx.isDht()) - return cctx.dht().near().context().tm().localTx(); - else - return cctx.tm().localTx(); + return cctx.tm().localTx(); } /** {@inheritDoc} */ @@ -3479,7 +3476,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } - IgniteInternalTx tx = cctx.tm().localTxx(); + IgniteInternalTx tx = cctx.tm().localTx(); if (tx != null) { IgniteTxEntry e = tx.entry(txKey()); http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 3fb63dc..604cfe3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -886,12 +886,13 @@ public class GridCacheSharedContext<K, V> { * @return Not null topology version if current thread holds lock preventing topology change. */ @Nullable public AffinityTopologyVersion lockedTopologyVersion(IgniteInternalTx ignore) { - long threadId = Thread.currentThread().getId(); + AffinityTopologyVersion topVer = txMgr.lockedTopologyVersion(ignore); - AffinityTopologyVersion topVer = txMgr.lockedTopologyVersion(threadId, ignore); + if (topVer == null) { + long threadId = Thread.currentThread().getId(); - if (topVer == null) topVer = mvccMgr.lastExplicitLockTopologyVersion(threadId); + } return topVer; } @@ -926,7 +927,7 @@ public class GridCacheSharedContext<K, V> { if (ctx == null) { tx.txState().awaitLastFut(this); - return tx.commitNearTxLocalAsync(); + return tx.commitNearTxLocalAsync(true); } else return ctx.cache().commitTxAsync(tx); @@ -940,7 +941,7 @@ public class GridCacheSharedContext<K, V> { public IgniteInternalFuture rollbackTxAsync(GridNearTxLocal tx) throws IgniteCheckedException { tx.txState().awaitLastFut(this); - return tx.rollbackNearTxLocalAsync(); + return tx.rollbackNearTxLocalAsync(true); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index c72f53e..de4a177 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -186,7 +186,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (keyCheck) validateCacheKey(key); - GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); + GridNearTxLocal tx = ctx.tm().currentThreadTx(ctx); final CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -273,7 +273,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (keyCheck) validateCacheKeys(keys); - GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); + GridNearTxLocal tx = ctx.tm().currentThreadTx(ctx); final CacheOperationContext opCtx = ctx.operationContextPerCall(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 82b0e6e..e5f94a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock; @@ -147,7 +148,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF private final Map<KeyCacheObject, IgniteBiTuple<GridCacheVersion, CacheObject>> valMap; /** Trackable flag (here may be non-volatile). */ - private boolean trackable = true; + private boolean trackable; /** TTL for create operation. */ private final long createTtl; @@ -226,12 +227,6 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF log = U.logger(cctx.kernalContext(), logRef, GridDhtColocatedLockFuture.class); } - if (timeout > 0) { - timeoutObj = new LockTimeoutObject(); - - cctx.time().addTimeoutObject(timeoutObj); - } - valMap = new ConcurrentHashMap8<>(); } @@ -399,7 +394,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF * @param success Success flag. */ public void complete(boolean success) { - onComplete(success, true); + onComplete(success, true, true); } /** @@ -533,7 +528,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF /** {@inheritDoc} */ @Override public boolean cancel() { if (onCancelled()) - onComplete(false, true); + onComplete(false, true, true); return isCancelled(); } @@ -556,7 +551,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF if (err != null) success = false; - return onComplete(success, true); + return onComplete(success, true, true); } /** @@ -564,9 +559,10 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF * * @param success {@code True} if lock was acquired. * @param distribute {@code True} if need to distribute lock removal in case of failure. + * @param restoreTimeout {@code True} if need restore tx timeout callback. * @return {@code True} if complete by this operation. */ - private boolean onComplete(boolean success, boolean distribute) { + private boolean onComplete(boolean success, boolean distribute, boolean restoreTimeout) { if (log.isDebugEnabled()) log.debug("Received onComplete(..) callback [success=" + success + ", distribute=" + distribute + ", fut=" + this + ']'); @@ -577,6 +573,12 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF if (tx != null) cctx.tm().txContext(tx); + if (restoreTimeout && tx != null && tx.trackTimeout()) { + // Need restore timeout before onDone is called, but onComplete can be called concurrently, + // thus need ignore duplicated timeout objects. + tx.addTimeoutHandler(true); + } + if (super.onDone(success, err)) { if (log.isDebugEnabled()) log.debug("Completing future: " + this); @@ -631,13 +633,6 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF } } - /** - * @return Timeout. - */ - public long timeout() { - return timeout; - } - /** {@inheritDoc} */ @Override public String toString() { Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @@ -682,12 +677,36 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF * part. Note that if primary node leaves grid, the future will fail and transaction will be rolled back. */ void map() { + if (tx != null && tx.trackTimeout()) { + if (!tx.removeTimeoutHandler()) { + tx.finishFuture().listen(new IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() { + @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) { + IgniteTxTimeoutCheckedException err = new IgniteTxTimeoutCheckedException("Failed to " + + "acquire lock, transaction was rolled back on timeout [timeout=" + tx.timeout() + + ", tx=" + tx + ']'); + + onError(err); + + onComplete(false, false, false); + } + }); + + return; + } + } + + if (timeout > 0) { + timeoutObj = new LockTimeoutObject(); + + cctx.time().addTimeoutObject(timeoutObj); + } + // Obtain the topology version to use. AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); // If there is another system transaction in progress, use it's topology version to prevent deadlock. if (topVer == null && tx != null && tx.system()) - topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), tx); + topVer = cctx.tm().lockedTopologyVersion(tx); if (topVer != null && tx != null) tx.topologyVersion(topVer); @@ -856,12 +875,8 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); // First assume this node is primary for all keys passed in. - if (!clientNode && mapAsPrimary(keys, topVer)) { - if (!cctx.mvcc().addFuture(this)) - throw new IllegalStateException("Duplicate future ID: " + this); - + if (!clientNode && mapAsPrimary(keys, topVer)) return; - } mappings = new ArrayDeque<>(); @@ -892,6 +907,8 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF if (log.isDebugEnabled()) log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); + boolean hasRmtNodes = false; + boolean first = true; // Create mini futures. @@ -939,7 +956,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF if (log.isDebugEnabled()) log.debug("Entry being locked did not pass filter (will not lock): " + entry); - onComplete(false, false); + onComplete(false, false, true); return; } @@ -1038,8 +1055,11 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF } } - if (!distributedKeys.isEmpty()) + if (!distributedKeys.isEmpty()) { mapping.distributedKeys(distributedKeys); + + hasRmtNodes |= !mapping.node().isLocal(); + } else { assert mapping.request() == null; @@ -1047,8 +1067,14 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF } } - if (!remap && !cctx.mvcc().addFuture(this)) - throw new IllegalStateException("Duplicate future ID: " + this); + if (hasRmtNodes) { + trackable = true; + + if (!remap && !cctx.mvcc().addFuture(this)) + throw new IllegalStateException("Duplicate future ID: " + this); + } + else + trackable = false; proceedMapping(); } @@ -1264,6 +1290,8 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF return true; } + trackable = false; + if (tx != null) { if (explicit) tx.markExplicit(cctx.localNodeId()); @@ -1305,7 +1333,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF if (log.isDebugEnabled()) log.debug("Entry being locked did not pass filter (will not lock): " + entry); - onComplete(false, false); + onComplete(false, false, true); return false; } @@ -1417,12 +1445,12 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF U.warn(log, "Failed to detect deadlock.", e); } - onComplete(false, true); + onComplete(false, true, true); } }); } else - onComplete(false, true); + onComplete(false, true, true); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index bb71337..00f6ef5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock; @@ -230,12 +231,6 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo if (log == null) log = U.logger(cctx.kernalContext(), logRef, GridNearLockFuture.class); - if (timeout > 0) { - timeoutObj = new LockTimeoutObject(); - - cctx.time().addTimeoutObject(timeoutObj); - } - valMap = new ConcurrentHashMap8<>(); } @@ -434,7 +429,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo * @param success Success flag. */ public void complete(boolean success) { - onComplete(success, true); + onComplete(success, true, true); } /** @@ -655,7 +650,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo log.debug("Local lock acquired for entries [fut=" + this + ", entries=" + entries + "]"); } - onComplete(true, true); + onComplete(true, true, true); return true; } @@ -666,7 +661,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo /** {@inheritDoc} */ @Override public boolean cancel() { if (onCancelled()) - onComplete(false, true); + onComplete(false, true, true); return isCancelled(); } @@ -690,7 +685,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo if (err != null) success = false; - return onComplete(success, true); + return onComplete(success, true, true); } /** @@ -698,9 +693,10 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo * * @param success {@code True} if lock was acquired. * @param distribute {@code True} if need to distribute lock removal in case of failure. + * @param restoreTimeout {@code True} if need restore tx timeout callback. * @return {@code True} if complete by this operation. */ - private boolean onComplete(boolean success, boolean distribute) { + private boolean onComplete(boolean success, boolean distribute, boolean restoreTimeout) { if (log.isDebugEnabled()) log.debug("Received onComplete(..) callback [success=" + success + ", distribute=" + distribute + ", fut=" + this + ']'); @@ -711,6 +707,12 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo if (tx != null) cctx.tm().txContext(tx); + if (restoreTimeout && tx != null && tx.trackTimeout()) { + // Need restore timeout before onDone is called, but onComplete can be called concurrently, + // thus need ignore duplicated timeout objects. + tx.addTimeoutHandler(true); + } + if (super.onDone(success, err)) { if (log.isDebugEnabled()) log.debug("Completing future: " + this); @@ -770,6 +772,30 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo * part. Note that if primary node leaves grid, the future will fail and transaction will be rolled back. */ void map() { + if (tx != null && tx.trackTimeout()) { + if (!tx.removeTimeoutHandler()) { + tx.finishFuture().listen(new IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() { + @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) { + IgniteTxTimeoutCheckedException err = new IgniteTxTimeoutCheckedException("Failed to " + + "acquire lock, transaction was rolled back on timeout [timeout=" + tx.timeout() + + ", tx=" + tx + ']'); + + onError(err); + + onComplete(false, false, false); + } + }); + + return; + } + } + + if (timeout > 0) { + timeoutObj = new LockTimeoutObject(); + + cctx.time().addTimeoutObject(timeoutObj); + } + // Obtain the topology version to use. long threadId = Thread.currentThread().getId(); @@ -777,7 +803,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo // If there is another system transaction in progress, use it's topology version to prevent deadlock. if (topVer == null && tx != null && tx.system()) - topVer = cctx.tm().lockedTopologyVersion(threadId, tx); + topVer = cctx.tm().lockedTopologyVersion(tx); if (topVer != null && tx != null) tx.topologyVersion(topVer); @@ -981,9 +1007,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo if (log.isDebugEnabled()) log.debug("Entry being locked did not pass filter (will not lock): " + entry); - onComplete( - false, - false); + onComplete(false, false, true); return; } @@ -1476,12 +1500,12 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo U.warn(log, "Failed to detect deadlock.", e); } - onComplete(false, true); + onComplete(false, true, true); } }); } else - onComplete(false, true); + onComplete(false, true, true); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index f09b6c8..9e5be1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -55,7 +55,7 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT // If there is another system transaction in progress, use it's topology version to prevent deadlock. if (topVer == null && tx.system()) { - topVer = cctx.tm().lockedTopologyVersion(threadId, tx); + topVer = cctx.tm().lockedTopologyVersion(tx); if (topVer == null) topVer = tx.topologyVersionSnapshot(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index a691cbc..19def50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -133,7 +133,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> if (keyCheck) validateCacheKeys(keys); - GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); + GridNearTxLocal tx = ctx.tm().currentThreadTx(ctx); CacheOperationContext opCtx = ctx.operationContextPerCall(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index ad3b464..a080bfa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -233,7 +233,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou initResult(); if (trackTimeout()) - cctx.time().addTimeoutObject(this); + addTimeoutHandler(false); } /** {@inheritDoc} */ @@ -1616,7 +1616,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou // with prepare response, if required. assert loadFut.isDone(); - return nonInterruptable(commitNearTxLocalAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { + return nonInterruptable(commitNearTxLocalAsync(false).chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) throws IgniteCheckedException { try { @@ -1626,7 +1626,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou implicitRes.value(), implicitRes.success()); } catch (IgniteCheckedException | RuntimeException e) { - rollbackNearTxLocalAsync(); + rollbackNearTxLocalAsync(false); throw e; } @@ -2384,7 +2384,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou return new GridFinishedFuture<>(e); } - return nonInterruptable(commitNearTxLocalAsync().chain( + return nonInterruptable(commitNearTxLocalAsync(false).chain( new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) throws IgniteCheckedException { @@ -2403,7 +2403,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } catch (IgniteCheckedException | RuntimeException e) { if (!(e instanceof NodeStoppingException)) - rollbackNearTxLocalAsync(); + rollbackNearTxLocalAsync(false); throw e; } @@ -3171,30 +3171,26 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou * @throws IgniteCheckedException If failed. */ public final void prepare() throws IgniteCheckedException { - prepareAsync().get(); - } - - /** - * @return Prepare future. - */ - private IgniteInternalFuture<?> prepareAsync() { - return prepareNearTxLocal(); + prepareNearTxLocal().get(); } /** * @throws IgniteCheckedException If failed. */ public void commit() throws IgniteCheckedException { - commitNearTxLocalAsync().get(); + commitNearTxLocalAsync(true).get(); } /** * @return Finish future. */ - public IgniteInternalFuture<IgniteInternalTx> commitNearTxLocalAsync() { + public IgniteInternalFuture<IgniteInternalTx> commitNearTxLocalAsync(boolean clearThreadLocal) { if (log.isDebugEnabled()) log.debug("Committing near local tx: " + this); + if (clearThreadLocal) + cctx.tm().clearThreadLocalTx(this); + if (fastFinish()) { state(PREPARING); state(PREPARED); @@ -3248,23 +3244,26 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** {@inheritDoc} */ @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { - return commitNearTxLocalAsync(); + return commitNearTxLocalAsync(true); } /** * @throws IgniteCheckedException If failed. */ public void rollback() throws IgniteCheckedException { - rollbackNearTxLocalAsync().get(); + rollbackNearTxLocalAsync(true).get(); } /** * @return Rollback future. */ - public IgniteInternalFuture<IgniteInternalTx> rollbackNearTxLocalAsync() { + public IgniteInternalFuture<IgniteInternalTx> rollbackNearTxLocalAsync(boolean clearThreadLocal) { if (log.isDebugEnabled()) log.debug("Rolling back near tx: " + this); + if (clearThreadLocal) + cctx.tm().clearThreadLocalTx(this); + removeTimeoutHandler(); if (fastFinish()) { @@ -3328,7 +3327,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** {@inheritDoc} */ @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() { - return rollbackNearTxLocalAsync(); + return rollbackNearTxLocalAsync(true); } /** @@ -3698,8 +3697,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED) rollback(); - else + else { + cctx.tm().clearThreadLocalTx(this); + removeTimeoutHandler(); + } synchronized (this) { try { @@ -4013,16 +4015,26 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** * @return {@code True} if need register callback which cancels tx on timeout. */ - private boolean trackTimeout() { + public boolean trackTimeout() { return timeout() > 0 && !implicit(); } /** * Removes timeout handler. + * + * @return {@code True} if handler was removed. */ - private void removeTimeoutHandler() { - if (trackTimeout()) - cctx.time().removeTimeoutObject(this); + public boolean removeTimeoutHandler() { + return trackTimeout() && cctx.time().removeTimeoutObject(this); + } + + /** + * @param ignoreDuplicated {@code True} if do need check for duplicated timeout object. + */ + public void addTimeoutHandler(boolean ignoreDuplicated) { + assert trackTimeout(); + + cctx.time().addTimeoutObject(this, ignoreDuplicated); } /** {@inheritDoc} */ @@ -4037,13 +4049,20 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** {@inheritDoc} */ @Override public void onTimeout() { - if (state(MARKED_ROLLBACK, true)) { + if (state(MARKED_ROLLBACK, true) || (state() == MARKED_ROLLBACK)) { + if (log.isDebugEnabled()) + log.debug("Will rollback tx on timeout: " + this); + cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - rollbackNearTxLocalAsync(); + rollbackNearTxLocalAsync(false); } }); } + else { + if (log.isDebugEnabled()) + log.debug("Skip rollback tx on timeout: " + this); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 7d4464a..2e4e181 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -469,7 +469,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement if (res.equals(AffinityTopologyVersion.NONE)) { if (system()) { - AffinityTopologyVersion topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), this); + AffinityTopologyVersion topVer = cctx.tm().lockedTopologyVersion(this); if (topVer != null) return topVer; @@ -966,7 +966,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement * @return {@code True} if state changed. */ @SuppressWarnings({"TooBroadScope"}) - protected boolean state(TransactionState state, boolean timedOut) { + protected final boolean state(TransactionState state, boolean timedOut) { boolean valid = false; TransactionState prev; http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index de0e415..f87b786 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -261,7 +261,7 @@ public class IgniteTxHandler { req.last()); if (locTx.isRollbackOnly()) - locTx.rollbackNearTxLocalAsync(); + locTx.rollbackNearTxLocalAsync(false); return fut.chain(new C1<IgniteInternalFuture<GridNearTxPrepareResponse>, GridNearTxPrepareResponse>() { @Override public GridNearTxPrepareResponse apply(IgniteInternalFuture<GridNearTxPrepareResponse> f) { @@ -1021,7 +1021,7 @@ public class IgniteTxHandler { if (tx != null) try { - return tx.rollbackNearTxLocalAsync(); + return tx.rollbackNearTxLocalAsync(false); } catch (Throwable e1) { e.addSuppressed(e1); http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 6aa04ac..6144b27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -148,11 +147,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** Topology version should be used when mapping internal tx. */ private final ThreadLocal<AffinityTopologyVersion> txTop = new ThreadLocal<>(); - /** Per-thread transaction map. */ - private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap(); + /** Current thread's tx. */ + private final ThreadLocal<GridNearTxLocal> threadTx = new ThreadLocal<>(); - /** Per-thread system transaction map. */ - private final ConcurrentMap<TxThreadKey, IgniteInternalTx> sysThreadMap = newMap(); + /** Current thread's system tx. */ + private final ThreadLocal<GridNearTxLocal> threadSysTx = new ThreadLocal<>(); /** Per-ID map. */ private final ConcurrentMap<GridCacheVersion, IgniteInternalTx> idMap = newMap(); @@ -284,7 +283,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { public void rollbackTransactionsForCache(int cacheId) { rollbackTransactionsForCache(cacheId, nearIdMap); - rollbackTransactionsForCache(cacheId, threadMap); + rollbackTransactionsForCache(cacheId, idMap); } /** @@ -309,8 +308,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @Override public void onDisconnected(IgniteFuture reconnectFut) { txFinishSync.onDisconnected(reconnectFut); - for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet()) - rollbackTx(e.getValue()); + for (IgniteInternalTx tx : idMap.values()) + rollbackTx(tx); + for (IgniteInternalTx tx : nearIdMap.values()) + rollbackTx(tx); IgniteClientDisconnectedException err = new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."); @@ -369,20 +370,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @Override public void printMemoryStats() { X.println(">>> "); X.println(">>> Transaction manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ']'); - X.println(">>> threadMapSize: " + threadMap.size()); X.println(">>> idMap [size=" + idMap.size() + ']'); + X.println(">>> nearIdMap [size=" + nearIdMap.size() + ']'); X.println(">>> completedVersSortedSize: " + completedVersSorted.size()); X.println(">>> completedVersHashMapSize: " + completedVersHashMap.sizex()); } /** - * @return Thread map size. - */ - public int threadMapSize() { - return threadMap.size(); - } - - /** * @return ID map size. */ public int idMapSize() { @@ -452,7 +446,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { taskNameHash); if (tx.system()) { - AffinityTopologyVersion topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), tx); + AffinityTopologyVersion topVer = cctx.tm().lockedTopologyVersion(tx); // If there is another system transaction in progress, use it's topology version to prevent deadlock. if (topVer != null) @@ -483,14 +477,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { IgniteInternalTx t; if ((t = txIdMap.putIfAbsent(tx.xidVersion(), tx)) == null) { - // Add both, explicit and implicit transactions. - // Do not add remote and dht local transactions as remote node may have the same thread ID - // and overwrite local transaction. if (tx.local() && !tx.dht()) { - if (cacheCtx == null || !cacheCtx.systemTx()) - threadMap.put(tx.threadId(), tx); - else - sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()), tx); + assert tx instanceof GridNearTxLocal : tx; + + if (!tx.implicit()) { + if (cacheCtx == null || !cacheCtx.systemTx()) + threadTx.set((GridNearTxLocal)tx); + else + threadSysTx.set((GridNearTxLocal)tx); + } } // Handle mapped versions. @@ -626,27 +621,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** * @return Local transaction. */ - @SuppressWarnings({"unchecked"}) - @Nullable public <T> T localTx() { - IgniteInternalTx tx = tx(); + @Nullable public IgniteTxLocalAdapter localTx() { + IgniteTxLocalAdapter tx = tx(); - return tx != null && tx.local() ? (T)tx : null; - } - - /** - * @param cctx Cache context. - * @return Transaction for current thread. - */ - public GridNearTxLocal threadLocalTx(GridCacheContext cctx) { - IgniteInternalTx tx = tx(cctx, Thread.currentThread().getId()); - - if (tx != null && tx.local() && (!tx.dht() || tx.colocated()) && !tx.implicit()) { - assert tx instanceof GridNearTxLocal : tx; - - return (GridNearTxLocal)tx; - } - - return null; + return tx != null && tx.local() ? tx : null; } /** @@ -656,16 +634,22 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { public <T> T tx() { IgniteInternalTx tx = txContext(); - return tx != null ? (T)tx : (T)tx(null, Thread.currentThread().getId()); + return tx != null ? (T)tx : (T)currentThreadTx(null); } /** - * @param threadId Thread ID. * @param ignore Transaction to ignore. * @return Not null topology version if current thread holds lock preventing topology change. */ - @Nullable public AffinityTopologyVersion lockedTopologyVersion(long threadId, IgniteInternalTx ignore) { - IgniteInternalTx tx = threadMap.get(threadId); + @Nullable public AffinityTopologyVersion lockedTopologyVersion(IgniteInternalTx ignore) { + IgniteInternalTx tx = threadTx.get(); + + if (tx == null) { + tx = threadSysTx.get(); + + if (tx == ignore) + tx = null; + } if (tx != null) { AffinityTopologyVersion topVer = tx.topologyVersionSnapshot(); @@ -674,22 +658,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return topVer; } - if (!sysThreadMap.isEmpty()) { - for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) { - if (!cacheCtx.systemTx()) - continue; - - tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId())); - - if (tx != null && tx != ignore) { - AffinityTopologyVersion topVer = tx.topologyVersionSnapshot(); - - if (topVer != null) - return topVer; - } - } - } - return txTop.get(); } @@ -712,15 +680,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * @return Local transaction. - */ - @Nullable public IgniteInternalTx localTxx() { - IgniteInternalTx tx = tx(); - - return tx != null && tx.local() ? tx : null; - } - - /** * @return User transaction for current thread. */ @Nullable public GridNearTxLocal userTx() { @@ -729,7 +688,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (activeUserTx(tx)) return (GridNearTxLocal)tx; - tx = tx(null, Thread.currentThread().getId()); + tx = currentThreadTx(null); if (activeUserTx(tx)) return (GridNearTxLocal)tx; @@ -742,7 +701,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return User transaction for current thread. */ @Nullable GridNearTxLocal userTx(GridCacheContext cctx) { - IgniteInternalTx tx = tx(cctx, Thread.currentThread().getId()); + IgniteInternalTx tx = currentThreadTx(cctx); if (activeUserTx(tx)) return (GridNearTxLocal)tx; @@ -766,17 +725,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** * @param cctx Cache context. - * @param threadId Id of thread for transaction. * @return Transaction for thread with given ID. */ @SuppressWarnings({"unchecked"}) - private <T> T tx(GridCacheContext cctx, long threadId) { + public GridNearTxLocal currentThreadTx(GridCacheContext cctx) { if (cctx == null || !cctx.systemTx()) - return (T)threadMap.get(threadId); + return threadTx.get(); - TxThreadKey key = new TxThreadKey(threadId, cctx.cacheId()); - - return (T)sysThreadMap.get(key); + return threadSysTx.get(); } /** @@ -1208,32 +1164,29 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { collectPendingVersions(dhtTxLoc); } - // 4. Unlock write resources. + // 3. Unlock write resources. unlockMultiple(tx, tx.writeEntries()); - // 5. Unlock read resources if required. + // 4. Unlock read resources if required. if (unlockReadEntries(tx)) unlockMultiple(tx, tx.readEntries()); - // 6. Notify evictions. + // 5. Notify evictions. notifyEvictions(tx); - // 7. Remove obsolete entries from cache. + // 6. Remove obsolete entries from cache. removeObsolete(tx); - // 8. Assign transaction number at the end of transaction. + // 7. Assign transaction number at the end of transaction. tx.endVersion(cctx.versions().next(tx.topologyVersion())); - // 9. Remove from per-thread storage. - clearThreadMap(tx); - - // 10. Unregister explicit locks. + // 8. Unregister explicit locks. if (!tx.alternateVersions().isEmpty()) { for (GridCacheVersion ver : tx.alternateVersions()) idMap.remove(ver); } - // 11. Remove Near-2-DHT mappings. + // 9. Remove Near-2-DHT mappings. if (tx instanceof GridCacheMappedVersion) { GridCacheVersion mapped = ((GridCacheMappedVersion)tx).mappedVersion(); @@ -1241,10 +1194,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { mappedVers.remove(mapped); } - // 12. Clear context. + // 10. Clear context. resetContext(); - // 14. Update metrics. + // 11. Update metrics. if (!tx.dht() && tx.local()) { if (!tx.system()) cctx.txMetrics().onTxCommit(); @@ -1294,22 +1247,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // 5. Remove obsolete entries. removeObsolete(tx); - // 6. Remove from per-thread storage. - clearThreadMap(tx); - - // 7. Unregister explicit locks. + // 6. Unregister explicit locks. if (!tx.alternateVersions().isEmpty()) for (GridCacheVersion ver : tx.alternateVersions()) idMap.remove(ver); - // 8. Remove Near-2-DHT mappings. + // 7. Remove Near-2-DHT mappings. if (tx instanceof GridCacheMappedVersion) mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion()); - // 9. Clear context. + // 8. Clear context. resetContext(); - // 10. Update metrics. + // 9. Update metrics. if (!tx.dht() && tx.local()) { if (!tx.system()) cctx.txMetrics().onTxRollback(); @@ -1350,13 +1300,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // 3. Remove obsolete entries. removeObsolete(tx); - // 4. Remove from per-thread storage. - clearThreadMap(tx); - - // 5. Clear context. + // 4. Clear context. resetContext(); - // 6. Update metrics. + // 5. Update metrics. if (!tx.dht() && tx.local()) { if (!tx.system()) { if (commit) @@ -1394,20 +1341,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // 3. Notify evictions. notifyEvictions(tx); - // 4. Remove from per-thread storage. - clearThreadMap(tx); - - // 5. Unregister explicit locks. + // 4. Unregister explicit locks. if (!tx.alternateVersions().isEmpty()) { for (GridCacheVersion ver : tx.alternateVersions()) idMap.remove(ver); } - // 6. Remove Near-2-DHT mappings. + // 5. Remove Near-2-DHT mappings. if (tx instanceof GridCacheMappedVersion) mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion()); - // 7. Clear context. + // 6. Clear context. resetContext(); if (log.isDebugEnabled()) @@ -1420,28 +1364,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** * @param tx Transaction to clear. */ - private void clearThreadMap(IgniteInternalTx tx) { - if (tx.local() && !tx.dht()) { - if (!tx.system()) - threadMap.remove(tx.threadId(), tx); - else { - Integer cacheId = tx.txState().firstCacheId(); - - if (cacheId != null) - sysThreadMap.remove(new TxThreadKey(tx.threadId(), cacheId), tx); - else { - for (Iterator<IgniteInternalTx> it = sysThreadMap.values().iterator(); it.hasNext(); ) { - IgniteInternalTx txx = it.next(); - - if (tx == txx) { - it.remove(); + public void clearThreadLocalTx(GridNearTxLocal tx) { + if (tx.implicit()) + return; - break; - } - } - } - } - } + if (tx.system()) + threadSysTx.set(null); + else + threadTx.set(null); } /** @@ -2250,6 +2180,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @see #resumeTx(GridNearTxLocal) * @see GridNearTxLocal#suspend() * @see GridNearTxLocal#resume() + * @throws IgniteCheckedException If failed to suspend transaction. */ public void suspendTx(final GridNearTxLocal tx) throws IgniteCheckedException { assert tx != null && !tx.system() : tx; @@ -2259,7 +2190,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { + "[expected=" + ACTIVE + ", actual=" + tx.state() + ']'); } - clearThreadMap(tx); + clearThreadLocalTx(tx); transactionMap(tx).remove(tx.xidVersion(), tx); } @@ -2273,45 +2204,30 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @see #suspendTx(GridNearTxLocal) * @see GridNearTxLocal#suspend() * @see GridNearTxLocal#resume() + * @throws IgniteCheckedException If failed to resume tx. */ public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException { assert tx != null && !tx.system() : tx; - assert !threadMap.containsValue(tx) : tx; assert !transactionMap(tx).containsValue(tx) : tx; - assert !haveSystemTxForThread(Thread.currentThread().getId()); + assert threadSysTx.get() == null; if (!tx.state(ACTIVE)) { throw new IgniteCheckedException("Trying to resume transaction with incorrect state " - + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']'); + + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']'); } - long threadId = Thread.currentThread().getId(); - - if (threadMap.putIfAbsent(threadId, tx) != null) - throw new IgniteCheckedException("Thread already has started a transaction."); - - if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null) + if (threadTx.get() != null) throw new IgniteCheckedException("Thread already has started a transaction."); - tx.threadId(threadId); - } + threadTx.set(tx); - /** - * @param threadId Thread id. - * @return True if thread have system transaction. False otherwise. - */ - private boolean haveSystemTxForThread(long threadId) { - if (!sysThreadMap.isEmpty()) { - for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) { - if (!cacheCtx.systemTx()) - continue; + if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null) { + threadTx.set(null); - if (sysThreadMap.containsKey(new TxThreadKey(threadId, cacheCtx.cacheId()))) - return true; - } + throw new IgniteCheckedException("Thread already has started a transaction."); } - return false; + tx.threadId(Thread.currentThread().getId()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java index 8c71f76..e69306f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java @@ -85,15 +85,23 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { /** * @param timeoutObj Timeout object. */ - @SuppressWarnings({"NakedNotify", "CallToNotifyInsteadOfNotifyAll"}) public void addTimeoutObject(GridTimeoutObject timeoutObj) { + addTimeoutObject(timeoutObj, false); + } + + /** + * @param timeoutObj Timeout object. + * @param ignoreDuplicated {@code True} if do need check for duplicated object. + */ + @SuppressWarnings({"NakedNotify", "CallToNotifyInsteadOfNotifyAll"}) + public void addTimeoutObject(GridTimeoutObject timeoutObj, boolean ignoreDuplicated) { if (timeoutObj.endTime() <= 0 || timeoutObj.endTime() == Long.MAX_VALUE) // Timeout will never happen. return; boolean added = timeoutObjs.add(timeoutObj); - assert added : "Duplicate timeout object found: " + timeoutObj; + assert ignoreDuplicated || added : "Duplicate timeout object found: " + timeoutObj; if (timeoutObjs.firstx() == timeoutObj) { synchronized (mux) { @@ -124,9 +132,10 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { /** * @param timeoutObj Timeout object. + * @return {@code True} if timeout object was removed. */ - public void removeTimeoutObject(GridTimeoutObject timeoutObj) { - timeoutObjs.remove(timeoutObj); + public boolean removeTimeoutObject(GridTimeoutObject timeoutObj) { + return timeoutObjs.remove(timeoutObj); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java new file mode 100644 index 0000000..d62bae2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; + +/** + * + */ +public class IgniteCacheThreadLocalTxTest extends GridCommonAbstractTest { + /** + * + */ + public IgniteCacheThreadLocalTxTest() { + super(true); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void testThreadLocalTx() throws Exception { + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setAtomicityMode(TRANSACTIONAL); + + Ignite node = grid(); + + IgniteCache<Object, Object> cache = node.createCache(ccfg); + IgniteTransactions txs = node.transactions(); + + checkNoTx(node); + + boolean[] reads = {true, false}; + boolean[] writes = {true, false}; + int endOps = 5; + + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + for (boolean read : reads) { + for (boolean write : writes) { + for (int i = 0; i < endOps; i++) + checkTx(concurrency, isolation, node, cache, read, write, i); + } + } + } + } + + assertNull(txs.tx()); + } + + /** + * @param concurrency Tx concurrency. + * @param isolation Tx isolation. + * @param node Node. + * @param cache Cache. + * @param read {@code True} if read in tx. + * @param write {@code True} if write in tx. + * @param endOp Operation to test. + */ + private void checkTx(TransactionConcurrency concurrency, + TransactionIsolation isolation, + Ignite node, + IgniteCache<Object, Object> cache, + boolean read, + boolean write, + int endOp) { + IgniteTransactions txs = node.transactions(); + + checkNoTx(node); + + Transaction tx = txs.txStart(concurrency, isolation); + + assertEquals(tx, txs.tx()); + + try { + txs.txStart(concurrency, isolation); + + fail(); + } + catch (IllegalStateException expected) { + // No-op. + } + + if (read) + cache.get(ThreadLocalRandom.current().nextInt(100_000)); + + if (write) + cache.put(ThreadLocalRandom.current().nextInt(100_000), 1); + + + try { + txs.txStart(concurrency, isolation); + + fail(); + } + catch (IllegalStateException expected) { + // No-op. + } + + assertEquals(tx, txs.tx()); + + IgniteFuture fut = null; + + switch (endOp) { + case 0: + tx.commit(); + + break; + + case 1: + fut = tx.commitAsync(); + + break; + + case 2: + tx.rollback(); + + break; + + case 3: + fut = tx.rollbackAsync(); + + break; + + case 4: + tx.close(); + + break; + + default: + fail(); + } + + checkNoTx(node); + + if (fut != null) + fut.get(); + + checkNoTx(node); + } + + /** + * @param node Node. + */ + private void checkNoTx(Ignite node) { + IgniteTransactions txs = node.transactions(); + + assertNull(txs.tx()); + assertNull(((IgniteKernal)node).context().cache().context().tm().tx()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java index ae691f5..51e39a7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java @@ -58,6 +58,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest /** Future timeout */ private static final int FUT_TIMEOUT = 5000; + /** */ private boolean client = false; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2468e009/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNoDeadlockDetectionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNoDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNoDeadlockDetectionTest.java index 77c383e..5123329 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNoDeadlockDetectionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNoDeadlockDetectionTest.java @@ -40,8 +40,8 @@ public class TxRollbackOnTimeoutNoDeadlockDetectionTest extends TxRollbackOnTime } /** */ - @Override protected void validateException(Exception e) { - assertEquals("Deadlock report is expected", + @Override protected void validateDeadlockException(Exception e) { + assertEquals("TimeoutException is expected", TransactionTimeoutException.class, e.getCause().getClass()); } } \ No newline at end of file