Fixed update sequence.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a857c5ba Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a857c5ba Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a857c5ba Branch: refs/heads/ignite-6181-1 Commit: a857c5ba5a24f41b5ebfaef0a15fde2906a7e0fd Parents: 8c249b7 Author: Ilya Lantukh <ilant...@gridgain.com> Authored: Sat Aug 26 17:21:44 2017 +0300 Committer: Ilya Lantukh <ilant...@gridgain.com> Committed: Sat Aug 26 17:21:44 2017 +0300 ---------------------------------------------------------------------- .../dht/GridDhtPartitionTopologyImpl.java | 33 ++++++++++++++------ 1 file changed, 23 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a857c5ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index a881130..e0f54b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -521,7 +521,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { DiscoveryEvent evt = evts0.get(i); if (ExchangeDiscoveryEvents.serverLeftEvent(evt)) - removeNode(evt.eventNode().id()); + updateSeq = removeNode(evt.eventNode().id(), updateSeq); } } @@ -1379,6 +1379,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); + node2part.newUpdateSequence(updateSeq); + if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) { AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer); @@ -1535,8 +1537,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionMap cur = node2part.get(parts.nodeId()); if (force) { - if (cur != null && cur.topologyVersion().initialized()) + if (cur != null && cur.topologyVersion().initialized()) { parts.updateSequence(cur.updateSequence(), cur.topologyVersion()); + + this.updateSeq.setIfGreater(cur.updateSequence()); + } } else if (isStaleUpdate(cur, parts)) { U.warn(log, "Stale update for single partition map update (will ignore) [exchId=" + exchId + @@ -1546,10 +1551,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { return false; } - long updateSeq = this.updateSeq.incrementAndGet(); - - node2part.newUpdateSequence(updateSeq); - boolean changed = false; if (cur == null || !cur.equals(parts)) @@ -1557,6 +1558,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { node2part.put(parts.nodeId(), parts); + this.updateSeq.setIfGreater(parts.updateSequence()); + + long updateSeq = this.updateSeq.incrementAndGet(); + // During exchange diff is calculated after all messages are received and affinity initialized. if (exchId == null && !grp.isReplicated()) { if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) { @@ -2110,8 +2115,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** * @param nodeId Node to remove. + * @param updateSeq Update sequence. + * + * @return Update sequence. */ - private void removeNode(UUID nodeId) { + private long removeNode(UUID nodeId, long updateSeq) { assert nodeId != null; assert lock.isWriteLockedByCurrentThread(); @@ -2122,11 +2130,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ClusterNode loc = ctx.localNode(); if (node2part != null) { - updateSeq.setIfGreater(node2part.updateSequence()); + assert updateSeq >= node2part.updateSequence(); - if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) - node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq.incrementAndGet(), + if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) { + updateSeq++; + + node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq, node2part, false); + } else node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence()); @@ -2145,6 +2156,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { consistencyCheck(); } + + return updateSeq; } /** {@inheritDoc} */