IGNITE-7514 Affinity assignment should be recalculated when primary node is not 
OWNER


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/faf50f1d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/faf50f1d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/faf50f1d

Branch: refs/heads/ignite-7485-2
Commit: faf50f1dad74028f6cd665ec23ee3dabc40ea923
Parents: 7b37f64
Author: Ilya Lantukh <ilant...@gridgain.com>
Authored: Wed Feb 7 13:33:28 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Wed Feb 7 13:33:28 2018 +0300

----------------------------------------------------------------------
 .../internal/events/DiscoveryCustomEvent.java   |  34 ++++
 .../cache/CacheAffinitySharedManager.java       |  86 +++++++---
 .../processors/cache/ClusterCachesInfo.java     |   1 +
 .../GridCachePartitionExchangeManager.java      |   2 +-
 .../dht/GridClientPartitionTopology.java        |   9 +-
 .../distributed/dht/GridDhtLocalPartition.java  |   3 -
 .../dht/GridDhtPartitionTopology.java           |  10 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  47 +++++-
 .../GridDhtPartitionsExchangeFuture.java        | 144 ++++++++++------
 .../GridCacheDatabaseSharedManager.java         |   5 +
 .../distributed/CacheBaselineTopologyTest.java  | 169 ++++++++++++++++++-
 11 files changed, 424 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/faf50f1d/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
