This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new f4654f98607 IGNITE-20217 GridCacheFutureAdapter and descendants initial cleanup (#10896) f4654f98607 is described below commit f4654f98607648252c5164c0fd1a81eff279ca26 Author: Anton Vinogradov <a...@apache.org> AuthorDate: Thu Aug 17 15:16:23 2023 +0300 IGNITE-20217 GridCacheFutureAdapter and descendants initial cleanup (#10896) --- .../processors/cache/GridCacheFutureAdapter.java | 2 +- .../dht/GridDhtTransactionalCacheAdapter.java | 24 ++-------- .../dht/GridDhtTxAbstractEnlistFuture.java | 48 +++++++------------ .../distributed/dht/GridDhtTxEnlistFuture.java | 14 ++---- .../dht/GridDhtTxQueryAbstractEnlistFuture.java | 3 -- .../dht/GridDhtTxQueryEnlistFuture.java | 4 -- .../dht/GridDhtTxQueryResultsEnlistFuture.java | 10 ++-- .../dht/GridPartitionedSingleGetFuture.java | 22 ++++----- .../atomic/GridDhtAtomicAbstractUpdateFuture.java | 56 ++++------------------ .../atomic/GridNearAtomicAbstractUpdateFuture.java | 18 ++++--- .../atomic/GridNearAtomicSingleUpdateFuture.java | 33 ++----------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 45 +++++------------ .../cache/distributed/near/GridNearLockFuture.java | 2 +- .../distributed/near/GridNearTxEnlistFuture.java | 1 - .../near/GridNearTxQueryEnlistFuture.java | 1 - .../near/GridNearTxQueryResultsEnlistFuture.java | 1 - 16 files changed, 75 insertions(+), 209 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java index 13030f971e1..c3e470e4cdb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java @@ -34,7 +34,7 @@ public abstract class GridCacheFutureAdapter<R> extends GridFutureAdapter<R> imp /** * Default constructor. */ - public GridCacheFutureAdapter() { + protected GridCacheFutureAdapter() { // No-op. } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 25d16805b67..86cf0850114 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -709,7 +709,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach nodeId, req.version(), req.mvccSnapshot(), - req.threadId(), req.futureId(), req.miniId(), tx, @@ -768,14 +767,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (waitForExchangeFuture(nearNode, req)) return; - f = lockAllAsync(ctx, nearNode, req, null); + f = lockAllAsync(ctx, nearNode, req); if (f != null) break; } } else - f = lockAllAsync(ctx, nearNode, req, null); + f = lockAllAsync(ctx, nearNode, req); // Register listener just so we print out errors. // Exclude lock timeout and rollback exceptions since it's not a fatal exception. @@ -839,7 +838,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) { assert nodeId != null; assert res != null; - GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>versionedFuture(res.version(), res.futureId()); + GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().versionedFuture(res.version(), res.futureId()); if (fut == null) { if (txLockMsgLog.isDebugEnabled()) @@ -988,18 +987,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param cacheCtx Cache context. * @param nearNode Near node. * @param req Request. - * @param filter0 Filter. * @return Future. */ public IgniteInternalFuture<GridNearLockResponse> lockAllAsync( final GridCacheContext<?, ?> cacheCtx, final ClusterNode nearNode, - final GridNearLockRequest req, - @Nullable final CacheEntryPredicate[] filter0) { + final GridNearLockRequest req) { final List<KeyCacheObject> keys = req.keys(); - CacheEntryPredicate[] filter = filter0; - // Set message into thread context. GridDhtTxLocal tx = null; @@ -1015,10 +1010,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach final List<GridCacheEntryEx> entries = new ArrayList<>(cnt); - // Unmarshal filter first. - if (filter == null) - filter = req.filter(); - GridDhtLockFuture fut = null; GridDhtPartitionTopology top = null; @@ -1704,10 +1695,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach Map<ClusterNode, List<KeyCacheObject>> map) { if (nodes != null) { for (ClusterNode n : nodes) { - List<KeyCacheObject> keys = map.get(n); - - if (keys == null) - map.put(n, keys = new LinkedList<>()); + List<KeyCacheObject> keys = map.computeIfAbsent(n, k -> new LinkedList<>()); keys.add(entry.key()); } @@ -1958,7 +1946,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach nodeId, req.version(), req.mvccSnapshot(), - req.threadId(), req.futureId(), req.miniId(), tx, @@ -2021,7 +2008,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach nodeId, req.version(), req.mvccSnapshot(), - req.threadId(), req.futureId(), req.miniId(), tx, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java index 073778b721b..432f2f31cd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java @@ -64,12 +64,9 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.NotNull; @@ -111,9 +108,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd @GridToStringExclude protected final IgniteLogger log; - /** Thread. */ - protected final long threadId; - /** Future ID. */ protected final IgniteUuid nearFutId; @@ -130,7 +124,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd protected final MvccSnapshot mvccSnapshot; /** New DHT nodes. */ - protected Set<UUID> newDhtNodes = new HashSet<>(); + protected final Set<UUID> newDhtNodes = new HashSet<>(); /** Near node ID. */ protected final UUID nearNodeId; @@ -196,7 +190,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd * @param nearNodeId Near node ID. * @param nearLockVer Near lock version. * @param mvccSnapshot Mvcc snapshot. - * @param threadId Thread ID. * @param nearFutId Near future id. * @param nearMiniId Near mini future id. * @param tx Transaction. @@ -208,7 +201,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd protected GridDhtTxAbstractEnlistFuture(UUID nearNodeId, GridCacheVersion nearLockVer, MvccSnapshot mvccSnapshot, - long threadId, IgniteUuid nearFutId, int nearMiniId, GridDhtTxLocalAdapter tx, @@ -221,7 +213,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd assert nearNodeId != null; assert nearLockVer != null; - this.threadId = threadId; this.cctx = cctx; this.nearNodeId = nearNodeId; this.nearLockVer = nearLockVer; @@ -232,7 +223,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd this.tx = tx; this.filter = filter; this.keepBinary = keepBinary; - this.deploymentLdrId = U.contextDeploymentClassLoaderId(cctx.kernalContext()); + deploymentLdrId = U.contextDeploymentClassLoaderId(cctx.kernalContext()); lockVer = tx.xidVersion(); @@ -301,11 +292,9 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd // Terminate this future if parent future is terminated by rollback. if (!fut.isDone()) { - fut.listen(new IgniteInClosure<IgniteInternalFuture>() { - @Override public void apply(IgniteInternalFuture fut) { - if (fut.error() != null) - onDone(fut.error()); - } + fut.listen(() -> { + if (fut.error() != null) + onDone(fut.error()); }); } else if (fut.error() != null) @@ -416,7 +405,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd assert !entry.detached(); CacheObject val = op.isDeleteOrLock() || op.isInvoke() - ? null : cctx.toCacheObject(((IgniteBiTuple)cur).getValue()); + ? null : cctx.toCacheObject(((Map.Entry<?, ?>)cur).getValue()); GridInvokeValue invokeVal = null; EntryProcessor entryProc = null; @@ -425,7 +414,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd if (op.isInvoke()) { assert needResult(); - invokeVal = (GridInvokeValue)((IgniteBiTuple)cur).getValue(); + invokeVal = (GridInvokeValue)((Map.Entry<?, ?>)cur).getValue(); entryProc = invokeVal.entryProcessor(); invokeArgs = invokeVal.invokeArgs(); @@ -513,18 +502,16 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd it.beforeDetach(); - updateFut.listen(new CI1<IgniteInternalFuture<GridCacheUpdateTxResult>>() { - @Override public void apply(IgniteInternalFuture<GridCacheUpdateTxResult> fut) { - try { - tx.incrementLockCounter(); + updateFut.listen(() -> { + try { + tx.incrementLockCounter(); - processEntry(entry0, op, fut.get(), val0, backups0); + processEntry(entry0, op, updateFut.get(), val0, backups0); - continueLoop(true); - } - catch (Throwable e) { - onDone(e); - } + continueLoop(true); + } + catch (Throwable e) { + onDone(e); } }); @@ -605,7 +592,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd /** */ private KeyCacheObject toKey(EnlistOperation op, Object cur) { - KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((IgniteBiTuple)cur).getKey()); + KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((Map.Entry<?, ?>)cur).getKey()); if (key.partition() == -1) key.partition(cctx.affinity().partition(key)); @@ -725,7 +712,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd * @param key Key. * @param hist History rows. * @return History entries. - * @throws IgniteCheckedException, if failed. */ private CacheEntryInfoCollection fetchHistoryInfo(KeyCacheObject key, List<MvccLinkAwareSearchRow> hist) { List<GridCacheEntryInfo> res = new ArrayList<>(); @@ -928,7 +914,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd /** * Checks whether new coordinator was initialized after the snapshot is acquired. - * + * <p> * Need to fit invariant that all updates are finished before a new coordinator is initialized. * * @throws ClusterTopologyCheckedException If failed. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java index 5f7a3332dd7..72ad4d85767 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.Collection; import java.util.Iterator; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheInvokeResult; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -41,16 +40,16 @@ import org.jetbrains.annotations.Nullable; */ public final class GridDhtTxEnlistFuture extends GridDhtTxAbstractEnlistFuture<GridCacheReturn> implements UpdateSourceIterator<Object> { /** Enlist operation. */ - private EnlistOperation op; + private final EnlistOperation op; /** Source iterator. */ - private Iterator<Object> it; + private final Iterator<Object> it; /** Future result. */ - private GridCacheReturn res; + private final GridCacheReturn res; /** Need result flag. If {@code True} previous value should be returned as well. */ - private boolean needRes; + private final boolean needRes; /** * Constructor. @@ -58,7 +57,6 @@ public final class GridDhtTxEnlistFuture extends GridDhtTxAbstractEnlistFuture<G * @param nearNodeId Near node ID. * @param nearLockVer Near lock version. * @param mvccSnapshot Mvcc snapshot. - * @param threadId Thread ID. * @param nearFutId Near future id. * @param nearMiniId Near mini future id. * @param tx Transaction. @@ -73,7 +71,6 @@ public final class GridDhtTxEnlistFuture extends GridDhtTxAbstractEnlistFuture<G public GridDhtTxEnlistFuture(UUID nearNodeId, GridCacheVersion nearLockVer, MvccSnapshot mvccSnapshot, - long threadId, IgniteUuid nearFutId, int nearMiniId, GridDhtTxLocalAdapter tx, @@ -87,7 +84,6 @@ public final class GridDhtTxEnlistFuture extends GridDhtTxAbstractEnlistFuture<G super(nearNodeId, nearLockVer, mvccSnapshot, - threadId, nearFutId, nearMiniId, tx, @@ -107,7 +103,7 @@ public final class GridDhtTxEnlistFuture extends GridDhtTxAbstractEnlistFuture<G } /** {@inheritDoc} */ - @Override protected UpdateSourceIterator<?> createIterator() throws IgniteCheckedException { + @Override protected UpdateSourceIterator<?> createIterator() { return this; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java index b9e42291f95..2236818fae0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java @@ -38,7 +38,6 @@ public abstract class GridDhtTxQueryAbstractEnlistFuture extends GridDhtTxAbstra * @param nearNodeId Near node ID. * @param nearLockVer Near lock version. * @param mvccSnapshot Mvcc snapshot. - * @param threadId Thread ID. * @param nearFutId Near future id. * @param nearMiniId Near mini future id. * @param tx Transaction. @@ -48,7 +47,6 @@ public abstract class GridDhtTxQueryAbstractEnlistFuture extends GridDhtTxAbstra protected GridDhtTxQueryAbstractEnlistFuture(UUID nearNodeId, GridCacheVersion nearLockVer, MvccSnapshot mvccSnapshot, - long threadId, IgniteUuid nearFutId, int nearMiniId, GridDhtTxLocalAdapter tx, @@ -57,7 +55,6 @@ public abstract class GridDhtTxQueryAbstractEnlistFuture extends GridDhtTxAbstra super(nearNodeId, nearLockVer, mvccSnapshot, - threadId, nearFutId, nearMiniId, tx, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java index f475efa48f4..bf4cb5db3cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java @@ -61,7 +61,6 @@ public final class GridDhtTxQueryEnlistFuture extends GridDhtTxQueryAbstractEnli * @param nearNodeId Near node ID. * @param nearLockVer Near lock version. * @param mvccSnapshot Mvcc snapshot. - * @param threadId Thread ID. * @param nearFutId Near future id. * @param nearMiniId Near mini future id. * @param tx Transaction. @@ -79,7 +78,6 @@ public final class GridDhtTxQueryEnlistFuture extends GridDhtTxQueryAbstractEnli UUID nearNodeId, GridCacheVersion nearLockVer, MvccSnapshot mvccSnapshot, - long threadId, IgniteUuid nearFutId, int nearMiniId, GridDhtTxLocalAdapter tx, @@ -95,7 +93,6 @@ public final class GridDhtTxQueryEnlistFuture extends GridDhtTxQueryAbstractEnli super(nearNodeId, nearLockVer, mvccSnapshot, - threadId, nearFutId, nearMiniId, tx, @@ -105,7 +102,6 @@ public final class GridDhtTxQueryEnlistFuture extends GridDhtTxQueryAbstractEnli assert timeout >= 0; assert nearNodeId != null; assert nearLockVer != null; - assert threadId == tx.threadId(); this.cacheIds = cacheIds; this.schema = schema; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java index ec0147e3885..2164ab9ca2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.Collection; import java.util.Iterator; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -35,16 +34,15 @@ import org.apache.ignite.lang.IgniteUuid; */ public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxQueryAbstractEnlistFuture implements UpdateSourceIterator<Object> { /** Enlist operation. */ - private EnlistOperation op; + private final EnlistOperation op; /** Source iterator. */ - private Iterator<Object> it; + private final Iterator<Object> it; /** * @param nearNodeId Near node ID. * @param nearLockVer Near lock version. * @param mvccSnapshot Mvcc snapshot. - * @param threadId Thread ID. * @param nearFutId Near future id. * @param nearMiniId Near mini future id. * @param tx Transaction. @@ -56,7 +54,6 @@ public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxQueryAbstr public GridDhtTxQueryResultsEnlistFuture(UUID nearNodeId, GridCacheVersion nearLockVer, MvccSnapshot mvccSnapshot, - long threadId, IgniteUuid nearFutId, int nearMiniId, GridDhtTxLocalAdapter tx, @@ -67,7 +64,6 @@ public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxQueryAbstr super(nearNodeId, nearLockVer, mvccSnapshot, - threadId, nearFutId, nearMiniId, tx, @@ -82,7 +78,7 @@ public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxQueryAbstr } /** {@inheritDoc} */ - @Override protected UpdateSourceIterator<?> createIterator() throws IgniteCheckedException { + @Override protected UpdateSourceIterator<?> createIterator() { return this; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 074ee4fe51c..1fb2636695f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -116,13 +116,13 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec private final String taskName; /** Whether to deserialize binary objects. */ - private boolean deserializeBinary; + private final boolean deserializeBinary; /** Skip values flag. */ - private boolean skipVals; + private final boolean skipVals; /** Expiry policy. */ - private IgniteCacheExpiryPolicy expiryPlc; + private final IgniteCacheExpiryPolicy expiryPlc; /** Flag indicating that get should be done on a locked topology version. */ private final boolean canRemap; @@ -134,7 +134,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec private final boolean keepCacheObjects; /** */ - private boolean recovery; + private final boolean recovery; /** */ @GridToStringInclude @@ -215,7 +215,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec this.recovery = recovery; this.topVer = topVer; this.mvccSnapshot = mvccSnapshot; - this.deploymentLdrId = U.contextDeploymentClassLoaderId(cctx.kernalContext()); + deploymentLdrId = U.contextDeploymentClassLoaderId(cctx.kernalContext()); this.txLbl = txLbl; @@ -703,9 +703,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec if (invalidParts) { addNodeAsInvalid(cctx.node(nodeId)); - if (canRemap) { + if (canRemap) awaitVersionAndRemap(rmtTopVer); - } else map(topVer); @@ -826,7 +825,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " + MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" + - (node0 != null ? U.toShortString(node0) : node0) + ", invalidNodes=" + invalidNodes + ']')); + (node0 != null ? U.toShortString(node0) : null) + ", invalidNodes=" + invalidNodes + ']')); return false; } @@ -890,12 +889,11 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec * @param topVer Topology version. */ private void awaitVersionAndRemap(AffinityTopologyVersion topVer) { - IgniteInternalFuture<AffinityTopologyVersion> awaitTopologyVersionFuture = - cctx.shared().exchange().affinityReadyFuture(topVer); + IgniteInternalFuture<AffinityTopologyVersion> awaitTopVerFut = cctx.shared().exchange().affinityReadyFuture(topVer); - awaitTopologyVersionFuture.listen(() -> { + awaitTopVerFut.listen(() -> { try { - remap(awaitTopologyVersionFuture.get()); + remap(awaitTopVerFut.get()); } catch (IgniteCheckedException e) { onDone(e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index 06d2febdb8a..b5d3da22f32 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -45,11 +44,9 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -87,9 +84,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA @GridToStringExclude protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings; - /** Continuous query closures. */ - private Collection<CI1<Boolean>> cntQryClsrs; - /** Response count. */ private volatile int resCnt; @@ -134,28 +128,11 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA return null; } - /** - * @param clsr Continuous query closure. - * @param sync Synchronous continuous query flag. - */ - public final void addContinuousQueryClosure(CI1<Boolean> clsr, boolean sync) { - assert !isDone() : this; - - if (sync) - clsr.apply(true); - else { - if (cntQryClsrs == null) - cntQryClsrs = new ArrayList<>(10); - - cntQryClsrs.add(clsr); - } - } - /** * @param affAssignment Affinity assignment. * @param entry Entry to map. * @param val Value to write. - * @param entryProcessor Entry processor. + * @param entryProc Entry processor. * @param ttl TTL (optional). * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). @@ -170,7 +147,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA AffinityAssignment affAssignment, GridDhtCacheEntry entry, @Nullable CacheObject val, - EntryProcessor<Object, Object, Object> entryProcessor, + EntryProcessor<Object, Object, Object> entryProc, long ttl, long conflictExpireTime, @Nullable GridCacheVersion conflictVer, @@ -222,7 +199,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA updateReq.addWriteValue(entry.key(), val, - entryProcessor, + entryProc, ttl, conflictExpireTime, conflictVer, @@ -251,7 +228,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA * @param readers Entry readers. * @param entry Entry. * @param val Value. - * @param entryProcessor Entry processor.. + * @param entryProc Entry processor.. * @param ttl TTL for near cache update (optional). * @param expireTime Expire time for near cache update (optional). * @param readRepairRecovery Recovery on Read Repair. @@ -261,7 +238,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA GridDhtCacheEntry.ReaderId[] readers, GridDhtCacheEntry entry, @Nullable CacheObject val, - EntryProcessor<Object, Object, Object> entryProcessor, + EntryProcessor<Object, Object, Object> entryProc, long ttl, long expireTime, boolean readRepairRecovery) { @@ -314,7 +291,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA updateReq.addNearWriteValue(entry.key(), val, - entryProcessor, + entryProc, ttl, expireTime); } @@ -395,7 +372,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA GridNearAtomicUpdateResponse updateRes, GridDhtAtomicCache.UpdateReplyClosure completionCb) { if (F.isEmpty(mappings)) { - updateRes.mapping(Collections.<UUID>emptyList()); + updateRes.mapping(Collections.emptyList()); completionCb.apply(updateReq, updateRes); @@ -470,9 +447,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA req.hasResult(true); } - if (cntQryClsrs != null) - req.replyWithoutDelay(true); - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); if (msgLog.isDebugEnabled()) { @@ -561,13 +535,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA if (super.onDone(res, err)) { cctx.mvcc().removeAtomicFuture(futId); - boolean suc = err == null; - - if (cntQryClsrs != null) { - for (CI1<Boolean> clsr : cntQryClsrs) - clsr.apply(suc); - } - return true; } @@ -588,14 +555,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA @Override public String toString() { synchronized (this) { Map<UUID, String> dhtRes = F.viewReadOnly(mappings, - new IgniteClosure<GridDhtAtomicAbstractUpdateRequest, String>() { - @Override public String apply(GridDhtAtomicAbstractUpdateRequest req) { - return "[res=" + req.hasResponse() + - ", size=" + req.size() + - ", nearSize=" + req.nearSize() + ']'; - } - } - ); + req -> "[res=" + req.hasResponse() + ", size=" + req.size() + ", nearSize=" + req.nearSize() + ']'); return S.toString(GridDhtAtomicAbstractUpdateFuture.class, this, "dhtRes", dhtRes); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index dbf2516e87b..5a38e9f93f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -197,7 +197,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture this.skipStore = skipStore; this.keepBinary = keepBinary; this.recovery = recovery; - this.deploymentLdrId = U.contextDeploymentClassLoaderId(cctx.kernalContext()); + deploymentLdrId = U.contextDeploymentClassLoaderId(cctx.kernalContext()); nearEnabled = CU.isNearEnabled(cctx); @@ -298,13 +298,11 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture final void sendSingleRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) { if (cctx.localNodeId().equals(nodeId)) { cache.updateAllAsyncInternal(cctx.localNode(), req, - new GridDhtAtomicCache.UpdateReplyClosure() { - @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { - if (syncMode != FULL_ASYNC) - onPrimaryResponse(res.nodeId(), res, false); - else if (res.remapTopologyVersion() != null) - ((GridDhtAtomicCache)cctx.cache()).remapToNewPrimary(req); - } + (ignored, res) -> { + if (syncMode != FULL_ASYNC) + onPrimaryResponse(res.nodeId(), res, false); + else if (res.remapTopologyVersion() != null) + ((GridDhtAtomicCache<?, ?>)cctx.cache()).remapToNewPrimary(req); }); } else { @@ -771,7 +769,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture nodeRes.rcvd = true; - rcvdCnt++; } else { if (!hasRes) // Do not finish future until primary response received and mapping is known. @@ -779,9 +776,10 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture mappedNodes.put(nodeId, new NodeResult(true)); - rcvdCnt++; } + rcvdCnt++; + return finished(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index e4a5f7154ef..3cb79a724e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -41,9 +41,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; -import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.lang.GridPlainRunnable; -import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -60,10 +57,10 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPD */ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpdateFuture { /** Keys */ - private Object key; + private final Object key; /** Values. */ - private Object val; + private final Object val; /** */ private PrimaryRequestState reqState; @@ -306,8 +303,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda AffinityTopologyVersion remapTopVer0 = null; if (remapTopVer == null) { - if (err != null && - X.hasCause(err, CachePartialUpdateCheckedException.class) && + if (X.hasCause(err, CachePartialUpdateCheckedException.class) && X.hasCause(err, ClusterTopologyCheckedException.class) && storeFuture() && --remapCnt > 0) { @@ -363,18 +359,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.shared().exchange().affinityReadyFuture(remapTopVer); - if (fut == null) - fut = new GridFinishedFuture<>(remapTopVer); - - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { - cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); + fut.listen(() -> cctx.kernalContext().closure().runLocalSafe(this::mapOnTopology)); } /** @@ -425,15 +410,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda else { assert !topLocked : this; - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); + fut.listen(() -> cctx.kernalContext().closure().runLocalSafe(this::mapOnTopology)); return; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index c81c751c05d..ecf83a47d57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -48,9 +48,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtom import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -69,16 +67,16 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPD */ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFuture { /** Keys */ - private Collection<?> keys; + private final Collection<?> keys; /** Values. */ - private Collection<?> vals; + private final Collection<?> vals; /** Conflict put values. */ - private Collection<GridCacheDrInfo> conflictPutVals; + private final Collection<GridCacheDrInfo> conflictPutVals; /** Conflict remove values. */ - private Collection<GridCacheVersion> conflictRmvVals; + private final Collection<GridCacheVersion> conflictRmvVals; /** Mappings if operations is mapped to more than one node. */ @GridToStringInclude @@ -494,15 +492,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (fut == null) fut = new GridFinishedFuture<>(remapTopVer); - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { - cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); + fut.listen(() -> cctx.kernalContext().closure().runLocalSafe(this::mapOnTopology)); } /** @@ -520,8 +510,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu remapTopVer0 = remapTopVer; } else { - if (err != null && - X.hasCause(err, CachePartialUpdateCheckedException.class) && + if (X.hasCause(err, CachePartialUpdateCheckedException.class) && X.hasCause(err, ClusterTopologyCheckedException.class) && storeFuture() && --remapCnt > 0) { @@ -648,15 +637,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu else { assert !topLocked : this; - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); + fut.listen(() -> cctx.kernalContext().closure().runLocalSafe(this::mapOnTopology)); return; } @@ -710,13 +691,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (locUpdate != null) { cache.updateAllAsyncInternal(cctx.localNode(), locUpdate, - new GridDhtAtomicCache.UpdateReplyClosure() { - @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { - if (syncMode != FULL_ASYNC) - onPrimaryResponse(res.nodeId(), res, false); - else if (res.remapTopologyVersion() != null) - ((GridDhtAtomicCache)cctx.cache()).remapToNewPrimary(req); - } + (req, res) -> { + if (syncMode != FULL_ASYNC) + onPrimaryResponse(res.nodeId(), res, false); + else if (res.remapTopologyVersion() != null) + ((GridDhtAtomicCache<?, ?>)cctx.cache()).remapToNewPrimary(req); }); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 2483e42d9f1..a6fc36c1959 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -1223,7 +1223,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo if (log.isDebugEnabled()) log.debug("Before locally locking near request: " + req); - IgniteInternalFuture<GridNearLockResponse> fut = dht().lockAllAsync(cctx, cctx.localNode(), req, filter); + IgniteInternalFuture<GridNearLockResponse> fut = dht().lockAllAsync(cctx, cctx.localNode(), req); // Add new future. add(new GridEmbeddedFuture<>( diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java index 4df7c947b05..697f194e48d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java @@ -485,7 +485,6 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC GridDhtTxEnlistFuture fut = new GridDhtTxEnlistFuture(nodeId, lockVer, mvccSnapshot, - threadId, futId, batchId, tx, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java index b541ae1121b..8680065cdcb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java @@ -151,7 +151,6 @@ public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFu cctx.localNode().id(), lockVer, mvccSnapshot, - threadId, futId, -(++idx), // The common tx logic expects non-zero mini-future ids (negative local and positive non-local). tx, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java index 9936211fa8b..db9c60873fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java @@ -448,7 +448,6 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE GridDhtTxQueryResultsEnlistFuture fut = new GridDhtTxQueryResultsEnlistFuture(nodeId, lockVer, mvccSnapshot, - threadId, futId, batchId, tx,