Increment GridDhtPartitionMap update sequence when assign new state on coordinator.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d31c43c1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d31c43c1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d31c43c1 Branch: refs/heads/ignite-6181-1 Commit: d31c43c1465ec33f9a1be81dedb958296ecc5068 Parents: 6f7011a Author: sboikov <sboi...@gridgain.com> Authored: Fri Aug 25 17:50:01 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Aug 25 17:50:01 2017 +0300 ---------------------------------------------------------------------- .../dht/GridDhtPartitionTopologyImpl.java | 18 ++++++++++++------ .../IgnitePdsCacheRebalancingAbstractTest.java | 2 ++ 2 files changed, 14 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d31c43c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index f25ae21..87b3670 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1186,7 +1186,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lock.writeLock().lock(); try { - if (stopping || !lastTopChangeVer.initialized()) + if (stopping || !lastTopChangeVer.initialized() || + // Ignore message not-related to exchange if exchange is in progress. + (exchangeVer == null && !lastTopChangeVer.equals(readyTopVer))) return false; if (incomeCntrMap != null) { @@ -1909,20 +1911,24 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { - if (!e.getValue().containsKey(p)) + GridDhtPartitionMap partMap = e.getValue(); + + if (!partMap.containsKey(p)) continue; - if (e.getValue().get(p) == OWNING && !owners.contains(e.getKey())) { + if (partMap.get(p) == OWNING && !owners.contains(e.getKey())) { if (haveHistory) - e.getValue().put(p, MOVING); + partMap.put(p, MOVING); else { - e.getValue().put(p, RENTING); + partMap.put(p, RENTING); result.add(e.getKey()); } + partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion()); + U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + - "[nodeId=" + ctx.localNodeId() + ", cacheOrGroupName=" + grp.cacheOrGroupName() + + "[nodeId=" + e.getKey() + ", cacheOrGroupName=" + grp.cacheOrGroupName() + ", partId=" + p + ", haveHistory=" + haveHistory + "]"); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d31c43c1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java index 91838fc..7b047f8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java @@ -512,6 +512,8 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb Integer val = 0; for (int i = 0; i < 5; i++) { + info("Iteration: " + i); + Integer key = primaryKey(ignite(3).cache(cacheName)); c.put(key, val);