index b3c6a2d..3b12b38 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
@@ -21,7 +21,10 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Custom event.
@@ -85,4 +88,35 @@ public class DiscoveryCustomEvent extends DiscoveryEvent {
     @Override public String toString() {
         return S.toString(DiscoveryCustomEvent.class, this, super.toString());
     }
+
+    /**
+     * @param evt Discovery event.
+     * @return {@code True} if event is DiscoveryCustomEvent that requires 
centralized affinity assignment.
+     */
+    public static boolean requiresCentralizedAffinityAssignment(DiscoveryEvent 
evt) {
+        if (!(evt instanceof DiscoveryCustomEvent))
+            return false;
+
+        return 
requiresCentralizedAffinityAssignment(((DiscoveryCustomEvent)evt).customMessage());
+    }
+
+    /**
+     * @param msg Discovery custom message.
+     * @return {@code True} if message belongs to event that requires 
centralized affinity assignment.
+     */
+    public static boolean requiresCentralizedAffinityAssignment(@Nullable 
DiscoveryCustomMessage msg) {
+        if (msg == null)
+            return false;
+
+        if (msg instanceof ChangeGlobalStateMessage && 
((ChangeGlobalStateMessage)msg).activate())
+            return true;
+
+        if (msg instanceof SnapshotDiscoveryMessage) {
+            SnapshotDiscoveryMessage snapMsg = (SnapshotDiscoveryMessage) msg;
+
+            return snapMsg.needExchange() && snapMsg.needAssignPartitions();
+        }
+
+        return false;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/faf50f1d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 4119f23..7bf793c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -40,6 +40,7 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -57,7 +58,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
-import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
@@ -161,13 +161,15 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         ClusterNode node,
         AffinityTopologyVersion topVer,
         DiscoveryDataClusterState state) {
-        if (state.transition() || !state.active())
+        if ((state.transition() || !state.active()) &&
+            
!DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg))
             return;
 
         if (type == EVT_NODE_JOINED && node.isLocal())
             lastAffVer = null;
 
-        if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == 
EVT_NODE_JOINED || type == EVT_NODE_LEFT)) {
+        if ((!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == 
EVT_NODE_JOINED || type == EVT_NODE_LEFT)) ||
+            
DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg)) {
             synchronized (mux) {
                 assert lastAffVer == null || topVer.compareTo(lastAffVer) > 0;
 
@@ -1260,10 +1262,12 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     }
 
     /**
+     * Applies affinity diff from the received full message.
+     *
      * @param fut Current exchange future.
      * @param msg Finish exchange message.
      */
-    public void mergeExchangesOnServerLeft(final 
GridDhtPartitionsExchangeFuture fut,
+    public void applyAffinityFromFullMessage(final 
GridDhtPartitionsExchangeFuture fut,
         final GridDhtPartitionsFullMessage msg) {
         final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
 
@@ -1396,7 +1400,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @return Computed difference with ideal affinity.
      * @throws IgniteCheckedException If failed.
      */
-    public  Map<Integer, CacheGroupAffinityMessage> 
onServerLeftWithExchangeMergeProtocol(
+    public Map<Integer, CacheGroupAffinityMessage> 
onServerLeftWithExchangeMergeProtocol(
         final GridDhtPartitionsExchangeFuture fut) throws 
IgniteCheckedException
     {
         final ExchangeDiscoveryEvents evts = fut.context().events();
@@ -1404,6 +1408,36 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         assert fut.context().mergeExchanges();
         assert evts.hasServerLeft();
 
+        return onReassignmentEnforced(fut);
+    }
+
+    /**
+     * Calculates affinity on coordinator for custom event types that require 
centralized assignment.
+     *
+     * @param fut Current exchange future.
+     * @return Computed difference with ideal affinity.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Map<Integer, CacheGroupAffinityMessage> 
onCustomEventWithEnforcedAffinityReassignment(
+        final GridDhtPartitionsExchangeFuture fut) throws 
IgniteCheckedException
+    {
+        assert 
DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent());
+
+        return onReassignmentEnforced(fut);
+    }
+
+    /**
+     * Calculates new affinity assignment on coordinator and creates affinity 
diff messages for other nodes.
+     *
+     * @param fut Current exchange future.
+     * @return Computed difference with ideal affinity.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Map<Integer, CacheGroupAffinityMessage> onReassignmentEnforced(
+        final GridDhtPartitionsExchangeFuture fut) throws 
IgniteCheckedException
+    {
+        final ExchangeDiscoveryEvents evts = fut.context().events();
+
         forAllRegisteredCacheGroups(new 
IgniteInClosureX<CacheGroupDescriptor>() {
             @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
                 AffinityTopologyVersion topVer = evts.topologyVersion();
@@ -1418,7 +1452,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             }
         });
 
-        Map<Integer, Map<Integer, List<Long>>> diff = 
initAffinityOnNodeLeft0(evts.topologyVersion(),
+        Map<Integer, Map<Integer, List<Long>>> diff = 
initAffinityBasedOnPartitionsAvailability(evts.topologyVersion(),
             fut,
             NODE_TO_ORDER,
             true);
@@ -1642,17 +1676,16 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     }
 
     /**
-     * Called on exchange initiated by server node leave.
+     * Called on exchange initiated by server node leave or custom event with 
centralized affinity assignment.
      *
      * @param fut Exchange future.
      * @param crd Coordinator flag.
      * @throws IgniteCheckedException If failed.
      * @return {@code True} if affinity should be assigned by coordinator.
      */
-    public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut, 
boolean crd) throws IgniteCheckedException {
-        ClusterNode leftNode = fut.firstEvent().eventNode();
-
-        assert !leftNode.isClient() : leftNode;
+    public boolean onCentralizedAffinityChange(final 
GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException 
{
+        assert (fut.events().hasServerLeft() && 
!fut.firstEvent().eventNode().isClient()) ||
+            
DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent()) : 
fut.firstEvent();
 
         if (crd) {
             // Need initialize CacheGroupHolders if this node become 
coordinator on this exchange.
@@ -2066,7 +2099,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             initFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> initFut) {
                     try {
-                        
resFut.onDone(initAffinityOnNodeLeft0(fut.initialVersion(), fut, NODE_TO_ID, 
false));
+                        
resFut.onDone(initAffinityBasedOnPartitionsAvailability(fut.initialVersion(), 
fut, NODE_TO_ID, false));
                     }
                     catch (IgniteCheckedException e) {
                         resFut.onDone(e);
@@ -2077,10 +2110,13 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             return resFut;
         }
         else
-            return new 
GridFinishedFuture<>(initAffinityOnNodeLeft0(fut.initialVersion(), fut, 
NODE_TO_ID, false));
+            return new 
GridFinishedFuture<>(initAffinityBasedOnPartitionsAvailability(fut.initialVersion(),
 fut, NODE_TO_ID, false));
     }
 
     /**
+     * Initializes current affinity assignment based on partitions 
availability.
+     * Nodes that have most recent data will be considered affinity nodes.
+     *
      * @param topVer Topology version.
      * @param fut Exchange future.
      * @param c Closure converting affinity diff.
@@ -2088,12 +2124,17 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @return Affinity assignment.
      * @throws IgniteCheckedException If failed.
      */
-    private <T> Map<Integer, Map<Integer, List<T>>> 
initAffinityOnNodeLeft0(final AffinityTopologyVersion topVer,
+    private <T> Map<Integer, Map<Integer, List<T>>> 
initAffinityBasedOnPartitionsAvailability(final AffinityTopologyVersion topVer,
         final GridDhtPartitionsExchangeFuture fut,
         final IgniteClosure<ClusterNode, T> c,
         final boolean initAff)
         throws IgniteCheckedException {
-        final WaitRebalanceInfo waitRebalanceInfo = new 
WaitRebalanceInfo(fut.context().events().lastServerEventVersion());
+        final boolean enforcedCentralizedAssignment =
+            
DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent());
+
+        final WaitRebalanceInfo waitRebalanceInfo = 
enforcedCentralizedAssignment ?
+            new WaitRebalanceInfo(fut.exchangeId().topologyVersion()) :
+            new 
WaitRebalanceInfo(fut.context().events().lastServerEventVersion());
 
         final Collection<ClusterNode> aliveNodes = 
fut.context().events().discoveryCache().serverNodes();
 
@@ -2103,13 +2144,14 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
                 CacheGroupHolder grpHolder = groupHolder(topVer, desc);
 
-                if (!grpHolder.rebalanceEnabled || 
fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom()))
+                if (!grpHolder.rebalanceEnabled ||
+                    (fut.cacheGroupAddedOnExchange(desc.groupId(), 
desc.receivedFrom()) && !enforcedCentralizedAssignment))
                     return;
 
                 AffinityTopologyVersion affTopVer = 
grpHolder.affinity().lastVersion();
 
-                assert affTopVer.topologyVersion() > 0 && 
!affTopVer.equals(topVer) : "Invalid affinity version " +
-                    "[last=" + affTopVer + ", futVer=" + topVer + ", grp=" + 
desc.cacheOrGroupName() + ']';
+                assert (affTopVer.topologyVersion() > 0 && 
!affTopVer.equals(topVer)) || enforcedCentralizedAssignment :
+                    "Invalid affinity version [last=" + affTopVer + ", 
futVer=" + topVer + ", grp=" + desc.cacheOrGroupName() + ']';
 
                 List<List<ClusterNode>> curAssignment = 
grpHolder.affinity().assignments(affTopVer);
                 List<List<ClusterNode>> newAssignment = 
grpHolder.affinity().idealAssignment();
@@ -2141,6 +2183,12 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         ", node=" + newPrimary +
                         ", topVer=" + topVer + ']';
 
+                    List<ClusterNode> owners = top.owners(p);
+
+                    // It is essential that curPrimary node has partition in 
OWNING state.
+                    if (!owners.isEmpty() && !owners.contains(curPrimary))
+                        curPrimary = owners.get(0);
+
                     if (curPrimary != null && newPrimary != null && 
!curPrimary.equals(newPrimary)) {
                         if (aliveNodes.contains(curPrimary)) {
                             GridDhtPartitionState state = 
top.partitionState(newPrimary.id(), p);
@@ -2173,8 +2221,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                                 }
 
                                 if (newNodes0 == null) {
-                                    List<ClusterNode> owners = top.owners(p);
-
                                     for (ClusterNode owner : owners) {
                                         if (aliveNodes.contains(owner)) {
                                             newNodes0 = 
latePrimaryAssignment(grpHolder.affinity(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/faf50f1d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 08a910b..2b2fb55 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -1186,6 +1186,7 @@ class ClusterCachesInfo {
     /**
      * @param msg Message.
      * @param topVer Current topology version.
+     * @param curState Current cluster state.
      * @return Exchange action.
      * @throws IgniteCheckedException If configuration validation failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/faf50f1d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 9b9284f..8aa6db9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -965,7 +965,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * For coordinator causes {@link GridDhtPartitionsFullMessage 
FullMessages} send,
      * for non coordinator -  {@link GridDhtPartitionsSingleMessage 
SingleMessages} send
      */
-    private void refreshPartitions() {
+    public void refreshPartitions() {
         // TODO https://issues.apache.org/jira/browse/IGNITE-6857
         if (cctx.snapshot().snapshotOperationInProgress()) {
             scheduleResendPartitions();

http://git-wip-us.apache.org/repos/asf/ignite/blob/faf50f1d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index e994113..def00f3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -256,9 +256,9 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void 
initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
+    @Override public boolean 
initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
         GridDhtPartitionsExchangeFuture exchFut) {
-        // No-op.
+        return false;
     }
 
     /** {@inheritDoc} */
@@ -383,6 +383,11 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public void afterStateRestored(AffinityTopologyVersion topVer) {
+        // no-op
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture 
exchFut) throws IgniteCheckedException {
         AffinityTopologyVersion topVer = exchFut.topologyVersion();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/faf50f1d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index e1f1d6f..e63aab6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -213,9 +213,6 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
             // TODO ignite-db
             throw new IgniteException(e);
         }
-
-        // Todo log moving state
-        casState(state.get(), MOVING);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/faf50f1d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 4ae68ef..13564c2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -117,12 +117,20 @@ public interface GridDhtPartitionTopology {
     /**
      * @param affVer Affinity version.
      * @param exchFut Exchange future.
+     * @return {@code True} if partitions must be refreshed.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
-    public void initPartitionsWhenAffinityReady(AffinityTopologyVersion 
affVer, GridDhtPartitionsExchangeFuture exchFut)
+    public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion 
affVer, GridDhtPartitionsExchangeFuture exchFut)
         throws IgniteInterruptedCheckedException;
 
     /**
+     * Initializes local data structures after partitions are restored from 
persistence.
+     *
+     * @param topVer Topology version.
+     */
+    public void afterStateRestored(AffinityTopologyVersion topVer);
+
+    /**
      * Post-initializes this topology.
      *
      * @param exchFut Exchange future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/faf50f1d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 0a2c154..020c3e7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -299,10 +299,12 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void 
initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
+    @Override public boolean 
initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
         GridDhtPartitionsExchangeFuture exchFut)
         throws IgniteInterruptedCheckedException
     {
+        boolean needRefresh;
+
         ctx.database().checkpointReadLock();
 
         try {
@@ -310,11 +312,11 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             try {
                 if (stopping)
-                    return;
+                    return false;
 
                 long updateSeq = this.updateSeq.incrementAndGet();
 
-                initPartitions0(affVer, exchFut, updateSeq);
+                needRefresh = initPartitions0(affVer, exchFut, updateSeq);
 
                 consistencyCheck();
             }
@@ -325,16 +327,21 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         finally {
             ctx.database().checkpointReadUnlock();
         }
+
+        return needRefresh;
     }
 
     /**
      * @param affVer Affinity version to use.
      * @param exchFut Exchange future.
      * @param updateSeq Update sequence.
+     * @return {@code True} if partitions must be refreshed.
      */
-    private void initPartitions0(AffinityTopologyVersion affVer, 
GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
+    private boolean initPartitions0(AffinityTopologyVersion affVer, 
GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
         List<List<ClusterNode>> aff = grp.affinity().readyAssignments(affVer);
 
+        boolean needRefresh = false;
+
         if (grp.affinityNode()) {
             ClusterNode loc = ctx.localNode();
 
@@ -378,6 +385,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                                     ", part=" + locPart + ']');
                             }
 
+                            needRefresh = true;
+
                             updateSeq = updateLocal(p, locPart.state(), 
updateSeq, affVer);
                         }
                     }
@@ -423,6 +432,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         }
 
         updateRebalanceVersion(aff);
+
+        return needRefresh;
     }
 
     /**
@@ -617,6 +628,30 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public void afterStateRestored(AffinityTopologyVersion topVer) {
+        lock.writeLock().lock();
+
+        try {
+            if (node2part == null)
+                return;
+
+            long updateSeq = this.updateSeq.incrementAndGet();
+
+            for (int p = 0; p < grp.affinity().partitions(); p++) {
+                GridDhtLocalPartition locPart = locParts.get(p);
+
+                if (locPart == null)
+                    updateLocal(p, EVICTED, updateSeq, topVer);
+                else
+                    updateLocal(p, locPart.state(), updateSeq, topVer);
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture 
exchFut) {
         boolean changed = false;
 
@@ -996,9 +1031,11 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 map.put(i, part.state());
             }
 
+            GridDhtPartitionMap locPartMap = node2part != null ? 
node2part.get(ctx.localNodeId()) : null;
+
             return new GridDhtPartitionMap(ctx.localNodeId(),
                 updateSeq.get(),
-                readyTopVer,
+                locPartMap != null ? locPartMap.topologyVersion() : 
readyTopVer,
                 map,
                 true);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/faf50f1d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a45c9b9..695c840 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -131,6 +131,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         
IgniteSystemProperties.getInteger(IGNITE_PARTITION_RELEASE_FUTURE_DUMP_THRESHOLD,
 0);
 
     /** */
+    private static final IgniteProductVersion FORCE_AFF_REASSIGNMENT_SINCE = 
IgniteProductVersion.fromString("2.4.3");
+
+    /** */
     @GridToStringExclude
     private final Object mux = new Object();
 
@@ -231,6 +234,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      */
     private boolean centralizedAff;
 
+    /**
+     * Enforce affinity reassignment based on actual partition distribution. 
This mode should be used when partitions
+     * might be distributed not according to affinity assignment.
+     */
+    private boolean forceAffReassignment;
+
     /** Change global state exception. */
     private Exception changeGlobalStateE;
 
@@ -615,6 +624,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                 DiscoveryCustomMessage msg = 
((DiscoveryCustomEvent)firstDiscoEvt).customMessage();
 
+                forceAffReassignment = 
DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(msg)
+                    && 
firstEventCache().minimumNodeVersion().compareToIgnoreTimestamp(FORCE_AFF_REASSIGNMENT_SINCE)
 >= 0;
+
                 if (msg instanceof ChangeGlobalStateMessage) {
                     assert exchActions != null && !exchActions.empty();
 
@@ -636,6 +648,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     exchange = onAffinityChangeRequest(crdNode);
                 }
 
+                if (forceAffReassignment)
+                    cctx.affinity().onCentralizedAffinityChange(this, crdNode);
+
                 initCoordinatorCaches(newCrd);
             }
             else {
@@ -781,7 +796,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     if (grp.isLocal())
                         continue;
 
-                    grp.topology().beforeExchange(this, !centralizedAff, 
false);
+                    grp.topology().beforeExchange(this, !centralizedAff && 
!forceAffReassignment, false);
                 }
             }
         }
@@ -924,8 +939,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         else if (req.activate()) {
             // TODO: BLT changes on inactive cluster can't be handled easily 
because persistent storage hasn't been initialized yet.
             try {
-                cctx.affinity().onBaselineTopologyChanged(this, crd);
-
                 if (CU.isPersistenceEnabled(cctx.kernalContext().config()) && 
!cctx.kernalContext().clientNode())
                     
cctx.kernalContext().state().onBaselineTopologyChanged(req.baselineTopology(),
                         req.prevBaselineTopologyHistoryItem());
@@ -962,7 +975,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @return Exchange type.
      */
     private ExchangeType onCustomMessageNoAffinityChange(boolean crd) {
-        cctx.affinity().onCustomMessageNoAffinityChange(this, crd, 
exchActions);
+        if (!forceAffReassignment)
+            cctx.affinity().onCustomMessageNoAffinityChange(this, crd, 
exchActions);
 
         return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : 
ExchangeType.ALL;
     }
@@ -1017,7 +1031,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             exchCtx.events().warnNoAffinityNodes(cctx);
 
-            centralizedAff = cctx.affinity().onServerLeft(this, crd);
+            centralizedAff = cctx.affinity().onCentralizedAffinityChange(this, 
crd);
         }
         else
             cctx.affinity().onServerJoin(this, crd);
@@ -1088,8 +1102,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             }
         }
 
-        cctx.database().beforeExchange(this);
-
         if (!exchCtx.mergeExchanges()) {
             for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                 if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
@@ -1097,10 +1109,14 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                 // It is possible affinity is not initialized yet if node 
joins to cluster.
                 if (grp.affinity().lastVersion().topologyVersion() > 0)
-                    grp.topology().beforeExchange(this, !centralizedAff, 
false);
+                    grp.topology().beforeExchange(this, !centralizedAff && 
!forceAffReassignment, false);
             }
         }
 
+        // It is necessary to run database callback after all topology 
callbacks, so partition states could be
+        // correctly restored from the persistent store.
+        cctx.database().beforeExchange(this);
+
         changeWalModeIfNeeded();
 
         if (crd.isLocal()) {
@@ -1534,19 +1550,24 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         }
 
         if (err == null) {
-            if (centralizedAff) {
+            if (centralizedAff || forceAffReassignment) {
                 assert !exchCtx.mergeExchanges();
 
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     if (grp.isLocal())
                         continue;
 
+                    boolean needRefresh = false;
+
                     try {
-                        grp.topology().initPartitionsWhenAffinityReady(res, 
this);
+                        needRefresh = 
grp.topology().initPartitionsWhenAffinityReady(res, this);
                     }
                     catch (IgniteInterruptedCheckedException e) {
                         U.error(log, "Failed to initialize partitions.", e);
                     }
+
+                    if (needRefresh)
+                        cctx.exchange().refreshPartitions();
                 }
             }
 
@@ -2342,7 +2363,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             if (!exchCtx.mergeExchanges() && 
!crd.equals(events().discoveryCache().serverNodes().get(0))) {
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     if (!grp.isLocal())
-                        grp.topology().beforeExchange(this, !centralizedAff, 
false);
+                        grp.topology().beforeExchange(this, !centralizedAff && 
!forceAffReassignment, false);
                 }
             }
 
@@ -2474,6 +2495,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     detectLostPartitions(resTopVer);
             }
 
+            if (!exchCtx.mergeExchanges() && forceAffReassignment)
+                idealAffDiff = 
cctx.affinity().onCustomEventWithEnforcedAffinityReassignment(this);
+
             for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
                 if (!grpCtx.isLocal())
                     grpCtx.topology().applyUpdateCounters();
@@ -2496,6 +2520,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 if (exchCtx.events().hasServerLeft())
                     msg.idealAffinityDiff(idealAffDiff);
             }
+            else if (forceAffReassignment)
+                msg.idealAffinityDiff(idealAffDiff);
 
             msg.prepareMarshal(cctx);
 
@@ -2549,65 +2575,69 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         nodes.addAll(sndResNodes);
                 }
 
-                IgniteCheckedException err = null;
+                if (!nodes.isEmpty())
+                    sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, 
joinedNodeAff);
 
-                if (stateChangeExchange()) {
-                    StateChangeRequest req = exchActions.stateChangeRequest();
+                if (!stateChangeExchange())
+                    onDone(exchCtx.events().topologyVersion(), null);
 
-                    assert req != null : exchActions;
+                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
pendingSingleMsgs.entrySet()) {
+                    if (log.isInfoEnabled()) {
+                        log.info("Process pending message on coordinator 
[node=" + e.getKey() +
+                            ", ver=" + initialVersion() +
+                            ", resVer=" + resTopVer + ']');
+                    }
 
-                    boolean stateChangeErr = false;
+                    processSingleMessage(e.getKey(), e.getValue());
+                }
+            }
 
-                    if (!F.isEmpty(changeGlobalStateExceptions)) {
-                        stateChangeErr = true;
+            if (stateChangeExchange()) {
+                IgniteCheckedException err = null;
 
-                        err = new IgniteCheckedException("Cluster state change 
failed.");
+                StateChangeRequest req = exchActions.stateChangeRequest();
 
-                        
cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions, 
req);
-                    }
-                    else {
-                        boolean hasMoving = !partsToReload.isEmpty();
+                assert req != null : exchActions;
 
-                        Set<Integer> waitGrps = cctx.affinity().waitGroups();
+                boolean stateChangeErr = false;
 
-                        if (!hasMoving) {
-                            for (CacheGroupContext grpCtx : 
cctx.cache().cacheGroups()) {
-                                if (waitGrps.contains(grpCtx.groupId()) && 
grpCtx.topology().hasMovingPartitions()) {
-                                    hasMoving = true;
+                if (!F.isEmpty(changeGlobalStateExceptions)) {
+                    stateChangeErr = true;
 
-                                    break;
-                                }
+                    err = new IgniteCheckedException("Cluster state change 
failed.");
 
-                            }
-                        }
+                    
cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions, 
req);
+                }
+                else {
+                    boolean hasMoving = !partsToReload.isEmpty();
 
-                        
cctx.kernalContext().state().onExchangeFinishedOnCoordinator(this, hasMoving);
-                    }
+                    Set<Integer> waitGrps = cctx.affinity().waitGroups();
 
-                    boolean active = !stateChangeErr && req.activate();
+                    if (!hasMoving) {
+                        for (CacheGroupContext grpCtx : 
cctx.cache().cacheGroups()) {
+                            if (waitGrps.contains(grpCtx.groupId()) && 
grpCtx.topology().hasMovingPartitions()) {
+                                hasMoving = true;
 
-                    ChangeGlobalStateFinishMessage stateFinishMsg = new 
ChangeGlobalStateFinishMessage(
-                        req.requestId(),
-                        active,
-                        !stateChangeErr);
+                                break;
+                            }
 
-                    cctx.discovery().sendCustomEvent(stateFinishMsg);
+                        }
+                    }
+
+                    
cctx.kernalContext().state().onExchangeFinishedOnCoordinator(this, hasMoving);
                 }
 
-                if (!nodes.isEmpty())
-                    sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, 
joinedNodeAff);
+                boolean active = !stateChangeErr && req.activate();
 
-                onDone(exchCtx.events().topologyVersion(), err);
+                ChangeGlobalStateFinishMessage stateFinishMsg = new 
ChangeGlobalStateFinishMessage(
+                    req.requestId(),
+                    active,
+                    !stateChangeErr);
 
-                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
pendingSingleMsgs.entrySet()) {
-                    if (log.isInfoEnabled()) {
-                        log.info("Process pending message on coordinator 
[node=" + e.getKey() +
-                            ", ver=" + initialVersion() +
-                            ", resVer=" + resTopVer + ']');
-                    }
+                cctx.discovery().sendCustomEvent(stateFinishMsg);
 
-                    processSingleMessage(e.getKey(), e.getValue());
-                }
+                if (!centralizedAff)
+                    onDone(exchCtx.events().topologyVersion(), err);
             }
         }
         catch (IgniteCheckedException e) {
@@ -2950,7 +2980,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     cctx.affinity().onLocalJoin(this, msg, resTopVer);
                 else {
                     if (exchCtx.events().hasServerLeft())
-                        cctx.affinity().mergeExchangesOnServerLeft(this, msg);
+                        cctx.affinity().applyAffinityFromFullMessage(this, 
msg);
                     else
                         
cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, false);
 
@@ -2964,6 +2994,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             }
             else if (localJoinExchange() && !exchCtx.fetchAffinityOnJoin())
                 cctx.affinity().onLocalJoin(this, msg, resTopVer);
+            else if (forceAffReassignment)
+                cctx.affinity().applyAffinityFromFullMessage(this, msg);
 
             updatePartitionFullMap(resTopVer, msg);
 
@@ -3073,6 +3105,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                             crd.isLocal(),
                             msg);
 
