IGNITE-1537 Fixed near optimistic TX future to avoid early completion.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/de08cd55 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/de08cd55 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/de08cd55 Branch: refs/heads/ignite-1282 Commit: de08cd554e75db0df06a5438da5012ebf6c7ad09 Parents: 895760e Author: sboikov <sboi...@gridgain.com> Authored: Fri Nov 27 08:40:16 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Nov 27 08:40:16 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 +- ...arOptimisticSerializableTxPrepareFuture.java | 92 ++++---------------- .../near/GridNearOptimisticTxPrepareFuture.java | 57 ++++++------ ...ridNearOptimisticTxPrepareFutureAdapter.java | 70 +++++++++++++++ .../cache/local/GridLocalCacheEntry.java | 6 ++ .../transactions/IgniteTxLocalAdapter.java | 2 +- .../util/future/GridCompoundFuture.java | 2 +- 7 files changed, 123 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index c55bead..1d418ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -417,7 +417,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter U.error(log, "Failed to get result value for cache entry: " + cached, e); } catch (GridCacheEntryRemovedException e) { - assert false : "Got entry removed exception while holding transactional lock on entry: " + e; + assert false : "Got entry removed exception while holding transactional lock on entry [e=" + e + ", cached=" + cached + ']'; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 144070c..916c7ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -39,12 +39,10 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; -import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -76,7 +74,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim /** */ @GridToStringExclude - private KeyLockFuture keyLockFut = new KeyLockFuture(); + private KeyLockFuture keyLockFut; /** */ @GridToStringExclude @@ -134,7 +132,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim } } - keyLockFut.onKeyLocked(entry.txKey()); + if (keyLockFut != null) + keyLockFut.onKeyLocked(entry.txKey()); return true; } @@ -189,7 +188,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim err.compareAndSet(null, e); - keyLockFut.onDone(e); + if (keyLockFut != null) + keyLockFut.onDone(e); } /** {@inheritDoc} */ @@ -210,7 +210,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim if (err != null) { this.err.compareAndSet(null, err); - keyLockFut.onDone(err); + if (keyLockFut != null) + keyLockFut.onDone(err); } return onComplete(); @@ -335,10 +336,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim for (IgniteTxEntry read : reads) map(read, topVer, mappings, remap, topLocked); - keyLockFut.onAllKeysAdded(); - - if (!remap) - add(keyLockFut); + if (keyLockFut != null) + keyLockFut.onAllKeysAdded(); if (isDone()) { if (log.isDebugEnabled()) @@ -535,8 +534,15 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim entry.cached(cacheCtx.local().entryEx(entry.key(), topVer)); if (!remap && (cacheCtx.isNear() || cacheCtx.isLocal())) { - if (entry.explicitVersion() == null) + if (entry.explicitVersion() == null) { + if (keyLockFut == null) { + keyLockFut = new KeyLockFuture(); + + add(keyLockFut); + } + keyLockFut.addLockKey(entry.txKey()); + } } IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, cacheCtx.isNear()); @@ -854,68 +860,4 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); } } - - /** - * Keys lock future. - */ - private class KeyLockFuture extends GridFutureAdapter<GridNearTxPrepareResponse> { - /** */ - @GridToStringInclude - private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); - - /** */ - private volatile boolean allKeysAdded; - - /** - * @param key Key to track for locking. - */ - private void addLockKey(IgniteTxKey key) { - assert !allKeysAdded; - - lockKeys.add(key); - } - - /** - * @param key Locked keys. - */ - private void onKeyLocked(IgniteTxKey key) { - lockKeys.remove(key); - - checkLocks(); - } - - /** - * Moves future to the ready state. - */ - private void onAllKeysAdded() { - allKeysAdded = true; - - checkLocks(); - } - - /** - * @return {@code True} if all locks are owned. - */ - private boolean checkLocks() { - boolean locked = lockKeys.isEmpty(); - - if (locked && allKeysAdded) { - if (log.isDebugEnabled()) - log.debug("All locks are acquired for near prepare future: " + this); - - onDone((GridNearTxPrepareResponse)null); - } - else { - if (log.isDebugEnabled()) - log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']'); - } - - return locked; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(KeyLockFuture.class, this, super.toString()); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index e70e574..ca1d36c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -40,15 +40,15 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; -import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -65,8 +65,8 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING; */ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter { /** */ - @GridToStringInclude - private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); + @GridToStringExclude + private KeyLockFuture keyLockFut; /** * @param cctx Context. @@ -84,10 +84,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa log.debug("Transaction future received owner changed callback: " + entry); if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) { - lockKeys.remove(entry.txKey()); - - // This will check for locks. - onDone(); + if (keyLockFut != null) + keyLockFut.onKeyLocked(entry.txKey()); return true; } @@ -151,24 +149,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } } - /** - * @return {@code True} if all locks are owned. - */ - private boolean checkLocks() { - boolean locked = lockKeys.isEmpty(); - - if (locked) { - if (log.isDebugEnabled()) - log.debug("All locks are acquired for near prepare future: " + this); - } - else { - if (log.isDebugEnabled()) - log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']'); - } - - return locked; - } - /** {@inheritDoc} */ @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { if (!isDone()) { @@ -215,8 +195,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa /** {@inheritDoc} */ @Override public boolean onDone(IgniteInternalTx t, Throwable err) { - // If locks were not acquired yet, delay completion. - if (isDone() || (err == null && !checkLocks())) + if (isDone()) return false; this.err.compareAndSet(null, err); @@ -320,6 +299,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa return; } + if (keyLockFut != null) + keyLockFut.onAllKeysAdded(); + tx.addSingleEntryMapping(mapping, write); cctx.mvcc().recheckPendingLocks(); @@ -385,6 +367,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa return; } + if (keyLockFut != null) + keyLockFut.onAllKeysAdded(); + tx.addEntryMapping(mappings); cctx.mvcc().recheckPendingLocks(); @@ -543,8 +528,15 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa entry.cached(cacheCtx.local().entryEx(entry.key(), topVer)); if (cacheCtx.isNear() || cacheCtx.isLocal()) { - if (entry.explicitVersion() == null) - lockKeys.add(entry.txKey()); + if (entry.explicitVersion() == null) { + if (keyLockFut == null) { + keyLockFut = new KeyLockFuture(); + + add(keyLockFut); + } + + keyLockFut.addLockKey(entry.txKey()); + } } if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) { @@ -594,10 +586,15 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa ", loc=" + ((MiniFuture)f).node().isLocal() + ", done=" + f.isDone() + "]"; } + }, new P1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { + @Override public boolean apply(IgniteInternalFuture<GridNearTxPrepareResponse> fut) { + return isMini(fut); + } }); return S.toString(GridNearOptimisticTxPrepareFuture.class, this, "innerFuts", futs, + "keyLockFut", keyLockFut, "tx", tx, "super", super.toString()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index 6b7244a..5c7553f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -17,14 +17,20 @@ package org.apache.ignite.internal.processors.cache.distributed.near; +import java.util.Collection; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; /** @@ -157,4 +163,68 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT * @param topLocked {@code True} if thread already acquired lock preventing topology change. */ protected abstract void prepare0(boolean remap, boolean topLocked); + + /** + * Keys lock future. + */ + protected static class KeyLockFuture extends GridFutureAdapter<GridNearTxPrepareResponse> { + /** */ + @GridToStringInclude + private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); + + /** */ + private volatile boolean allKeysAdded; + + /** + * @param key Key to track for locking. + */ + protected void addLockKey(IgniteTxKey key) { + assert !allKeysAdded; + + lockKeys.add(key); + } + + /** + * @param key Locked keys. + */ + protected void onKeyLocked(IgniteTxKey key) { + lockKeys.remove(key); + + checkLocks(); + } + + /** + * Moves future to the ready state. + */ + protected void onAllKeysAdded() { + allKeysAdded = true; + + checkLocks(); + } + + /** + * @return {@code True} if all locks are owned. + */ + private boolean checkLocks() { + boolean locked = lockKeys.isEmpty(); + + if (locked && allKeysAdded) { + if (log.isDebugEnabled()) + log.debug("All locks are acquired for near prepare future: " + this); + + onDone((GridNearTxPrepareResponse)null); + } + else { + if (log.isDebugEnabled()) + log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']'); + } + + return locked; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(KeyLockFuture.class, this, super.toString()); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java index 0ceae20..76bfc46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_LOCKED; @@ -434,4 +435,9 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { @Override protected void offHeapPointer(long valPtr) { this.valPtr = valPtr; } + + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return S.toString(GridLocalCacheEntry.class, this, super.toString()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index b3ff3a6..f13cff4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -598,7 +598,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @param entry Cache entry to check. */ private void checkCommitLocks(GridCacheEntryEx entry) { - assert ownsLockUnsafe(entry) : "Lock is not owned for commit in PESSIMISTIC mode [entry=" + entry + + assert ownsLockUnsafe(entry) : "Lock is not owned for commit [entry=" + entry + ", tx=" + this + ']'; } http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index cc296e6..4b2461e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -423,7 +423,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { /** {@inheritDoc} */ @Override public String toString() { - return "Compound future listener: " + GridCompoundFuture.this; + return "Compound future listener []"; } } }