IGNITE-6113 Fixed partition eviction preventing exchange completion - Fixes 
#3445.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6dc5804a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6dc5804a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6dc5804a

Branch: refs/heads/master
Commit: 6dc5804af6dda90ca210a39d622964d78e3890f1
Parents: 7b9526b
Author: Pavel Kovalenko <jokse...@gmail.com>
Authored: Tue Mar 6 16:27:22 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Tue Mar 6 16:27:22 2018 +0300

----------------------------------------------------------------------
 .../junits/IgniteCompatibilityNodeRunner.java   |   1 +
 .../org/apache/ignite/cache/CacheMetrics.java   |   5 +
 .../communication/GridIoMessageFactory.java     |   6 +
 .../cache/CacheClusterMetricsMXBeanImpl.java    |   5 +
 .../processors/cache/CacheGroupContext.java     |  13 +
 .../cache/CacheLocalMetricsMXBeanImpl.java      |   5 +
 .../processors/cache/CacheMetricsImpl.java      |  16 +
 .../processors/cache/CacheMetricsSnapshot.java  |   9 +
 .../processors/cache/GridCacheIoManager.java    |   6 -
 .../GridCachePartitionExchangeManager.java      |  11 +-
 .../processors/cache/GridCachePreloader.java    |  16 +-
 .../cache/GridCachePreloaderAdapter.java        |  12 +-
 .../cache/IgniteCacheOffheapManager.java        |  14 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    | 110 ++--
 .../cache/IgniteRebalanceIterator.java          |  23 +-
 .../dht/GridClientPartitionTopology.java        |  15 +-
 .../distributed/dht/GridDhtLocalPartition.java  | 485 ++++++++++++----
 .../dht/GridDhtPartitionTopologyImpl.java       | 190 ++++---
 .../dht/GridDhtPartitionsEvictor.java           | 140 +++++
 .../dht/GridDhtPartitionsReservation.java       |   6 +-
 .../CachePartitionFullCountersMap.java          |  22 +
 .../CachePartitionPartialCountersMap.java       |  44 +-
 .../GridDhtPartitionDemandLegacyMessage.java    | 435 +++++++++++++++
 .../GridDhtPartitionDemandMessage.java          | 201 +++----
 .../dht/preloader/GridDhtPartitionDemander.java | 445 ++++++++-------
 .../dht/preloader/GridDhtPartitionSupplier.java | 548 ++++++++-----------
 .../GridDhtPartitionSupplyMessage.java          |  22 +-
 .../GridDhtPartitionsExchangeFuture.java        |   3 +-
 .../dht/preloader/GridDhtPreloader.java         | 198 ++-----
 .../IgniteDhtDemandedPartitionsMap.java         | 218 ++++++++
 .../IgniteDhtPartitionCountersMap.java          |   6 +
 .../IgniteDhtPartitionHistorySuppliersMap.java  |   6 +
 .../IgniteDhtPartitionsToReloadMap.java         |   6 +
 .../dht/preloader/IgniteHistoricalIterator.java |  39 ++
 .../preloader/IgniteRebalanceIteratorImpl.java  | 201 +++++++
 .../persistence/GridCacheOffheapManager.java    | 138 +++--
 .../IgniteCachePartitionLossPolicySelfTest.java |   2 +-
 ...achePartitionPartialCountersMapSelfTest.java |  57 ++
 .../GridCacheRebalancingSyncSelfTest.java       |   3 +-
 ...idCacheRebalancingWithAsyncClearingTest.java | 240 ++++++++
 .../IgnitePdsCacheRebalancingAbstractTest.java  |  77 ++-
 .../file/IgnitePdsDiskErrorsRecoveringTest.java |  15 -
 .../wal/IgniteWalHistoryReservationsTest.java   |  21 -
 .../db/wal/WalRecoveryTxLogicalRecordsTest.java | 103 ++--
 .../platform/PlatformCacheWriteMetricsTask.java |   5 +
 .../junits/common/GridCommonAbstractTest.java   |  38 ++
 .../testsuites/IgniteCacheTestSuite2.java       |   3 +
 .../testsuites/IgniteCacheTestSuite3.java       |   2 +
 ...GridCacheLazyQueryPartitionsReleaseTest.java |   9 +-
 .../impl/cache/CacheBasedDatasetTest.java       |   2 -
 50 files changed, 2982 insertions(+), 1215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testframework/junits/IgniteCompatibilityNodeRunner.java