+                        IgniteCheckedException err = 
!F.isEmpty(msg.partitionsMessage().getErrorsMap()) ?
+                            new IgniteCheckedException("Cluster state change 
failed.") : null;
+
                         if (!crd.isLocal()) {
                             GridDhtPartitionsFullMessage partsMsg = 
msg.partitionsMessage();
 
@@ -3080,9 +3115,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                             assert partsMsg.lastVersion() != null : partsMsg;
 
                             updatePartitionFullMap(resTopVer, partsMsg);
+
+                            if (exchActions != null && 
exchActions.stateChangeRequest() != null && err != null)
+                                
cctx.kernalContext().state().onStateChangeError(msg.partitionsMessage().getErrorsMap(),
 exchActions.stateChangeRequest());
                         }
 
-                        onDone(resTopVer);
+                        onDone(resTopVer, err);
                     }
                     else {
                         if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/faf50f1d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index bd80ec8..1de48f6 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -113,6 +113,8 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
@@ -2364,6 +2366,9 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                     updateState(part, restore.get1());
                 }
             }
+
+            // After partition states are restored, it is necessary to update 
internal data structures in topology.
+            
grp.topology().afterStateRestored(grp.topology().lastTopologyChangeVersion());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/faf50f1d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
index 7b40b03..6ccb450 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
@@ -25,11 +25,16 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.BaselineNode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -40,8 +45,15 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestDelayingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
@@ -62,6 +74,9 @@ public class CacheBaselineTopologyTest extends 
GridCommonAbstractTest {
     /** */
     private static final int NODE_COUNT = 4;
 
+    /** */
+    private static boolean delayRebalance = false;
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
@@ -106,6 +121,9 @@ public class CacheBaselineTopologyTest extends 
GridCommonAbstractTest {
         if (client)
             cfg.setClientMode(true);
 
+        if (delayRebalance)
+            cfg.setCommunicationSpi(new DelayRebalanceCommunicationSpi());
+
         return cfg;
     }
 
@@ -594,7 +612,10 @@ public class CacheBaselineTopologyTest extends 
GridCommonAbstractTest {
         assertEquals(val2, primary.cache(CACHE_NAME).get(key));
         assertEquals(val2, backup.cache(CACHE_NAME).get(key));
 
-        primary.cache(CACHE_NAME).rebalance().get();
+        for (int i = 0; i < NODE_COUNT; i++)
+            grid(i).cache(CACHE_NAME).rebalance().get();
+
+        awaitPartitionMapExchange();
 
         affNodes = (List<ClusterNode>) 
ig.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(key);
 
@@ -697,6 +718,83 @@ public class CacheBaselineTopologyTest extends 
GridCommonAbstractTest {
         
assertTrue(ig.affinity(inMemoryCache.getName()).allPartitions(newNode.cluster().localNode()).length
 > 0);
     }
 
+    /**
+     * @throws Exception if failed.
+     */
+    public void testAffinityAssignmentChangedAfterRestart() throws Exception {
+        delayRebalance = false;
+
+        int parts = 32;
+
+        final List<Integer> partMapping = new ArrayList<>();
+
+        for (int p = 0; p < parts; p++)
+            partMapping.add(p);
+
+        final AffinityFunction affFunc = new TestAffinityFunction(new 
RendezvousAffinityFunction(false, parts));
+
+        TestAffinityFunction.partsAffMapping = partMapping;
+
+        String cacheName = CACHE_NAME + 2;
+
+        startGrids(4);
+
+        IgniteEx ig = grid(0);
+
+        ig.cluster().active(true);
+
+        IgniteCache<Integer, Integer> cache = ig.createCache(
+            new CacheConfiguration<Integer, Integer>()
+                .setName(cacheName)
+                .setCacheMode(PARTITIONED)
+                .setBackups(1)
+                .setPartitionLossPolicy(READ_ONLY_SAFE)
+                .setReadFromBackup(true)
+                .setWriteSynchronizationMode(FULL_SYNC)
+                .setRebalanceDelay(-1)
+                .setAffinity(affFunc));
+
+        Map<Integer, String> keyToConsId = new HashMap<>();
+
+        for (int k = 0; k < 1000; k++) {
+            cache.put(k, k);
+
+            keyToConsId.put(k, 
ig.affinity(cacheName).mapKeyToNode(k).consistentId().toString());
+        }
+
+        stopAllGrids();
+
+        Collections.shuffle(TestAffinityFunction.partsAffMapping, new 
Random(1));
+
+        delayRebalance = true;
+
+        startGrids(4);
+
+        ig = grid(0);
+
+        ig.active(true);
+
+        cache = ig.cache(cacheName);
+
+        GridDhtPartitionFullMap partMap = 
ig.cachex(cacheName).context().topology().partitionMap(false);
+
+        for (int i = 1; i < 4; i++) {
+            IgniteEx ig0 = grid(i);
+
+            for (int p = 0; p < 32; p++)
+                
assertEqualsCollections(ig.affinity(cacheName).mapPartitionToPrimaryAndBackups(p),
 ig0.affinity(cacheName).mapPartitionToPrimaryAndBackups(p));
+        }
+
+        for (Map.Entry<Integer, String> e : keyToConsId.entrySet()) {
+            int p = ig.affinity(cacheName).partition(e.getKey());
+
+            assertEquals("p=" + p, GridDhtPartitionState.OWNING, 
partMap.get(ig.affinity(cacheName).mapKeyToNode(e.getKey()).id()).get(p));
+        }
+
+        for (int k = 0; k < 1000; k++)
+            assertEquals("k=" + k, Integer.valueOf(k), cache.get(k));
+    }
+
     /** */
     private Collection<BaselineNode> baselineNodes(Collection<ClusterNode> 
clNodes) {
         Collection<BaselineNode> res = new ArrayList<>(clNodes.size());
@@ -758,4 +856,73 @@ public class CacheBaselineTopologyTest extends 
GridCommonAbstractTest {
             return result;
         }
     }
+
+    /**
+     *
+     */
+    private static class TestAffinityFunction implements AffinityFunction {
+        /** */
+        private final AffinityFunction delegate;
+
+        /** */
+        private static List<Integer> partsAffMapping;
+
+        /** */
+        public TestAffinityFunction(AffinityFunction delegate) {
+            this.delegate = delegate;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            delegate.reset();;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partitions() {
+            return delegate.partitions();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partition(Object key) {
+            return delegate.partition(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<List<ClusterNode>> 
assignPartitions(AffinityFunctionContext affCtx) {
+            List<List<ClusterNode>> res0 = delegate.assignPartitions(affCtx);
+
+            List<List<ClusterNode>> res = new ArrayList<>(res0.size());
+
+            for (int p = 0; p < res0.size(); p++)
+                res.add(p, null);
+
+            for (int p = 0; p < res0.size(); p++)
+                res.set(partsAffMapping.get(p), res0.get(p));
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeNode(UUID nodeId) {
+            delegate.removeNode(nodeId);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class DelayRebalanceCommunicationSpi extends 
TestDelayingCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override protected boolean delayMessage(Message msg, GridIoMessage 
ioMsg) {
+            if (msg != null && (msg instanceof GridDhtPartitionDemandMessage 
|| msg instanceof GridDhtPartitionSupplyMessage))
+                return true;
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected int delayMillis() {
+            return 1_000_000;
+        }
+    }
 }

Reply via email to