This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b3e1009e7b IGNITE-20177 GridCacheCompoundIdentityFuture's child-free 
descendants initial cleanup (#10883)
1b3e1009e7b is described below

commit 1b3e1009e7b0b4c4f1a42bbf320c481a7d6bef1d
Author: Anton Vinogradov <a...@apache.org>
AuthorDate: Thu Aug 10 16:54:00 2023 +0300

    IGNITE-20177 GridCacheCompoundIdentityFuture's child-free descendants 
initial cleanup (#10883)
---
 .../distributed/GridCacheTxRecoveryFuture.java     |  69 +++----
 .../cache/distributed/dht/GridDhtLockFuture.java   | 223 +++++++--------------
 .../dht/GridDhtTransactionalCacheAdapter.java      |   2 -
 .../distributed/dht/GridDhtTxFinishFuture.java     |  63 +++---
 .../dht/colocated/GridDhtColocatedCache.java       |   1 -
 .../dht/colocated/GridDhtColocatedLockFuture.java  | 161 ++++++---------
 .../cache/distributed/near/GridNearLockFuture.java | 114 +++++------
 .../distributed/near/GridNearTxFinishFuture.java   | 171 +++++++---------
 8 files changed, 301 insertions(+), 503 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 6e5906c1951..e75a1de09ee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -33,10 +33,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -93,7 +90,6 @@ public class GridCacheTxRecoveryFuture extends 
GridCacheCompoundIdentityFuture<B
      * @param failedNodeIds IDs of failed nodes started transaction.
      * @param txNodes Transaction mapping.
      */
-    @SuppressWarnings("ConstantConditions")
     public GridCacheTxRecoveryFuture(GridCacheSharedContext<?, ?> cctx,
         IgniteInternalTx tx,
         Set<UUID> failedNodeIds,
@@ -143,14 +139,12 @@ public class GridCacheTxRecoveryFuture extends 
GridCacheCompoundIdentityFuture<B
             if (cctx.localNodeId().equals(nearNodeId)) {
                 IgniteInternalFuture<Boolean> fut = 
cctx.tm().txCommitted(tx.nearXidVersion());
 
-                fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                    @Override public void apply(IgniteInternalFuture<Boolean> 
fut) {
-                        try {
-                            onDone(fut.get());
-                        }
-                        catch (IgniteCheckedException e) {
-                            onDone(e);
-                        }
+                fut.listen((IgniteInternalFuture<Boolean> fut0) -> {
+                    try {
+                        onDone(fut.get());
+                    }
+                    catch (IgniteCheckedException e) {
+                        onDone(e);
                     }
                 });
             }
@@ -206,7 +200,7 @@ public class GridCacheTxRecoveryFuture extends 
GridCacheCompoundIdentityFuture<B
                 boolean prepared;
 
                 try {
-                    prepared = fut == null ? true : fut.get();
+                    prepared = fut == null || fut.get();
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Check prepared transaction future failed: " 
+ e, e);
@@ -223,27 +217,25 @@ public class GridCacheTxRecoveryFuture extends 
GridCacheCompoundIdentityFuture<B
                 }
             }
             else {
-                fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                    @Override public void apply(IgniteInternalFuture<Boolean> 
fut) {
-                        boolean prepared;
+                fut.listen((IgniteInternalFuture<Boolean> fut0) -> {
+                    boolean prepared;
 
-                        try {
-                            prepared = fut.get();
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Check prepared transaction future 
failed: " + e, e);
+                    try {
+                        prepared = fut.get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Check prepared transaction future 
failed: " + e, e);
 
-                            prepared = false;
-                        }
+                        prepared = false;
+                    }
 
-                        if (!prepared) {
-                            onDone(false);
+                    if (!prepared) {
+                        onDone(false);
 
-                            markInitialized();
-                        }
-                        else
-                            proceedPrepare();
+                        markInitialized();
                     }
+                    else
+                        proceedPrepare();
                 });
 
                 return;
@@ -464,13 +456,8 @@ public class GridCacheTxRecoveryFuture extends 
GridCacheCompoundIdentityFuture<B
             if (isMini(fut)) {
                 final MiniFuture f = (MiniFuture)fut;
 
-                if (f.nodeId().equals(nodeId)) {
-                    cctx.kernalContext().closure().runLocalSafe(new 
GridPlainRunnable() {
-                        @Override public void run() {
-                            f.onNodeLeft(nodeId);
-                        }
-                    });
-                }
+                if (f.nodeId().equals(nodeId))
+                    cctx.kernalContext().closure().runLocalSafe(() -> 
f.onNodeLeft(nodeId));
             }
         }
 
@@ -529,12 +516,8 @@ public class GridCacheTxRecoveryFuture extends 
GridCacheCompoundIdentityFuture<B
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        Collection<String> futs = F.viewReadOnly(futures(), new 
C1<IgniteInternalFuture<?>, String>() {
-            @Override public String apply(IgniteInternalFuture<?> f) {
-                return "[node=" + ((MiniFuture)f).nodeId +
-                    ", done=" + f.isDone() + "]";
-            }
-        });
+        Collection<String> futs = F.viewReadOnly(futures(),
+            (IgniteInternalFuture<?> f) -> "[node=" + ((MiniFuture)f).nodeId + 
", done=" + f.isDone() + "]");
 
         return S.toString(GridCacheTxRecoveryFuture.class, this,
             "innerFuts", futs,
@@ -549,7 +532,7 @@ public class GridCacheTxRecoveryFuture extends 
GridCacheCompoundIdentityFuture<B
         private final IgniteUuid futId = IgniteUuid.randomUuid();
 
         /** Node ID. */
-        private UUID nodeId;
+        private final UUID nodeId;
 
         /**
          * @param nodeId Node ID.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index f7f76222a07..2859dc3e849 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -36,7 +36,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheLockCandidates;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
@@ -45,6 +44,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
@@ -60,7 +60,6 @@ import 
org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware;
 import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.dr.GridDrType;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
@@ -70,12 +69,10 @@ import 
org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.NotNull;
@@ -109,46 +106,45 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
 
     /** Cache registry. */
     @GridToStringExclude
-    private GridCacheContext<?, ?> cctx;
+    private final GridCacheContext<?, ?> cctx;
 
     /** Near node ID. */
-    private UUID nearNodeId;
+    private final UUID nearNodeId;
 
     /** Near lock version. */
-    private GridCacheVersion nearLockVer;
+    private final GridCacheVersion nearLockVer;
 
     /** Topology version. */
-    private AffinityTopologyVersion topVer;
+    private final AffinityTopologyVersion topVer;
 
     /** Thread. */
-    private long threadId;
+    private final long threadId;
 
     /**
      * Keys locked so far.
-     *
+     * <p>
      * Thread created this object iterates over entries and tries to lock each 
of them.
      * If it finds some entry already locked by another thread it registers 
callback which will be executed
      * by the thread owning the lock.
-     *
+     * <p>
      * Thus access to this collection must be synchronized except cases
      * when this object is yet local to the thread created it.
      */
-    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     @GridToStringExclude
-    private List<GridDhtCacheEntry> entries;
+    private final List<GridDhtCacheEntry> entries;
 
     /** DHT mappings. */
-    private Map<ClusterNode, List<GridDhtCacheEntry>> dhtMap =
+    private final Map<ClusterNode, List<GridDhtCacheEntry>> dhtMap =
         new ConcurrentHashMap<>();
 
     /** Future ID. */
-    private IgniteUuid futId;
+    private final IgniteUuid futId;
 
     /** Lock version. */
     private GridCacheVersion lockVer;
 
     /** Read flag. */
-    private boolean read;
+    private final boolean read;
 
     /** Error. */
     private Throwable err;
@@ -163,11 +159,8 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
     /** Lock timeout. */
     private final long timeout;
 
-    /** Filter. */
-    private CacheEntryPredicate[] filter;
-
     /** Transaction. */
-    private GridDhtTxLocalAdapter tx;
+    private final GridDhtTxLocalAdapter tx;
 
     /** All replies flag. */
     private boolean mapped;
@@ -182,13 +175,13 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
     private final Collection<KeyCacheObject> pendingLocks;
 
     /** TTL for create operation. */
-    private long createTtl;
+    private final long createTtl;
 
     /** TTL for read operation. */
-    private long accessTtl;
+    private final long accessTtl;
 
     /** Need return value flag. */
-    private boolean needReturnVal;
+    private final boolean needReturnVal;
 
     /** Skip store flag. */
     private final boolean skipStore;
@@ -208,7 +201,6 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
      * @param tx Transaction.
      * @param threadId Thread ID.
      * @param accessTtl TTL for read operation.
-     * @param filter Filter.
      * @param skipStore Skip store flag.
      */
     public GridDhtLockFuture(
@@ -224,7 +216,6 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
         long threadId,
         long createTtl,
         long accessTtl,
-        CacheEntryPredicate[] filter,
         boolean skipStore,
         boolean keepBinary) {
         super(CU.boolReducer());
@@ -232,7 +223,7 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
         assert nearNodeId != null;
         assert nearLockVer != null;
         assert topVer.topologyVersion() > 0;
-        assert (tx != null && timeout >= 0) || tx == null;
+        assert tx == null || timeout >= 0;
 
         this.cctx = cctx;
         this.nearNodeId = nearNodeId;
@@ -241,7 +232,6 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
         this.read = read;
         this.needReturnVal = needReturnVal;
         this.timeout = timeout;
-        this.filter = filter;
         this.tx = tx;
         this.createTtl = createTtl;
         this.accessTtl = accessTtl;
@@ -276,7 +266,7 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
 
         if (tx != null) {
             while (true) {
-                IgniteInternalFuture fut = tx.lockFut;
+                IgniteInternalFuture<?> fut = tx.lockFut;
 
                 if (fut != null) {
                     if (fut == GridDhtTxLocalAdapter.ROLLBACK_FUT)
@@ -286,14 +276,12 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
                         assert fut instanceof GridDhtColocatedLockFuture : fut;
 
                         // Terminate this future if parent(collocated) future 
is terminated by rollback.
-                        fut.listen(new IgniteInClosure<IgniteInternalFuture>() 
{
-                            @Override public void apply(IgniteInternalFuture 
fut) {
-                                try {
-                                    fut.get();
-                                }
-                                catch (IgniteCheckedException e) {
-                                    onError(e);
-                                }
+                        fut.listen((IgniteInternalFuture<?> fut0) -> {
+                            try {
+                                fut.get();
+                            }
+                            catch (IgniteCheckedException e) {
+                                onError(e);
                             }
                         });
                     }
@@ -309,7 +297,7 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
 
     /** {@inheritDoc} */
     @Override public Collection<Integer> invalidPartitions() {
-        return invalidParts == null ? Collections.<Integer>emptyList() : 
invalidParts;
+        return invalidParts == null ? Collections.emptyList() : invalidParts;
     }
 
     /**
@@ -447,7 +435,7 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
             if (log.isDebugEnabled())
                 log.debug("Failed to acquire lock with negative timeout: " + 
entry);
 
-            onFailed(false);
+            onFailed();
 
             return null;
         }
@@ -480,11 +468,7 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
 
         if (dist && tx == null) {
             cctx.dhtTx().removeLocks(nearNodeId, lockVer, 
F.viewReadOnly(entriesCp,
-                new C1<GridDhtCacheEntry, KeyCacheObject>() {
-                    @Override public KeyCacheObject apply(GridDhtCacheEntry e) 
{
-                        return e.key();
-                    }
-                }), false);
+                (C1<GridDhtCacheEntry, 
KeyCacheObject>)GridCacheMapEntry::key), false);
         }
         else {
             if (tx != null) {
@@ -523,10 +507,9 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
 
     /**
      *
-     * @param dist {@code True} if need to distribute lock release.
      */
-    private void onFailed(boolean dist) {
-        undoLocks(dist);
+    private void onFailed() {
+        undoLocks(false);
 
         onComplete(false, false, true);
     }
@@ -629,7 +612,7 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
                     if (timeout < 0) {
                         if (owners == null || !owners.hasCandidate(lockVer)) {
                             // We did not send any requests yet.
-                            onFailed(false);
+                            onFailed();
 
                             return;
                         }
@@ -669,30 +652,6 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
         onComplete(false, false, true);
     }
 
-    /**
-     * @param cached Entry to check.
-     * @return {@code True} if filter passed.
-     */
-    private boolean filter(GridCacheEntryEx cached) {
-        try {
-            if (!cctx.isAll(cached, filter)) {
-                if (log.isDebugEnabled())
-                    log.debug("Filter didn't pass for entry (will fail lock): 
" + cached);
-
-                onFailed(true);
-
-                return false;
-            }
-
-            return true;
-        }
-        catch (IgniteCheckedException e) {
-            onError(e);
-
-            return false;
-        }
-    }
-
     /**
      * Callback for whenever entry lock ownership changes.
      *
@@ -890,7 +849,7 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
 
                             // Possible in case of lock cancellation.
                             if (cand == null) {
-                                onFailed(false);
+                                onFailed();
 
                                 // Will mark initialized in finally block.
                                 return;
@@ -969,7 +928,7 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
                             for (ListIterator<GridDhtCacheEntry> it = 
dhtMapping.listIterator(); it.hasNext(); ) {
                                 GridDhtCacheEntry e = it.next();
 
-                                boolean needVal = false;
+                                boolean needVal;
 
                                 try {
                                     // Must unswap entry so that isNewLocked 
returns correct value.
@@ -1069,12 +1028,10 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        Collection<String> futs = F.viewReadOnly(futures(), new 
C1<IgniteInternalFuture<?>, String>() {
-            @Override public String apply(IgniteInternalFuture<?> f) {
-                MiniFuture m = (MiniFuture)f;
+        Collection<String> futs = F.viewReadOnly(futures(), 
(IgniteInternalFuture<?> f) -> {
+            MiniFuture m = (MiniFuture)f;
 
-                return "[node=" + m.node().id() + ", loc=" + 
m.node().isLocal() + ", done=" + f.isDone() + "]";
-            }
+            return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + 
", done=" + f.isDone() + "]";
         });
 
         Collection<KeyCacheObject> locks;
@@ -1120,46 +1077,44 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
                 cctx.store().loadAll(
                     null,
                     loadMap.keySet(),
-                    new CI2<KeyCacheObject, Object>() {
-                        @Override public void apply(KeyCacheObject key, Object 
val) {
-                            // No value loaded from store.
-                            if (val == null)
-                                return;
-
-                            GridDhtCacheEntry entry0 = loadMap.get(key);
+                    (KeyCacheObject key, Object val) -> {
+                        // No value loaded from store.
+                        if (val == null)
+                            return;
 
-                            try {
-                                CacheObject val0 = cctx.toCacheObject(val);
+                        GridDhtCacheEntry entry0 = loadMap.get(key);
 
-                                long ttl = createTtl;
-                                long expireTime;
+                        try {
+                            CacheObject val0 = cctx.toCacheObject(val);
 
-                                if (ttl == CU.TTL_ZERO)
-                                    expireTime = CU.expireTimeInPast();
-                                else {
-                                    if (ttl == CU.TTL_NOT_CHANGED)
-                                        ttl = CU.TTL_ETERNAL;
+                            long ttl = createTtl;
+                            long expireTime;
 
-                                    expireTime = CU.toExpireTime(ttl);
-                                }
+                            if (ttl == CU.TTL_ZERO)
+                                expireTime = CU.expireTimeInPast();
+                            else {
+                                if (ttl == CU.TTL_NOT_CHANGED)
+                                    ttl = CU.TTL_ETERNAL;
 
-                                entry0.initialValue(val0,
-                                    ver,
-                                    ttl,
-                                    expireTime,
-                                    false,
-                                    topVer,
-                                    GridDrType.DR_LOAD,
-                                    true,
-                                    false);
-                            }
-                            catch (GridCacheEntryRemovedException e) {
-                                assert false : "Should not get removed 
exception while holding lock on entry " +
-                                    "[entry=" + entry0 + ", e=" + e + ']';
-                            }
-                            catch (IgniteCheckedException e) {
-                                onDone(e);
+                                expireTime = CU.toExpireTime(ttl);
                             }
+
+                            entry0.initialValue(val0,
+                                ver,
+                                ttl,
+                                expireTime,
+                                false,
+                                topVer,
+                                GridDrType.DR_LOAD,
+                                true,
+                                false);
+                        }
+                        catch (GridCacheEntryRemovedException e) {
+                            assert false : "Should not get removed exception 
while holding lock on entry " +
+                                "[entry=" + entry0 + ", e=" + e + ']';
+                        }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
                         }
                     });
             }
@@ -1283,11 +1238,11 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
 
         /** Node. */
         @GridToStringExclude
-        private ClusterNode node;
+        private final ClusterNode node;
 
         /** DHT mapping. */
         @GridToStringInclude
-        private List<GridDhtCacheEntry> dhtMapping;
+        private final List<GridDhtCacheEntry> dhtMapping;
 
         /**
          * @param node Node.
@@ -1384,7 +1339,7 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
                     cache0 = ((GridNearCacheAdapter)cache0).dht();
 
                 synchronized (GridDhtLockFuture.this) { // Prevents entry 
re-creation on concurrent rollback.
-                    if (GridDhtLockFuture.this.checkDone())
+                    if (checkDone())
                         return;
 
                     for (GridCacheEntryInfo info : res.preloadEntries()) {
@@ -1434,44 +1389,6 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
             }
         }
 
-        /**
-         * @param cacheCtx Context.
-         * @param keys Keys to evict readers for.
-         * @param nodeId Node ID.
-         * @param msgId Message ID.
-         * @param entries Entries to check.
-         */
-        private void evictReaders(GridCacheContext<?, ?> cacheCtx, 
Collection<IgniteTxKey> keys, UUID nodeId, long msgId,
-            @Nullable List<GridDhtCacheEntry> entries) {
-            if (entries == null || keys == null || entries.isEmpty() || 
keys.isEmpty())
-                return;
-
-            for (ListIterator<GridDhtCacheEntry> it = entries.listIterator(); 
it.hasNext(); ) {
-                GridDhtCacheEntry cached = it.next();
-
-                if (keys.contains(cached.txKey())) {
-                    while (true) {
-                        try {
-                            cached.removeReader(nodeId, msgId);
-
-                            if (tx != null)
-                                tx.removeNearMapping(nodeId, cached);
-
-                            break;
-                        }
-                        catch (GridCacheEntryRemovedException ignore) {
-                            GridDhtCacheEntry e = 
cacheCtx.dht().peekExx(cached.key());
-
-                            if (e == null)
-                                break;
-
-                            it.set(e);
-                        }
-                    }
-                }
-            }
-        }
-
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(MiniFuture.class, this, "nodeId", node.id(), 
"super", super.toString());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 2fb2a0f4d1a..25d16805b67 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -934,7 +934,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
             tx.threadId(),
             createTtl,
             accessTtl,
-            filter,
             skipStore,
             keepBinary);
 
@@ -1125,7 +1124,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
                         req.threadId(),
                         req.createTtl(),
                         req.accessTtl(),
-                        filter,
                         req.skipStore(),
                         req.keepBinary());
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index a78653d9b7b..e53eacc2b74 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import 
org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccFuture;
@@ -45,7 +44,6 @@ import org.apache.ignite.internal.processors.tracing.Span;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -63,7 +61,7 @@ import static 
org.apache.ignite.transactions.TransactionState.COMMITTING;
  *
  */
 public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentityFuture<IgniteInternalTx>
-    implements GridCacheFuture<IgniteInternalTx>, IgniteDiagnosticAware {
+    implements IgniteDiagnosticAware {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -84,27 +82,27 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
     private static IgniteLogger msgLog;
 
     /** Context. */
-    private GridCacheSharedContext<K, V> cctx;
+    private final GridCacheSharedContext<K, V> cctx;
 
     /** Future ID. */
     private final IgniteUuid futId;
 
     /** Transaction. */
     @GridToStringExclude
-    private GridDhtTxLocalAdapter tx;
+    private final GridDhtTxLocalAdapter tx;
 
     /** Commit flag. */
-    private boolean commit;
+    private final boolean commit;
 
     /** Error. */
     @GridToStringExclude
     private volatile Throwable err;
 
     /** DHT mappings. */
-    private Map<UUID, GridDistributedTxMapping> dhtMap;
+    private final Map<UUID, GridDistributedTxMapping> dhtMap;
 
     /** Near mappings. */
-    private Map<UUID, GridDistributedTxMapping> nearMap;
+    private final Map<UUID, GridDistributedTxMapping> nearMap;
 
     /**
      * @param cctx Context.
@@ -112,7 +110,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
      * @param commit Commit flag.
      */
     public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, 
GridDhtTxLocalAdapter tx, boolean commit) {
-        super(F.<IgniteInternalTx>identityReducer(tx));
+        super(F.identityReducer(tx));
 
         this.cctx = cctx;
         this.tx = tx;
@@ -316,7 +314,6 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
      *
      * @param commit Commit flag.
      */
-    @SuppressWarnings({"SimplifiableIfStatement"})
     public void finish(boolean commit) {
         try (MTC.TraceSurroundings ignored =
                  MTC.supportContinual(span = 
cctx.kernalContext().tracing().create(TX_DHT_FINISH, MTC.span()))) {
@@ -409,7 +406,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
             catch (IgniteCheckedException e) {
                 // Fail the whole thing.
                 if (e instanceof ClusterTopologyCheckedException)
-                    fut.onNodeLeft((ClusterTopologyCheckedException)e);
+                    fut.onNodeLeft();
                 else {
                     if (msgLog.isDebugEnabled()) {
                         msgLog.debug("DHT finish fut, failed to send request 
lock tx [txId=" + tx.nearXidVersion() +
@@ -505,8 +502,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
                 if (isNull(cctx.discovery().getAlive(n.id()))) {
                     log.error("Unable to send message (node left topology): " 
+ n);
 
-                    fut.onNodeLeft(new ClusterTopologyCheckedException("Node 
left grid while sending message to: "
-                        + n.id()));
+                    fut.onNodeLeft();
                 }
                 else {
                     cctx.tm().sendTransactionMessage(n, req, tx, 
tx.ioPolicy());
@@ -526,7 +522,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
             catch (IgniteCheckedException e) {
                 // Fail the whole thing.
                 if (e instanceof ClusterTopologyCheckedException)
-                    fut.onNodeLeft((ClusterTopologyCheckedException)e);
+                    fut.onNodeLeft();
                 else {
                     if (msgLog.isDebugEnabled()) {
                         msgLog.debug("DHT finish fut, failed to send request 
dht [txId=" + tx.nearXidVersion() +
@@ -596,7 +592,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
                 catch (IgniteCheckedException e) {
                     // Fail the whole thing.
                     if (e instanceof ClusterTopologyCheckedException)
-                        fut.onNodeLeft((ClusterTopologyCheckedException)e);
+                        fut.onNodeLeft();
                     else {
                         if (msgLog.isDebugEnabled()) {
                             msgLog.debug("DHT finish fut, failed to send 
request near [txId=" + tx.nearXidVersion() +
@@ -619,7 +615,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
         if (!isDone()) {
             for (IgniteInternalFuture fut : futures()) {
                 if (!fut.isDone()) {
-                    if (MiniFuture.class.isInstance(fut)) {
+                    if (fut instanceof GridDhtTxFinishFuture.MiniFuture) {
                         MiniFuture f = (MiniFuture)fut;
 
                         if (!f.node().isLocal()) {
@@ -656,23 +652,21 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        Collection<String> futs = F.viewReadOnly(futures(), new 
C1<IgniteInternalFuture<?>, String>() {
-            @Override public String apply(IgniteInternalFuture<?> f) {
-                if (f.getClass() == MiniFuture.class) {
-                    return "[node=" + ((MiniFuture)f).node().id() +
-                        ", loc=" + ((MiniFuture)f).node().isLocal() +
-                        ", done=" + f.isDone() + "]";
-                }
-                else if (f instanceof MvccFuture) {
-                    MvccFuture crdFut = (MvccFuture)f;
+        Collection<String> futs = F.viewReadOnly(futures(), 
(IgniteInternalFuture<?> f) -> {
+            if (f.getClass() == MiniFuture.class) {
+                return "[node=" + ((MiniFuture)f).node().id() +
+                    ", loc=" + ((MiniFuture)f).node().isLocal() +
+                    ", done=" + f.isDone() + "]";
+            }
+            else if (f instanceof MvccFuture) {
+                MvccFuture crdFut = (MvccFuture)f;
 
-                    return "[mvccCrdNode=" + crdFut.coordinatorNodeId() +
-                        ", loc=" + 
crdFut.coordinatorNodeId().equals(cctx.localNodeId()) +
-                        ", done=" + f.isDone() + "]";
-                }
-                else
-                    return f.toString();
+                return "[mvccCrdNode=" + crdFut.coordinatorNodeId() +
+                    ", loc=" + 
crdFut.coordinatorNodeId().equals(cctx.localNodeId()) +
+                    ", done=" + f.isDone() + "]";
             }
+            else
+                return f.toString();
         });
 
         return S.toString(GridDhtTxFinishFuture.class, this,
@@ -748,13 +742,6 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
             onDone(e);
         }
 
-        /**
-         * @param e Node failure.
-         */
-        void onNodeLeft(ClusterTopologyCheckedException e) {
-            onNodeLeft();
-        }
-
         /**
          */
         void onNodeLeft() {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 4da88bef33a..cf301cdf1d8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -1111,7 +1111,6 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
                 threadId,
                 createTtl,
                 accessTtl,
-                filter,
                 skipStore,
                 keepBinary);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 37d55334828..cd9f3b9714c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -73,14 +73,11 @@ import 
org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.C2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionDeadlockException;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -125,7 +122,7 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
 
     /** Keys to lock. */
     @GridToStringInclude
-    private Collection<KeyCacheObject> keys;
+    private final Collection<KeyCacheObject> keys;
 
     /** Future ID. */
     private final IgniteUuid futId;
@@ -357,8 +354,8 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
                     lockVer,
                     true,
                     txEntry.locked(),
-                    inTx(),
-                    inTx() && tx.implicitSingle(),
+                    true,
+                    tx.implicitSingle(),
                     false,
                     false,
                     null,
@@ -377,8 +374,8 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
                     lockVer,
                     true,
                     false,
-                    inTx(),
-                    inTx() && tx.implicitSingle(),
+                    false,
+                    false,
                     false,
                     false,
                     null,
@@ -419,13 +416,6 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
         cctx.mvcc().recheckPendingLocks();
     }
 
-    /**
-     * @param success Success flag.
-     */
-    public void complete(boolean success) {
-        onComplete(success, true);
-    }
-
     /**
      * @param nodeId Left node ID
      * @return {@code True} if node was in the list.
@@ -728,21 +718,19 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        Collection<String> futs = F.viewReadOnly(futures(), new 
C1<IgniteInternalFuture<?>, String>() {
-            @Override public String apply(IgniteInternalFuture<?> f) {
-                if (isMini(f)) {
-                    MiniFuture m = (MiniFuture)f;
-
-                    synchronized (m) {
-                        return "[node=" + m.node().id() +
-                            ", rcvRes=" + m.rcvRes +
-                            ", loc=" + m.node().isLocal() +
-                            ", done=" + f.isDone() + "]";
-                    }
+        Collection<String> futs = F.viewReadOnly(futures(), 
(IgniteInternalFuture<?> f) -> {
+            if (isMini(f)) {
+                MiniFuture m = (MiniFuture)f;
+
+                synchronized (m) {
+                    return "[node=" + m.node().id() +
+                        ", rcvRes=" + m.rcvRes +
+                        ", loc=" + m.node().isLocal() +
+                        ", done=" + f.isDone() + "]";
                 }
-                else
-                    return "[loc=true, done=" + f.isDone() + "]";
             }
+            else
+                return "[loc=true, done=" + f.isDone() + "]";
         });
 
         return S.toString(GridDhtColocatedLockFuture.class, this,
@@ -1213,7 +1201,7 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
 
         // Fail fast if the transaction is timed out.
         if (tx != null && tx.remainingTime() == -1) {
-            GridDhtColocatedLockFuture.this.onDone(false, 
tx.timeoutException());
+            onDone(false, tx.timeoutException());
 
             clear();
 
@@ -1290,50 +1278,47 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
 
         // Add new future.
         add(new GridEmbeddedFuture<>(
-            new C2<Exception, Exception, Boolean>() {
-                @Override public Boolean apply(Exception resEx, Exception e) {
-                    if (CU.isLockTimeoutOrCancelled(e) ||
-                        (resEx != null && CU.isLockTimeoutOrCancelled(resEx)))
-                        return false;
+            (Exception resEx, Exception e) -> {
+                if (CU.isLockTimeoutOrCancelled(e) || 
(CU.isLockTimeoutOrCancelled(resEx)))
+                    return false;
 
-                    if (e != null) {
-                        onError(e);
+                if (e != null) {
+                    onError(e);
 
-                        return false;
-                    }
-
-                    if (resEx != null) {
-                        onError(resEx);
+                    return false;
+                }
 
-                        return false;
-                    }
+                if (resEx != null) {
+                    onError(resEx);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Acquired lock for local DHT mapping 
[locId=" + cctx.nodeId() +
-                            ", mappedKeys=" + keys + ", fut=" + 
GridDhtColocatedLockFuture.this + ']');
+                    return false;
+                }
 
-                    if (inTx()) {
-                        for (KeyCacheObject key : keys)
-                            tx.entry(cctx.txKey(key)).markLocked();
-                    }
-                    else {
-                        for (KeyCacheObject key : keys)
-                            cctx.mvcc().markExplicitOwner(cctx.txKey(key), 
threadId);
-                    }
+                if (log.isDebugEnabled())
+                    log.debug("Acquired lock for local DHT mapping [locId=" + 
cctx.nodeId() +
+                        ", mappedKeys=" + keys + ", fut=" + this + ']');
 
-                    try {
-                        // Proceed and add new future (if any) before 
completing embedded future.
-                        if (mappings != null)
-                            proceedMapping();
-                    }
-                    catch (IgniteCheckedException ex) {
-                        onError(ex);
+                if (inTx()) {
+                    for (KeyCacheObject key : keys)
+                        tx.entry(cctx.txKey(key)).markLocked();
+                }
+                else {
+                    for (KeyCacheObject key : keys)
+                        cctx.mvcc().markExplicitOwner(cctx.txKey(key), 
threadId);
+                }
 
-                        return false;
-                    }
+                try {
+                    // Proceed and add new future (if any) before completing 
embedded future.
+                    if (mappings != null)
+                        proceedMapping();
+                }
+                catch (IgniteCheckedException ex) {
+                    onError(ex);
 
-                    return true;
+                    return false;
                 }
+
+                return true;
             },
             fut));
     }
@@ -1534,25 +1519,23 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
 
                     IgniteInternalFuture<TxDeadlock> fut = 
cctx.tm().detectDeadlock(tx, keys);
 
-                    fut.listen(new 
IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() {
-                        @Override public void 
apply(IgniteInternalFuture<TxDeadlock> fut) {
-                            try {
-                                TxDeadlock deadlock = fut.get();
+                    fut.listen((IgniteInternalFuture<TxDeadlock> fut0) -> {
+                        try {
+                            TxDeadlock deadlock = fut.get();
 
-                                err = new 
IgniteTxTimeoutCheckedException("Failed to acquire lock within provided " +
-                                    "timeout for transaction [timeout=" + 
tx.timeout() + ", tx=" + CU.txString(tx) + ']',
-                                    deadlock != null ? new 
TransactionDeadlockException(deadlock.toString(cctx.shared())) :
-                                        null);
-                            }
-                            catch (IgniteCheckedException e) {
-                                err = e;
+                            err = new IgniteTxTimeoutCheckedException("Failed 
to acquire lock within provided " +
+                                "timeout for transaction [timeout=" + 
tx.timeout() + ", tx=" + CU.txString(tx) + ']',
+                                deadlock != null ? new 
TransactionDeadlockException(deadlock.toString(cctx.shared())) :
+                                    null);
+                        }
+                        catch (IgniteCheckedException e) {
+                            err = e;
 
-                                U.warn(log, "Failed to detect deadlock.", e);
-                            }
+                            U.warn(log, "Failed to detect deadlock.", e);
+                        }
 
-                            synchronized (LockTimeoutObject.this) {
-                                onComplete(false, true);
-                            }
+                        synchronized (this) {
+                            onComplete(false, true);
                         }
                     });
                 }
@@ -1596,9 +1579,6 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
         /** */
         private boolean rcvRes;
 
-        /** Remap topology version for debug purpose. */
-        private AffinityTopologyVersion remapTopVer;
-
         /**
          * @param node Node.
          * @param keys Keys.
@@ -1628,13 +1608,6 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
             return node;
         }
 
-        /**
-         * @return Keys.
-         */
-        public Collection<KeyCacheObject> keys() {
-            return keys;
-        }
-
         /**
          * @param e Node left exception.
          */
@@ -1673,8 +1646,6 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
                     return;
 
                 rcvRes = true;
-
-                remapTopVer = res.clientRemapVersion();
             }
 
             if (res.error() != null) {
@@ -1815,11 +1786,7 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
             for (KeyCacheObject key : GridDhtColocatedLockFuture.this.keys)
                 cctx.mvcc().removeExplicitLock(threadId, cctx.txKey(key), 
lockVer);
 
-            mapOnTopology(true, new Runnable() {
-                @Override public void run() {
-                    onDone(true);
-                }
-            });
+            mapOnTopology(true, () -> onDone(true));
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index f6c8916c78b..a75c062427d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -63,15 +63,12 @@ import 
org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.C2;
-import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionDeadlockException;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -104,7 +101,7 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
 
     /** Lock owner thread. */
     @GridToStringInclude
-    private long threadId;
+    private final long threadId;
 
     /** Keys to lock. */
     @GridToStringInclude
@@ -117,7 +114,7 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
     private final GridCacheVersion lockVer;
 
     /** Read flag. */
-    private boolean read;
+    private final boolean read;
 
     /** Flag to return value. */
     private final boolean retval;
@@ -153,13 +150,13 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
 
     /** Keys locked so far. */
     @GridToStringExclude
-    private List<GridDistributedCacheEntry> entries;
+    private final List<GridDistributedCacheEntry> entries;
 
     /** TTL for create operation. */
-    private long createTtl;
+    private final long createTtl;
 
     /** TTL for read operation. */
-    private long accessTtl;
+    private final long accessTtl;
 
     /** Skip store flag. */
     private final boolean skipStore;
@@ -208,7 +205,7 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
         super(CU.boolReducer());
 
         assert keys != null;
-        assert (tx != null && timeout >= 0) || tx == null;
+        assert tx == null || timeout >= 0;
 
         this.cctx = cctx;
         this.keys = keys;
@@ -783,16 +780,14 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        Collection<String> futs = F.viewReadOnly(futures(), new 
C1<IgniteInternalFuture<?>, String>() {
-            @Override public String apply(IgniteInternalFuture<?> f) {
-                if (isMini(f)) {
-                    MiniFuture m = (MiniFuture)f;
+        Collection<String> futs = F.viewReadOnly(futures(), 
(IgniteInternalFuture<?> f) -> {
+            if (isMini(f)) {
+                MiniFuture m = (MiniFuture)f;
 
-                    return "[node=" + m.node().id() + ", loc=" + 
m.node().isLocal() + ", done=" + f.isDone() + "]";
-                }
-                else
-                    return "[loc=true, done=" + f.isDone() + "]";
+                return "[node=" + m.node().id() + ", loc=" + 
m.node().isLocal() + ", done=" + f.isDone() + "]";
             }
+            else
+                return "[loc=true, done=" + f.isDone() + "]";
         });
 
         return S.toString(GridNearLockFuture.class, this,
@@ -915,19 +910,17 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
                 markInitialized();
             }
             else {
-                fut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                    @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                        try {
-                            fut.get();
+                fut.listen((IgniteInternalFuture<AffinityTopologyVersion> 
fut0) -> {
+                    try {
+                        fut.get();
 
-                            mapOnTopology(remap);
-                        }
-                        catch (IgniteCheckedException e) {
-                            onDone(e);
-                        }
-                        finally {
-                            cctx.shared().txContextReset();
-                        }
+                        mapOnTopology(remap);
+                    }
+                    catch (IgniteCheckedException e) {
+                        onDone(e);
+                    }
+                    finally {
+                        cctx.shared().txContextReset();
                     }
                 });
             }
@@ -1473,25 +1466,23 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
 
                     IgniteInternalFuture<TxDeadlock> fut = 
cctx.tm().detectDeadlock(tx, keys);
 
-                    fut.listen(new 
IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() {
-                        @Override public void 
apply(IgniteInternalFuture<TxDeadlock> fut) {
-                            try {
-                                TxDeadlock deadlock = fut.get();
+                    fut.listen((IgniteInternalFuture<TxDeadlock> fut0) -> {
+                        try {
+                            TxDeadlock deadlock = fut.get();
 
-                                err = new 
IgniteTxTimeoutCheckedException("Failed to acquire lock within provided " +
-                                    "timeout for transaction [timeout=" + 
tx.timeout() + ", tx=" + CU.txString(tx) + ']',
-                                    deadlock != null ? new 
TransactionDeadlockException(deadlock.toString(cctx.shared())) :
-                                        null);
-                            }
-                            catch (IgniteCheckedException e) {
-                                err = e;
+                            err = new IgniteTxTimeoutCheckedException("Failed 
to acquire lock within provided " +
+                                "timeout for transaction [timeout=" + 
tx.timeout() + ", tx=" + CU.txString(tx) + ']',
+                                deadlock != null ? new 
TransactionDeadlockException(deadlock.toString(cctx.shared())) :
+                                    null);
+                        }
+                        catch (IgniteCheckedException e) {
+                            err = e;
 
-                                U.warn(log, "Failed to detect deadlock.", e);
-                            }
+                            U.warn(log, "Failed to detect deadlock.", e);
+                        }
 
-                            synchronized (LockTimeoutObject.this) {
-                                onComplete(false, true);
-                            }
+                        synchronized (LockTimeoutObject.this) {
+                            onComplete(false, true);
                         }
                     });
                 }
@@ -1526,11 +1517,11 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
 
         /** Node ID. */
         @GridToStringExclude
-        private ClusterNode node;
+        private final ClusterNode node;
 
         /** Keys. */
         @GridToStringInclude(sensitive = true)
-        private Collection<KeyCacheObject> keys;
+        private final Collection<KeyCacheObject> keys;
 
         /** */
         private boolean rcvRes;
@@ -1564,13 +1555,6 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
             return node;
         }
 
-        /**
-         * @return Keys.
-         */
-        public Collection<KeyCacheObject> keys() {
-            return keys;
-        }
-
         /**
          * @param e Node left exception.
          */
@@ -1651,19 +1635,17 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
 
                     if (!affFut.isDone()) {
                         // TODO FIXME 
https://ggsystems.atlassian.net/browse/GG-23288
-                        affFut.listen(new CI1<IgniteInternalFuture<?>>() {
-                            @Override public void 
apply(IgniteInternalFuture<?> fut) {
-                                try {
-                                    fut.get();
+                        affFut.listen((IgniteInternalFuture<?> fut) -> {
+                            try {
+                                fut.get();
 
-                                    remap();
-                                }
-                                catch (IgniteCheckedException e) {
-                                    onDone(e);
-                                }
-                                finally {
-                                    cctx.shared().txContextReset();
-                                }
+                                remap();
+                            }
+                            catch (IgniteCheckedException e) {
+                                onDone(e);
+                            }
+                            finally {
+                                cctx.shared().txContextReset();
                             }
                         });
                     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index c5dd5efdb66..79340a08b07 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -33,7 +33,6 @@ import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import 
org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import 
org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -49,13 +48,10 @@ import org.apache.ignite.internal.processors.tracing.Span;
 import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionRollbackException;
 
@@ -74,8 +70,7 @@ import static 
org.apache.ignite.transactions.TransactionState.UNKNOWN;
 /**
  *
  */
-public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentityFuture<IgniteInternalTx>
-    implements GridCacheFuture<IgniteInternalTx>, NearTxFinishFuture {
+public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentityFuture<IgniteInternalTx> implements NearTxFinishFuture 
{
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -93,23 +88,23 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
     private static IgniteLogger log;
 
     /** Logger. */
-    protected static IgniteLogger msgLog;
+    private static IgniteLogger msgLog;
 
     /** Context. */
-    private GridCacheSharedContext<K, V> cctx;
+    private final GridCacheSharedContext<K, V> cctx;
 
     /** Future ID. */
     private final IgniteUuid futId;
 
     /** Transaction. */
     @GridToStringInclude
-    private GridNearTxLocal tx;
+    private final GridNearTxLocal tx;
 
     /** Commit flag. This flag used only for one-phase commit transaction. */
-    private boolean commit;
+    private final boolean commit;
 
     /** Node mappings. */
-    private IgniteTxMappings mappings;
+    private final IgniteTxMappings mappings;
 
     /** Trackable flag. */
     private boolean trackable = true;
@@ -123,7 +118,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
      * @param commit Commit flag.
      */
     public GridNearTxFinishFuture(GridCacheSharedContext<K, V> cctx, 
GridNearTxLocal tx, boolean commit) {
-        super(F.<IgniteInternalTx>identityReducer(tx));
+        super(F.identityReducer(tx));
 
         this.cctx = cctx;
         this.tx = tx;
@@ -149,13 +144,6 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
         return commit;
     }
 
-    /**
-     * @return Cache context.
-     */
-    GridCacheSharedContext<K, V> context() {
-        return cctx;
-    }
-
     /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
         return futId;
@@ -281,7 +269,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
                     CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
 
                     if (f.futureId() == res.miniId())
-                        f.onDhtFinishResponse(nodeId, false);
+                        f.onDhtFinishResponse(nodeId);
                 }
             }
 
@@ -306,7 +294,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
      *
      */
     void forceFinish() {
-        super.onDone(tx, null, false);
+        onDone(tx, null, false);
     }
 
     /** {@inheritDoc} */
@@ -443,16 +431,14 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
             }
         }
 
-        curFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
-            @Override public void apply(IgniteInternalFuture<?> fut) {
-                try {
-                    fut.get();
+        curFut.listen((IgniteInternalFuture<?> fut) -> {
+            try {
+                fut.get();
 
-                    rollbackAsyncSafe(onTimeout);
-                }
-                catch (IgniteCheckedException e) {
-                    doFinish(false, false);
-                }
+                rollbackAsyncSafe(onTimeout);
+            }
+            catch (IgniteCheckedException e) {
+                doFinish(false, false);
             }
         });
     }
@@ -477,13 +463,13 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
                         if (mapping != null) {
                             assert !hasFutures() || isDone() : futures();
 
-                            finish(1, mapping, commit, !clearThreadMap);
+                            finish(1, mapping, commit);
                         }
                     }
                     else {
                         assert !hasFutures() || isDone() : futures();
 
-                        finish(mappings.mappings(), commit, !clearThreadMap);
+                        finish(mappings.mappings(), commit);
                     }
                 }
 
@@ -539,13 +525,12 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
                 ClusterNode backup = cctx.discovery().node(backupId);
 
                 // Nothing to do if backup has left the grid.
-                if (backup == null) {
-                    // No-op.
+                if (backup != null) {
+                    if (backup.isLocal())
+                        cctx.tm().removeTxReturn(tx.xidVersion());
+                    else
+                        cctx.tm().sendDeferredAckResponse(backupId, 
tx.xidVersion());
                 }
-                else if (backup.isLocal())
-                    cctx.tm().removeTxReturn(tx.xidVersion());
-                else
-                    cctx.tm().sendDeferredAckResponse(backupId, 
tx.xidVersion());
             }
         }
     }
@@ -614,11 +599,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
 
                                     IgniteInternalFuture<?> fut = 
cctx.tm().remoteTxFinishFuture(nearXidVer);
 
-                                    fut.listen(new 
CI1<IgniteInternalFuture<?>>() {
-                                        @Override public void 
apply(IgniteInternalFuture<?> fut) {
-                                            mini.onDone(tx);
-                                        }
-                                    });
+                                    fut.listen((IgniteInternalFuture<?> fut0) 
-> mini.onDone(tx));
 
                                     return;
                                 }
@@ -733,32 +714,30 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
             mapping.dhtVersion(xidVer, xidVer);
 
             tx.readyNearLocks(mapping,
-                Collections.<GridCacheVersion>emptyList(),
-                Collections.<GridCacheVersion>emptyList(),
-                Collections.<GridCacheVersion>emptyList());
+                Collections.emptyList(),
+                Collections.emptyList(),
+                Collections.emptyList());
         }
     }
 
     /**
      * @param mappings Mappings.
      * @param commit Commit flag.
-     * @param useCompletedVer {@code True} if need to add completed version on 
finish.
      */
-    private void finish(Iterable<GridDistributedTxMapping> mappings, boolean 
commit, boolean useCompletedVer) {
+    private void finish(Iterable<GridDistributedTxMapping> mappings, boolean 
commit) {
         int miniId = 0;
 
         // Create mini futures.
         for (GridDistributedTxMapping m : mappings)
-            finish(++miniId, m, commit, useCompletedVer);
+            finish(++miniId, m, commit);
     }
 
     /**
      * @param miniId Mini future ID.
      * @param m Mapping.
      * @param commit Commit flag.
-     * @param useCompletedVer {@code True} if need to add completed version on 
finish.
      */
-    private void finish(int miniId, GridDistributedTxMapping m, boolean 
commit, boolean useCompletedVer) {
+    private void finish(int miniId, GridDistributedTxMapping m, boolean 
commit) {
         ClusterNode n = m.primary();
 
         assert !m.empty() || m.queryUpdate() : m + " " + tx.state();
@@ -843,47 +822,45 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        Collection<String> futs = F.viewReadOnly(futures(), new 
C1<IgniteInternalFuture<?>, String>() {
-            @Override public String apply(IgniteInternalFuture<?> f) {
-                if (f.getClass() == FinishMiniFuture.class) {
-                    FinishMiniFuture fut = (FinishMiniFuture)f;
+        Collection<String> futs = F.viewReadOnly(futures(), 
(IgniteInternalFuture<?> f) -> {
+            if (f.getClass() == FinishMiniFuture.class) {
+                FinishMiniFuture fut = (FinishMiniFuture)f;
 
-                    ClusterNode node = fut.primary();
+                ClusterNode node = fut.primary();
 
-                    if (node != null) {
-                        return "FinishFuture[node=" + node.id() +
-                            ", loc=" + node.isLocal() +
-                            ", done=" + fut.isDone() + ']';
-                    }
-                    else
-                        return "FinishFuture[node=null, done=" + fut.isDone() 
+ ']';
+                if (node != null) {
+                    return "FinishFuture[node=" + node.id() +
+                        ", loc=" + node.isLocal() +
+                        ", done=" + fut.isDone() + ']';
                 }
-                else if (f.getClass() == CheckBackupMiniFuture.class) {
-                    CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f;
+                else
+                    return "FinishFuture[node=null, done=" + fut.isDone() + 
']';
+            }
+            else if (f.getClass() == CheckBackupMiniFuture.class) {
+                CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f;
 
-                    ClusterNode node = fut.node();
+                ClusterNode node = fut.node();
 
-                    if (node != null) {
-                        return "CheckBackupFuture[node=" + node.id() +
-                            ", loc=" + node.isLocal() +
-                            ", done=" + f.isDone() + "]";
-                    }
-                    else
-                        return "CheckBackupFuture[node=null, done=" + 
f.isDone() + "]";
+                if (node != null) {
+                    return "CheckBackupFuture[node=" + node.id() +
+                        ", loc=" + node.isLocal() +
+                        ", done=" + f.isDone() + "]";
                 }
-                else if (f.getClass() == CheckRemoteTxMiniFuture.class) {
-                    CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f;
+                else
+                    return "CheckBackupFuture[node=null, done=" + f.isDone() + 
"]";
+            }
+            else if (f.getClass() == CheckRemoteTxMiniFuture.class) {
+                CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f;
 
-                    return "CheckRemoteTxMiniFuture[nodes=" + fut.nodes() + ", 
done=" + f.isDone() + "]";
-                }
-                else if (f instanceof MvccFuture) {
-                    MvccFuture fut = (MvccFuture)f;
+                return "CheckRemoteTxMiniFuture[nodes=" + fut.nodes() + ", 
done=" + f.isDone() + "]";
+            }
+            else if (f instanceof MvccFuture) {
+                MvccFuture fut = (MvccFuture)f;
 
-                    return "WaitPreviousTxsFut[mvccCrd=" + 
fut.coordinatorNodeId() + ", done=" + f.isDone() + "]";
-                }
-                else
-                    return "[loc=true, done=" + f.isDone() + "]";
+                return "WaitPreviousTxsFut[mvccCrd=" + fut.coordinatorNodeId() 
+ ", done=" + f.isDone() + "]";
             }
+            else
+                return "[loc=true, done=" + f.isDone() + "]";
         });
 
         return S.toString(GridNearTxFinishFuture.class, this,
@@ -932,7 +909,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
     /**
      *
      */
-    private abstract class MinFuture extends 
GridFutureAdapter<IgniteInternalTx> {
+    private abstract static class MinFuture extends 
GridFutureAdapter<IgniteInternalTx> {
         /** */
         private final int futId;
 
@@ -964,7 +941,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
     private class FinishMiniFuture extends MinFuture {
         /** Keys. */
         @GridToStringInclude
-        private GridDistributedTxMapping m;
+        private final GridDistributedTxMapping m;
 
         /**
          * @param futId Future ID.
@@ -983,13 +960,6 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
             return m.primary();
         }
 
-        /**
-         * @return Keys.
-         */
-        public GridDistributedTxMapping mapping() {
-            return m;
-        }
-
         /** {@inheritDoc} */
         @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) {
             if (tx.state() == COMMITTING || tx.state() == COMMITTED) {
@@ -1037,11 +1007,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
                                     if (backup.isLocal()) {
                                         IgniteInternalFuture<?> fut = 
cctx.tm().remoteTxFinishFuture(tx.nearXidVersion());
 
-                                        fut.listen(new 
CI1<IgniteInternalFuture<?>>() {
-                                            @Override public void 
apply(IgniteInternalFuture<?> fut) {
-                                                
mini.onDhtFinishResponse(cctx.localNodeId(), true);
-                                            }
-                                        });
+                                        fut.listen((IgniteInternalFuture<?> 
fut0) -> mini.onDhtFinishResponse(cctx.localNodeId()));
                                     }
                                     else {
                                         try {
@@ -1056,7 +1022,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
                                     }
                                 }
                                 else
-                                    mini.onDhtFinishResponse(backupId, true);
+                                    mini.onDhtFinishResponse(backupId);
                             }
                         }
                     }
@@ -1100,10 +1066,10 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
     private class CheckBackupMiniFuture extends MinFuture {
         /** Keys. */
         @GridToStringInclude
-        private GridDistributedTxMapping m;
+        private final GridDistributedTxMapping m;
 
         /** Backup node to check. */
-        private ClusterNode backup;
+        private final ClusterNode backup;
 
         /**
          * @param futId Future ID.
@@ -1167,7 +1133,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
      */
     private class CheckRemoteTxMiniFuture extends MinFuture {
         /** */
-        private Set<UUID> nodes;
+        private final Set<UUID> nodes;
 
         /**
          * @param futId Future ID.
@@ -1195,9 +1161,8 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
 
         /**
          * @param nodeId Node ID.
-         * @param discoThread {@code True} if executed from discovery thread.
          */
-        void onDhtFinishResponse(UUID nodeId, boolean discoThread) {
+        void onDhtFinishResponse(UUID nodeId) {
             onResponse(nodeId);
         }
 

Reply via email to