IGNITE-7871 Implemented additional synchronization phase for correct partition 
counters update


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/da77b981
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/da77b981
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/da77b981

Branch: refs/heads/ignite-7708
Commit: da77b9818a70495b7afdf6899ebd9180dadd7f68
Parents: f4de6df
Author: Pavel Kovalenko <jokse...@gmail.com>
Authored: Wed Apr 11 11:23:46 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Wed Apr 11 11:23:46 2018 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/GridTopic.java   |   5 +-
 .../communication/GridIoMessageFactory.java     |   6 +
 .../discovery/GridDiscoveryManager.java         |  10 +
 .../MetaPageUpdatePartitionDataRecord.java      |   2 +-
 .../processors/cache/CacheMetricsImpl.java      |   2 +-
 .../processors/cache/GridCacheMvccManager.java  |  38 +
 .../GridCachePartitionExchangeManager.java      |  17 +
 .../cache/GridCacheSharedContext.java           |   9 +-
 .../processors/cache/GridCacheUtils.java        |   2 +-
 .../cache/IgniteCacheOffheapManager.java        |   8 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    |  10 +-
 .../dht/GridClientPartitionTopology.java        |   5 +
 .../distributed/dht/GridDhtLocalPartition.java  |   9 +-
 .../dht/GridDhtPartitionTopology.java           |   6 +
 .../dht/GridDhtPartitionTopologyImpl.java       |  26 +-
 .../dht/GridDhtPartitionsStateValidator.java    | 255 +++++++
 .../cache/distributed/dht/GridDhtTxLocal.java   |   5 +
 .../GridDhtPartitionsExchangeFuture.java        |  96 ++-
 .../GridDhtPartitionsSingleMessage.java         |  68 +-
 .../dht/preloader/InitNewCoordinatorFuture.java |   2 +-
 .../preloader/latch/ExchangeLatchManager.java   | 695 +++++++++++++++++++
 .../distributed/dht/preloader/latch/Latch.java  |  52 ++
 .../dht/preloader/latch/LatchAckMessage.java    | 165 +++++
 .../cache/distributed/near/GridNearTxLocal.java |  10 +
 .../persistence/GridCacheOffheapManager.java    |  10 +-
 .../cache/transactions/IgniteTxAdapter.java     |   2 +-
 .../cache/transactions/IgniteTxManager.java     |  36 +-
 ...cheDhtLocalPartitionAfterRemoveSelfTest.java |   2 +-
 .../processors/cache/IgniteCacheGroupsTest.java |   1 +
 ...ExchangeLatchManagerCoordinatorFailTest.java | 244 +++++++
 .../GridCachePartitionsStateValidationTest.java | 316 +++++++++
 ...idCachePartitionsStateValidatorSelfTest.java | 158 +++++
 .../TxOptimisticOnPartitionExchangeTest.java    | 322 +++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   4 +
 .../testsuites/IgniteCacheTestSuite6.java       |   6 +
 35 files changed, 2568 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 1227e8c..0b2d41a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -124,7 +124,10 @@ public enum GridTopic {
     TOPIC_METRICS,
 
     /** */
-    TOPIC_AUTH;
+    TOPIC_AUTH,
+
+    /** */
+    TOPIC_EXCHANGE;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5616fd0..581c32e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -53,6 +53,7 @@ import 
org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.WalStateAckMessage;
 import 
org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
 import 
org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
@@ -921,6 +922,11 @@ public class GridIoMessageFactory implements 
MessageFactory {
 
                 break;
 
+            case 135:
+                msg = new LatchAckMessage();
+
+                break;
+
             // [-3..119] [124..129] [-23..-27] [-36..-55]- this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index a1d84e5..400bb5f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -793,6 +793,9 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
                     ((IgniteKernal)ctx.grid()).onDisconnected();
 
+                    if (!locJoin.isDone())
+                        locJoin.onDone(new IgniteCheckedException("Node 
disconnected"));
+
                     locJoin = new GridFutureAdapter<>();
 
                     registeredCaches.clear();
@@ -2142,6 +2145,13 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @return Local join future.
+     */
+    public GridFutureAdapter<DiscoveryLocalJoinData> localJoinFuture() {
+        return locJoin;
+    }
+
+    /**
      * @param msg Custom message.
      * @throws IgniteCheckedException If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
index bafbf47..e5bd343 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
@@ -32,7 +32,7 @@ public class MetaPageUpdatePartitionDataRecord extends 
PageDeltaRecord {
     /** */
     private long globalRmvId;
 
-    /** */
+    /** TODO: Partition size may be long */
     private int partSize;
 
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 6fae8fe..b402ff2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -792,7 +792,7 @@ public class CacheMetricsImpl implements CacheMetrics {
                     if (cctx.cache() == null)
                         continue;
 
-                    int cacheSize = part.dataStore().cacheSize(cctx.cacheId());
+                    long cacheSize = 
part.dataStore().cacheSize(cctx.cacheId());
 
                     offHeapEntriesCnt += cacheSize;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index a9fa3c7..fade833 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -44,6 +44,8 @@ import 
org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -314,6 +316,42 @@ public class GridCacheMvccManager extends 
GridCacheSharedManagerAdapter {
     }
 
     /**
+     * Creates a future that will wait for finishing all remote transactions 
(primary -> backup)
+     * with topology version less or equal to {@code topVer}.
+     *
+     * @param topVer Topology version.
+     * @return Compound future of all {@link GridDhtTxFinishFuture} futures.
+     */
+    public IgniteInternalFuture<?> finishRemoteTxs(AffinityTopologyVersion 
topVer) {
+        GridCompoundFuture<?, ?> res = new 
CacheObjectsReleaseFuture<>("RemoteTx", topVer);
+
+        for (GridCacheFuture<?> fut : futs.values()) {
+            if (fut instanceof GridDhtTxFinishFuture) {
+                GridDhtTxFinishFuture finishTxFuture = (GridDhtTxFinishFuture) 
fut;
+
+                if (cctx.tm().needWaitTransaction(finishTxFuture.tx(), topVer))
+                    res.add(ignoreErrors(finishTxFuture));
+            }
+        }
+
+        res.markInitialized();
+
+        return res;
+    }
+
+    /**
+     * Future wrapper which ignores any underlying future errors.
+     *
+     * @param f Underlying future.
+     * @return Future wrapper which ignore any underlying future errors.
+     */
+    private IgniteInternalFuture ignoreErrors(IgniteInternalFuture<?> f) {
+        GridFutureAdapter<?> wrapper = new GridFutureAdapter();
+        f.listen(future -> wrapper.onDone());
+        return wrapper;
+    }
+
+    /**
      * @param leftNodeId Left node ID.
      * @param topVer Topology version.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1a0e65f..20a3ccb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -64,6 +64,7 @@ import 
org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -216,6 +217,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     /** For tests only. */
     private volatile AffinityTopologyVersion exchMergeTestWaitVer;
 
+    /** Distributed latch manager. */
+    private ExchangeLatchManager latchMgr;
+
     /** Discovery listener. */
     private final DiscoveryEventListener discoLsnr = new 
DiscoveryEventListener() {
         @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -309,6 +313,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
         exchWorker = new ExchangeWorker();
 
+        latchMgr = new ExchangeLatchManager(cctx.kernalContext());
+
         cctx.gridEvents().addDiscoveryEventListener(discoLsnr, 
EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
             EVT_DISCOVERY_CUSTOM_EVT);
 
@@ -1255,6 +1261,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                     m.addPartitionUpdateCounters(grp.groupId(),
                         newCntrMap ? cntrsMap : 
CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+
+                    m.addPartitionSizes(grp.groupId(), 
grp.topology().partitionSizes());
                 }
             }
         }
@@ -1277,6 +1285,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                 m.addPartitionUpdateCounters(top.groupId(),
                     newCntrMap ? cntrsMap : 
CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+
+                m.addPartitionSizes(top.groupId(), top.partitionSizes());
             }
         }
 
@@ -1570,6 +1580,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
+     * @return Latch manager instance.
+     */
+    public ExchangeLatchManager latch() {
+        return latchMgr;
+    }
+
+    /**
      * @param exchFut Optional current exchange future.
      * @throws Exception If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index c2f9229..b3b4f0d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -26,7 +26,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
-import java.util.function.BiFunction;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
@@ -711,7 +710,7 @@ public class GridCacheSharedContext<K, V> {
 
     /**
      * @return Ttl cleanup manager.
-     * */
+     */
     public GridCacheSharedTtlCleanupManager ttl() {
         return ttlMgr;
     }
@@ -854,10 +853,14 @@ public class GridCacheSharedContext<K, V> {
         GridCompoundFuture f = new CacheObjectsReleaseFuture("Partition", 
topVer);
 
         f.add(mvcc().finishExplicitLocks(topVer));
-        f.add(tm().finishTxs(topVer));
         f.add(mvcc().finishAtomicUpdates(topVer));
         f.add(mvcc().finishDataStreamerUpdates(topVer));
 
+        IgniteInternalFuture<?> finishLocalTxsFuture = 
tm().finishLocalTxs(topVer);
+        // To properly track progress of finishing local tx updates we 
explicitly add this future to compound set.
+        f.add(finishLocalTxsFuture);
+        f.add(tm().finishAllTxs(finishLocalTxsFuture, topVer));
+
         f.markInitialized();
 
         return f;

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index a5169d2..d672420 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1732,7 +1732,7 @@ public class GridCacheUtils {
                             ver,
                             expiryPlc == null ? 0 : expiryPlc.forCreate(),
                             expiryPlc == null ? 0 : 
toExpireTime(expiryPlc.forCreate()),
-                            false,
+                            true,
                             topVer,
                             GridDrType.DR_BACKUP,
                             true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 3d83f87..a12c033 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -22,11 +22,11 @@ import javax.cache.Cache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.RootPage;
 import org.apache.ignite.internal.processors.cache.persistence.RowStore;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
 import org.apache.ignite.internal.util.GridAtomicLong;
@@ -344,7 +344,7 @@ public interface IgniteCacheOffheapManager {
      * @param part Partition.
      * @return Number of entries.
      */
-    public int totalPartitionEntriesCount(int part);
+    public long totalPartitionEntriesCount(int part);
 
     /**
      *
@@ -381,7 +381,7 @@ public interface IgniteCacheOffheapManager {
          * @param cacheId Cache ID.
          * @return Size.
          */
-        int cacheSize(int cacheId);
+        long cacheSize(int cacheId);
 
         /**
          * @return Cache sizes if store belongs to group containing multiple 
caches.
@@ -391,7 +391,7 @@ public interface IgniteCacheOffheapManager {
         /**
          * @return Total size.
          */
-        int fullSize();
+        long fullSize();
 
         /**
          * @return Update counter.

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index b201935..f8cc86f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -252,7 +252,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
-    @Override public int totalPartitionEntriesCount(int p) {
+    @Override public long totalPartitionEntriesCount(int p) {
         if (grp.isLocal())
             return locCacheDataStore.fullSize();
         else {
@@ -1152,14 +1152,14 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public int cacheSize(int cacheId) {
+        @Override public long cacheSize(int cacheId) {
             if (grp.sharedGroup()) {
                 AtomicLong size = cacheSizes.get(cacheId);
 
                 return size != null ? (int)size.get() : 0;
             }
 
-            return (int)storageSize.get();
+            return storageSize.get();
         }
 
         /** {@inheritDoc} */
@@ -1176,8 +1176,8 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public int fullSize() {
-            return (int)storageSize.get();
+        @Override public long fullSize() {
+            return storageSize.get();
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5bbbb31..3e3bb0d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -1196,6 +1196,11 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public Map<Integer, Long> partitionSizes() {
+        return Collections.emptyMap();
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) 
{
         assert false : "Should not be called on non-affinity node";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 7a47f31..ea20dbf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -929,7 +929,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     /**
      * @return Initial update counter.
      */
-    public Long initialUpdateCounter() {
+    public long initialUpdateCounter() {
         return store.initialUpdateCounter();
     }
 
@@ -948,6 +948,13 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * @return Total size of all caches.
+     */
+    public long fullSize() {
+        return store.fullSize();
+    }
+
+    /**
      * Removes all entries and rows from this partition.
      *
      * @return Number of rows cleared from page memory.

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 7f900cb..6f68dbb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
@@ -345,6 +346,11 @@ public interface GridDhtPartitionTopology {
     public CachePartitionPartialCountersMap localUpdateCounters(boolean 
skipZeros);
 
     /**
+     * @return Partition cache sizes.
+     */
+    public Map<Integer, Long> partitionSizes();
+
+    /**
      * @param part Partition to own.
      * @return {@code True} if owned.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 538c57e..740903e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -31,6 +31,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -2526,6 +2528,28 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public Map<Integer, Long> partitionSizes() {
+        lock.readLock().lock();
+
+        try {
+            Map<Integer, Long> partitionSizes = new HashMap<>();
+
+            for (int p = 0; p < locParts.length(); p++) {
+                GridDhtLocalPartition part = locParts.get(p);
+                if (part == null || part.fullSize() == 0)
+                    continue;
+
+                partitionSizes.put(part.id(), part.fullSize());
+            }
+
+            return partitionSizes;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) 
{
         AffinityTopologyVersion curTopVer = this.readyTopVer;
 
@@ -2587,7 +2611,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 if (part == null)
                     continue;
 
-                int size = part.dataStore().fullSize();
+                long size = part.dataStore().fullSize();
 
                 if (size >= threshold)
                     X.println(">>>   Local partition [part=" + part.id() + ", 
size=" + size + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
new file mode 100644
index 0000000..92a0584
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.lang.IgniteProductVersion;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+
+/**
+ * Class to validate partitions update counters and cache sizes during 
exchange process.
+ */
+public class GridDhtPartitionsStateValidator {
+    /** Version since node is able to send cache sizes in {@link 
GridDhtPartitionsSingleMessage}. */
+    private static final IgniteProductVersion SIZES_VALIDATION_AVAILABLE_SINCE 
= IgniteProductVersion.fromString("2.5.0");
+
+    /** Cache shared context. */
+    private final GridCacheSharedContext<?, ?> cctx;
+
+    /**
+     * Constructor.
+     *
+     * @param cctx Cache shared context.
+     */
+    public GridDhtPartitionsStateValidator(GridCacheSharedContext<?, ?> cctx) {
+        this.cctx = cctx;
+    }
+
+    /**
+     * Validates partition states - update counters and cache sizes for all 
nodes.
+     * If update counter value or cache size for the same partitions are 
different on some nodes
+     * method throws exception with full information about inconsistent 
partitions.
+     *
+     * @param fut Current exchange future.
+     * @param top Topology to validate.
+     * @param messages Single messages received from all nodes.
+     * @throws IgniteCheckedException If validation failed. Exception message 
contains
+     * full information about all partitions which update counters or cache 
sizes are not consistent.
+     */
+    public void 
validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture fut,
+                                                  GridDhtPartitionTopology top,
+                                                  Map<UUID, 
GridDhtPartitionsSingleMessage> messages) throws IgniteCheckedException {
+        // Ignore just joined nodes.
+        final Set<UUID> ignoringNodes = new HashSet<>();
+
+        for (DiscoveryEvent evt : fut.events().events())
+            if (evt.type() == EVT_NODE_JOINED)
+                ignoringNodes.add(evt.eventNode().id());
+
+        AffinityTopologyVersion topVer = 
fut.context().events().topologyVersion();
+
+        // Validate update counters.
+        Map<Integer, Map<UUID, Long>> result = 
validatePartitionsUpdateCounters(top, messages, ignoringNodes);
+        if (!result.isEmpty())
+            throw new IgniteCheckedException("Partitions update counters are 
inconsistent for " + fold(topVer, result));
+
+        // For sizes validation ignore also nodes which are not able to send 
cache sizes.
+        for (UUID id : messages.keySet()) {
+            ClusterNode node = cctx.discovery().node(id);
+            if (node != null && 
node.version().compareTo(SIZES_VALIDATION_AVAILABLE_SINCE) < 0)
+                ignoringNodes.add(id);
+        }
+
+        // Validate cache sizes.
+        result = validatePartitionsSizes(top, messages, ignoringNodes);
+        if (!result.isEmpty())
+            throw new IgniteCheckedException("Partitions cache sizes are 
inconsistent for " + fold(topVer, result));
+    }
+
+    /**
+     * Validate partitions update counters for given {@code top}.
+     *
+     * @param top Topology to validate.
+     * @param messages Single messages received from all nodes.
+     * @param ignoringNodes Nodes for what we ignore validation.
+     * @return Invalid partitions map with following structure: (partId, 
(nodeId, updateCounter)).
+     * If map is empty validation is successful.
+     */
+     Map<Integer, Map<UUID, Long>> validatePartitionsUpdateCounters(
+            GridDhtPartitionTopology top,
+            Map<UUID, GridDhtPartitionsSingleMessage> messages,
+            Set<UUID> ignoringNodes) {
+        Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>();
+
+        Map<Integer, T2<UUID, Long>> updateCountersAndNodesByPartitions = new 
HashMap<>();
+
+        // Populate counters statistics from local node partitions.
+        for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
+            if (top.partitionState(cctx.localNodeId(), part.id()) != 
GridDhtPartitionState.OWNING)
+                continue;
+
+            updateCountersAndNodesByPartitions.put(part.id(), new 
T2<>(cctx.localNodeId(), part.updateCounter()));
+        }
+
+        int partitions = top.partitions();
+
+        // Then process and validate counters from other nodes.
+        for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
messages.entrySet()) {
+            UUID nodeId = e.getKey();
+            if (ignoringNodes.contains(nodeId))
+                continue;
+
+            CachePartitionPartialCountersMap countersMap = 
e.getValue().partitionUpdateCounters(top.groupId(), partitions);
+
+            for (int part = 0; part < partitions; part++) {
+                if (top.partitionState(nodeId, part) != 
GridDhtPartitionState.OWNING)
+                    continue;
+
+                int partIdx = countersMap.partitionIndex(part);
+                long currentCounter = partIdx >= 0 ? 
countersMap.updateCounterAt(partIdx) : 0;
+
+                process(invalidPartitions, updateCountersAndNodesByPartitions, 
part, nodeId, currentCounter);
+            }
+        }
+
+        return invalidPartitions;
+    }
+
+    /**
+     * Validate partitions cache sizes for given {@code top}.
+     *
+     * @param top Topology to validate.
+     * @param messages Single messages received from all nodes.
+     * @param ignoringNodes Nodes for what we ignore validation.
+     * @return Invalid partitions map with following structure: (partId, 
(nodeId, cacheSize)).
+     * If map is empty validation is successful.
+     */
+     Map<Integer, Map<UUID, Long>> validatePartitionsSizes(
+            GridDhtPartitionTopology top,
+            Map<UUID, GridDhtPartitionsSingleMessage> messages,
+            Set<UUID> ignoringNodes) {
+        Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>();
+
+        Map<Integer, T2<UUID, Long>> sizesAndNodesByPartitions = new 
HashMap<>();
+
+        // Populate sizes statistics from local node partitions.
+        for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
+            if (top.partitionState(cctx.localNodeId(), part.id()) != 
GridDhtPartitionState.OWNING)
+                continue;
+
+            sizesAndNodesByPartitions.put(part.id(), new 
T2<>(cctx.localNodeId(), part.fullSize()));
+        }
+
+        int partitions = top.partitions();
+
+        // Then process and validate sizes from other nodes.
+        for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
messages.entrySet()) {
+            UUID nodeId = e.getKey();
+            if (ignoringNodes.contains(nodeId))
+                continue;
+
+            Map<Integer, Long> sizesMap = 
e.getValue().partitionSizes(top.groupId());
+
+            for (int part = 0; part < partitions; part++) {
+                if (top.partitionState(nodeId, part) != 
GridDhtPartitionState.OWNING)
+                    continue;
+
+                long currentSize = sizesMap.containsKey(part) ? 
sizesMap.get(part) : 0L;
+
+                process(invalidPartitions, sizesAndNodesByPartitions, part, 
nodeId, currentSize);
+            }
+        }
+
+        return invalidPartitions;
+    }
+
+    /**
+     * Processes given {@code counter} for partition {@code part} reported by 
{@code node}.
+     * Populates {@code invalidPartitions} map if existing counter and current 
{@code counter} are different.
+     *
+     * @param invalidPartitions Invalid partitions map.
+     * @param countersAndNodes Current map of counters and nodes by partitions.
+     * @param part Processing partition.
+     * @param node Node id.
+     * @param counter Counter value reported by {@code node}.
+     */
+    private void process(Map<Integer, Map<UUID, Long>> invalidPartitions,
+                         Map<Integer, T2<UUID, Long>> countersAndNodes,
+                         int part,
+                         UUID node,
+                         long counter) {
+        T2<UUID, Long> existingData = countersAndNodes.get(part);
+
+        if (existingData == null)
+            countersAndNodes.put(part, new T2<>(node, counter));
+
+        if (existingData != null && counter != existingData.get2()) {
+            if (!invalidPartitions.containsKey(part)) {
+                Map<UUID, Long> map = new HashMap<>();
+                map.put(existingData.get1(), existingData.get2());
+                invalidPartitions.put(part, map);
+            }
+
+            invalidPartitions.get(part).put(node, counter);
+        }
+    }
+
+    /**
+     * Folds given map of invalid partition states to string representation in 
the following format:
+     * Part [id]: [consistentId=value*]
+     *
+     * Value can be both update counter or cache size.
+     *
+     * @param topVer Last topology version.
+     * @param invalidPartitions Invalid partitions map.
+     * @return String representation of invalid partitions.
+     */
+    private String fold(AffinityTopologyVersion topVer, Map<Integer, Map<UUID, 
Long>> invalidPartitions) {
+        SB sb = new SB();
+
+        NavigableMap<Integer, Map<UUID, Long>> sortedPartitions = new 
TreeMap<>(invalidPartitions);
+
+        for (Map.Entry<Integer, Map<UUID, Long>> p : 
sortedPartitions.entrySet()) {
+            sb.a("Part ").a(p.getKey()).a(": [");
+            for (Map.Entry<UUID, Long> e : p.getValue().entrySet()) {
+                Object consistentId = cctx.discovery().node(topVer, 
e.getKey()).consistentId();
+                sb.a(consistentId).a("=").a(e.getValue()).a(" ");
+            }
+            sb.a("] ");
+        }
+
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 28cc018..0609f04 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -447,6 +447,11 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter 
implements GridCacheMa
 
             err = e;
         }
+        catch (Throwable t) {
+            fut.onDone(t);
+
+            throw t;
+        }
 
         if (primarySync)
             sendFinishReply(err);

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index cbb4985..dd4a571 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -41,6 +41,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -75,10 +76,12 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.LocalJoinCachesContext;
 import org.apache.ignite.internal.processors.cache.StateChangeRequest;
 import org.apache.ignite.internal.processors.cache.WalStateAbstractMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsStateValidator;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -290,6 +293,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private GridDhtPartitionsExchangeFuture mergedWith;
 
+    /** Validator for partition states. */
+    @GridToStringExclude
+    private final GridDhtPartitionsStateValidator validator;
+
     /**
      * @param cctx Cache context.
      * @param busyLock Busy lock.
@@ -314,6 +321,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         this.exchId = exchId;
         this.exchActions = exchActions;
         this.affChangeMsg = affChangeMsg;
+        this.validator = new GridDhtPartitionsStateValidator(cctx);
 
         log = cctx.logger(getClass());
         exchLog = cctx.logger(EXCHANGE_LOG);
@@ -1099,7 +1107,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         // To correctly rebalance when persistence is enabled, it is necessary 
to reserve history within exchange.
         partHistReserved = cctx.database().reserveHistoryForExchange();
 
-        waitPartitionRelease();
+        // On first phase we wait for finishing all local tx updates, atomic 
updates and lock releases.
+        waitPartitionRelease(1);
+
+        // Second phase is needed to wait for finishing all tx updates from 
primary to backup nodes remaining after first phase.
+        waitPartitionRelease(2);
 
         boolean topChanged = firstDiscoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT 
|| affChangeMsg != null;
 
@@ -1202,9 +1214,17 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * For the exact list of the objects being awaited for see
      * {@link 
GridCacheSharedContext#partitionReleaseFuture(AffinityTopologyVersion)} javadoc.
      *
+     * @param phase Phase of partition release.
+     *
      * @throws IgniteCheckedException If failed.
      */
-    private void waitPartitionRelease() throws IgniteCheckedException {
+    private void waitPartitionRelease(int phase) throws IgniteCheckedException 
{
+        Latch releaseLatch = null;
+
+        // Wait for other nodes only on first phase.
+        if (phase == 1)
+            releaseLatch = cctx.exchange().latch().getOrCreate("exchange", 
initialVersion());
+
         IgniteInternalFuture<?> partReleaseFut = 
cctx.partitionReleaseFuture(initialVersion());
 
         // Assign to class variable so it will be included into toString() 
method.
@@ -1238,6 +1258,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     nextDumpTime = U.currentTimeMillis() + 
nextDumpTimeout(dumpCnt++, futTimeout);
                 }
             }
+            catch (IgniteCheckedException e) {
+                U.warn(log,"Unable to await partitions release future", e);
+
+                throw e;
+            }
         }
 
         long waitEnd = U.currentTimeMillis();
@@ -1290,6 +1315,35 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 }
             }
         }
+
+        if (releaseLatch == null)
+            return;
+
+        releaseLatch.countDown();
+
+        if (!localJoinExchange()) {
+            try {
+                while (true) {
+                    try {
+                        releaseLatch.await(futTimeout, TimeUnit.MILLISECONDS);
+
+                        if (log.isInfoEnabled())
+                            log.info("Finished waiting for partitions release 
latch: " + releaseLatch);
+
+                        break;
+                    }
+                    catch (IgniteFutureTimeoutCheckedException ignored) {
+                        U.warn(log, "Unable to await partitions release latch 
within timeout: " + releaseLatch);
+
+                        // Try to resend ack.
+                        releaseLatch.countDown();
+                    }
+                }
+            }
+            catch (IgniteCheckedException e) {
+                U.warn(log, "Stop waiting for partitions release latch: " + 
e.getMessage());
+            }
+        }
     }
 
     /**
@@ -2499,6 +2553,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 }
             }
 
+            validatePartitionsState();
+
             if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
                 assert firstDiscoEvt instanceof DiscoveryCustomEvent;
 
@@ -2683,6 +2739,42 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Validates that partition update counters and cache sizes for all caches 
are consistent.
+     */
+    private void validatePartitionsState() {
+        for (Map.Entry<Integer, CacheGroupDescriptor> e : 
cctx.affinity().cacheGroups().entrySet()) {
+            CacheGroupDescriptor grpDesc = e.getValue();
+            if (grpDesc.config().getCacheMode() == CacheMode.LOCAL)
+                continue;
+
+            int grpId = e.getKey();
+
+            CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpId);
+
+            GridDhtPartitionTopology top = grpCtx != null ?
+                    grpCtx.topology() :
+                    cctx.exchange().clientTopology(grpId, 
events().discoveryCache());
+
+            // Do not validate read or write through caches or caches with 
disabled rebalance.
+            if (grpCtx == null
+                    || grpCtx.config().isReadThrough()
+                    || grpCtx.config().isWriteThrough()
+                    || grpCtx.config().getCacheStoreFactory() != null
+                    || grpCtx.config().getRebalanceDelay() != -1
+                    || grpCtx.config().getRebalanceMode() == 
CacheRebalanceMode.NONE)
+                continue;
+
+            try {
+                validator.validatePartitionCountersAndSizes(this, top, msgs);
+            }
+            catch (IgniteCheckedException ex) {
+                log.warning("Partition states validation was failed for cache 
" + grpDesc.cacheOrGroupName(), ex);
+                // TODO: Handle such errors 
https://issues.apache.org/jira/browse/IGNITE-7833
+            }
+        }
+    }
+
+    /**
      *
      */
     private void assignPartitionsStates() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 215152d..6ebafac 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -17,9 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
-import java.util.Collection;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -67,6 +67,14 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
     /** Serialized partitions counters. */
     private byte[] partCntrsBytes;
 
+    /** Partitions sizes. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Map<Integer, Map<Integer, Long>> partSizes;
+
+    /** Serialized partitions counters. */
+    private byte[] partSizesBytes;
+
     /** Partitions history reservation counters. */
     @GridToStringInclude
     @GridDirectTransient
@@ -220,6 +228,35 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
     }
 
     /**
+     * Adds partition sizes map for specified {@code grpId} to the current 
message.
+     *
+     * @param grpId Group id.
+     * @param partSizesMap Partition sizes map.
+     */
+    public void addPartitionSizes(int grpId, Map<Integer, Long> partSizesMap) {
+        if (partSizesMap.isEmpty())
+            return;
+
+        if (partSizes == null)
+            partSizes = new HashMap<>();
+
+        partSizes.put(grpId, partSizesMap);
+    }
+
+    /**
+     * Returns partition sizes map for specified {@code grpId}.
+     *
+     * @param grpId Group id.
+     * @return Partition sizes map (partId, partSize).
+     */
+    public Map<Integer, Long> partitionSizes(int grpId) {
+        if (partSizes == null)
+            return Collections.emptyMap();
+
+        return partSizes.getOrDefault(grpId, Collections.emptyMap());
+    }
+
+    /**
      * @param grpId Cache group ID.
      * @param cntrMap Partition history counters.
      */
@@ -287,12 +324,14 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
         boolean marshal = (parts != null && partsBytes == null) ||
             (partCntrs != null && partCntrsBytes == null) ||
             (partHistCntrs != null && partHistCntrsBytes == null) ||
+            (partSizes != null && partSizesBytes == null) ||
             (err != null && errBytes == null);
 
         if (marshal) {
             byte[] partsBytes0 = null;
             byte[] partCntrsBytes0 = null;
             byte[] partHistCntrsBytes0 = null;
+            byte[] partSizesBytes0 = null;
             byte[] errBytes0 = null;
 
             if (parts != null && partsBytes == null)
@@ -304,6 +343,9 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
             if (partHistCntrs != null && partHistCntrsBytes == null)
                 partHistCntrsBytes0 = U.marshal(ctx, partHistCntrs);
 
+            if (partSizes != null && partSizesBytes == null)
+                partSizesBytes0 = U.marshal(ctx, partSizes);
+
             if (err != null && errBytes == null)
                 errBytes0 = U.marshal(ctx, err);
 
@@ -314,11 +356,13 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                     byte[] partsBytesZip = U.zip(partsBytes0);
                     byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
                     byte[] partHistCntrsBytesZip = U.zip(partHistCntrsBytes0);
+                    byte[] partSizesBytesZip = U.zip(partSizesBytes0);
                     byte[] exBytesZip = U.zip(errBytes0);
 
                     partsBytes0 = partsBytesZip;
                     partCntrsBytes0 = partCntrsBytesZip;
                     partHistCntrsBytes0 = partHistCntrsBytesZip;
+                    partSizesBytes0 = partSizesBytesZip;
                     errBytes0 = exBytesZip;
 
                     compressed(true);
@@ -331,6 +375,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
             partsBytes = partsBytes0;
             partCntrsBytes = partCntrsBytes0;
             partHistCntrsBytes = partHistCntrsBytes0;
+            partSizesBytes = partSizesBytes0;
             errBytes = errBytes0;
         }
     }
@@ -360,6 +405,13 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                 partHistCntrs = U.unmarshal(ctx, partHistCntrsBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
         }
 
+        if (partSizesBytes != null && partSizes == null) {
+            if (compressed())
+                partSizes = U.unmarshalZip(ctx.marshaller(), partSizesBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
+            else
+                partSizes = U.unmarshal(ctx, partSizesBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
+        }
+
         if (errBytes != null && err == null) {
             if (compressed())
                 err = U.unmarshalZip(ctx.marshaller(), errBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
@@ -451,6 +503,11 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
 
                 writer.incrementState();
 
+            case 13:
+                if (!writer.writeByteArray("partsSizesBytes", partSizesBytes))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -531,6 +588,13 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
+            case 13:
+                partSizesBytes = reader.readByteArray("partsSizesBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return reader.afterMessageRead(GridDhtPartitionsSingleMessage.class);
@@ -543,7 +607,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 14;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
index 596fa8c..42a9ba6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -235,7 +235,7 @@ public class InitNewCoordinatorFuture extends 
GridCompoundFuture {
             if (awaited.remove(node.id())) {
                 GridDhtPartitionsFullMessage fullMsg0 = msg.finishMessage();
 
-                if (fullMsg0 != null) {
+                if (fullMsg0 != null && fullMsg0.resultTopologyVersion() != 
null) {
                     assert fullMsg == null || 
fullMsg.resultTopologyVersion().equals(fullMsg0.resultTopologyVersion());
 
                     fullMsg  = fullMsg0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
new file mode 100644
index 0000000..c205cb1
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+
+/**
+ * Class is responsible to create and manage instances of distributed latches 
{@link Latch}.
+ */
+public class ExchangeLatchManager {
+    /** Version since latch management is available. */
+    private static final IgniteProductVersion VERSION_SINCE = 
IgniteProductVersion.fromString("2.5.0");
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Context. */
+    private final GridKernalContext ctx;
+
+    /** Discovery manager. */
+    private final GridDiscoveryManager discovery;
+
+    /** IO manager. */
+    private final GridIoManager io;
+
+    /** Current coordinator. */
+    private volatile ClusterNode coordinator;
+
+    /** Pending acks collection. */
+    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, 
Set<UUID>> pendingAcks = new ConcurrentHashMap<>();
+
+    /** Server latches collection. */
+    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, 
ServerLatch> serverLatches = new ConcurrentHashMap<>();
+
+    /** Client latches collection. */
+    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, 
ClientLatch> clientLatches = new ConcurrentHashMap<>();
+
+    /** Lock. */
+    private final ReentrantLock lock = new ReentrantLock();
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     */
+    public ExchangeLatchManager(GridKernalContext ctx) {
+        this.ctx = ctx;
+        this.log = ctx.log(getClass());
+        this.discovery = ctx.discovery();
+        this.io = ctx.io();
+
+        if (!ctx.clientNode()) {
+            ctx.io().addMessageListener(GridTopic.TOPIC_EXCHANGE, (nodeId, 
msg, plc) -> {
+                if (msg instanceof LatchAckMessage) {
+                    processAck(nodeId, (LatchAckMessage) msg);
+                }
+            });
+
+            // First coordinator initialization.
+            ctx.discovery().localJoinFuture().listen(f -> {
+                this.coordinator = 
getLatchCoordinator(AffinityTopologyVersion.NONE);
+            });
+
+            ctx.event().addDiscoveryEventListener((e, cache) -> {
+                assert e != null;
+                assert e.type() == EVT_NODE_LEFT || e.type() == 
EVT_NODE_FAILED : this;
+
+                // Do not process from discovery thread.
+                ctx.closure().runLocalSafe(() -> 
processNodeLeft(e.eventNode()));
+            }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+        }
+    }
+
+    /**
+     * Creates server latch with given {@code id} and {@code topVer}.
+     * Adds corresponding pending acks to it.
+     *
+     * @param id Latch id.
+     * @param topVer Latch topology version.
+     * @param participants Participant nodes.
+     * @return Server latch instance.
+     */
+    private Latch createServerLatch(String id, AffinityTopologyVersion topVer, 
Collection<ClusterNode> participants) {
+        final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, 
topVer);
+
+        if (serverLatches.containsKey(latchId))
+            return serverLatches.get(latchId);
+
+        ServerLatch latch = new ServerLatch(id, topVer, participants);
+
+        serverLatches.put(latchId, latch);
+
+        if (log.isDebugEnabled())
+            log.debug("Server latch is created [latch=" + latchId + ", 
participantsSize=" + participants.size() + "]");
+
+        if (pendingAcks.containsKey(latchId)) {
+            Set<UUID> acks = pendingAcks.get(latchId);
+
+            for (UUID node : acks)
+                if (latch.hasParticipant(node) && !latch.hasAck(node))
+                    latch.ack(node);
+
+            pendingAcks.remove(latchId);
+        }
+
+        if (latch.isCompleted())
+            serverLatches.remove(latchId);
+
+        return latch;
+    }
+
+    /**
+     * Creates client latch.
+     * If there is final ack corresponds to given {@code id} and {@code 
topVer}, latch will be completed immediately.
+     *
+     * @param id Latch id.
+     * @param topVer Latch topology version.
+     * @param coordinator Coordinator node.
+     * @param participants Participant nodes.
+     * @return Client latch instance.
+     */
+    private Latch createClientLatch(String id, AffinityTopologyVersion topVer, 
ClusterNode coordinator, Collection<ClusterNode> participants) {
+        final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, 
topVer);
+
+        if (clientLatches.containsKey(latchId))
+            return clientLatches.get(latchId);
+
+        ClientLatch latch = new ClientLatch(id, topVer, coordinator, 
participants);
+
+        if (log.isDebugEnabled())
+            log.debug("Client latch is created [latch=" + latchId
+                    + ", crd=" + coordinator
+                    + ", participantsSize=" + participants.size() + "]");
+
+        // There is final ack for created latch.
+        if (pendingAcks.containsKey(latchId)) {
+            latch.complete();
+            pendingAcks.remove(latchId);
+        }
+        else
+            clientLatches.put(latchId, latch);
+
+        return latch;
+    }
+
+    /**
+     * Creates new latch with specified {@code id} and {@code topVer} or 
returns existing latch.
+     *
+     * Participants of latch are calculated from given {@code topVer} as alive 
server nodes.
+     * If local node is coordinator {@code ServerLatch} instance will be 
created, otherwise {@code ClientLatch} instance.
+     *
+     * @param id Latch id.
+     * @param topVer Latch topology version.
+     * @return Latch instance.
+     */
+    public Latch getOrCreate(String id, AffinityTopologyVersion topVer) {
+        lock.lock();
+
+        try {
+            ClusterNode coordinator = getLatchCoordinator(topVer);
+
+            if (coordinator == null) {
+                ClientLatch latch = new ClientLatch(id, 
AffinityTopologyVersion.NONE, null, Collections.emptyList());
+                latch.complete();
+
+                return latch;
+            }
+
+            Collection<ClusterNode> participants = 
getLatchParticipants(topVer);
+
+            return coordinator.isLocal()
+                ? createServerLatch(id, topVer, participants)
+                : createClientLatch(id, topVer, coordinator, participants);
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * @param topVer Latch topology version.
+     * @return Collection of alive server nodes with latch functionality.
+     */
+    private Collection<ClusterNode> 
getLatchParticipants(AffinityTopologyVersion topVer) {
+        Collection<ClusterNode> aliveNodes = topVer == 
AffinityTopologyVersion.NONE
+                ? discovery.aliveServerNodes()
+                : discovery.discoCache(topVer).aliveServerNodes();
+
+        return aliveNodes
+                .stream()
+                .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * @param topVer Latch topology version.
+     * @return Oldest alive server node with latch functionality.
+     */
+    @Nullable private ClusterNode getLatchCoordinator(AffinityTopologyVersion 
topVer) {
+        Collection<ClusterNode> aliveNodes = topVer == 
AffinityTopologyVersion.NONE
+                ? discovery.aliveServerNodes()
+                : discovery.discoCache(topVer).aliveServerNodes();
+
+        return aliveNodes
+                .stream()
+                .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
+                .findFirst()
+                .orElse(null);
+    }
+
+    /**
+     * Processes ack message from given {@code from} node.
+     *
+     * Completes client latch in case of final ack message.
+     *
+     * If no latch is associated with message, ack is placed to {@link 
#pendingAcks} set.
+     *
+     * @param from Node sent ack.
+     * @param message Ack message.
+     */
+    private void processAck(UUID from, LatchAckMessage message) {
+        lock.lock();
+
+        try {
+            ClusterNode coordinator = 
getLatchCoordinator(AffinityTopologyVersion.NONE);
+
+            if (coordinator == null)
+                return;
+
+            T2<String, AffinityTopologyVersion> latchId = new 
T2<>(message.latchId(), message.topVer());
+
+            if (message.isFinal()) {
+                if (log.isDebugEnabled())
+                    log.debug("Process final ack [latch=" + latchId + ", 
from=" + from + "]");
+
+                if (clientLatches.containsKey(latchId)) {
+                    ClientLatch latch = clientLatches.remove(latchId);
+                    latch.complete();
+                }
+                else if (!coordinator.isLocal()) {
+                    pendingAcks.computeIfAbsent(latchId, (id) -> new 
GridConcurrentHashSet<>());
+                    pendingAcks.get(latchId).add(from);
+                }
+            } else {
+                if (log.isDebugEnabled())
+                    log.debug("Process ack [latch=" + latchId + ", from=" + 
from + "]");
+
+                if (serverLatches.containsKey(latchId)) {
+                    ServerLatch latch = serverLatches.get(latchId);
+
+                    if (latch.hasParticipant(from) && !latch.hasAck(from)) {
+                        latch.ack(from);
+
+                        if (latch.isCompleted())
+                            serverLatches.remove(latchId);
+                    }
+                }
+                else {
+                    pendingAcks.computeIfAbsent(latchId, (id) -> new 
GridConcurrentHashSet<>());
+                    pendingAcks.get(latchId).add(from);
+                }
+            }
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Changes coordinator to current local node.
+     * Restores all server latches from pending acks and own client latches.
+     */
+    private void becomeNewCoordinator() {
+        if (log.isInfoEnabled())
+            log.info("Become new coordinator " + coordinator.id());
+
+        List<T2<String, AffinityTopologyVersion>> latchesToRestore = new 
ArrayList<>();
+        latchesToRestore.addAll(pendingAcks.keySet());
+        latchesToRestore.addAll(clientLatches.keySet());
+
+        for (T2<String, AffinityTopologyVersion> latchId : latchesToRestore) {
+            String id = latchId.get1();
+            AffinityTopologyVersion topVer = latchId.get2();
+            Collection<ClusterNode> participants = 
getLatchParticipants(topVer);
+
+            if (!participants.isEmpty())
+                createServerLatch(id, topVer, participants);
+        }
+    }
+
+    /**
+     * Handles node left discovery event.
+     *
+     * Summary:
+     * Removes pending acks corresponds to the left node.
+     * Adds fake acknowledgements to server latches where such node was 
participant.
+     * Changes client latches coordinator to oldest available server node 
where such node was coordinator.
+     * Detects coordinator change.
+     *
+     * @param left Left node.
+     */
+    private void processNodeLeft(ClusterNode left) {
+        assert this.coordinator != null : "Coordinator is not initialized";
+
+        lock.lock();
+
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Process node left " + left.id());
+
+            ClusterNode coordinator = 
getLatchCoordinator(AffinityTopologyVersion.NONE);
+
+            if (coordinator == null)
+                return;
+
+            // Clear pending acks.
+            for (Map.Entry<T2<String, AffinityTopologyVersion>, Set<UUID>> 
ackEntry : pendingAcks.entrySet())
+                if (ackEntry.getValue().contains(left.id()))
+                    pendingAcks.get(ackEntry.getKey()).remove(left.id());
+
+            // Change coordinator for client latches.
+            for (Map.Entry<T2<String, AffinityTopologyVersion>, ClientLatch> 
latchEntry : clientLatches.entrySet()) {
+                ClientLatch latch = latchEntry.getValue();
+                if (latch.hasCoordinator(left.id())) {
+                    // Change coordinator for latch and re-send ack if 
necessary.
+                    if (latch.hasParticipant(coordinator.id()))
+                        latch.newCoordinator(coordinator);
+                    else {
+                        /* If new coordinator is not able to take control on 
the latch,
+                           it means that all other latch participants are left 
from topology
+                           and there is no reason to track such latch. */
+                        AffinityTopologyVersion topVer = 
latchEntry.getKey().get2();
+
+                        assert getLatchParticipants(topVer).isEmpty();
+
+                        latch.complete(new IgniteCheckedException("All latch 
participants are left from topology."));
+                        clientLatches.remove(latchEntry.getKey());
+                    }
+                }
+            }
+
+            // Add acknowledgements from left node.
+            for (Map.Entry<T2<String, AffinityTopologyVersion>, ServerLatch> 
latchEntry : serverLatches.entrySet()) {
+                ServerLatch latch = latchEntry.getValue();
+
+                if (latch.hasParticipant(left.id()) && 
!latch.hasAck(left.id())) {
+                    if (log.isDebugEnabled())
+                        log.debug("Process node left [latch=" + 
latchEntry.getKey() + ", left=" + left.id() + "]");
+
+                    latch.ack(left.id());
+
+                    if (latch.isCompleted())
+                        serverLatches.remove(latchEntry.getKey());
+                }
+            }
+
+            // Coordinator is changed.
+            if (coordinator.isLocal() && this.coordinator.id() != 
coordinator.id()) {
+                this.coordinator = coordinator;
+
+                becomeNewCoordinator();
+            }
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Latch creating on coordinator node.
+     * Latch collects acks from participants: non-coordinator nodes and 
current local node.
+     * Latch completes when all acks from all participants are received.
+     *
+     * After latch completion final ack is sent to all participants.
+     */
+    class ServerLatch extends CompletableLatch {
+        /** Number of latch permits. This is needed to track number of 
countDown invocations. */
+        private final AtomicInteger permits;
+
+        /** Set of received acks. */
+        private final Set<UUID> acks = new GridConcurrentHashSet<>();
+
+        /**
+         * Constructor.
+         *
+         * @param id Latch id.
+         * @param topVer Latch topology version.
+         * @param participants Participant nodes.
+         */
+        ServerLatch(String id, AffinityTopologyVersion topVer, 
Collection<ClusterNode> participants) {
+            super(id, topVer, participants);
+            this.permits = new AtomicInteger(participants.size());
+
+            // Send final acks when latch is completed.
+            this.complete.listen(f -> {
+                for (ClusterNode node : participants) {
+                    try {
+                        if (discovery.alive(node)) {
+                            io.sendToGridTopic(node, GridTopic.TOPIC_EXCHANGE, 
new LatchAckMessage(id, topVer, true), GridIoPolicy.SYSTEM_POOL);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Final ack is ackSent [latch=" + 
latchId() + ", to=" + node.id() + "]");
+                        }
+                    } catch (IgniteCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Unable to send final ack [latch=" + 
latchId() + ", to=" + node.id() + "]");
+                    }
+                }
+            });
+        }
+
+        /**
+         * Checks if latch has ack from given node.
+         *
+         * @param from Node.
+         * @return {@code true} if latch has ack from given node.
+         */
+        private boolean hasAck(UUID from) {
+            return acks.contains(from);
+        }
+
+        /**
+         * Receives ack from given node.
+         * Count downs latch if ack was not already processed.
+         *
+         * @param from Node.
+         */
+        private void ack(UUID from) {
+            if (log.isDebugEnabled())
+                log.debug("Ack is accepted [latch=" + latchId() + ", from=" + 
from + "]");
+
+            countDown0(from);
+        }
+
+        /**
+         * Count down latch from ack of given node.
+         * Completes latch if all acks are received.
+         *
+         * @param node Node.
+         */
+        private void countDown0(UUID node) {
+            if (isCompleted() || acks.contains(node))
+                return;
+
+            acks.add(node);
+
+            int remaining = permits.decrementAndGet();
+
+            if (log.isDebugEnabled())
+                log.debug("Count down + [latch=" + latchId() + ", remaining=" 
+ remaining + "]");
+
+            if (remaining == 0)
+                complete();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void countDown() {
+            countDown0(ctx.localNodeId());
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            Set<UUID> pendingAcks = participants.stream().filter(ack -> 
!acks.contains(ack)).collect(Collectors.toSet());
+
+            return S.toString(ServerLatch.class, this,
+                    "pendingAcks", pendingAcks,
+                    "super", super.toString());
+        }
+    }
+
+    /**
+     * Latch creating on non-coordinator node.
+     * Latch completes when final ack from coordinator is received.
+     */
+    class ClientLatch extends CompletableLatch {
+        /** Latch coordinator node. Can be changed if coordinator is left from 
topology. */
+        private volatile ClusterNode coordinator;
+
+        /** Flag indicates that ack is sent to coordinator. */
+        private boolean ackSent;
+
+        /**
+         * Constructor.
+         *
+         * @param id Latch id.
+         * @param topVer Latch topology version.
+         * @param coordinator Coordinator node.
+         * @param participants Participant nodes.
+         */
+        ClientLatch(String id, AffinityTopologyVersion topVer, ClusterNode 
coordinator, Collection<ClusterNode> participants) {
+            super(id, topVer, participants);
+
+            this.coordinator = coordinator;
+        }
+
+        /**
+         * Checks if latch coordinator is given {@code node}.
+         *
+         * @param node Node.
+         * @return {@code true} if latch coordinator is given node.
+         */
+        private boolean hasCoordinator(UUID node) {
+            return coordinator.id().equals(node);
+        }
+
+        /**
+         * Changes coordinator of latch and resends ack to new coordinator if 
needed.
+         *
+         * @param coordinator New coordinator.
+         */
+        private void newCoordinator(ClusterNode coordinator) {
+            if (log.isDebugEnabled())
+                log.debug("Coordinator is changed [latch=" + latchId() + ", 
crd=" + coordinator.id() + "]");
+
+            synchronized (this) {
+                this.coordinator = coordinator;
+
+                // Resend ack to new coordinator.
+                if (ackSent)
+                    sendAck();
+            }
+        }
+
+        /**
+         * Sends ack to coordinator node.
+         * There is ack deduplication on coordinator. So it's fine to send 
same ack twice.
+         */
+        private void sendAck() {
+            try {
+                ackSent = true;
+
+                io.sendToGridTopic(coordinator, GridTopic.TOPIC_EXCHANGE, new 
LatchAckMessage(id, topVer, false), GridIoPolicy.SYSTEM_POOL);
+
+                if (log.isDebugEnabled())
+                    log.debug("Ack is ackSent + [latch=" + latchId() + ", to=" 
+ coordinator.id() + "]");
+            } catch (IgniteCheckedException e) {
+                // Coordinator is unreachable. On coodinator node left 
discovery event ack will be resent.
+                if (log.isDebugEnabled())
+                    log.debug("Unable to send ack [latch=" + latchId() + ", 
to=" + coordinator.id() + "]: " + e.getMessage());
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void countDown() {
+            if (isCompleted())
+                return;
+
+            // Synchronize in case of changed coordinator.
+            synchronized (this) {
+                sendAck();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ClientLatch.class, this,
+                    "super", super.toString());
+        }
+    }
+
+    /**
+     * Base latch functionality with implemented complete / await logic.
+     */
+    private abstract static class CompletableLatch implements Latch {
+        /** Latch id. */
+        @GridToStringInclude
+        protected final String id;
+
+        /** Latch topology version. */
+        @GridToStringInclude
+        protected final AffinityTopologyVersion topVer;
+
+        /** Latch node participants. Only participant nodes are able to change 
state of latch. */
+        @GridToStringExclude
+        protected final Set<UUID> participants;
+
+        /** Future indicates that latch is completed. */
+        @GridToStringExclude
+        protected final GridFutureAdapter<?> complete = new 
GridFutureAdapter<>();
+
+        /**
+         * Constructor.
+         *
+         * @param id Latch id.
+         * @param topVer Latch topology version.
+         * @param participants Participant nodes.
+         */
+        CompletableLatch(String id, AffinityTopologyVersion topVer, 
Collection<ClusterNode> participants) {
+            this.id = id;
+            this.topVer = topVer;
+            this.participants = 
participants.stream().map(ClusterNode::id).collect(Collectors.toSet());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void await() throws IgniteCheckedException {
+            complete.get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void await(long timeout, TimeUnit timeUnit) throws 
IgniteCheckedException {
+            complete.get(timeout, timeUnit);
+        }
+
+        /**
+         * Checks if latch participants contain given {@code node}.
+         *
+         * @param node Node.
+         * @return {@code true} if latch participants contain given node.
+         */
+        boolean hasParticipant(UUID node) {
+            return participants.contains(node);
+        }
+
+        /**
+         * @return {@code true} if latch is completed.
+         */
+        boolean isCompleted() {
+            return complete.isDone();
+        }
+
+        /**
+         * Completes current latch.
+         */
+        void complete() {
+            complete.onDone();
+        }
+
+        /**
+         * Completes current latch with given {@code error}.
+         *
+         * @param error Error.
+         */
+        void complete(Throwable error) {
+            complete.onDone(error);
+        }
+
+        /**
+         * @return Full latch id.
+         */
+        String latchId() {
+            return id + "-" + topVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CompletableLatch.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java
new file mode 100644
index 0000000..9704c2e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Simple distributed count down latch interface.
+ * Latch supports count down and await logic.
+ * Latch functionality is not relied on caches and has own state management 
{@link ExchangeLatchManager}.
+ */
+public interface Latch {
+    /**
+     * Decrements count on current latch.
+     * Release all latch waiters on all nodes if count reaches zero.
+     *
+     * This is idempotent operation. Invoking this method twice or more on the 
same node doesn't have any effect.
+     */
+    void countDown();
+
+    /**
+     * Awaits current latch completion.
+     *
+     * @throws IgniteCheckedException If await is failed.
+     */
+    void await() throws IgniteCheckedException;
+
+    /**
+     * Awaits current latch completion with specified timeout.
+     *
+     * @param timeout Timeout value.
+     * @param timeUnit Timeout time unit.
+     * @throws IgniteCheckedException If await is failed.
+     */
+    void await(long timeout, TimeUnit timeUnit) throws IgniteCheckedException;
+}

Reply via email to