This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 1b3e1009e7b IGNITE-20177 GridCacheCompoundIdentityFuture's child-free descendants initial cleanup (#10883) 1b3e1009e7b is described below commit 1b3e1009e7b0b4c4f1a42bbf320c481a7d6bef1d Author: Anton Vinogradov <a...@apache.org> AuthorDate: Thu Aug 10 16:54:00 2023 +0300 IGNITE-20177 GridCacheCompoundIdentityFuture's child-free descendants initial cleanup (#10883) --- .../distributed/GridCacheTxRecoveryFuture.java | 69 +++---- .../cache/distributed/dht/GridDhtLockFuture.java | 223 +++++++-------------- .../dht/GridDhtTransactionalCacheAdapter.java | 2 - .../distributed/dht/GridDhtTxFinishFuture.java | 63 +++--- .../dht/colocated/GridDhtColocatedCache.java | 1 - .../dht/colocated/GridDhtColocatedLockFuture.java | 161 ++++++--------- .../cache/distributed/near/GridNearLockFuture.java | 114 +++++------ .../distributed/near/GridNearTxFinishFuture.java | 171 +++++++--------- 8 files changed, 301 insertions(+), 503 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java index 6e5906c1951..e75a1de09ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java @@ -33,10 +33,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -93,7 +90,6 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B * @param failedNodeIds IDs of failed nodes started transaction. * @param txNodes Transaction mapping. */ - @SuppressWarnings("ConstantConditions") public GridCacheTxRecoveryFuture(GridCacheSharedContext<?, ?> cctx, IgniteInternalTx tx, Set<UUID> failedNodeIds, @@ -143,14 +139,12 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B if (cctx.localNodeId().equals(nearNodeId)) { IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion()); - fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> fut) { - try { - onDone(fut.get()); - } - catch (IgniteCheckedException e) { - onDone(e); - } + fut.listen((IgniteInternalFuture<Boolean> fut0) -> { + try { + onDone(fut.get()); + } + catch (IgniteCheckedException e) { + onDone(e); } }); } @@ -206,7 +200,7 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B boolean prepared; try { - prepared = fut == null ? true : fut.get(); + prepared = fut == null || fut.get(); } catch (IgniteCheckedException e) { U.error(log, "Check prepared transaction future failed: " + e, e); @@ -223,27 +217,25 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B } } else { - fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> fut) { - boolean prepared; + fut.listen((IgniteInternalFuture<Boolean> fut0) -> { + boolean prepared; - try { - prepared = fut.get(); - } - catch (IgniteCheckedException e) { - U.error(log, "Check prepared transaction future failed: " + e, e); + try { + prepared = fut.get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Check prepared transaction future failed: " + e, e); - prepared = false; - } + prepared = false; + } - if (!prepared) { - onDone(false); + if (!prepared) { + onDone(false); - markInitialized(); - } - else - proceedPrepare(); + markInitialized(); } + else + proceedPrepare(); }); return; @@ -464,13 +456,8 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B if (isMini(fut)) { final MiniFuture f = (MiniFuture)fut; - if (f.nodeId().equals(nodeId)) { - cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { - @Override public void run() { - f.onNodeLeft(nodeId); - } - }); - } + if (f.nodeId().equals(nodeId)) + cctx.kernalContext().closure().runLocalSafe(() -> f.onNodeLeft(nodeId)); } } @@ -529,12 +516,8 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B /** {@inheritDoc} */ @Override public String toString() { - Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { - @Override public String apply(IgniteInternalFuture<?> f) { - return "[node=" + ((MiniFuture)f).nodeId + - ", done=" + f.isDone() + "]"; - } - }); + Collection<String> futs = F.viewReadOnly(futures(), + (IgniteInternalFuture<?> f) -> "[node=" + ((MiniFuture)f).nodeId + ", done=" + f.isDone() + "]"); return S.toString(GridCacheTxRecoveryFuture.class, this, "innerFuts", futs, @@ -549,7 +532,7 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B private final IgniteUuid futId = IgniteUuid.randomUuid(); /** Node ID. */ - private UUID nodeId; + private final UUID nodeId; /** * @param nodeId Node ID. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index f7f76222a07..2859dc3e849 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -36,7 +36,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheLockCandidates; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; @@ -45,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture; @@ -60,7 +60,6 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware; import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; @@ -70,12 +69,10 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.NotNull; @@ -109,46 +106,45 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo /** Cache registry. */ @GridToStringExclude - private GridCacheContext<?, ?> cctx; + private final GridCacheContext<?, ?> cctx; /** Near node ID. */ - private UUID nearNodeId; + private final UUID nearNodeId; /** Near lock version. */ - private GridCacheVersion nearLockVer; + private final GridCacheVersion nearLockVer; /** Topology version. */ - private AffinityTopologyVersion topVer; + private final AffinityTopologyVersion topVer; /** Thread. */ - private long threadId; + private final long threadId; /** * Keys locked so far. - * + * <p> * Thread created this object iterates over entries and tries to lock each of them. * If it finds some entry already locked by another thread it registers callback which will be executed * by the thread owning the lock. - * + * <p> * Thus access to this collection must be synchronized except cases * when this object is yet local to the thread created it. */ - @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) @GridToStringExclude - private List<GridDhtCacheEntry> entries; + private final List<GridDhtCacheEntry> entries; /** DHT mappings. */ - private Map<ClusterNode, List<GridDhtCacheEntry>> dhtMap = + private final Map<ClusterNode, List<GridDhtCacheEntry>> dhtMap = new ConcurrentHashMap<>(); /** Future ID. */ - private IgniteUuid futId; + private final IgniteUuid futId; /** Lock version. */ private GridCacheVersion lockVer; /** Read flag. */ - private boolean read; + private final boolean read; /** Error. */ private Throwable err; @@ -163,11 +159,8 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo /** Lock timeout. */ private final long timeout; - /** Filter. */ - private CacheEntryPredicate[] filter; - /** Transaction. */ - private GridDhtTxLocalAdapter tx; + private final GridDhtTxLocalAdapter tx; /** All replies flag. */ private boolean mapped; @@ -182,13 +175,13 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo private final Collection<KeyCacheObject> pendingLocks; /** TTL for create operation. */ - private long createTtl; + private final long createTtl; /** TTL for read operation. */ - private long accessTtl; + private final long accessTtl; /** Need return value flag. */ - private boolean needReturnVal; + private final boolean needReturnVal; /** Skip store flag. */ private final boolean skipStore; @@ -208,7 +201,6 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo * @param tx Transaction. * @param threadId Thread ID. * @param accessTtl TTL for read operation. - * @param filter Filter. * @param skipStore Skip store flag. */ public GridDhtLockFuture( @@ -224,7 +216,6 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo long threadId, long createTtl, long accessTtl, - CacheEntryPredicate[] filter, boolean skipStore, boolean keepBinary) { super(CU.boolReducer()); @@ -232,7 +223,7 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo assert nearNodeId != null; assert nearLockVer != null; assert topVer.topologyVersion() > 0; - assert (tx != null && timeout >= 0) || tx == null; + assert tx == null || timeout >= 0; this.cctx = cctx; this.nearNodeId = nearNodeId; @@ -241,7 +232,6 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo this.read = read; this.needReturnVal = needReturnVal; this.timeout = timeout; - this.filter = filter; this.tx = tx; this.createTtl = createTtl; this.accessTtl = accessTtl; @@ -276,7 +266,7 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo if (tx != null) { while (true) { - IgniteInternalFuture fut = tx.lockFut; + IgniteInternalFuture<?> fut = tx.lockFut; if (fut != null) { if (fut == GridDhtTxLocalAdapter.ROLLBACK_FUT) @@ -286,14 +276,12 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo assert fut instanceof GridDhtColocatedLockFuture : fut; // Terminate this future if parent(collocated) future is terminated by rollback. - fut.listen(new IgniteInClosure<IgniteInternalFuture>() { - @Override public void apply(IgniteInternalFuture fut) { - try { - fut.get(); - } - catch (IgniteCheckedException e) { - onError(e); - } + fut.listen((IgniteInternalFuture<?> fut0) -> { + try { + fut.get(); + } + catch (IgniteCheckedException e) { + onError(e); } }); } @@ -309,7 +297,7 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo /** {@inheritDoc} */ @Override public Collection<Integer> invalidPartitions() { - return invalidParts == null ? Collections.<Integer>emptyList() : invalidParts; + return invalidParts == null ? Collections.emptyList() : invalidParts; } /** @@ -447,7 +435,7 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo if (log.isDebugEnabled()) log.debug("Failed to acquire lock with negative timeout: " + entry); - onFailed(false); + onFailed(); return null; } @@ -480,11 +468,7 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo if (dist && tx == null) { cctx.dhtTx().removeLocks(nearNodeId, lockVer, F.viewReadOnly(entriesCp, - new C1<GridDhtCacheEntry, KeyCacheObject>() { - @Override public KeyCacheObject apply(GridDhtCacheEntry e) { - return e.key(); - } - }), false); + (C1<GridDhtCacheEntry, KeyCacheObject>)GridCacheMapEntry::key), false); } else { if (tx != null) { @@ -523,10 +507,9 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo /** * - * @param dist {@code True} if need to distribute lock release. */ - private void onFailed(boolean dist) { - undoLocks(dist); + private void onFailed() { + undoLocks(false); onComplete(false, false, true); } @@ -629,7 +612,7 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo if (timeout < 0) { if (owners == null || !owners.hasCandidate(lockVer)) { // We did not send any requests yet. - onFailed(false); + onFailed(); return; } @@ -669,30 +652,6 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo onComplete(false, false, true); } - /** - * @param cached Entry to check. - * @return {@code True} if filter passed. - */ - private boolean filter(GridCacheEntryEx cached) { - try { - if (!cctx.isAll(cached, filter)) { - if (log.isDebugEnabled()) - log.debug("Filter didn't pass for entry (will fail lock): " + cached); - - onFailed(true); - - return false; - } - - return true; - } - catch (IgniteCheckedException e) { - onError(e); - - return false; - } - } - /** * Callback for whenever entry lock ownership changes. * @@ -890,7 +849,7 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo // Possible in case of lock cancellation. if (cand == null) { - onFailed(false); + onFailed(); // Will mark initialized in finally block. return; @@ -969,7 +928,7 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo for (ListIterator<GridDhtCacheEntry> it = dhtMapping.listIterator(); it.hasNext(); ) { GridDhtCacheEntry e = it.next(); - boolean needVal = false; + boolean needVal; try { // Must unswap entry so that isNewLocked returns correct value. @@ -1069,12 +1028,10 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo /** {@inheritDoc} */ @Override public String toString() { - Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { - @Override public String apply(IgniteInternalFuture<?> f) { - MiniFuture m = (MiniFuture)f; + Collection<String> futs = F.viewReadOnly(futures(), (IgniteInternalFuture<?> f) -> { + MiniFuture m = (MiniFuture)f; - return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]"; - } + return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]"; }); Collection<KeyCacheObject> locks; @@ -1120,46 +1077,44 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo cctx.store().loadAll( null, loadMap.keySet(), - new CI2<KeyCacheObject, Object>() { - @Override public void apply(KeyCacheObject key, Object val) { - // No value loaded from store. - if (val == null) - return; - - GridDhtCacheEntry entry0 = loadMap.get(key); + (KeyCacheObject key, Object val) -> { + // No value loaded from store. + if (val == null) + return; - try { - CacheObject val0 = cctx.toCacheObject(val); + GridDhtCacheEntry entry0 = loadMap.get(key); - long ttl = createTtl; - long expireTime; + try { + CacheObject val0 = cctx.toCacheObject(val); - if (ttl == CU.TTL_ZERO) - expireTime = CU.expireTimeInPast(); - else { - if (ttl == CU.TTL_NOT_CHANGED) - ttl = CU.TTL_ETERNAL; + long ttl = createTtl; + long expireTime; - expireTime = CU.toExpireTime(ttl); - } + if (ttl == CU.TTL_ZERO) + expireTime = CU.expireTimeInPast(); + else { + if (ttl == CU.TTL_NOT_CHANGED) + ttl = CU.TTL_ETERNAL; - entry0.initialValue(val0, - ver, - ttl, - expireTime, - false, - topVer, - GridDrType.DR_LOAD, - true, - false); - } - catch (GridCacheEntryRemovedException e) { - assert false : "Should not get removed exception while holding lock on entry " + - "[entry=" + entry0 + ", e=" + e + ']'; - } - catch (IgniteCheckedException e) { - onDone(e); + expireTime = CU.toExpireTime(ttl); } + + entry0.initialValue(val0, + ver, + ttl, + expireTime, + false, + topVer, + GridDrType.DR_LOAD, + true, + false); + } + catch (GridCacheEntryRemovedException e) { + assert false : "Should not get removed exception while holding lock on entry " + + "[entry=" + entry0 + ", e=" + e + ']'; + } + catch (IgniteCheckedException e) { + onDone(e); } }); } @@ -1283,11 +1238,11 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo /** Node. */ @GridToStringExclude - private ClusterNode node; + private final ClusterNode node; /** DHT mapping. */ @GridToStringInclude - private List<GridDhtCacheEntry> dhtMapping; + private final List<GridDhtCacheEntry> dhtMapping; /** * @param node Node. @@ -1384,7 +1339,7 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo cache0 = ((GridNearCacheAdapter)cache0).dht(); synchronized (GridDhtLockFuture.this) { // Prevents entry re-creation on concurrent rollback. - if (GridDhtLockFuture.this.checkDone()) + if (checkDone()) return; for (GridCacheEntryInfo info : res.preloadEntries()) { @@ -1434,44 +1389,6 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo } } - /** - * @param cacheCtx Context. - * @param keys Keys to evict readers for. - * @param nodeId Node ID. - * @param msgId Message ID. - * @param entries Entries to check. - */ - private void evictReaders(GridCacheContext<?, ?> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId, - @Nullable List<GridDhtCacheEntry> entries) { - if (entries == null || keys == null || entries.isEmpty() || keys.isEmpty()) - return; - - for (ListIterator<GridDhtCacheEntry> it = entries.listIterator(); it.hasNext(); ) { - GridDhtCacheEntry cached = it.next(); - - if (keys.contains(cached.txKey())) { - while (true) { - try { - cached.removeReader(nodeId, msgId); - - if (tx != null) - tx.removeNearMapping(nodeId, cached); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - GridDhtCacheEntry e = cacheCtx.dht().peekExx(cached.key()); - - if (e == null) - break; - - it.set(e); - } - } - } - } - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(MiniFuture.class, this, "nodeId", node.id(), "super", super.toString()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 2fb2a0f4d1a..25d16805b67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -934,7 +934,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach tx.threadId(), createTtl, accessTtl, - filter, skipStore, keepBinary); @@ -1125,7 +1124,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.threadId(), req.createTtl(), req.accessTtl(), - filter, req.skipStore(), req.keepBinary()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index a78653d9b7b..e53eacc2b74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -32,7 +32,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; -import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.mvcc.MvccFuture; @@ -45,7 +44,6 @@ import org.apache.ignite.internal.processors.tracing.Span; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; @@ -63,7 +61,7 @@ import static org.apache.ignite.transactions.TransactionState.COMMITTING; * */ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentityFuture<IgniteInternalTx> - implements GridCacheFuture<IgniteInternalTx>, IgniteDiagnosticAware { + implements IgniteDiagnosticAware { /** */ private static final long serialVersionUID = 0L; @@ -84,27 +82,27 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity private static IgniteLogger msgLog; /** Context. */ - private GridCacheSharedContext<K, V> cctx; + private final GridCacheSharedContext<K, V> cctx; /** Future ID. */ private final IgniteUuid futId; /** Transaction. */ @GridToStringExclude - private GridDhtTxLocalAdapter tx; + private final GridDhtTxLocalAdapter tx; /** Commit flag. */ - private boolean commit; + private final boolean commit; /** Error. */ @GridToStringExclude private volatile Throwable err; /** DHT mappings. */ - private Map<UUID, GridDistributedTxMapping> dhtMap; + private final Map<UUID, GridDistributedTxMapping> dhtMap; /** Near mappings. */ - private Map<UUID, GridDistributedTxMapping> nearMap; + private final Map<UUID, GridDistributedTxMapping> nearMap; /** * @param cctx Context. @@ -112,7 +110,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity * @param commit Commit flag. */ public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridDhtTxLocalAdapter tx, boolean commit) { - super(F.<IgniteInternalTx>identityReducer(tx)); + super(F.identityReducer(tx)); this.cctx = cctx; this.tx = tx; @@ -316,7 +314,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity * * @param commit Commit flag. */ - @SuppressWarnings({"SimplifiableIfStatement"}) public void finish(boolean commit) { try (MTC.TraceSurroundings ignored = MTC.supportContinual(span = cctx.kernalContext().tracing().create(TX_DHT_FINISH, MTC.span()))) { @@ -409,7 +406,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity catch (IgniteCheckedException e) { // Fail the whole thing. if (e instanceof ClusterTopologyCheckedException) - fut.onNodeLeft((ClusterTopologyCheckedException)e); + fut.onNodeLeft(); else { if (msgLog.isDebugEnabled()) { msgLog.debug("DHT finish fut, failed to send request lock tx [txId=" + tx.nearXidVersion() + @@ -505,8 +502,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity if (isNull(cctx.discovery().getAlive(n.id()))) { log.error("Unable to send message (node left topology): " + n); - fut.onNodeLeft(new ClusterTopologyCheckedException("Node left grid while sending message to: " - + n.id())); + fut.onNodeLeft(); } else { cctx.tm().sendTransactionMessage(n, req, tx, tx.ioPolicy()); @@ -526,7 +522,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity catch (IgniteCheckedException e) { // Fail the whole thing. if (e instanceof ClusterTopologyCheckedException) - fut.onNodeLeft((ClusterTopologyCheckedException)e); + fut.onNodeLeft(); else { if (msgLog.isDebugEnabled()) { msgLog.debug("DHT finish fut, failed to send request dht [txId=" + tx.nearXidVersion() + @@ -596,7 +592,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity catch (IgniteCheckedException e) { // Fail the whole thing. if (e instanceof ClusterTopologyCheckedException) - fut.onNodeLeft((ClusterTopologyCheckedException)e); + fut.onNodeLeft(); else { if (msgLog.isDebugEnabled()) { msgLog.debug("DHT finish fut, failed to send request near [txId=" + tx.nearXidVersion() + @@ -619,7 +615,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity if (!isDone()) { for (IgniteInternalFuture fut : futures()) { if (!fut.isDone()) { - if (MiniFuture.class.isInstance(fut)) { + if (fut instanceof GridDhtTxFinishFuture.MiniFuture) { MiniFuture f = (MiniFuture)fut; if (!f.node().isLocal()) { @@ -656,23 +652,21 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity /** {@inheritDoc} */ @Override public String toString() { - Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { - @Override public String apply(IgniteInternalFuture<?> f) { - if (f.getClass() == MiniFuture.class) { - return "[node=" + ((MiniFuture)f).node().id() + - ", loc=" + ((MiniFuture)f).node().isLocal() + - ", done=" + f.isDone() + "]"; - } - else if (f instanceof MvccFuture) { - MvccFuture crdFut = (MvccFuture)f; + Collection<String> futs = F.viewReadOnly(futures(), (IgniteInternalFuture<?> f) -> { + if (f.getClass() == MiniFuture.class) { + return "[node=" + ((MiniFuture)f).node().id() + + ", loc=" + ((MiniFuture)f).node().isLocal() + + ", done=" + f.isDone() + "]"; + } + else if (f instanceof MvccFuture) { + MvccFuture crdFut = (MvccFuture)f; - return "[mvccCrdNode=" + crdFut.coordinatorNodeId() + - ", loc=" + crdFut.coordinatorNodeId().equals(cctx.localNodeId()) + - ", done=" + f.isDone() + "]"; - } - else - return f.toString(); + return "[mvccCrdNode=" + crdFut.coordinatorNodeId() + + ", loc=" + crdFut.coordinatorNodeId().equals(cctx.localNodeId()) + + ", done=" + f.isDone() + "]"; } + else + return f.toString(); }); return S.toString(GridDhtTxFinishFuture.class, this, @@ -748,13 +742,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity onDone(e); } - /** - * @param e Node failure. - */ - void onNodeLeft(ClusterTopologyCheckedException e) { - onNodeLeft(); - } - /** */ void onNodeLeft() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 4da88bef33a..cf301cdf1d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -1111,7 +1111,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte threadId, createTtl, accessTtl, - filter, skipStore, keepBinary); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 37d55334828..cd9f3b9714c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -73,14 +73,11 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.transactions.TransactionIsolation; @@ -125,7 +122,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF /** Keys to lock. */ @GridToStringInclude - private Collection<KeyCacheObject> keys; + private final Collection<KeyCacheObject> keys; /** Future ID. */ private final IgniteUuid futId; @@ -357,8 +354,8 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF lockVer, true, txEntry.locked(), - inTx(), - inTx() && tx.implicitSingle(), + true, + tx.implicitSingle(), false, false, null, @@ -377,8 +374,8 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF lockVer, true, false, - inTx(), - inTx() && tx.implicitSingle(), + false, + false, false, false, null, @@ -419,13 +416,6 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF cctx.mvcc().recheckPendingLocks(); } - /** - * @param success Success flag. - */ - public void complete(boolean success) { - onComplete(success, true); - } - /** * @param nodeId Left node ID * @return {@code True} if node was in the list. @@ -728,21 +718,19 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF /** {@inheritDoc} */ @Override public String toString() { - Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { - @Override public String apply(IgniteInternalFuture<?> f) { - if (isMini(f)) { - MiniFuture m = (MiniFuture)f; - - synchronized (m) { - return "[node=" + m.node().id() + - ", rcvRes=" + m.rcvRes + - ", loc=" + m.node().isLocal() + - ", done=" + f.isDone() + "]"; - } + Collection<String> futs = F.viewReadOnly(futures(), (IgniteInternalFuture<?> f) -> { + if (isMini(f)) { + MiniFuture m = (MiniFuture)f; + + synchronized (m) { + return "[node=" + m.node().id() + + ", rcvRes=" + m.rcvRes + + ", loc=" + m.node().isLocal() + + ", done=" + f.isDone() + "]"; } - else - return "[loc=true, done=" + f.isDone() + "]"; } + else + return "[loc=true, done=" + f.isDone() + "]"; }); return S.toString(GridDhtColocatedLockFuture.class, this, @@ -1213,7 +1201,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF // Fail fast if the transaction is timed out. if (tx != null && tx.remainingTime() == -1) { - GridDhtColocatedLockFuture.this.onDone(false, tx.timeoutException()); + onDone(false, tx.timeoutException()); clear(); @@ -1290,50 +1278,47 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF // Add new future. add(new GridEmbeddedFuture<>( - new C2<Exception, Exception, Boolean>() { - @Override public Boolean apply(Exception resEx, Exception e) { - if (CU.isLockTimeoutOrCancelled(e) || - (resEx != null && CU.isLockTimeoutOrCancelled(resEx))) - return false; + (Exception resEx, Exception e) -> { + if (CU.isLockTimeoutOrCancelled(e) || (CU.isLockTimeoutOrCancelled(resEx))) + return false; - if (e != null) { - onError(e); + if (e != null) { + onError(e); - return false; - } - - if (resEx != null) { - onError(resEx); + return false; + } - return false; - } + if (resEx != null) { + onError(resEx); - if (log.isDebugEnabled()) - log.debug("Acquired lock for local DHT mapping [locId=" + cctx.nodeId() + - ", mappedKeys=" + keys + ", fut=" + GridDhtColocatedLockFuture.this + ']'); + return false; + } - if (inTx()) { - for (KeyCacheObject key : keys) - tx.entry(cctx.txKey(key)).markLocked(); - } - else { - for (KeyCacheObject key : keys) - cctx.mvcc().markExplicitOwner(cctx.txKey(key), threadId); - } + if (log.isDebugEnabled()) + log.debug("Acquired lock for local DHT mapping [locId=" + cctx.nodeId() + + ", mappedKeys=" + keys + ", fut=" + this + ']'); - try { - // Proceed and add new future (if any) before completing embedded future. - if (mappings != null) - proceedMapping(); - } - catch (IgniteCheckedException ex) { - onError(ex); + if (inTx()) { + for (KeyCacheObject key : keys) + tx.entry(cctx.txKey(key)).markLocked(); + } + else { + for (KeyCacheObject key : keys) + cctx.mvcc().markExplicitOwner(cctx.txKey(key), threadId); + } - return false; - } + try { + // Proceed and add new future (if any) before completing embedded future. + if (mappings != null) + proceedMapping(); + } + catch (IgniteCheckedException ex) { + onError(ex); - return true; + return false; } + + return true; }, fut)); } @@ -1534,25 +1519,23 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF IgniteInternalFuture<TxDeadlock> fut = cctx.tm().detectDeadlock(tx, keys); - fut.listen(new IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() { - @Override public void apply(IgniteInternalFuture<TxDeadlock> fut) { - try { - TxDeadlock deadlock = fut.get(); + fut.listen((IgniteInternalFuture<TxDeadlock> fut0) -> { + try { + TxDeadlock deadlock = fut.get(); - err = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided " + - "timeout for transaction [timeout=" + tx.timeout() + ", tx=" + CU.txString(tx) + ']', - deadlock != null ? new TransactionDeadlockException(deadlock.toString(cctx.shared())) : - null); - } - catch (IgniteCheckedException e) { - err = e; + err = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided " + + "timeout for transaction [timeout=" + tx.timeout() + ", tx=" + CU.txString(tx) + ']', + deadlock != null ? new TransactionDeadlockException(deadlock.toString(cctx.shared())) : + null); + } + catch (IgniteCheckedException e) { + err = e; - U.warn(log, "Failed to detect deadlock.", e); - } + U.warn(log, "Failed to detect deadlock.", e); + } - synchronized (LockTimeoutObject.this) { - onComplete(false, true); - } + synchronized (this) { + onComplete(false, true); } }); } @@ -1596,9 +1579,6 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF /** */ private boolean rcvRes; - /** Remap topology version for debug purpose. */ - private AffinityTopologyVersion remapTopVer; - /** * @param node Node. * @param keys Keys. @@ -1628,13 +1608,6 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF return node; } - /** - * @return Keys. - */ - public Collection<KeyCacheObject> keys() { - return keys; - } - /** * @param e Node left exception. */ @@ -1673,8 +1646,6 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF return; rcvRes = true; - - remapTopVer = res.clientRemapVersion(); } if (res.error() != null) { @@ -1815,11 +1786,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF for (KeyCacheObject key : GridDhtColocatedLockFuture.this.keys) cctx.mvcc().removeExplicitLock(threadId, cctx.txKey(key), lockVer); - mapOnTopology(true, new Runnable() { - @Override public void run() { - onDone(true); - } - }); + mapOnTopology(true, () -> onDone(true)); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index f6c8916c78b..a75c062427d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -63,15 +63,12 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.C2; -import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.transactions.TransactionIsolation; @@ -104,7 +101,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo /** Lock owner thread. */ @GridToStringInclude - private long threadId; + private final long threadId; /** Keys to lock. */ @GridToStringInclude @@ -117,7 +114,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo private final GridCacheVersion lockVer; /** Read flag. */ - private boolean read; + private final boolean read; /** Flag to return value. */ private final boolean retval; @@ -153,13 +150,13 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo /** Keys locked so far. */ @GridToStringExclude - private List<GridDistributedCacheEntry> entries; + private final List<GridDistributedCacheEntry> entries; /** TTL for create operation. */ - private long createTtl; + private final long createTtl; /** TTL for read operation. */ - private long accessTtl; + private final long accessTtl; /** Skip store flag. */ private final boolean skipStore; @@ -208,7 +205,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo super(CU.boolReducer()); assert keys != null; - assert (tx != null && timeout >= 0) || tx == null; + assert tx == null || timeout >= 0; this.cctx = cctx; this.keys = keys; @@ -783,16 +780,14 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo /** {@inheritDoc} */ @Override public String toString() { - Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { - @Override public String apply(IgniteInternalFuture<?> f) { - if (isMini(f)) { - MiniFuture m = (MiniFuture)f; + Collection<String> futs = F.viewReadOnly(futures(), (IgniteInternalFuture<?> f) -> { + if (isMini(f)) { + MiniFuture m = (MiniFuture)f; - return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]"; - } - else - return "[loc=true, done=" + f.isDone() + "]"; + return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]"; } + else + return "[loc=true, done=" + f.isDone() + "]"; }); return S.toString(GridNearLockFuture.class, this, @@ -915,19 +910,17 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo markInitialized(); } else { - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - try { - fut.get(); + fut.listen((IgniteInternalFuture<AffinityTopologyVersion> fut0) -> { + try { + fut.get(); - mapOnTopology(remap); - } - catch (IgniteCheckedException e) { - onDone(e); - } - finally { - cctx.shared().txContextReset(); - } + mapOnTopology(remap); + } + catch (IgniteCheckedException e) { + onDone(e); + } + finally { + cctx.shared().txContextReset(); } }); } @@ -1473,25 +1466,23 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo IgniteInternalFuture<TxDeadlock> fut = cctx.tm().detectDeadlock(tx, keys); - fut.listen(new IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() { - @Override public void apply(IgniteInternalFuture<TxDeadlock> fut) { - try { - TxDeadlock deadlock = fut.get(); + fut.listen((IgniteInternalFuture<TxDeadlock> fut0) -> { + try { + TxDeadlock deadlock = fut.get(); - err = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided " + - "timeout for transaction [timeout=" + tx.timeout() + ", tx=" + CU.txString(tx) + ']', - deadlock != null ? new TransactionDeadlockException(deadlock.toString(cctx.shared())) : - null); - } - catch (IgniteCheckedException e) { - err = e; + err = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided " + + "timeout for transaction [timeout=" + tx.timeout() + ", tx=" + CU.txString(tx) + ']', + deadlock != null ? new TransactionDeadlockException(deadlock.toString(cctx.shared())) : + null); + } + catch (IgniteCheckedException e) { + err = e; - U.warn(log, "Failed to detect deadlock.", e); - } + U.warn(log, "Failed to detect deadlock.", e); + } - synchronized (LockTimeoutObject.this) { - onComplete(false, true); - } + synchronized (LockTimeoutObject.this) { + onComplete(false, true); } }); } @@ -1526,11 +1517,11 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo /** Node ID. */ @GridToStringExclude - private ClusterNode node; + private final ClusterNode node; /** Keys. */ @GridToStringInclude(sensitive = true) - private Collection<KeyCacheObject> keys; + private final Collection<KeyCacheObject> keys; /** */ private boolean rcvRes; @@ -1564,13 +1555,6 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo return node; } - /** - * @return Keys. - */ - public Collection<KeyCacheObject> keys() { - return keys; - } - /** * @param e Node left exception. */ @@ -1651,19 +1635,17 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo if (!affFut.isDone()) { // TODO FIXME https://ggsystems.atlassian.net/browse/GG-23288 - affFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { - try { - fut.get(); + affFut.listen((IgniteInternalFuture<?> fut) -> { + try { + fut.get(); - remap(); - } - catch (IgniteCheckedException e) { - onDone(e); - } - finally { - cctx.shared().txContextReset(); - } + remap(); + } + catch (IgniteCheckedException e) { + onDone(e); + } + finally { + cctx.shared().txContextReset(); } }); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index c5dd5efdb66..79340a08b07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -33,7 +33,6 @@ import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; -import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -49,13 +48,10 @@ import org.apache.ignite.internal.processors.tracing.Span; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionRollbackException; @@ -74,8 +70,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN; /** * */ -public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentityFuture<IgniteInternalTx> - implements GridCacheFuture<IgniteInternalTx>, NearTxFinishFuture { +public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentityFuture<IgniteInternalTx> implements NearTxFinishFuture { /** */ private static final long serialVersionUID = 0L; @@ -93,23 +88,23 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit private static IgniteLogger log; /** Logger. */ - protected static IgniteLogger msgLog; + private static IgniteLogger msgLog; /** Context. */ - private GridCacheSharedContext<K, V> cctx; + private final GridCacheSharedContext<K, V> cctx; /** Future ID. */ private final IgniteUuid futId; /** Transaction. */ @GridToStringInclude - private GridNearTxLocal tx; + private final GridNearTxLocal tx; /** Commit flag. This flag used only for one-phase commit transaction. */ - private boolean commit; + private final boolean commit; /** Node mappings. */ - private IgniteTxMappings mappings; + private final IgniteTxMappings mappings; /** Trackable flag. */ private boolean trackable = true; @@ -123,7 +118,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit * @param commit Commit flag. */ public GridNearTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridNearTxLocal tx, boolean commit) { - super(F.<IgniteInternalTx>identityReducer(tx)); + super(F.identityReducer(tx)); this.cctx = cctx; this.tx = tx; @@ -149,13 +144,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit return commit; } - /** - * @return Cache context. - */ - GridCacheSharedContext<K, V> context() { - return cctx; - } - /** {@inheritDoc} */ @Override public IgniteUuid futureId() { return futId; @@ -281,7 +269,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut; if (f.futureId() == res.miniId()) - f.onDhtFinishResponse(nodeId, false); + f.onDhtFinishResponse(nodeId); } } @@ -306,7 +294,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit * */ void forceFinish() { - super.onDone(tx, null, false); + onDone(tx, null, false); } /** {@inheritDoc} */ @@ -443,16 +431,14 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit } } - curFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { - try { - fut.get(); + curFut.listen((IgniteInternalFuture<?> fut) -> { + try { + fut.get(); - rollbackAsyncSafe(onTimeout); - } - catch (IgniteCheckedException e) { - doFinish(false, false); - } + rollbackAsyncSafe(onTimeout); + } + catch (IgniteCheckedException e) { + doFinish(false, false); } }); } @@ -477,13 +463,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit if (mapping != null) { assert !hasFutures() || isDone() : futures(); - finish(1, mapping, commit, !clearThreadMap); + finish(1, mapping, commit); } } else { assert !hasFutures() || isDone() : futures(); - finish(mappings.mappings(), commit, !clearThreadMap); + finish(mappings.mappings(), commit); } } @@ -539,13 +525,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit ClusterNode backup = cctx.discovery().node(backupId); // Nothing to do if backup has left the grid. - if (backup == null) { - // No-op. + if (backup != null) { + if (backup.isLocal()) + cctx.tm().removeTxReturn(tx.xidVersion()); + else + cctx.tm().sendDeferredAckResponse(backupId, tx.xidVersion()); } - else if (backup.isLocal()) - cctx.tm().removeTxReturn(tx.xidVersion()); - else - cctx.tm().sendDeferredAckResponse(backupId, tx.xidVersion()); } } } @@ -614,11 +599,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(nearXidVer); - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { - mini.onDone(tx); - } - }); + fut.listen((IgniteInternalFuture<?> fut0) -> mini.onDone(tx)); return; } @@ -733,32 +714,30 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit mapping.dhtVersion(xidVer, xidVer); tx.readyNearLocks(mapping, - Collections.<GridCacheVersion>emptyList(), - Collections.<GridCacheVersion>emptyList(), - Collections.<GridCacheVersion>emptyList()); + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList()); } } /** * @param mappings Mappings. * @param commit Commit flag. - * @param useCompletedVer {@code True} if need to add completed version on finish. */ - private void finish(Iterable<GridDistributedTxMapping> mappings, boolean commit, boolean useCompletedVer) { + private void finish(Iterable<GridDistributedTxMapping> mappings, boolean commit) { int miniId = 0; // Create mini futures. for (GridDistributedTxMapping m : mappings) - finish(++miniId, m, commit, useCompletedVer); + finish(++miniId, m, commit); } /** * @param miniId Mini future ID. * @param m Mapping. * @param commit Commit flag. - * @param useCompletedVer {@code True} if need to add completed version on finish. */ - private void finish(int miniId, GridDistributedTxMapping m, boolean commit, boolean useCompletedVer) { + private void finish(int miniId, GridDistributedTxMapping m, boolean commit) { ClusterNode n = m.primary(); assert !m.empty() || m.queryUpdate() : m + " " + tx.state(); @@ -843,47 +822,45 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit /** {@inheritDoc} */ @Override public String toString() { - Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { - @Override public String apply(IgniteInternalFuture<?> f) { - if (f.getClass() == FinishMiniFuture.class) { - FinishMiniFuture fut = (FinishMiniFuture)f; + Collection<String> futs = F.viewReadOnly(futures(), (IgniteInternalFuture<?> f) -> { + if (f.getClass() == FinishMiniFuture.class) { + FinishMiniFuture fut = (FinishMiniFuture)f; - ClusterNode node = fut.primary(); + ClusterNode node = fut.primary(); - if (node != null) { - return "FinishFuture[node=" + node.id() + - ", loc=" + node.isLocal() + - ", done=" + fut.isDone() + ']'; - } - else - return "FinishFuture[node=null, done=" + fut.isDone() + ']'; + if (node != null) { + return "FinishFuture[node=" + node.id() + + ", loc=" + node.isLocal() + + ", done=" + fut.isDone() + ']'; } - else if (f.getClass() == CheckBackupMiniFuture.class) { - CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f; + else + return "FinishFuture[node=null, done=" + fut.isDone() + ']'; + } + else if (f.getClass() == CheckBackupMiniFuture.class) { + CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f; - ClusterNode node = fut.node(); + ClusterNode node = fut.node(); - if (node != null) { - return "CheckBackupFuture[node=" + node.id() + - ", loc=" + node.isLocal() + - ", done=" + f.isDone() + "]"; - } - else - return "CheckBackupFuture[node=null, done=" + f.isDone() + "]"; + if (node != null) { + return "CheckBackupFuture[node=" + node.id() + + ", loc=" + node.isLocal() + + ", done=" + f.isDone() + "]"; } - else if (f.getClass() == CheckRemoteTxMiniFuture.class) { - CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f; + else + return "CheckBackupFuture[node=null, done=" + f.isDone() + "]"; + } + else if (f.getClass() == CheckRemoteTxMiniFuture.class) { + CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f; - return "CheckRemoteTxMiniFuture[nodes=" + fut.nodes() + ", done=" + f.isDone() + "]"; - } - else if (f instanceof MvccFuture) { - MvccFuture fut = (MvccFuture)f; + return "CheckRemoteTxMiniFuture[nodes=" + fut.nodes() + ", done=" + f.isDone() + "]"; + } + else if (f instanceof MvccFuture) { + MvccFuture fut = (MvccFuture)f; - return "WaitPreviousTxsFut[mvccCrd=" + fut.coordinatorNodeId() + ", done=" + f.isDone() + "]"; - } - else - return "[loc=true, done=" + f.isDone() + "]"; + return "WaitPreviousTxsFut[mvccCrd=" + fut.coordinatorNodeId() + ", done=" + f.isDone() + "]"; } + else + return "[loc=true, done=" + f.isDone() + "]"; }); return S.toString(GridNearTxFinishFuture.class, this, @@ -932,7 +909,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit /** * */ - private abstract class MinFuture extends GridFutureAdapter<IgniteInternalTx> { + private abstract static class MinFuture extends GridFutureAdapter<IgniteInternalTx> { /** */ private final int futId; @@ -964,7 +941,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit private class FinishMiniFuture extends MinFuture { /** Keys. */ @GridToStringInclude - private GridDistributedTxMapping m; + private final GridDistributedTxMapping m; /** * @param futId Future ID. @@ -983,13 +960,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit return m.primary(); } - /** - * @return Keys. - */ - public GridDistributedTxMapping mapping() { - return m; - } - /** {@inheritDoc} */ @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) { if (tx.state() == COMMITTING || tx.state() == COMMITTED) { @@ -1037,11 +1007,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit if (backup.isLocal()) { IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(tx.nearXidVersion()); - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { - mini.onDhtFinishResponse(cctx.localNodeId(), true); - } - }); + fut.listen((IgniteInternalFuture<?> fut0) -> mini.onDhtFinishResponse(cctx.localNodeId())); } else { try { @@ -1056,7 +1022,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit } } else - mini.onDhtFinishResponse(backupId, true); + mini.onDhtFinishResponse(backupId); } } } @@ -1100,10 +1066,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit private class CheckBackupMiniFuture extends MinFuture { /** Keys. */ @GridToStringInclude - private GridDistributedTxMapping m; + private final GridDistributedTxMapping m; /** Backup node to check. */ - private ClusterNode backup; + private final ClusterNode backup; /** * @param futId Future ID. @@ -1167,7 +1133,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit */ private class CheckRemoteTxMiniFuture extends MinFuture { /** */ - private Set<UUID> nodes; + private final Set<UUID> nodes; /** * @param futId Future ID. @@ -1195,9 +1161,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit /** * @param nodeId Node ID. - * @param discoThread {@code True} if executed from discovery thread. */ - void onDhtFinishResponse(UUID nodeId, boolean discoThread) { + void onDhtFinishResponse(UUID nodeId) { onResponse(nodeId); }