----------------------------------------------------------------------
diff --git 
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testframework/junits/IgniteCompatibilityNodeRunner.java
 
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testframework/junits/IgniteCompatibilityNodeRunner.java
index 7a72ea6..6c58887 100644
--- 
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testframework/junits/IgniteCompatibilityNodeRunner.java
+++ 
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testframework/junits/IgniteCompatibilityNodeRunner.java
@@ -109,6 +109,7 @@ public class IgniteCompatibilityNodeRunner extends 
IgniteNodeRunner {
             watchdog.interrupt();
         }
         catch (Throwable e) {
+            e.printStackTrace();
             X.println("Dumping classpath, error occurred: " + e);
             dumpClasspath();
             throw e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java 
b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
index 609f345..0b1cb87 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
@@ -536,6 +536,11 @@ public interface CacheMetrics {
     public long getRebalancingStartTime();
 
     /**
+     * @return Number of partitions need to be cleared before actual rebalance 
start.
+     */
+    public long getRebalanceClearingPartitionsLeft();
+
+    /**
      * Checks whether statistics collection is enabled in this cache.
      * <p>
      * The default value is {@code false}.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index e29a316..0f952d9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -85,6 +85,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.Update
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandLegacyMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
@@ -511,6 +512,11 @@ public class GridIoMessageFactory implements 
MessageFactory {
                 break;
 
             case 44:
+                msg = new GridDhtPartitionDemandLegacyMessage();
+
+                break;
+
+            case 45:
                 msg = new GridDhtPartitionDemandMessage();
 
                 break;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
index fb9c035..ce6416f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
@@ -405,6 +405,11 @@ class CacheClusterMetricsMXBeanImpl implements 
CacheMetricsMXBean {
     }
 
     /** {@inheritDoc} */
+    @Override public long getRebalanceClearingPartitionsLeft() {
+        return cache.clusterMetrics().getRebalanceClearingPartitionsLeft();
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean isValidForReading() {
         return cache.clusterMetrics().isValidForReading();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index fd9ea1f..12636f3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -39,6 +39,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsEvictor;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
@@ -126,6 +127,9 @@ public class CacheGroupContext {
     /** */
     private GridCachePreloader preldr;
 
+    /** Partition evictor. */
+    private GridDhtPartitionsEvictor evictor;
+
     /** */
     private final DataRegion dataRegion;
 
@@ -245,6 +249,13 @@ public class CacheGroupContext {
     }
 
     /**
+     * @return Partitions evictor.
+     */
+    public GridDhtPartitionsEvictor evictor() {
+        return evictor;
+    }
+
+    /**
      * @return IO policy for the given cache group.
      */
     public byte ioPolicy() {
@@ -881,6 +892,8 @@ public class CacheGroupContext {
         else
             preldr = new GridCachePreloaderAdapter(this);
 
+        evictor = new GridDhtPartitionsEvictor(this);
+
         if (persistenceEnabled()) {
             try {
                 offheapMgr = new GridCacheOffheapManager();

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
index f62cd15..438c8c6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
@@ -406,6 +406,11 @@ class CacheLocalMetricsMXBeanImpl implements 
CacheMetricsMXBean {
     }
 
     /** {@inheritDoc} */
+    @Override public long getRebalanceClearingPartitionsLeft() {
+        return cache.metrics0().getRebalanceClearingPartitionsLeft();
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean isValidForReading() {
         return cache.metrics0().isValidForReading();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 1603ecb..6fae8fe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -123,6 +123,9 @@ public class CacheMetricsImpl implements CacheMetrics {
     /** Rebalancing rate in bytes. */
     private HitRateMetrics rebalancingBytesRate = new 
HitRateMetrics(REBALANCE_RATE_INTERVAL, 20);
 
+    /** Number of currently clearing partitions for rebalancing. */
+    private AtomicLong rebalanceClearingPartitions = new AtomicLong();
+
     /** Cache metrics. */
     @GridToStringExclude
     private transient CacheMetricsImpl delegate;
@@ -904,6 +907,19 @@ public class CacheMetricsImpl implements CacheMetrics {
         return rebalanceStartTime.get();
     }
 
+    /** {@inheritDoc} */
+    @Override public long getRebalanceClearingPartitionsLeft() {
+        return rebalanceClearingPartitions.get();
+    }
+
+    /**
+     * Sets clearing partitions number.
+     * @param partitions Partitions number.
+     */
+    public void rebalanceClearingPartitions(int partitions) {
+        rebalanceClearingPartitions.set(partitions);
+    }
+
     /**
      * First rebalance supply message callback.
      * @param keysCnt Estimated number of keys.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
index af6a174..e693720 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
@@ -209,6 +209,9 @@ public class CacheMetricsSnapshot implements CacheMetrics, 
Externalizable {
     /** Estimate rebalance finish time. */
     private long rebalanceFinishTime;
 
+    /** The number of clearing partitions need to await before rebalance. */
+    private long rebalanceClearingPartitionsLeft;
+
     /** */
     private String keyType;
 
@@ -329,6 +332,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, 
Externalizable {
         rebalancingKeysRate = m.getRebalancingKeysRate();
         rebalanceStartTime = m.rebalancingStartTime();
         rebalanceFinishTime = m.estimateRebalancingFinishTime();
+        rebalanceClearingPartitionsLeft = 
m.getRebalanceClearingPartitionsLeft();
     }
 
     /**
@@ -760,6 +764,11 @@ public class CacheMetricsSnapshot implements CacheMetrics, 
Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Override public long getRebalanceClearingPartitionsLeft() {
+        return rebalanceClearingPartitionsLeft;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean isWriteBehindEnabled() {
         return isWriteBehindEnabled;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 11a416f..3182192 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -815,12 +815,6 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
             break;
 
-            case 45: {
-                processMessage(nodeId, msg, c);// Will be handled by Rebalance 
Demander.
-            }
-
-            break;
-
             case 49: {
                 GridNearGetRequest req = (GridNearGetRequest)msg;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1bd5591..4e93822 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -68,6 +68,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandLegacyMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -370,6 +371,12 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                                     return;
                                 }
+                                else if (m instanceof 
GridDhtPartitionDemandLegacyMessage) {
+                                    grp.preloader().handleDemandMessage(idx, 
id,
+                                        new 
GridDhtPartitionDemandMessage((GridDhtPartitionDemandLegacyMessage) m));
+
+                                    return;
+                                }
                             }
 
                             U.error(log, "Unsupported message type: " + 
m.getClass().getName());
@@ -2240,7 +2247,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
             long timeout = cctx.gridConfig().getNetworkTimeout();
 
-            int cnt = 0;
+            long cnt = 0;
 
             while (!isCancelled()) {
                 cnt++;
@@ -2413,7 +2420,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                                 // Don't delay for dummy reassigns to avoid 
infinite recursion.
                                 if ((delay == 0 || forcePreload) && 
!disableRebalance)
-                                    assigns = grp.preloader().assign(exchId, 
exchFut);
+                                    assigns = 
grp.preloader().generateAssignments(exchId, exchFut);
 
                                 assignsMap.put(grp.groupId(), assigns);
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index dc1624a..5fa7a82 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -23,7 +23,6 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -68,22 +67,22 @@ public interface GridCachePreloader {
      * @param exchFut Exchange future.
      * @return Assignments or {@code null} if detected that there are pending 
exchanges.
      */
-    @Nullable public GridDhtPreloaderAssignments 
assign(GridDhtPartitionExchangeId exchId,
-        @Nullable GridDhtPartitionsExchangeFuture exchFut);
+    @Nullable public GridDhtPreloaderAssignments 
generateAssignments(GridDhtPartitionExchangeId exchId,
+                                                                     @Nullable 
GridDhtPartitionsExchangeFuture exchFut);
 
     /**
      * Adds assignments to preloader.
      *
      * @param assignments Assignments to add.
      * @param forcePreload Force preload flag.
-     * @param cnt Counter.
+     * @param rebalanceId Rebalance id.
      * @param next Runnable responsible for cache rebalancing start.
      * @param forcedRebFut Rebalance future.
      * @return Rebalancing runnable.
      */
     public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
         boolean forcePreload,
-        int cnt,
+        long rebalanceId,
         Runnable next,
         @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut);
 
@@ -178,13 +177,6 @@ public interface GridCachePreloader {
     public void handleDemandMessage(int idx, UUID id, 
GridDhtPartitionDemandMessage d);
 
     /**
-     * Evicts partition asynchronously.
-     *
-     * @param part Partition.
-     */
-    public void evictPartitionAsync(GridDhtLocalPartition part);
-
-    /**
      * @param lastFut Last future.
      */
     public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index c0accf6..af91679 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -24,7 +24,6 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -153,26 +152,21 @@ public class GridCachePreloaderAdapter implements 
GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtPreloaderAssignments 
assign(GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionsExchangeFuture exchFut) {
+    @Override public GridDhtPreloaderAssignments 
generateAssignments(GridDhtPartitionExchangeId exchId,
+                                                                     
GridDhtPartitionsExchangeFuture exchFut) {
         return null;
     }
 
     /** {@inheritDoc} */
     @Override public Runnable addAssignments(GridDhtPreloaderAssignments 
assignments,
         boolean forcePreload,
-        int cnt,
+        long rebalanceId,
         Runnable next,
         @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut) {
         return null;
     }
 
     /** {@inheritDoc} */
-    @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
     @Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture 
lastFut) {
         // No-op.
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 546672c..3d83f87 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import javax.cache.Cache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.RootPage;
 import org.apache.ignite.internal.processors.cache.persistence.RowStore;
@@ -234,13 +235,20 @@ public interface IgniteCacheOffheapManager {
     public GridIterator<CacheDataRow> partitionIterator(final int part) throws 
IgniteCheckedException;
 
     /**
-     * @param part Partition.
+     * @param part Partition number.
      * @param topVer Topology version.
-     * @param partCntr Partition counter to get historical data if available.
+     * @return Iterator for given partition that will reserve partition state 
until it is closed.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridCloseableIterator<CacheDataRow> reservedIterator(final int 
part, final AffinityTopologyVersion topVer)
+        throws IgniteCheckedException;
+
+    /**
+     * @param parts Partitions.
      * @return Partition data iterator.
      * @throws IgniteCheckedException If failed.
      */
-    public IgniteRebalanceIterator rebalanceIterator(int part, 
AffinityTopologyVersion topVer, Long partCntr)
+    public IgniteRebalanceIterator 
rebalanceIterator(IgniteDhtDemandedPartitionsMap parts, AffinityTopologyVersion 
topVer)
         throws IgniteCheckedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index f7892af..b201935 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -19,10 +19,12 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -36,6 +38,10 @@ import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
@@ -72,6 +78,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
 import static 
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 
 /**
  *
@@ -109,7 +116,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
     private int updateValSizeThreshold;
 
     /** */
-    private GridStripedLock partStoreLock = new 
GridStripedLock(Runtime.getRuntime().availableProcessors());
+    protected GridStripedLock partStoreLock = new 
GridStripedLock(Runtime.getRuntime().availableProcessors());
 
     /** {@inheritDoc} */
     @Override public GridAtomicLong globalRemoveId() {
@@ -813,54 +820,95 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRebalanceIterator rebalanceIterator(int part, 
AffinityTopologyVersion topVer, Long partCntr)
-        throws IgniteCheckedException {
-        final GridIterator<CacheDataRow> it = partitionIterator(part);
+    @Override public GridCloseableIterator<CacheDataRow> reservedIterator(int 
part,
+        AffinityTopologyVersion topVer) throws IgniteCheckedException {
+        final GridDhtLocalPartition loc = grp.topology().localPartition(part, 
topVer, false);
 
-        return new IgniteRebalanceIterator() {
-            @Override public boolean historical() {
-                return false;
-            }
+        if (loc == null || !loc.reserve())
+            return null;
 
-            @Override public boolean hasNextX() throws IgniteCheckedException {
-                return it.hasNextX();
-            }
+        // It is necessary to check state after reservation to avoid race 
conditions.
+        if (loc.state() != OWNING) {
+            loc.release();
 
-            @Override public CacheDataRow nextX() throws 
IgniteCheckedException {
-                return it.nextX();
-            }
+            return null;
+        }
 
-            @Override public void removeX() throws IgniteCheckedException {
-                it.removeX();
-            }
+        CacheDataStore data = partitionData(part);
 
-            @Override public Iterator<CacheDataRow> iterator() {
-                return it.iterator();
-            }
+        final GridCursor<? extends CacheDataRow> cur = data.cursor();
 
-            @Override public boolean hasNext() {
-                return it.hasNext();
-            }
+        return new GridCloseableIteratorAdapter<CacheDataRow>() {
+            /** */
+            private CacheDataRow next;
 
-            @Override public CacheDataRow next() {
-                return it.next();
-            }
+            @Override protected CacheDataRow onNext() {
+                CacheDataRow res = next;
 
-            @Override public void close() {
+                next = null;
 
+                return res;
             }
 
-            @Override public boolean isClosed() {
-                return false;
+            @Override protected boolean onHasNext() throws 
IgniteCheckedException {
+                if (next != null)
+                    return true;
+
+                if (cur.next())
+                    next = cur.get();
+
+                return next != null;
             }
 
-            @Override public void remove() {
-                throw new UnsupportedOperationException();
+            @Override protected void onClose() throws IgniteCheckedException {
+                assert loc != null && loc.state() == OWNING && 
loc.reservations() > 0;
+
+                loc.release();
             }
         };
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteRebalanceIterator 
rebalanceIterator(IgniteDhtDemandedPartitionsMap parts,
+        final AffinityTopologyVersion topVer)
+        throws IgniteCheckedException {
+
+        final TreeMap<Integer, GridCloseableIterator<CacheDataRow>> iterators 
= new TreeMap<>();
+        Set<Integer> missing = null;
+
+        for (Integer p : parts.fullSet()) {
+            GridCloseableIterator<CacheDataRow> partIter = reservedIterator(p, 
topVer);
+
+            if (partIter == null) {
+                if (missing == null)
+                    missing = new HashSet<>();
+
+                missing.add(p);
+            }
+            else
+                iterators.put(p, partIter);
+        }
+
+        IgniteRebalanceIterator iter = new 
IgniteRebalanceIteratorImpl(iterators, 
historicalIterator(parts.historicalMap()));
+
+        if (missing != null) {
+            for (Integer p : missing)
+                iter.setPartitionMissing(p);
+        }
+
+        return iter;
+    }
+
+    /**
+     * @param partCntrs Partition counters map.
+     * @return Historical iterator.
+     */
+    @Nullable protected IgniteHistoricalIterator 
historicalIterator(CachePartitionPartialCountersMap partCntrs)
+        throws IgniteCheckedException {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public final CacheDataStore createCacheDataStore(int p) throws 
IgniteCheckedException {
         CacheDataStore dataStore;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteRebalanceIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteRebalanceIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteRebalanceIterator.java
index 4efe640..d2f29d5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteRebalanceIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteRebalanceIterator.java
@@ -21,11 +21,30 @@ import 
org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 
 /**
- *
+ * Iterator over supplied data for rebalancing.
  */
 public interface IgniteRebalanceIterator extends 
GridCloseableIterator<CacheDataRow> {
     /**
      * @return {@code True} if this iterator is a historical iterator starting 
from the requested partition counter.
      */
-    public boolean historical();
+    public boolean historical(int partId);
+
+    /**
+     * @param partId Partition ID.
+     * @return {@code True} if all data for given partition was already 
returned.
+     */
+    public boolean isPartitionDone(int partId);
+
+    /**
+     * @param partId Partition ID.
+     * @return {@code True} if partition was marked as missing.
+     */
+    public boolean isPartitionMissing(int partId);
+
+    /**
+     * Marks partition as missing.
+     *
+     * @param partId Partition ID.
+     */
+    public void setPartitionMissing(int partId);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5c98ee8..9b3c1ec 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
@@ -1107,23 +1108,21 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
         try {
             for (Map.Entry<UUID, GridDhtPartitionMap> e : 
node2part.entrySet()) {
                 GridDhtPartitionMap partMap = e.getValue();
+                UUID remoteNodeId = e.getKey();
 
                 if (!partMap.containsKey(p))
                     continue;
 
-                if (partMap.get(p) == OWNING && !owners.contains(e.getKey())) {
-                    if (haveHistory)
-                        partMap.put(p, MOVING);
-                    else {
-                        partMap.put(p, RENTING);
+                if (partMap.get(p) == OWNING && 
!owners.contains(remoteNodeId)) {
+                    partMap.put(p, MOVING);
 
-                        result.add(e.getKey());
-                    }
+                    if (!haveHistory)
+                        result.add(remoteNodeId);
 
                     partMap.updateSequence(partMap.updateSequence() + 1, 
partMap.topologyVersion());
 
                     U.warn(log, "Partition has been scheduled for rebalancing 
due to outdated update counter " +
-                        "[nodeId=" + e.getKey() + ", groupId=" + grpId +
+                        "[nodeId=" + remoteNodeId + ", groupId=" + grpId +
                         ", partId=" + p + ", haveHistory=" + haveHistory + 
"]");
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 89ce6dd..119ea91 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import 
org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
@@ -47,6 +48,7 @@ import 
org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import 
org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -54,6 +56,7 @@ import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -114,6 +117,10 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     @GridToStringExclude
     private final GridFutureAdapter<?> rent;
 
+    /** Clear future. */
+    @GridToStringExclude
+    private final ClearFuture clearFuture;
+
     /** */
     @GridToStringExclude
     private final GridCacheSharedContext ctx;
@@ -148,14 +155,14 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
 
     /** */
     @GridToStringExclude
-    private final CacheDataStore store;
+    private volatile CacheDataStore store;
 
     /** Set if failed to move partition to RENTING state due to reservations, 
to be checked when
      * reservation is released. */
-    private volatile boolean shouldBeRenting;
+    private volatile boolean delayedRenting;
 
-    /** Set if partition must be re-created and preloaded after eviction. */
-    private boolean reload;
+    /** Set if partition must be cleared in MOVING state. */
+    private volatile boolean clear;
 
     /**
      * @param ctx Context.
@@ -189,6 +196,8 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
             }
         };
 
+        clearFuture = new ClearFuture();
+
         int delQueueSize = grp.systemCache() ? 100 :
             Math.max(MAX_DELETE_QUEUE_SIZE / grp.affinity().partitions(), 20);
 
@@ -321,13 +330,6 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
-     * @return {@code True} if partition is marked for transfer to renting 
state.
-     */
-    public boolean shouldBeRenting() {
-        return shouldBeRenting;
-    }
-
-    /**
      * @return Reservations.
      */
     public int reservations() {
@@ -363,7 +365,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
         removeEntry(entry);
 
         // Attempt to evict.
-        tryEvictAsync(false);
+        tryContinueClearing();
     }
 
     /**
@@ -434,7 +436,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
-     * Reserves a partition so it won't be cleared.
+     * Reserves a partition so it won't be cleared or evicted.
      *
      * @return {@code True} if reserved.
      */
@@ -479,7 +481,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
             if (reservations == 0)
                 return;
 
-            assert getPartState(state) != EVICTED;
+            assert getPartState(state) != EVICTED : getPartState(state);
 
             long newState = setReservations(state, --reservations);
             newState = setSize(newState, getSize(newState) + sizeChange);
@@ -488,10 +490,13 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
 
             // Decrement reservations.
             if (this.state.compareAndSet(state, newState)) {
-                if (reservations == 0 && shouldBeRenting)
-                    rent(true);
-
-                tryEvictAsync(false);
+                // If no more reservations try to continue delayed renting or 
clearing process.
+                if (reservations == 0) {
+                    if (delayedRenting)
+                        rent(true);
+                    else
+                        tryContinueClearing();
+                }
 
                 break;
             }
@@ -559,13 +564,13 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     /**
      * Forcibly moves partition to a MOVING state.
      */
-    void moving() {
+    public void moving() {
         while (true) {
             long state = this.state.get();
 
             GridDhtPartitionState partState = getPartState(state);
 
-            assert partState == OWNING : "Only OWNed partitions should be 
moved to MOVING state";
+            assert partState == OWNING || partState == RENTING : "Only 
partitions in state OWNING or RENTING can be moved to MOVING state";
 
             if (casState(state, MOVING)) {
                 if (log.isDebugEnabled())
@@ -598,72 +603,98 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
-     * @return {@code True} if partition should be re-created after it is 
cleared.
-     */
-    public boolean reload() {
-        return reload;
-    }
-
-    /**
-     * @param value {@code reload} flag value.
-     */
-    public void reload(boolean value) {
-        reload = value;
-    }
-
-    /**
-     * @param updateSeq Update sequence.
+     * Initiates partition eviction process.
+     *
+     * If partition has reservations, eviction will be delayed and continued 
after all reservations will be released.
+     *
+     * @param updateSeq If {@code true} topology update sequence will be 
updated after eviction is finished.
      * @return Future to signal that this node is no longer an owner or backup.
      */
     public IgniteInternalFuture<?> rent(boolean updateSeq) {
-        long state = this.state.get();
+        long state0 = this.state.get();
 
-        GridDhtPartitionState partState = getPartState(state);
+        GridDhtPartitionState partState = getPartState(state0);
 
         if (partState == RENTING || partState == EVICTED)
             return rent;
 
-        shouldBeRenting = true;
+        delayedRenting = true;
 
-        if (getReservations(state) == 0 && casState(state, RENTING)) {
-            shouldBeRenting = false;
+        if (getReservations(state0) == 0 && casState(state0, RENTING)) {
+            delayedRenting = false;
 
             if (log.isDebugEnabled())
                 log.debug("Moved partition to RENTING state: " + this);
 
             // Evict asynchronously, as the 'rent' method may be called
             // from within write locks on local partition.
-            tryEvictAsync(updateSeq);
+            clearAsync0(updateSeq);
         }
 
         return rent;
     }
 
     /**
+     * Starts clearing process asynchronously if it's requested and not 
running at the moment.
+     * Method may finish clearing process ahead of time if partition is empty 
and doesn't have reservations.
+     *
      * @param updateSeq Update sequence.
      */
-     public void tryEvictAsync(boolean updateSeq) {
+    private void clearAsync0(boolean updateSeq) {
         long state = this.state.get();
 
         GridDhtPartitionState partState = getPartState(state);
 
-        if (isEmpty() && !grp.queriesEnabled() && getSize(state) == 0 &&
-            partState == RENTING && getReservations(state) == 0 && 
!groupReserved() &&
-            casState(state, EVICTED)) {
-            if (log.isDebugEnabled())
-                log.debug("Evicted partition: " + this);
+        boolean evictionRequested = partState == RENTING || delayedRenting;
+        boolean clearingRequested = partState == MOVING && clear;
 
-            if (markForDestroy())
-                finishDestroy(updateSeq);
+        if (!evictionRequested && !clearingRequested)
+            return;
+
+        boolean reinitialized = clearFuture.initialize(updateSeq, 
evictionRequested);
+
+        // Clearing process is already running at the moment. No needs to run 
it again.
+        if (!reinitialized)
+            return;
+
+        // Try fast eviction
+        if (isEmpty() && getSize(state) == 0 && !grp.queriesEnabled()
+                && getReservations(state) == 0 && !groupReserved()) {
+
+            if (partState == RENTING && casState(state, EVICTED) || 
clearingRequested) {
+                clearFuture.finish();
+
+                return;
+            }
         }
-        else if (partState == RENTING || shouldBeRenting())
-            grp.preloader().evictPartitionAsync(this);
+
+        grp.evictor().evictPartitionAsync(this);
+    }
+
+    /**
+     * Initiates single clear process if partition is in MOVING state.
+     * Method does nothing if clear process is already running.
+     */
+    public void clearAsync() {
+        if (state() != MOVING)
+            return;
+
+        clear = true;
+        clearAsync0(false);
+    }
+
+    /**
+     * Continues delayed clearing of partition if possible.
+     * Clearing may be delayed because of existing reservations.
+     */
+    void tryContinueClearing() {
+        clearAsync0(true);
     }
 
     /**
      * @return {@code true} If there is a group reservation.
      */
-    boolean groupReserved() {
+    private boolean groupReserved() {
         for (GridDhtPartitionsReservation reservation : reservations) {
             if (!reservation.invalidate())
                 return true; // Failed to invalidate reservation -> we are 
reserved.
@@ -673,7 +704,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
-     * @return {@code True} if evicting thread was added.
+     * @return {@code true} if evicting thread was added.
      */
     private boolean addEvicting() {
         while (true) {
@@ -682,15 +713,17 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
             if (cnt != 0)
                 return false;
 
-            if (evictGuard.compareAndSet(cnt, cnt + 1))
+            if (evictGuard.compareAndSet(cnt, cnt + 1)) {
+
                 return true;
+            }
         }
     }
 
     /**
-     *
+     * @return {@code true} if no thread evicting partition at the moment.
      */
-    private void clearEvicting() {
+    private boolean clearEvicting() {
         boolean free;
 
         while (true) {
@@ -705,14 +738,11 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
             }
         }
 
-        if (free && state() == EVICTED) {
-            if (markForDestroy())
-                finishDestroy(true);
-        }
+        return free;
     }
 
     /**
-     * @return {@code True} if partition is safe to destroy
+     * @return {@code True} if partition is safe to destroy.
      */
     private boolean markForDestroy() {
         while (true) {
@@ -727,7 +757,29 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
-     * @param updateSeq Update sequence request.
+     * Moves partition state to {@code EVICTED} if possible.
+     * and initiates partition destroy process after successful moving 
partition state to {@code EVICTED} state.
+     *
+     * @param updateSeq If {@code true} increment update sequence on cache 
group topology after successful eviction.
+     */
+    private void finishEviction(boolean updateSeq) {
+        long state0 = this.state.get();
+
+        GridDhtPartitionState state = getPartState(state0);
+
+        if (state == EVICTED || (isEmpty() && getSize(state0) == 0 && 
getReservations(state0) == 0 && state == RENTING && casState(state0, EVICTED))) 
{
+            if (log.isDebugEnabled())
+                log.debug("Evicted partition: " + this);
+
+            if (markForDestroy())
+                finishDestroy(updateSeq);
+        }
+    }
+
+    /**
+     * Destroys partition data store and invokes appropriate callbacks.
+     *
+     * @param updateSeq If {@code true} increment update sequence on cache 
group topology after successful destroy.
      */
     private void finishDestroy(boolean updateSeq) {
         assert state() == EVICTED : this;
@@ -745,31 +797,72 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * Awaits completion of partition destroy process in case of {@code 
EVICTED} partition state.
+     */
+    public void awaitDestroy() {
+        try {
+            if (state() == EVICTED)
+                rent.get();
+        } catch (IgniteCheckedException e) {
+            log.error("Unable to await partition destroy " + this, e);
+        }
+    }
+
+    /**
+     * Adds listener on {@link #clearFuture} finish.
+     *
+     * @param lsnr Listener.
+     */
+    public void onClearFinished(IgniteInClosure<? super 
IgniteInternalFuture<?>> lsnr) {
+        clearFuture.listen(lsnr);
+    }
+
+    /**
+     * @return {@code True} if clearing process is running at the moment on 
the partition.
+     */
+    public boolean isClearing() {
+        return !clearFuture.isDone();
+    }
+
+    /**
+     * Tries to start partition clear process {@link 
GridDhtLocalPartition#clearAll()}).
+     * Only one thread is allowed to do such process concurrently.
+     * At the end of clearing method completes {@code clearFuture}.
+     *
+     * @return {@code false} if clearing is not started due to existing 
reservations.
      * @throws NodeStoppingException If node is stopping.
      */
-    public void tryEvict() throws NodeStoppingException {
-        long state = this.state.get();
+    public boolean tryClear() throws NodeStoppingException {
+        if (clearFuture.isDone())
+            return true;
 
-        GridDhtPartitionState partState = getPartState(state);
+        long state = this.state.get();
 
-        if (partState != RENTING || getReservations(state) != 0 || 
groupReserved())
-            return;
+        if (getReservations(state) != 0 || groupReserved())
+            return false;
 
         if (addEvicting()) {
             try {
                 // Attempt to evict partition entries from cache.
-                clearAll();
+                long clearedEntities = clearAll();
 
-                if (isEmpty() && getSize(state) == 0 && casState(state, 
EVICTED)) {
-                    if (log.isDebugEnabled())
-                        log.debug("Evicted partition: " + this);
-                    // finishDestroy() will be initiated by clearEvicting().
-                }
+                if (log.isDebugEnabled())
+                    log.debug("Partition is cleared [clearedEntities=" + 
clearedEntities + ", part=" + this + "]");
+            }
+            catch (NodeStoppingException e) {
+                clearFuture.finish(e);
+
+                throw e;
             }
             finally {
-                clearEvicting();
+                boolean free = clearEvicting();
+
+                if (free)
+                    clearFuture.finish();
             }
         }
+
+        return true;
     }
 
     /**
@@ -785,10 +878,11 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
-     *
+     * On partition unlock callback.
+     * Tries to continue delayed partition clearing.
      */
     void onUnlock() {
-        tryEvictAsync(false);
+        tryContinueClearing();
     }
 
     /**
@@ -854,11 +948,12 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
-     * Clears values for this partition.
+     * Removes all entries and rows from this partition.
      *
+     * @return Number of rows cleared from page memory.
      * @throws NodeStoppingException If node stopping.
      */
-    public void clearAll() throws NodeStoppingException {
+    private long clearAll() throws NodeStoppingException {
         GridCacheVersion clearVer = ctx.versions().next();
 
         GridCacheObsoleteEntryExtras extras = new 
GridCacheObsoleteEntryExtras(clearVer);
@@ -872,6 +967,8 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
         else
             clear(singleCacheEntryMap.map, extras, rec);
 
+        long cleared = 0;
+
         if (!grp.allowFastEviction()) {
             CacheMapHolder hld = grp.sharedGroup() ? null : 
singleCacheEntryMap;
 
@@ -884,6 +981,10 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
                     try {
                         CacheDataRow row = it0.next();
 
+                        // Do not clear fresh rows in case of single partition 
clearing.
+                        if (row.version().compareTo(clearVer) >= 0 && (state() 
== MOVING && clear))
+                            continue;
+
                         if (grp.sharedGroup() && (hld == null || 
hld.cctx.cacheId() != row.cacheId()))
                             hld = 
cacheMapHolder(ctx.cacheContext(row.cacheId()));
 
@@ -897,32 +998,27 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
                             true,
                             false);
 
-                        ctx.database().checkpointReadLock();
-
-                        try {
-                            if (cached instanceof GridDhtCacheEntry && 
((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
-                                removeEntry(cached);
-
-                                if (rec && 
!hld.cctx.config().isEventsDisabled()) {
-                                    
hld.cctx.events().addEvent(cached.partition(),
-                                        cached.key(),
-                                        ctx.localNodeId(),
-                                        (IgniteUuid)null,
-                                        null,
-                                        EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
-                                        null,
-                                        false,
-                                        cached.rawGet(),
-                                        cached.hasValue(),
-                                        null,
-                                        null,
-                                        null,
-                                        false);
-                                }
+                        if (cached instanceof GridDhtCacheEntry && 
((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
+                            removeEntry(cached);
+
+                            if (rec && !hld.cctx.config().isEventsDisabled()) {
+                                hld.cctx.events().addEvent(cached.partition(),
+                                    cached.key(),
+                                    ctx.localNodeId(),
+                                    (IgniteUuid)null,
+                                    null,
+                                    EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
+                                    null,
+                                    false,
+                                    cached.rawGet(),
+                                    cached.hasValue(),
+                                    null,
+                                    null,
+                                    null,
+                                    false);
                             }
-                        }
-                        finally {
-                            ctx.database().checkpointReadUnlock();
+
+                            cleared++;
                         }
                     }
                     catch (GridDhtInvalidPartitionException e) {
@@ -939,21 +1035,23 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
                 if (log.isDebugEnabled())
                     log.debug("Failed to get iterator for evicted partition: " 
+ id);
 
-                rent.onDone(e);
-
                 throw e;
             }
             catch (IgniteCheckedException e) {
                 U.error(log, "Failed to get iterator for evicted partition: " 
+ id, e);
             }
         }
+
+        return cleared;
     }
 
     /**
+     * Removes all cache entries from specified {@code map}.
+     *
      * @param map Map to clear.
      * @param extras Obsolete extras.
      * @param evt Unload event flag.
-     * @throws NodeStoppingException
+     * @throws NodeStoppingException If current node is stopping.
      */
     private void clear(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map,
         GridCacheObsoleteEntryExtras extras,
@@ -995,8 +1093,6 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
                 if (log.isDebugEnabled())
                     log.debug("Failed to clear cache entry for evicted 
partition: " + cached.partition());
 
-                rent.onDone(e);
-
                 throw e;
             }
             catch (IgniteCheckedException e) {
@@ -1009,7 +1105,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
-     *
+     * Removes all deferred delete requests from {@code rmvQueue}.
      */
     private void clearDeferredDeletes() {
         for (RemovedEntryHolder e : rmvQueue)
@@ -1098,12 +1194,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     void onCacheStopped(int cacheId) {
         assert grp.sharedGroup() : grp.cacheOrGroupName();
 
-        for (Iterator<RemovedEntryHolder> it = rmvQueue.iterator(); 
it.hasNext();) {
-            RemovedEntryHolder e = it.next();
-
-            if (e.cacheId() == cacheId)
-                it.remove();
-        }
+        rmvQueue.removeIf(e -> e.cacheId() == cacheId);
 
         cacheMaps.remove(cacheId);
     }
@@ -1222,4 +1313,168 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
             return S.toString(RemovedEntryHolder.class, this);
         }
     }
+
+    /**
+     * Future is needed to control partition clearing process.
+     * Future can be used both for single clearing or eviction processes.
+     */
+    class ClearFuture extends GridFutureAdapter<Boolean> {
+        /** Flag indicates that eviction callback was registered on the 
current future. */
+        private volatile boolean evictionCallbackRegistered;
+
+        /** Flag indicates that clearing callback was registered on the 
current future. */
+        private volatile boolean clearingCallbackRegistered;
+
+        /** Flag indicates that future with all callbacks was finished. */
+        private volatile boolean finished;
+
+        /**
+         * Constructor.
+         */
+        ClearFuture() {
+            onDone();
+            finished = true;
+        }
+
+        /**
+         * Registers finish eviction callback on the future.
+         *
+         * @param updateSeq If {@code true} update topology sequence after 
successful eviction.
+         */
+        private void registerEvictionCallback(boolean updateSeq) {
+            if (evictionCallbackRegistered)
+                return;
+
+            synchronized (this) {
+                // Double check
+                if (evictionCallbackRegistered)
+                    return;
+
+                evictionCallbackRegistered = true;
+
+                // Initiates partition eviction and destroy.
+                listen(f -> {
+                    if (f.error() != null) {
+                        rent.onDone(f.error());
+                    } else if (f.isDone()) {
+                        finishEviction(updateSeq);
+                    }
+
+                    evictionCallbackRegistered = false;
+                });
+            }
+        }
+
+        /**
+         * Registers clearing callback on the future.
+         */
+        private void registerClearingCallback() {
+            if (clearingCallbackRegistered)
+                return;
+
+            synchronized (this) {
+                // Double check
+                if (clearingCallbackRegistered)
+                    return;
+
+                clearingCallbackRegistered = true;
+
+                // Recreate cache data store in case of allowed fast eviction, 
and reset clear flag.
+                listen(f -> {
+                    clear = false;
+
+                    clearingCallbackRegistered = false;
+                });
+            }
+        }
+
+        /**
+         * Recreate cache data store after successful clearing and allowed 
fast eviction.
+         */
+        private void recreateCacheDataStore() {
+            assert grp.offheap() instanceof GridCacheOffheapManager;
+
+            try {
+                CacheDataStore store0 = store;
+
+                store = ((GridCacheOffheapManager) 
grp.offheap()).recreateCacheDataStore(store0);
+
+                // Inject row cache cleaner on store creation
+                // Used in case the cache with enabled SqlOnheapCache is 
single cache at the cache group
+                if (ctx.kernalContext().query().moduleEnabled()) {
+                    GridQueryRowCacheCleaner cleaner = 
ctx.kernalContext().query().getIndexing()
+                            .rowCacheCleaner(grp.groupId());
+
+                    if (store != null && cleaner != null)
+                        store.setRowCacheCleaner(cleaner);
+                }
+            } catch (IgniteCheckedException e) {
+                finish(e);
+            }
+        }
+
+        /**
+         * Successfully finishes the future.
+         */
+        public void finish() {
+            if (state() == MOVING && clear && grp.allowFastEviction())
+                recreateCacheDataStore();
+
+            synchronized (this) {
+                onDone();
+                finished = true;
+            }
+        }
+
+        /**
+         * Finishes the future with error.
+         *
+         * @param t Error.
+         */
+        public void finish(Throwable t) {
+            synchronized (this) {
+                onDone(t);
+                finished = true;
+            }
+        }
+
+        /**
+         * Reuses future if it's done.
+         * Adds appropriate callbacks to the future in case of eviction or 
single clearing.
+         *
+         * @param updateSeq Update sequence.
+         * @param evictionRequested If {@code true} adds eviction callback, in 
other case adds single clearing callback.
+         * @return {@code true} if future has been reinitialized.
+         */
+        public boolean initialize(boolean updateSeq, boolean 
evictionRequested) {
+            // In case of running clearing just try to add missing callbacks 
to avoid extra synchronization.
+            if (!finished) {
+                if (evictionRequested)
+                    registerEvictionCallback(updateSeq);
+                else
+                    registerClearingCallback();
+
+                return false;
+            }
+
+            synchronized (this) {
+                boolean done = isDone();
+
+                if (done) {
+                    reset();
+
+                    finished = false;
+                    evictionCallbackRegistered = false;
+                    clearingCallbackRegistered = false;
+                }
+
+                if (evictionRequested)
+                    registerEvictionCallback(updateSeq);
+                else
+                    registerClearingCallback();
+
+                return done;
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 1234042..528f0a6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -37,6 +38,7 @@ import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
@@ -89,9 +91,6 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     private static final boolean FAST_DIFF_REBUILD = false;
 
     /** */
-    private static final Long ZERO = 0L;
-
-    /** */
     private final GridCacheSharedContext ctx;
 
     /** */
@@ -760,7 +759,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                                 changed = true;
 
                                 if (log.isDebugEnabled()) {
-                                    log.debug("Evicting moving partition (it 
does not belong to affinity) [" +
+                                    log.debug("Evicting " + state + " 
partition (it does not belong to affinity) [" +
                                         "grp=" + grp.cacheOrGroupName() + ", 
part=" + locPart + ']');
                                 }
                             }
@@ -813,14 +812,9 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         GridDhtLocalPartition loc = locParts.get(p);
 
         if (loc == null || loc.state() == EVICTED) {
-            if (loc != null) {
-                try {
-                    loc.rent(false).get();
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException(e);
-                }
-            }
+            // Make sure that after eviction partition is destroyed.
+            if (loc != null)
+                loc.awaitDestroy();
 
             locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p));
 
@@ -906,12 +900,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 boolean belongs = partitionLocalNode(p, topVer);
 
                 if (loc != null && state == EVICTED) {
-                    try {
-                        loc.rent(false).get();
-                    }
-                    catch (IgniteCheckedException ex) {
-                        throw new IgniteException(ex);
-                    }
+                    // Make sure that after eviction partition is destroyed.
+                    loc.awaitDestroy();
 
                     locParts.set(p, loc = null);
 
@@ -924,8 +914,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 }
                 else if (loc != null && state == RENTING && !showRenting) {
                     throw new GridDhtInvalidPartitionException(p, "Adding 
entry to partition that is concurrently " +
-                        "evicted [grp=" + grp.cacheOrGroupName() + ", part=" + 
p + ", shouldBeMoving=" +
-                        loc.reload() + ", belongs=" + belongs + ", topVer=" + 
topVer + ", curTopVer=" + this.readyTopVer + "]");
+                        "evicted [grp=" + grp.cacheOrGroupName() + ", part=" + 
p + ", shouldBeMoving="
+                        + ", belongs=" + belongs + ", topVer=" + topVer + ", 
curTopVer=" + this.readyTopVer + "]");
                 }
 
                 if (loc == null) {
@@ -937,8 +927,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
                     locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, 
p));
 
-                    if (updateSeq)
-                        this.updateSeq.incrementAndGet();
+                    this.updateSeq.incrementAndGet();
 
                     created = true;
 
@@ -1483,32 +1472,49 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                         else if (state == MOVING) {
                             GridDhtLocalPartition locPart = locParts.get(p);
 
-                            if (locPart == null || locPart.state() == EVICTED)
-                                locPart = createPartition(p);
+                            if (!partsToReload.contains(p)) {
+                                if (locPart == null || locPart.state() == 
EVICTED)
+                                    locPart = createPartition(p);
 
-                            if (locPart.state() == OWNING) {
-                                locPart.moving();
+                                if (locPart.state() == OWNING) {
+                                    locPart.moving();
 
-                                changed = true;
+                                    changed = true;
+                                }
                             }
-                        }
-                        else if (state == RENTING && 
partsToReload.contains(p)) {
-                            GridDhtLocalPartition locPart = locParts.get(p);
+                            else {
+                                if (locPart == null || locPart.state() == 
EVICTED) {
+                                    createPartition(p);
 
-                            if (locPart == null || locPart.state() == EVICTED) 
{
-                                createPartition(p);
+                                    changed = true;
+                                }
+                                else if (locPart.state() == OWNING || 
locPart.state() == MOVING) {
+                                    if (locPart.state() == OWNING)
+                                        locPart.moving();
+                                    locPart.clearAsync();
 
-                                changed = true;
-                            }
-                            else if (locPart.state() == OWNING || 
locPart.state() == MOVING) {
-                                locPart.reload(true);
+                                    changed = true;
+                                }
+                                else if (locPart.state() == RENTING) {
+                                    // Try to prevent partition eviction.
+                                    if (locPart.reserve()) {
+                                        try {
+                                            locPart.moving();
+                                            locPart.clearAsync();
+                                        } finally {
+                                            locPart.release();
+                                        }
+                                    }
+                                    // In other case just recreate it.
+                                    else {
+                                        assert locPart.state() == EVICTED;
 
-                                locPart.rent(false);
+                                        createPartition(p);
+                                    }
 
-                                changed = true;
+                                    changed = true;
+                                }
                             }
-                            else
-                                locPart.reload(true);
                         }
                     }
                 }
@@ -1535,8 +1541,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                     ctx.exchange().scheduleResendPartitions();
 
                 return changed;
-            }
-            finally {
+            } finally {
                 lock.writeLock().unlock();
             }
         }
@@ -1855,6 +1860,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /**
+     * Rebuilds {@link #diffFromAffinity} from given assignment.
+     *
      * @param affAssignment New affinity assignment.
      */
     private void rebuildDiff(AffinityAssignment affAssignment) {
@@ -2078,37 +2085,31 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
                 if (locPart != null) {
                     if (locPart.state() == OWNING && 
!owners.contains(ctx.localNodeId())) {
-                        if (haveHistory)
-                            locPart.moving();
-                        else {
-                            locPart.rent(false);
-
-                            locPart.reload(true);
+                        locPart.moving();
 
+                        if (!haveHistory) {
+                            locPart.clearAsync();
                             result.add(ctx.localNodeId());
                         }
 
                         U.warn(log, "Partition has been scheduled for 
rebalancing due to outdated update counter " +
                             "[nodeId=" + ctx.localNodeId() + ", grp=" + 
grp.cacheOrGroupName() +
                             ", partId=" + locPart.id() + ", haveHistory=" + 
haveHistory + "]");
-
                     }
                 }
 
                 for (Map.Entry<UUID, GridDhtPartitionMap> e : 
node2part.entrySet()) {
+                    UUID remoteNodeId = e.getKey();
                     GridDhtPartitionMap partMap = e.getValue();
 
                     if (!partMap.containsKey(p))
                         continue;
 
-                    if (partMap.get(p) == OWNING && 
!owners.contains(e.getKey())) {
-                        if (haveHistory)
-                            partMap.put(p, MOVING);
-                        else {
-                            partMap.put(p, RENTING);
+                    if (partMap.get(p) == OWNING && 
!owners.contains(remoteNodeId)) {
+                        partMap.put(p, MOVING);
 
-                            result.add(e.getKey());
-                        }
+                        if (!haveHistory)
+                            result.add(remoteNodeId);
 
                         partMap.updateSequence(partMap.updateSequence() + 1, 
partMap.topologyVersion());
 
@@ -2116,15 +2117,17 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                             
this.updateSeq.setIfGreater(partMap.updateSequence());
 
                         U.warn(log, "Partition has been scheduled for 
rebalancing due to outdated update counter " +
-                            "[nodeId=" + e.getKey() + ", grp=" + 
grp.cacheOrGroupName() +
+                            "[nodeId=" + remoteNodeId + ", grp=" + 
grp.cacheOrGroupName() +
                             ", partId=" + p + ", haveHistory=" + haveHistory + 
"]");
                     }
                 }
 
-                if (updateSeq)
-                    node2part = new GridDhtPartitionFullMap(node2part, 
this.updateSeq.incrementAndGet());
-            }
-            finally {
+                if (updateSeq) {
+                    long updSeq = this.updateSeq.incrementAndGet();
+
+                    node2part = new GridDhtPartitionFullMap(node2part, updSeq);
+                }
+            } finally {
                 lock.writeLock().unlock();
             }
         }
@@ -2136,9 +2139,11 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /**
+     * Finds local partitions which don't belong to affinity and runs eviction 
process for such partitions.
+     *
      * @param updateSeq Update sequence.
      * @param aff Affinity assignments.
-     * @return Checks if any of the local partitions need to be evicted.
+     * @return {@code True} if there are local partitions need to be evicted.
      */
     private boolean checkEvictions(long updateSeq, AffinityAssignment aff) {
         if (!ctx.kernalContext().state().evictionsAllowed())
@@ -2148,6 +2153,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
         UUID locId = ctx.localNodeId();
 
+        List<IgniteInternalFuture<?>> rentingFutures = new ArrayList<>();
+
         for (int p = 0; p < locParts.length(); p++) {
             GridDhtLocalPartition part = locParts.get(p);
 
@@ -2165,9 +2172,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
                     // If all affinity nodes are owners, then evict partition 
from local node.
                     if (nodeIds.containsAll(F.nodeIds(affNodes))) {
-                        part.reload(false);
-
-                        part.rent(false);
+                        IgniteInternalFuture<?> rentFuture = part.rent(false);
+                        rentingFutures.add(rentFuture);
 
                         updateSeq = updateLocal(part.id(), part.state(), 
updateSeq, aff.topologyVersion());
 
@@ -2192,14 +2198,10 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                                 ClusterNode n = nodes.get(i);
 
                                 if (locId.equals(n.id())) {
-                                    part.reload(false);
-
-                                    part.rent(false);
+                                    IgniteInternalFuture<?> rentFuture = 
part.rent(false);
+                                    rentingFutures.add(rentFuture);
 
-                                    updateSeq = updateLocal(part.id(),
-                                        part.state(),
-                                        updateSeq,
-                                        aff.topologyVersion());
+                                    updateSeq = updateLocal(part.id(), 
part.state(), updateSeq, aff.topologyVersion());
 
                                     changed = true;
 
@@ -2217,14 +2219,38 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             }
         }
 
+        // After all rents are finished resend partitions.
+        if (!rentingFutures.isEmpty()) {
+            final AtomicInteger rentingPartitions = new 
AtomicInteger(rentingFutures.size());
+
+            for (IgniteInternalFuture<?> rentingFuture : rentingFutures) {
+                rentingFuture.listen(f -> {
+                    int remaining = rentingPartitions.decrementAndGet();
+
+                    if (remaining == 0) {
+                        lock.writeLock().lock();
+
+                        try {
+                            this.updateSeq.incrementAndGet();
+
+                            ctx.exchange().scheduleResendPartitions();
+                        }
+                        finally {
+                            lock.writeLock().unlock();
+                        }
+                    }
+                });
+            }
+        }
+
         return changed;
     }
 
     /**
-     * Updates value for single partition.
+     * Updates state of partition in local {@link #node2part} map and 
recalculates {@link #diffFromAffinity}.
      *
      * @param p Partition.
-     * @param state State.
+     * @param state Partition state.
      * @param updateSeq Update sequence.
      * @param affVer Affinity version.
      * @return Update sequence.
@@ -2302,6 +2328,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /**
+     * Removes node from local {@link #node2part} map and recalculates {@link 
#diffFromAffinity}.
+     *
      * @param nodeId Node to remove.
      */
     private void removeNode(UUID nodeId) {
@@ -2346,7 +2374,9 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             if (part.own()) {
                 assert lastTopChangeVer.initialized() : lastTopChangeVer;
 
-                updateLocal(part.id(), part.state(), 
updateSeq.incrementAndGet(), lastTopChangeVer);
+                long updSeq = updateSeq.incrementAndGet();
+
+                updateLocal(part.id(), part.state(), updSeq, lastTopChangeVer);
 
                 consistencyCheck();
 
@@ -2377,9 +2407,6 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
                 long seq = updateSeq ? this.updateSeq.incrementAndGet() : 
this.updateSeq.get();
 
-                if (part.reload())
-                    part = createPartition(part.id());
-
                 assert lastTopChangeVer.initialized() : lastTopChangeVer;
 
                 updateLocal(part.id(), part.state(), seq, lastTopChangeVer);
@@ -2590,11 +2617,14 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /**
-     * @param p Partition.
+     * Checks that state of partition {@code p} for node {@code nodeId} in 
local {@code node2part} map
+     * matches to one of the specified states {@code match} or {@ode matches}.
+     *
+     * @param p Partition id.
      * @param nodeId Node ID.
      * @param match State to match.
      * @param matches Additional states.
-     * @return Filter for owners of this partition.
+     * @return {@code True} if partition matches to one of the specified 
states.
      */
     private boolean hasState(final int p, @Nullable UUID nodeId, final 
GridDhtPartitionState match,
         final GridDhtPartitionState... matches) {

Reply via email to