http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java
new file mode 100644
index 0000000..3e51ff3
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.typedef.internal.GPC;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Class that serves asynchronous partition eviction process.
+ */
+public class GridDhtPartitionsEvictor {
+    /** Show eviction progress frequency in ms. */
+    private static final int SHOW_EVICTION_PROGRESS_FREQ_MS = 2 * 60 * 1000; 
// 2 Minutes.
+
+    /** */
+    private final GridCacheSharedContext<?, ?> ctx;
+
+    /** */
+    private final CacheGroupContext grp;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** Queue contains partitions scheduled for eviction. */
+    private final ConcurrentHashMap<Integer, GridDhtLocalPartition> 
evictionQueue = new ConcurrentHashMap<>();
+
+    /** Flag indicates that eviction process is running at the moment, false 
in other case. */
+    private final AtomicBoolean evictionRunning = new AtomicBoolean();
+
+    /**
+     * Constructor.
+     *
+     * @param grp Cache group context.
+     */
+    public GridDhtPartitionsEvictor(CacheGroupContext grp) {
+        assert grp != null;
+
+        this.grp = grp;
+        this.ctx = grp.shared();
+
+        this.log = ctx.logger(getClass());
+    }
+
+    /**
+     * Adds partition to eviction queue and starts eviction process.
+     *
+     * @param part Partition to evict.
+     */
+    public void evictPartitionAsync(GridDhtLocalPartition part) {
+        evictionQueue.putIfAbsent(part.id(), part);
+
+        if (evictionRunning.compareAndSet(false, true)) {
+            ctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
+                @Override public Boolean call() {
+                    boolean locked = true;
+
+                    long nextShowProgressTime = U.currentTimeMillis() + 
SHOW_EVICTION_PROGRESS_FREQ_MS;
+
+                    while (locked || !evictionQueue.isEmpty()) {
+                        if (!locked && !evictionRunning.compareAndSet(false, 
true))
+                            return false;
+
+                        try {
+                            for (GridDhtLocalPartition part : 
evictionQueue.values()) {
+                                // Show progress of currently evicting 
partitions.
+                                if (U.currentTimeMillis() >= 
nextShowProgressTime) {
+                                    if (log.isInfoEnabled())
+                                        log.info("Eviction in progress [grp=" 
+ grp.cacheOrGroupName()
+                                                + ", remainingCnt=" + 
evictionQueue.size() + "]");
+
+                                    nextShowProgressTime = 
U.currentTimeMillis() + SHOW_EVICTION_PROGRESS_FREQ_MS;
+                                }
+
+                                try {
+                                    boolean success = part.tryClear();
+
+                                    if (success)
+                                        evictionQueue.remove(part.id());
+                                }
+                                catch (Throwable ex) {
+                                    if (ctx.kernalContext().isStopping()) {
+                                        LT.warn(log, ex, "Partition eviction 
failed (current node is stopping).",
+                                                false,
+                                                true);
+
+                                        evictionQueue.clear();
+
+                                        return true;
+                                    }
+                                    else
+                                        LT.error(log, ex, "Partition eviction 
failed, this can cause grid hang.");
+                                }
+                            }
+                        }
+                        finally {
+                            if (!evictionQueue.isEmpty()) {
+                                if (ctx.kernalContext().isStopping()) {
+                                    evictionQueue.clear();
+
+                                    locked = false;
+                                }
+                                else
+                                    locked = true;
+                            }
+                            else {
+                                boolean res = 
evictionRunning.compareAndSet(true, false);
+
+                                assert res;
+
+                                locked = false;
+                            }
+                        }
+                    }
+
+                    return true;
+                }
+            }, /*system pool*/ true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
index de58188..97bd16b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
@@ -189,7 +189,7 @@ public class GridDhtPartitionsReservation implements 
GridReservable {
      */
     private static void tryEvict(GridDhtLocalPartition part) {
         if (part.state() == RENTING && part.reservations() == 0)
-            part.tryEvictAsync(true);
+            part.tryContinueClearing();
     }
 
     /**
@@ -227,8 +227,6 @@ public class GridDhtPartitionsReservation implements 
GridReservable {
                 GridDhtLocalPartition part = arr[i];
 
                 part.removeReservation(this);
-
-                tryEvict(part);
             }
         }
 
@@ -240,7 +238,7 @@ public class GridDhtPartitionsReservation implements 
GridReservable {
     }
 
     /**
-     * Must be checked in {@link GridDhtLocalPartition#tryEvict()}.
+     * Must be checked in {@link GridDhtLocalPartition#tryClear()}.
      * If returns {@code true} this reservation object becomes invalid and 
partitions
      * can be evicted or at least cleared.
      * Also this means that after returning {@code true} here method {@link 
#reserve()} can not

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
index ebc993c..2d5eec3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Set;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
@@ -93,6 +94,27 @@ public class CachePartitionFullCountersMap implements 
Serializable {
     }
 
     /**
+     * Creates submap for provided partition IDs.
+     *
+     * @param parts Partition IDs.
+     * @return Partial counters map.
+     */
+    public CachePartitionPartialCountersMap subMap(Set<Integer> parts) {
+        CachePartitionPartialCountersMap res = new 
CachePartitionPartialCountersMap(parts.size());
+
+        for (int p = 0; p < updCntrs.length; p++) {
+            if (!parts.contains(p))
+                continue;
+
+            res.add(p, initialUpdCntrs[p], updCntrs[p]);
+        }
+
+        assert res.size() == parts.size();
+
+        return res;
+    }
+
+    /**
      * Clears full counters map.
      */
     public void clear() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
index c0de7cf..c8cf3f8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
@@ -73,6 +73,13 @@ public class CachePartitionPartialCountersMap implements 
Serializable {
     }
 
     /**
+     * @return {@code True} if map is empty.
+     */
+    public boolean isEmpty() {
+        return curIdx == 0;
+    }
+
+    /**
      * Adds partition counters for a partition with the given ID.
      *
      * @param partId Partition ID to add.
@@ -97,11 +104,38 @@ public class CachePartitionPartialCountersMap implements 
Serializable {
     }
 
     /**
+     * Removes element.
+     *
+     * @param partId Partition ID.
+     * @return {@code True} if element was actually removed.
+     */
+    public boolean remove(int partId) {
+        int removedIdx = partitionIndex(partId);
+
+        if (removedIdx < 0)
+            return false;
+
+        int lastIdx = --curIdx;
+
+        for (int i = removedIdx; i < lastIdx; i++) {
+            partIds[i] = partIds[i + 1];
+            initialUpdCntrs[i] = initialUpdCntrs[i + 1];
+            updCntrs[i] = updCntrs[i + 1];
+        }
+
+        partIds[lastIdx] = 0;
+        initialUpdCntrs[lastIdx] = 0;
+        updCntrs[lastIdx] = 0;
+
+        return true;
+    }
+
+    /**
      * Cuts the array sizes according to curIdx. No more entries can be added 
to this map
      * after this method is called.
      */
     public void trim() {
-        if (curIdx < partIds.length) {
+        if (partIds != null && curIdx < partIds.length) {
             partIds = Arrays.copyOf(partIds, curIdx);
             initialUpdCntrs = Arrays.copyOf(initialUpdCntrs, curIdx);
             updCntrs = Arrays.copyOf(updCntrs, curIdx);
@@ -117,6 +151,14 @@ public class CachePartitionPartialCountersMap implements 
Serializable {
     }
 
     /**
+     * @param partId Partition ID.
+     * @return {@code True} if partition is present in map.
+     */
+    public boolean contains(int partId) {
+        return partitionIndex(partId) >= 0;
+    }
+
+    /**
      * Gets partition ID saved at the given index.
      *
      * @param idx Index to get value from.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandLegacyMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandLegacyMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandLegacyMessage.java
new file mode 100644
index 0000000..a71aabf
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandLegacyMessage.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Partition demand request.
+ */
+public class GridDhtPartitionDemandLegacyMessage extends 
GridCacheGroupIdMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Update sequence. */
+    private long updateSeq;
+
+    /** Partition. */
+    @GridDirectCollection(int.class)
+    private Collection<Integer> parts;
+
+    /** Partitions that must be restored from history. */
+    @GridDirectCollection(int.class)
+    private Collection<Integer> historicalParts;
+
+    /** Partition counters. */
+    @GridDirectMap(keyType = int.class, valueType = long.class)
+    private Map<Integer, Long> partsCntrs;
+
+    /** Topic. */
+    @GridDirectTransient
+    private Object topic;
+
+    /** Serialized topic. */
+    private byte[] topicBytes;
+
+    /** Timeout. */
+    private long timeout;
+
+    /** Worker ID. */
+    private int workerId = -1;
+
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
+    /**
+     * @param updateSeq Update sequence for this node.
+     * @param topVer Topology version.
+     * @param grpId Cache group ID.
+     */
+    GridDhtPartitionDemandLegacyMessage(long updateSeq, @NotNull 
AffinityTopologyVersion topVer, int grpId) {
+        this.grpId = grpId;
+        this.updateSeq = updateSeq;
+        this.topVer = topVer;
+    }
+
+    /**
+     * @param cp Message to copy from.
+     * @param parts Partitions.
+     */
+    GridDhtPartitionDemandLegacyMessage(GridDhtPartitionDemandLegacyMessage 
cp, Collection<Integer> parts,
+        Map<Integer, Long> partsCntrs) {
+        grpId = cp.grpId;
+        updateSeq = cp.updateSeq;
+        topic = cp.topic;
+        timeout = cp.timeout;
+        workerId = cp.workerId;
+        topVer = cp.topVer;
+
+        // Create a copy of passed in collection since it can be modified when 
this message is being sent.
+        this.parts = new HashSet<>(parts);
+        this.partsCntrs = partsCntrs;
+
+        if (cp.historicalParts != null)
+            historicalParts = new HashSet<>(cp.historicalParts);
+    }
+
+    GridDhtPartitionDemandLegacyMessage(GridDhtPartitionDemandMessage cp) {
+        grpId = cp.groupId();
+        updateSeq = cp.rebalanceId() < 0 ? -1 : cp.rebalanceId();
+        topic = cp.topic();
+        timeout = cp.timeout();
+        workerId = cp.workerId();
+        topVer = cp.topologyVersion();
+
+        parts = new HashSet<>(cp.partitions().size());
+
+        parts.addAll(cp.partitions().fullSet());
+
+        CachePartitionPartialCountersMap histMap = 
cp.partitions().historicalMap();
+
+        if (!histMap.isEmpty()) {
+            historicalParts = new HashSet<>(histMap.size());
+
+            for (int i = 0; i < histMap.size(); i++) {
+                int p = histMap.partitionAt(i);
+
+                parts.add(p);
+                historicalParts.add(p);
+                partsCntrs.put(p, histMap.updateCounterAt(i));
+            }
+        }
+    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridDhtPartitionDemandLegacyMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param p Partition.
+     */
+    void addPartition(int p, boolean historical) {
+        if (parts == null)
+            parts = new HashSet<>();
+
+        parts.add(p);
+
+        if (historical) {
+            if (historicalParts == null)
+                historicalParts = new HashSet<>();
+
+            historicalParts.add(p);
+        }
+    }
+
+    /**
+     * @return Partition.
+     */
+    Collection<Integer> partitions() {
+        return parts;
+    }
+
+    /**
+     * @param p Partition to check.
+     * @return {@code True} if historical.
+     */
+    boolean isHistorical(int p) {
+        if (historicalParts == null)
+            return false;
+
+        return historicalParts.contains(p);
+    }
+
+    /**
+     * @param updateSeq Update sequence.
+     */
+    void updateSequence(long updateSeq) {
+        this.updateSeq = updateSeq;
+    }
+
+    /**
+     * @return Update sequence.
+     */
+    long updateSequence() {
+        return updateSeq;
+    }
+
+    /**
+     * @return Reply message timeout.
+     */
+    long timeout() {
+        return timeout;
+    }
+
+    /**
+     * @param timeout Timeout.
+     */
+    void timeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    /**
+     * @return Topic.
+     */
+    Object topic() {
+        return topic;
+    }
+
+    /**
+     * @param topic Topic.
+     */
+    void topic(Object topic) {
+        this.topic = topic;
+    }
+
+    /**
+     * @return Worker ID.
+     */
+    int workerId() {
+        return workerId;
+    }
+
+    /**
+     * @param workerId Worker ID.
+     */
+    void workerId(int workerId) {
+        this.workerId = workerId;
+    }
+
+    /**
+     * @param part Partition to get counter for.
+     * @return Partition counter associated with this partition or {@code 
null} if this information is unavailable.
+     */
+    Long partitionCounter(int part) {
+        return partsCntrs == null ? null : partsCntrs.get(part);
+    }
+
+    /**
+     * @return Topology version for which demand message is sent.
+     */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc}
+     * @param ctx*/
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws 
IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        if (topic != null && topicBytes == null)
+            topicBytes = U.marshal(ctx, topic);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, 
ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        if (topicBytes != null && topic == null)
+            topic = U.unmarshal(ctx, topicBytes, U.resolveClassLoader(ldr, 
ctx.gridConfig()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeCollection("historicalParts", 
historicalParts, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeCollection("parts", parts, 
MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeMap("partsCntrs", partsCntrs, 
MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeLong("timeout", timeout))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeByteArray("topicBytes", topicBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeLong("updateSeq", updateSeq))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeInt("workerId", workerId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                historicalParts = reader.readCollection("historicalParts", 
MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                parts = reader.readCollection("parts", 
MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                partsCntrs = reader.readMap("partsCntrs", 
MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                timeout = reader.readLong("timeout");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                topicBytes = reader.readByteArray("topicBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                updateSeq = reader.readLong("updateSeq");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                workerId = reader.readInt("workerId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridDhtPartitionDemandMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 44;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 11;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtPartitionDemandLegacyMessage.class, this,
+            "partCnt", parts != null ? parts.size() : 0,
+            "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 4a693bf..4fba917 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -19,19 +19,15 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
@@ -43,20 +39,18 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Update sequence. */
-    private long updateSeq;
+    /** */
+    public static final IgniteProductVersion VERSION_SINCE = 
IgniteProductVersion.fromString("2.4.0");
 
-    /** Partition. */
-    @GridDirectCollection(int.class)
-    private Collection<Integer> parts;
+    /** Rebalance id. */
+    private long rebalanceId;
 
-    /** Partitions that must be restored from history. */
-    @GridDirectCollection(int.class)
-    private Collection<Integer> historicalParts;
+    /** Partitions map. */
+    @GridDirectTransient
+    private IgniteDhtDemandedPartitionsMap parts;
 
-    /** Partition counters. */
-    @GridDirectMap(keyType = int.class, valueType = long.class)
-    private Map<Integer, Long> partsCntrs;
+    /** Serialized partitions map. */
+    private byte[] partsBytes;
 
     /** Topic. */
     @GridDirectTransient
@@ -75,35 +69,43 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
     private AffinityTopologyVersion topVer;
 
     /**
-     * @param updateSeq Update sequence for this node.
+     * @param rebalanceId Rebalance id for this node.
      * @param topVer Topology version.
      * @param grpId Cache group ID.
      */
-    GridDhtPartitionDemandMessage(long updateSeq, @NotNull 
AffinityTopologyVersion topVer, int grpId) {
+    GridDhtPartitionDemandMessage(long rebalanceId, @NotNull 
AffinityTopologyVersion topVer, int grpId) {
         this.grpId = grpId;
-        this.updateSeq = updateSeq;
+        this.rebalanceId = rebalanceId;
         this.topVer = topVer;
+
+        parts = new IgniteDhtDemandedPartitionsMap();
     }
 
     /**
      * @param cp Message to copy from.
-     * @param parts Partitions.
      */
-    GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, 
Collection<Integer> parts,
-        Map<Integer, Long> partsCntrs) {
-        grpId = cp.grpId;
-        updateSeq = cp.updateSeq;
-        topic = cp.topic;
-        timeout = cp.timeout;
-        workerId = cp.workerId;
-        topVer = cp.topVer;
-
-        // Create a copy of passed in collection since it can be modified when 
this message is being sent.
-        this.parts = new HashSet<>(parts);
-        this.partsCntrs = partsCntrs;
-
-        if (cp.historicalParts != null)
-            historicalParts = new HashSet<>(cp.historicalParts);
+    public GridDhtPartitionDemandMessage(GridDhtPartitionDemandLegacyMessage 
cp) {
+        grpId = cp.groupId();
+        rebalanceId = cp.updateSequence();
+        topic = cp.topic();
+        timeout = cp.timeout();
+        workerId = cp.workerId();
+        topVer = cp.topologyVersion();
+
+        IgniteDhtDemandedPartitionsMap partMap = new 
IgniteDhtDemandedPartitionsMap();
+
+        if (cp.partitions() != null) {
+            for (Integer p : cp.partitions()) {
+                if (cp.isHistorical(p))
+                    partMap.addHistorical(p, 0, cp.partitionCounter(p), 
cp.partitions().size());
+                else
+                    partMap.addFull(p);
+            }
+        }
+
+        partMap.historicalMap().trim();
+
+        parts = partMap;
     }
 
     /**
@@ -114,52 +116,42 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
     }
 
     /**
-     * @param p Partition.
+     * Creates copy of this message with new partitions map.
+     *
+     * @param parts New partitions map.
+     * @return Copy of message with new partitions map.
      */
-    void addPartition(int p, boolean historical) {
-        if (parts == null)
-            parts = new HashSet<>();
-
-        parts.add(p);
-
-        if (historical) {
-            if (historicalParts == null)
-                historicalParts = new HashSet<>();
-
-            historicalParts.add(p);
-        }
+    public GridDhtPartitionDemandMessage withNewPartitionsMap(@NotNull 
IgniteDhtDemandedPartitionsMap parts) {
+        GridDhtPartitionDemandMessage cp = new GridDhtPartitionDemandMessage();
+        cp.grpId = grpId;
+        cp.rebalanceId = rebalanceId;
+        cp.topic = topic;
+        cp.timeout = timeout;
+        cp.workerId = workerId;
+        cp.topVer = topVer;
+        cp.parts = parts;
+        return cp;
     }
 
     /**
      * @return Partition.
      */
-    Collection<Integer> partitions() {
+    IgniteDhtDemandedPartitionsMap partitions() {
         return parts;
     }
 
     /**
-     * @param p Partition to check.
-     * @return {@code True} if historical.
-     */
-    boolean isHistorical(int p) {
-        if (historicalParts == null)
-            return false;
-
-        return historicalParts.contains(p);
-    }
-
-    /**
      * @param updateSeq Update sequence.
      */
-    void updateSequence(long updateSeq) {
-        this.updateSeq = updateSeq;
+    void rebalanceId(long updateSeq) {
+        this.rebalanceId = updateSeq;
     }
 
     /**
-     * @return Update sequence.
+     * @return Unique rebalance session id.
      */
-    long updateSequence() {
-        return updateSeq;
+    long rebalanceId() {
+        return rebalanceId;
     }
 
     /**
@@ -205,18 +197,23 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
     }
 
     /**
-     * @param part Partition to get counter for.
-     * @return Partition counter associated with this partition or {@code 
null} if this information is unavailable.
+     * @return Topology version for which demand message is sent.
      */
-    Long partitionCounter(int part) {
-        return partsCntrs == null ? null : partsCntrs.get(part);
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
     }
 
     /**
-     * @return Topology version for which demand message is sent.
+     * Converts message to it's legacy version if necessary.
+     *
+     * @param target Target version
+     * @return Converted message or {@code this} if conversion isn't necessary.
      */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return topVer;
+    public GridCacheMessage convertIfNeeded(IgniteProductVersion target) {
+        if (target.compareTo(VERSION_SINCE) <= 0)
+            return new GridDhtPartitionDemandLegacyMessage(this);
+
+        return this;
     }
 
     /** {@inheritDoc}
@@ -226,6 +223,9 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
 
         if (topic != null && topicBytes == null)
             topicBytes = U.marshal(ctx, topic);
+
+        if (parts != null && partsBytes == null)
+            partsBytes = U.marshal(ctx, parts);
     }
 
     /** {@inheritDoc} */
@@ -234,6 +234,9 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
 
         if (topicBytes != null && topic == null)
             topic = U.unmarshal(ctx, topicBytes, U.resolveClassLoader(ldr, 
ctx.gridConfig()));
+
+        if (partsBytes != null && parts == null)
+            parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, 
ctx.gridConfig()));
     }
 
     /** {@inheritDoc} */
@@ -257,48 +260,36 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeCollection("historicalParts", 
historicalParts, MessageCollectionItemType.INT))
+                if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeCollection("parts", parts, 
MessageCollectionItemType.INT))
+                if (!writer.writeLong("timeout", timeout))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeMap("partsCntrs", partsCntrs, 
MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeLong("timeout", timeout))
+                if (!writer.writeByteArray("topicBytes", topicBytes))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeLong("rebalanceId", rebalanceId))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeByteArray("topicBytes", topicBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 9:
-                if (!writer.writeLong("updateSeq", updateSeq))
-                    return false;
-
-                writer.incrementState();
-
-            case 10:
                 if (!writer.writeInt("workerId", workerId))
                     return false;
 
@@ -321,7 +312,7 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
 
         switch (reader.state()) {
             case 3:
-                historicalParts = reader.readCollection("historicalParts", 
MessageCollectionItemType.INT);
+                partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -329,7 +320,7 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
                 reader.incrementState();
 
             case 4:
-                parts = reader.readCollection("parts", 
MessageCollectionItemType.INT);
+                timeout = reader.readLong("timeout");
 
                 if (!reader.isLastRead())
                     return false;
@@ -337,7 +328,7 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
                 reader.incrementState();
 
             case 5:
-                partsCntrs = reader.readMap("partsCntrs", 
MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false);
+                topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -345,7 +336,7 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
                 reader.incrementState();
 
             case 6:
-                timeout = reader.readLong("timeout");
+                topicBytes = reader.readByteArray("topicBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -353,7 +344,7 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
                 reader.incrementState();
 
             case 7:
-                topVer = reader.readMessage("topVer");
+                rebalanceId = reader.readLong("rebalanceId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -361,22 +352,6 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
                 reader.incrementState();
 
             case 8:
-                topicBytes = reader.readByteArray("topicBytes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 9:
-                updateSeq = reader.readLong("updateSeq");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 10:
                 workerId = reader.readInt("workerId");
 
                 if (!reader.isLastRead())
@@ -391,12 +366,12 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
 
     /** {@inheritDoc} */
     @Override public short directType() {
-        return 44;
+        return 45;
     }
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 9;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index d09312f..734bbaa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -19,7 +19,6 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -27,12 +26,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -251,41 +252,40 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @param assigns Assignments.
+     * Initiates new rebalance process from given {@code assignments}.
+     * If previous rebalance is not finished method cancels it.
+     * In case of delayed rebalance method schedules new with configured delay.
+     *
+     * @param assignments Assignments.
      * @param force {@code True} if dummy reassign.
-     * @param cnt Counter.
+     * @param rebalanceId Rebalance id.
      * @param next Runnable responsible for cache rebalancing start.
      * @param forcedRebFut External future for forced rebalance.
      * @return Rebalancing runnable.
      */
     Runnable addAssignments(
-        final GridDhtPreloaderAssignments assigns,
+        final GridDhtPreloaderAssignments assignments,
         boolean force,
-        int cnt,
+        long rebalanceId,
         final Runnable next,
         @Nullable final GridCompoundFuture<Boolean, Boolean> forcedRebFut
     ) {
         if (log.isDebugEnabled())
-            log.debug("Adding partition assignments: " + assigns);
+            log.debug("Adding partition assignments: " + assignments);
 
         assert force == (forcedRebFut != null);
 
         long delay = grp.config().getRebalanceDelay();
 
-        if ((delay == 0 || force) && assigns != null) {
+        if ((delay == 0 || force) && assignments != null) {
             final RebalanceFuture oldFut = rebalanceFut;
 
-            final RebalanceFuture fut = new RebalanceFuture(grp, assigns, log, 
cnt);
+            final RebalanceFuture fut = new RebalanceFuture(grp, assignments, 
log, rebalanceId);
 
             if (!oldFut.isInitial())
                 oldFut.cancel();
-            else {
-                fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                    @Override public void apply(IgniteInternalFuture<Boolean> 
fut) {
-                        oldFut.onDone(fut.result());
-                    }
-                });
-            }
+            else
+                fut.listen(f -> oldFut.onDone(f.result()));
 
             if (forcedRebFut != null)
                 forcedRebFut.add(fut);
@@ -300,17 +300,13 @@ public class GridDhtPartitionDemander {
 
                     metrics.startRebalance(0);
 
-                    rebalanceFut.listen(new 
IgniteInClosure<IgniteInternalFuture<Boolean>>() {
-                        @Override public void 
apply(IgniteInternalFuture<Boolean> fut) {
-                            metrics.clearRebalanceCounters();
-                        }
-                    });
+                    rebalanceFut.listen(f -> metrics.clearRebalanceCounters());
                 }
             }
 
             fut.sendRebalanceStartedEvent();
 
-            if (assigns.cancelled()) { // Pending exchange.
+            if (assignments.cancelled()) { // Pending exchange.
                 if (log.isDebugEnabled())
                     log.debug("Rebalancing skipped due to cancelled 
assignments.");
 
@@ -321,7 +317,7 @@ public class GridDhtPartitionDemander {
                 return null;
             }
 
-            if (assigns.isEmpty()) { // Nothing to rebalance.
+            if (assignments.isEmpty()) { // Nothing to rebalance.
                 if (log.isDebugEnabled())
                     log.debug("Rebalancing skipped due to empty assignments.");
 
@@ -334,24 +330,20 @@ public class GridDhtPartitionDemander {
                 return null;
             }
 
-            return new Runnable() {
-                @Override public void run() {
-                    if (next != null)
-                        fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                            @Override public void 
apply(IgniteInternalFuture<Boolean> f) {
-                                try {
-                                    if (f.get()) // Not cancelled.
-                                        next.run(); // Starts next cache 
rebalancing (according to the order).
-                                }
-                                catch (IgniteCheckedException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug(e.getMessage());
-                                }
-                            }
-                        });
+            return () -> {
+                if (next != null)
+                    fut.listen(f -> {
+                        try {
+                            if (f.get()) // Not cancelled.
+                                next.run(); // Starts next cache rebalancing 
(according to the order).
+                        }
+                        catch (IgniteCheckedException e) {
+                            if (log.isDebugEnabled())
+                                log.debug(e.getMessage());
+                        }
+                    });
 
-                    requestPartitions(fut, assigns);
-                }
+                requestPartitions(fut, assignments);
             };
         }
         else if (delay > 0) {
@@ -391,10 +383,20 @@ public class GridDhtPartitionDemander {
     }
 
     /**
+     * Asynchronously sends initial demand messages formed from {@code 
assignments} and initiates supply-demand processes.
+     *
+     * For each node participating in rebalance process method distributes set 
of partitions for that node to several stripes (topics).
+     * It means that each stripe containing a subset of partitions can be 
processed in parallel.
+     * The number of stripes are controlled by {@link 
IgniteConfiguration#getRebalanceThreadPoolSize()} property.
+     *
+     * Partitions that can be rebalanced using only WAL are called historical, 
others are called full.
+     *
+     * Before sending messages, method awaits partitions clearing for full 
partitions.
+     *
      * @param fut Rebalance future.
-     * @param assigns Assignments.
+     * @param assignments Assignments.
      */
-    private void requestPartitions(final RebalanceFuture fut, 
GridDhtPreloaderAssignments assigns) {
+    private void requestPartitions(final RebalanceFuture fut, 
GridDhtPreloaderAssignments assignments) {
         assert fut != null;
 
         if (topologyChanged(fut)) {
@@ -418,10 +420,10 @@ public class GridDhtPartitionDemander {
 
             // Must add all remaining node before send first request, for 
avoid race between add remaining node and
             // processing response, see checkIsDone(boolean).
-            for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : 
assigns.entrySet()) {
+            for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : 
assignments.entrySet()) {
                 UUID nodeId = e.getKey().id();
 
-                Collection<Integer> parts= e.getValue().partitions();
+                IgniteDhtDemandedPartitionsMap parts = 
e.getValue().partitions();
 
                 assert parts != null : "Partitions are null [grp=" + 
grp.cacheOrGroupName() + ", fromNode=" + nodeId + "]";
 
@@ -431,193 +433,217 @@ public class GridDhtPartitionDemander {
 
         final CacheConfiguration cfg = grp.config();
 
-        int lsnrCnt = ctx.gridConfig().getRebalanceThreadPoolSize();
-
-        for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : 
assigns.entrySet()) {
+        for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : 
assignments.entrySet()) {
             final ClusterNode node = e.getKey();
 
             GridDhtPartitionDemandMessage d = e.getValue();
 
-            final Collection<Integer> parts = d.partitions();
-
-            U.log(log, "Starting rebalancing [mode=" + cfg.getRebalanceMode() +
-                ", fromNode=" + node.id() + ", partitionsCount=" + 
parts.size() +
-                ", topology=" + fut.topologyVersion() + ", updateSeq=" + 
fut.updateSeq + "]");
+            final IgniteDhtDemandedPartitionsMap parts;
+            synchronized (fut) { // Synchronized to prevent consistency issues 
in case of parallel cancellation.
+                if (fut.isDone())
+                    break;
 
-            final List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+                parts = fut.remaining.get(node.id()).get2();
 
-            for (int cnt = 0; cnt < lsnrCnt; cnt++)
-                sParts.add(new HashSet<Integer>());
+                U.log(log, "Starting rebalancing [grp=" + 
grp.cacheOrGroupName()
+                        + ", mode=" + cfg.getRebalanceMode() + ", fromNode=" + 
node.id() + ", partitionsCount=" + parts.size()
+                        + ", topology=" + fut.topologyVersion() + ", 
rebalanceId=" + fut.rebalanceId + "]");
+            }
 
-            Iterator<Integer> it = parts.iterator();
+            int stripes = ctx.gridConfig().getRebalanceThreadPoolSize();
 
-            int cnt = 0;
+            final List<IgniteDhtDemandedPartitionsMap> stripePartitions = new 
ArrayList<>(stripes);
+            for (int i = 0; i < stripes; i++)
+                stripePartitions.add(new IgniteDhtDemandedPartitionsMap());
 
-            while (it.hasNext())
-                sParts.get(cnt++ % lsnrCnt).add(it.next());
+            // Reserve one stripe for historical partitions.
+            if (parts.hasHistorical()) {
+                stripePartitions.add(stripes - 1, new 
IgniteDhtDemandedPartitionsMap(parts.historicalMap(), null));
 
-            for (cnt = 0; cnt < lsnrCnt; cnt++) {
-                if (!sParts.get(cnt).isEmpty()) {
-                    // Create copy.
-                    final GridDhtPartitionDemandMessage initD = 
createDemandMessage(d, sParts.get(cnt));
+                if (stripes > 1)
+                    stripes--;
+            }
 
-                    initD.topic(rebalanceTopics.get(cnt));
-                    initD.updateSequence(fut.updateSeq);
-                    initD.timeout(cfg.getRebalanceTimeout());
+            // Distribute full partitions across other stripes.
+            Iterator<Integer> it = parts.fullSet().iterator();
+            for (int i = 0; it.hasNext(); i++)
+                stripePartitions.get(i % stripes).addFull(it.next());
 
-                    final int finalCnt = cnt;
+            for (int stripe = 0; stripe < stripes; stripe++) {
+                if (!stripePartitions.get(stripe).isEmpty()) {
+                    // Create copy of demand message with new striped 
partitions map.
+                    final GridDhtPartitionDemandMessage demandMsg = 
d.withNewPartitionsMap(stripePartitions.get(stripe));
 
-                    ctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                        @Override public void run() {
-                            try {
-                                if (!fut.isDone()) {
-                                    ctx.io().sendOrderedMessage(node, 
rebalanceTopics.get(finalCnt), initD, grp.ioPolicy(), initD.timeout());
+                    demandMsg.topic(rebalanceTopics.get(stripe));
+                    demandMsg.rebalanceId(fut.rebalanceId);
+                    demandMsg.timeout(cfg.getRebalanceTimeout());
 
-                                    // Cleanup required in case partitions 
demanded in parallel with cancellation.
-                                    synchronized (fut) {
-                                        if (fut.isDone())
-                                            
fut.cleanupRemoteContexts(node.id());
-                                    }
+                    final int topicId = stripe;
 
-                                    if (log.isDebugEnabled())
-                                        log.debug("Requested rebalancing [from 
node=" + node.id() + ", listener index=" +
-                                            finalCnt + ", partitions count=" + 
sParts.get(finalCnt).size() +
-                                            " (" + 
partitionsList(sParts.get(finalCnt)) + ")]");
+                    Runnable initDemandRequestTask = () -> {
+                        try {
+                            if (!fut.isDone()) {
+                                ctx.io().sendOrderedMessage(node, 
rebalanceTopics.get(topicId),
+                                        
demandMsg.convertIfNeeded(node.version()), grp.ioPolicy(), demandMsg.timeout());
+
+                                // Cleanup required in case partitions 
demanded in parallel with cancellation.
+                                synchronized (fut) {
+                                    if (fut.isDone())
+                                        fut.cleanupRemoteContexts(node.id());
                                 }
-                            }
-                            catch (IgniteCheckedException e) {
-                                ClusterTopologyCheckedException cause = 
e.getCause(ClusterTopologyCheckedException.class);
 
-                                if (cause != null)
-                                    log.warning("Failed to send initial demand 
request to node. " + e.getMessage());
-                                else
-                                    log.error("Failed to send initial demand 
request to node.", e);
-
-                                fut.cancel();
-                            }
-                            catch (Throwable th) {
-                                log.error("Runtime error caught during initial 
demand request sending.", th);
-
-                                fut.cancel();
+                                if (log.isDebugEnabled())
+                                    log.debug("Requested rebalancing [from 
node=" + node.id() + ", listener index=" +
+                                            topicId + ", partitions count=" + 
stripePartitions.get(topicId).size() +
+                                            " (" + 
stripePartitions.get(topicId).partitionsList() + ")]");
                             }
                         }
-                    }, /*system pool*/true);
-                }
-            }
-        }
-    }
+                        catch (IgniteCheckedException e1) {
+                            ClusterTopologyCheckedException cause = 
e1.getCause(ClusterTopologyCheckedException.class);
 
-    /**
-     * @param old Old message.
-     * @param parts Partitions to demand.
-     * @return New demand message.
-     */
-    private GridDhtPartitionDemandMessage 
createDemandMessage(GridDhtPartitionDemandMessage old,
-        Collection<Integer> parts) {
-        Map<Integer, Long> partCntrs = null;
+                            if (cause != null)
+                                log.warning("Failed to send initial demand 
request to node. " + e1.getMessage());
+                            else
+                                log.error("Failed to send initial demand 
request to node.", e1);
 
-        for (Integer part : parts) {
-            try {
-                if (grp.persistenceEnabled()) {
-                    if (partCntrs == null)
-                        partCntrs = new HashMap<>(parts.size(), 1.0f);
+                            fut.cancel();
+                        }
+                        catch (Throwable th) {
+                            log.error("Runtime error caught during initial 
demand request sending.", th);
 
-                    GridDhtLocalPartition p = 
grp.topology().localPartition(part, old.topologyVersion(), false);
+                            fut.cancel();
+                        }
+                    };
 
-                    partCntrs.put(part, p.initialUpdateCounter());
+                    awaitClearingAndStartRebalance(fut, demandMsg, 
initDemandRequestTask);
                 }
             }
-            catch (GridDhtInvalidPartitionException ignore) {
-                // Skip this partition.
-            }
         }
-
-        return new GridDhtPartitionDemandMessage(old, parts, partCntrs);
     }
 
     /**
-     * @param c Partitions.
-     * @return String representation of partitions list.
+     * Awaits partitions clearing for full partitions and sends initial demand 
request
+     * after all partitions are cleared and safe to consume data.
+     *
+     * @param fut Rebalance future.
+     * @param demandMessage Initial demand message which contains set of full 
partitions to await.
+     * @param initDemandRequestTask Task which sends initial demand request.
      */
-    private String partitionsList(Collection<Integer> c) {
-        List<Integer> s = new ArrayList<>(c);
+    private void awaitClearingAndStartRebalance(RebalanceFuture fut,
+                                                GridDhtPartitionDemandMessage 
demandMessage,
+                                                Runnable 
initDemandRequestTask) {
+        Set<Integer> fullPartitions = demandMessage.partitions().fullSet();
 
-        Collections.sort(s);
+        if (fullPartitions.isEmpty()) {
+            ctx.kernalContext().closure().runLocalSafe(initDemandRequestTask, 
true);
 
-        StringBuilder sb = new StringBuilder();
+            return;
+        }
 
-        int start = -1;
+        for (GridCacheContext cctx : grp.caches()) {
+            if (cctx.statisticsEnabled()) {
+                final CacheMetricsImpl metrics = cctx.cache().metrics0();
 
-        int prev = -1;
+                metrics.rebalanceClearingPartitions(fullPartitions.size());
+            }
+        }
 
-        Iterator<Integer> sit = s.iterator();
+        final AtomicInteger clearingPartitions = new 
AtomicInteger(fullPartitions.size());
 
-        while (sit.hasNext()) {
-            int p = sit.next();
+        for (int partId : fullPartitions) {
+            if (fut.isDone())
+                return;
 
-            if (start == -1) {
-                start = p;
-                prev = p;
-            }
+            GridDhtLocalPartition part = grp.topology().localPartition(partId);
 
-            if (prev < p - 1) {
-                sb.append(start);
+            if (part != null && part.state() == MOVING) {
+                part.onClearFinished(f -> {
+                    // Cancel rebalance if partition clearing was failed.
+                    if (f.error() != null) {
+                        if (!fut.isDone()) {
+                            for (GridCacheContext cctx : grp.caches()) {
+                                if (cctx.statisticsEnabled()) {
+                                    final CacheMetricsImpl metrics = 
cctx.cache().metrics0();
 
-                if (start != prev)
-                    sb.append("-").append(prev);
+                                    metrics.rebalanceClearingPartitions(0);
+                                }
+                            }
 
-                sb.append(", ");
+                            log.error("Unable to await partition clearing " + 
part, f.error());
 
-                start = p;
-            }
+                            fut.cancel();
+                        }
+                    }
+                    else {
+                        if (!fut.isDone()) {
+                            int existed = clearingPartitions.decrementAndGet();
 
-            if (!sit.hasNext()) {
-                sb.append(start);
+                            for (GridCacheContext cctx : grp.caches()) {
+                                if (cctx.statisticsEnabled()) {
+                                    final CacheMetricsImpl metrics = 
cctx.cache().metrics0();
 
-                if (start != p)
-                    sb.append("-").append(p);
-            }
+                                    
metrics.rebalanceClearingPartitions(existed);
+                                }
+                            }
 
-            prev = p;
+                            // If all partitions are cleared send initial 
demand message.
+                            if (existed == 0)
+                                
ctx.kernalContext().closure().runLocalSafe(initDemandRequestTask, true);
+                        }
+                    }
+                });
+            }
+            else
+                clearingPartitions.decrementAndGet();
         }
-
-        return sb.toString();
     }
 
     /**
-     * @param idx Index.
-     * @param id Node id.
-     * @param supply Supply.
+     * Handles supply message from {@code nodeId} with specified {@code 
topicId}.
+     *
+     * Supply message contains entries to populate rebalancing partitions.
+     *
+     * There is a cyclic process:
+     * Populate rebalancing partitions with entries from Supply message.
+     * If not all partitions specified in {@link #rebalanceFut} were 
rebalanced or marked as missed
+     * send new Demand message to request next batch of entries.
+     *
+     * @param topicId Topic id.
+     * @param nodeId Node id.
+     * @param supply Supply message.
      */
     public void handleSupplyMessage(
-        int idx,
-        final UUID id,
+        int topicId,
+        final UUID nodeId,
         final GridDhtPartitionSupplyMessage supply
     ) {
         AffinityTopologyVersion topVer = supply.topologyVersion();
 
         final RebalanceFuture fut = rebalanceFut;
 
-        ClusterNode node = ctx.node(id);
+        ClusterNode node = ctx.node(nodeId);
 
         if (node == null)
             return;
 
-        if (!fut.isActual(supply.updateSequence())) // Current future have 
another update sequence.
-            return; // Supple message based on another future.
-
         if (topologyChanged(fut)) // Topology already changed (for the future 
that supply message based on).
             return;
 
+        if (!fut.isActual(supply.rebalanceId())) {
+            // Current future have another rebalance id.
+            // Supple message based on another future.
+            return;
+        }
+
         if (log.isDebugEnabled())
             log.debug("Received supply message [grp=" + grp.cacheOrGroupName() 
+ ", msg=" + supply + ']');
 
         // Check whether there were class loading errors on unmarshal
         if (supply.classError() != null) {
-            U.warn(log, "Rebalancing from node cancelled [grp=" + 
grp.cacheOrGroupName() + ", node=" + id +
+            U.warn(log, "Rebalancing from node cancelled [grp=" + 
grp.cacheOrGroupName() + ", node=" + nodeId +
                 "]. Class got undeployed during preloading: " + 
supply.classError());
 
-            fut.cancel(id);
+            fut.cancel(nodeId);
 
             return;
         }
@@ -686,19 +712,16 @@ public class GridDhtPartitionDemander {
                                 if (grp.sharedGroup() && (cctx == null || 
cctx.cacheId() != entry.cacheId()))
                                     cctx = ctx.cacheContext(entry.cacheId());
 
-                                if(cctx != null && cctx.statisticsEnabled())
+                                if (cctx != null && cctx.statisticsEnabled())
                                     
cctx.cache().metrics0().onRebalanceKeyReceived();
                             }
 
                             // If message was last for this partition,
                             // then we take ownership.
                             if (last) {
-                                if (supply.isClean(p))
-                                    part.updateCounter(supply.last().get(p));
-
                                 top.own(part);
 
-                                fut.partitionDone(id, p);
+                                fut.partitionDone(nodeId, p);
 
                                 if (log.isDebugEnabled())
                                     log.debug("Finished rebalancing partition: 
" + part);
@@ -711,14 +734,14 @@ public class GridDhtPartitionDemander {
                     }
                     else {
                         if (last)
-                            fut.partitionDone(id, p);
+                            fut.partitionDone(nodeId, p);
 
                         if (log.isDebugEnabled())
                             log.debug("Skipping rebalancing partition (state 
is not MOVING): " + part);
                     }
                 }
                 else {
-                    fut.partitionDone(id, p);
+                    fut.partitionDone(nodeId, p);
 
                     if (log.isDebugEnabled())
                         log.debug("Skipping rebalancing partition (it does not 
belong on current node): " + p);
@@ -728,26 +751,26 @@ public class GridDhtPartitionDemander {
             // Only request partitions based on latest topology version.
             for (Integer miss : supply.missed()) {
                 if (aff.get(miss).contains(ctx.localNode()))
-                    fut.partitionMissed(id, miss);
+                    fut.partitionMissed(nodeId, miss);
             }
 
             for (Integer miss : supply.missed())
-                fut.partitionDone(id, miss);
+                fut.partitionDone(nodeId, miss);
 
             GridDhtPartitionDemandMessage d = new 
GridDhtPartitionDemandMessage(
-                supply.updateSequence(),
+                supply.rebalanceId(),
                 supply.topologyVersion(),
                 grp.groupId());
 
             d.timeout(grp.config().getRebalanceTimeout());
 
-            d.topic(rebalanceTopics.get(idx));
+            d.topic(rebalanceTopics.get(topicId));
 
             if (!topologyChanged(fut) && !fut.isDone()) {
                 // Send demand message.
                 try {
-                    ctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx),
-                        d, grp.ioPolicy(), grp.config().getRebalanceTimeout());
+                    ctx.io().sendOrderedMessage(node, 
rebalanceTopics.get(topicId),
+                        d.convertIfNeeded(node.version()), grp.ioPolicy(), 
grp.config().getRebalanceTimeout());
                 }
                 catch (ClusterTopologyCheckedException e) {
                     if (log.isDebugEnabled()) {
@@ -765,15 +788,17 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @param pick Node picked for preloading.
-     * @param p Partition.
+     * Adds {@code entry} to partition {@code p}.
+     *
+     * @param from Node which sent entry.
+     * @param p Partition id.
      * @param entry Preloaded entry.
      * @param topVer Topology version.
      * @return {@code False} if partition has become invalid during preloading.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
     private boolean preloadEntry(
-        ClusterNode pick,
+        ClusterNode from,
         int p,
         GridCacheEntryInfo entry,
         AffinityTopologyVersion topVer
@@ -789,7 +814,7 @@ public class GridDhtPartitionDemander {
                 cached = cctx.dhtCache().entryEx(entry.key());
 
                 if (log.isDebugEnabled())
-                    log.debug("Rebalancing key [key=" + entry.key() + ", 
part=" + p + ", node=" + pick.id() + ']');
+                    log.debug("Rebalancing key [key=" + entry.key() + ", 
part=" + p + ", node=" + from.id() + ']');
 
                 cctx.shared().database().checkpointReadLock();
 
@@ -844,7 +869,7 @@ public class GridDhtPartitionDemander {
         }
         catch (IgniteCheckedException e) {
             throw new IgniteCheckedException("Failed to cache rebalanced entry 
(will stop rebalancing) [local=" +
-                ctx.localNode() + ", node=" + pick.id() + ", key=" + 
entry.key() + ", part=" + p + ']', e);
+                ctx.localNode() + ", node=" + from.id() + ", key=" + 
entry.key() + ", part=" + p + ']', e);
         }
         finally {
             ctx.database().checkpointReadUnlock();
@@ -872,7 +897,7 @@ public class GridDhtPartitionDemander {
         private final IgniteLogger log;
 
         /** Remaining. T2: startTime, partitions */
-        private final Map<UUID, T2<Long, Collection<Integer>>> remaining = new 
HashMap<>();
+        private final Map<UUID, T2<Long, IgniteDhtDemandedPartitionsMap>> 
remaining = new HashMap<>();
 
         /** Missed. */
         private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
@@ -884,28 +909,28 @@ public class GridDhtPartitionDemander {
         /** Topology version. */
         private final AffinityTopologyVersion topVer;
 
-        /** Unique (per demander) sequence id. */
-        private final long updateSeq;
+        /** Unique (per demander) rebalance id. */
+        private final long rebalanceId;
 
         /**
          * @param grp Cache group.
-         * @param assigns Assigns.
+         * @param assignments Assignments.
          * @param log Logger.
-         * @param updateSeq Update sequence.
+         * @param rebalanceId Rebalance id.
          */
         RebalanceFuture(
             CacheGroupContext grp,
-            GridDhtPreloaderAssignments assigns,
+            GridDhtPreloaderAssignments assignments,
             IgniteLogger log,
-            long updateSeq) {
-            assert assigns != null;
+            long rebalanceId) {
+            assert assignments != null;
 
-            exchId = assigns.exchangeId();
-            topVer = assigns.topologyVersion();
+            exchId = assignments.exchangeId();
+            topVer = assignments.topologyVersion();
 
             this.grp = grp;
             this.log = log;
-            this.updateSeq = updateSeq;
+            this.rebalanceId = rebalanceId;
 
             ctx = grp.shared();
         }
@@ -919,7 +944,7 @@ public class GridDhtPartitionDemander {
             this.ctx = null;
             this.grp = null;
             this.log = null;
-            this.updateSeq = -1;
+            this.rebalanceId = -1;
         }
 
         /**
@@ -930,11 +955,11 @@ public class GridDhtPartitionDemander {
         }
 
         /**
-         * @param updateSeq Update sequence.
-         * @return true in case future created for specified updateSeq, false 
in other case.
+         * @param rebalanceId Rebalance id.
+         * @return true in case future created for specified {@code 
rebalanceId}, false in other case.
          */
-        private boolean isActual(long updateSeq) {
-            return this.updateSeq == updateSeq;
+        private boolean isActual(long rebalanceId) {
+            return this.rebalanceId == rebalanceId;
         }
 
         /**
@@ -989,20 +1014,18 @@ public class GridDhtPartitionDemander {
 
                 checkIsDone(); // But will finish syncFuture only when other 
nodes are preloaded or rebalancing cancelled.
             }
-
         }
 
         /**
          * @param nodeId Node id.
-         * @param p P.
+         * @param p Partition id.
          */
         private void partitionMissed(UUID nodeId, int p) {
             synchronized (this) {
                 if (isDone())
                     return;
 
-                if (missed.get(nodeId) == null)
-                    missed.put(nodeId, new HashSet<Integer>());
+                missed.computeIfAbsent(nodeId, k -> new HashSet<>());
 
                 missed.get(nodeId).add(p);
             }
@@ -1018,7 +1041,9 @@ public class GridDhtPartitionDemander {
                 return;
 
             GridDhtPartitionDemandMessage d = new 
GridDhtPartitionDemandMessage(
-                -1/* remove supply context signal */,
+                // Negative number of id signals that supply context
+                // with the same positive id must be cleaned up at the supply 
node.
+                -rebalanceId,
                 this.topologyVersion(),
                 grp.groupId());
 
@@ -1029,7 +1054,7 @@ public class GridDhtPartitionDemander {
                     
d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
 
                     ctx.io().sendOrderedMessage(node, 
GridCachePartitionExchangeManager.rebalanceTopic(idx),
-                        d, grp.ioPolicy(), grp.config().getRebalanceTimeout());
+                        d.convertIfNeeded(node.version()), grp.ioPolicy(), 
grp.config().getRebalanceTimeout());
                 }
             }
             catch (IgniteCheckedException ignored) {
@@ -1040,7 +1065,7 @@ public class GridDhtPartitionDemander {
 
         /**
          * @param nodeId Node id.
-         * @param p P.
+         * @param p Partition number.
          */
         private void partitionDone(UUID nodeId, int p) {
             synchronized (this) {
@@ -1050,23 +1075,23 @@ public class GridDhtPartitionDemander {
                 if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
                     rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, 
exchId.discoveryEvent());
 
-                T2<Long, Collection<Integer>> t = remaining.get(nodeId);
+                T2<Long, IgniteDhtDemandedPartitionsMap> t = 
remaining.get(nodeId);
 
-                assert t != null : "Remaining not found [grp=" + grp.name() + 
", fromNode=" + nodeId +
+                assert t != null : "Remaining not found [grp=" + 
grp.cacheOrGroupName() + ", fromNode=" + nodeId +
                     ", part=" + p + "]";
 
-                Collection<Integer> parts = t.get2();
+                IgniteDhtDemandedPartitionsMap parts = t.get2();
 
                 boolean rmvd = parts.remove(p);
 
-                assert rmvd : "Partition already done [grp=" + grp.name() + ", 
fromNode=" + nodeId +
+                assert rmvd : "Partition already done [grp=" + 
grp.cacheOrGroupName() + ", fromNode=" + nodeId +
                     ", part=" + p + ", left=" + parts + "]";
 
                 if (parts.isEmpty()) {
                     U.log(log, "Completed " + ((remaining.size() == 1 ? 
"(final) " : "") +
-                        "rebalancing [fromNode=" + nodeId +
-                        ", cacheOrGroup=" + grp.cacheOrGroupName() +
-                        ", topology=" + topologyVersion() +
+                            "rebalancing [fromNode=" + nodeId +
+                            ", cacheOrGroup=" + grp.cacheOrGroupName() +
+                            ", topology=" + topologyVersion() +
                         ", time=" + (U.currentTimeMillis() - t.get1()) + " 
ms]"));
 
                     remaining.remove(nodeId);
@@ -1109,8 +1134,8 @@ public class GridDhtPartitionDemander {
             if (remaining.isEmpty()) {
                 sendRebalanceFinishedEvent();
 
-                if (log.isDebugEnabled())
-                    log.debug("Completed rebalance future: " + this);
+                if (log.isInfoEnabled())
+                    log.info("Completed rebalance future: " + this);
 
                 ctx.exchange().scheduleResendPartitions();
 

Reply via email to