ignite-4154 affinity
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/87b09dba Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/87b09dba Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/87b09dba Branch: refs/heads/ignite-4154-3 Commit: 87b09dba85b5ed7996ba93a10ef4f28eb398c4a8 Parents: f74c9f4 Author: sboikov <sboi...@gridgain.com> Authored: Wed Nov 2 11:44:24 2016 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Nov 2 14:00:33 2016 +0300 ---------------------------------------------------------------------- .../processors/affinity/AffinityAssignment.java | 88 ++++++++++ .../affinity/GridAffinityAssignment.java | 120 ++++++------- .../affinity/GridAffinityAssignmentCache.java | 83 +++++++-- .../affinity/GridAffinityProcessor.java | 8 +- .../processors/affinity/GridAffinityUtils.java | 8 +- .../affinity/HistoryAffinityAssignment.java | 169 +++++++++++++++++++ .../cache/CacheAffinitySharedManager.java | 38 ++++- .../cache/GridCacheAffinityManager.java | 4 +- .../dht/GridDhtPartitionTopologyImpl.java | 4 +- .../dht/preloader/GridDhtPreloader.java | 4 +- 10 files changed, 433 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java new file mode 100644 index 0000000..06207d3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import org.apache.ignite.cluster.ClusterNode; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +/** + * Cached affinity calculations. + */ +public interface AffinityAssignment { + /** + * @return {@code True} if related discovery event did not not cause affinity assignment change and + * this assignment is just reference to the previous one. + */ + public boolean clientEventChange(); + + /** + * @return Affinity assignment computed by affinity function. + */ + public List<List<ClusterNode>> idealAssignment(); + + /** + * @return Affinity assignment. + */ + public List<List<ClusterNode>> assignment(); + + /** + * @return Topology version. + */ + public AffinityTopologyVersion topologyVersion(); + + /** + * Get affinity nodes for partition. + * + * @param part Partition. + * @return Affinity nodes. + */ + public List<ClusterNode> get(int part); + + /** + * Get affinity node IDs for partition. + * + * @param part Partition. + * @return Affinity nodes IDs. + */ + public HashSet<UUID> getIds(int part); + + /** + * @return Nodes having primary partitions assignments. + */ + public Set<ClusterNode> primaryPartitionNodes(); + + /** + * Get primary partitions for specified node ID. + * + * @param nodeId Node ID to get primary partitions for. + * @return Primary partitions for specified node ID. + */ + public Set<Integer> primaryPartitions(UUID nodeId); + + /** + * Get backup partitions for specified node ID. + * + * @param nodeId Node ID to get backup partitions for. + * @return Backup partitions for specified node ID. + */ + public Set<Integer> backupPartitions(UUID nodeId); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java index 568e4e8..2940d92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java @@ -27,12 +27,14 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; /** * Cached affinity calculations. */ -public class GridAffinityAssignment implements Serializable { +public class GridAffinityAssignment implements AffinityAssignment, Serializable { /** */ private static final long serialVersionUID = 0L; @@ -86,7 +88,7 @@ public class GridAffinityAssignment implements Serializable { this.topVer = topVer; this.assignment = assignment; - this.idealAssignment = idealAssignment; + this.idealAssignment = idealAssignment.equals(assignment) ? assignment : idealAssignment; primary = new HashMap<>(); backup = new HashMap<>(); @@ -139,96 +141,76 @@ public class GridAffinityAssignment implements Serializable { return topVer; } - /** - * Get affinity nodes for partition. - * - * @param part Partition. - * @return Affinity nodes. - */ - public List<ClusterNode> get(int part) { + /** {@inheritDoc} */ + @Override public List<ClusterNode> get(int part) { assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" + - " [part=" + part + ", partitions=" + assignment.size() + ']'; + " [part=" + part + ", partitions=" + assignment.size() + ']'; return assignment.get(part); } - /** - * Get affinity node IDs for partition. - * - * @param part Partition. - * @return Affinity nodes IDs. - */ - public HashSet<UUID> getIds(int part) { + /** {@inheritDoc} */ + @Override public HashSet<UUID> getIds(int part) { assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" + - " [part=" + part + ", partitions=" + assignment.size() + ']'; + " [part=" + part + ", partitions=" + assignment.size() + ']'; - List<HashSet<UUID>> assignmentIds0 = assignmentIds; + List<ClusterNode> nodes = assignment.get(part); - if (assignmentIds0 == null) { - assignmentIds0 = new ArrayList<>(); + HashSet<UUID> ids = U.newHashSet(nodes.size()); - for (List<ClusterNode> assignmentPart : assignment) { - HashSet<UUID> partIds = new HashSet<>(); + for (int i = 0; i < nodes.size(); i++) + ids.add(nodes.get(i).id()); - for (ClusterNode node : assignmentPart) - partIds.add(node.id()); + return ids; + } - assignmentIds0.add(partIds); - } + /** {@inheritDoc} */ + @Override public Set<ClusterNode> primaryPartitionNodes() { + Set<ClusterNode> res = new HashSet<>(); + + for (int p = 0; p < assignment.size(); p++) { + List<ClusterNode> nodes = assignment.get(p); - assignmentIds = assignmentIds0; + if (!F.isEmpty(nodes)) + res.add(nodes.get(0)); } - return assignmentIds0.get(part); + return res; } - /** - * @return Nodes having primary partitions assignments. - */ - @SuppressWarnings("ForLoopReplaceableByForEach") - public Set<ClusterNode> primaryPartitionNodes() { - Set<ClusterNode> primaryPartsNodes0 = primaryPartsNodes; - - if (primaryPartsNodes0 == null) { - int parts = assignment.size(); - - primaryPartsNodes0 = new HashSet<>(); - - for (int p = 0; p < parts; p++) { - List<ClusterNode> nodes = assignment.get(p); + /** {@inheritDoc} */ + @Override public Set<Integer> primaryPartitions(UUID nodeId) { + Set<Integer> res = new HashSet<>(); - if (nodes.size() > 0) - primaryPartsNodes0.add(nodes.get(0)); - } + for (int p = 0; p < assignment.size(); p++) { + List<ClusterNode> nodes = assignment.get(p); - primaryPartsNodes = primaryPartsNodes0; + if (!F.isEmpty(nodes) && nodes.get(0).id().equals(nodeId)) + res.add(p); } - return primaryPartsNodes0; + return res; } - /** - * Get primary partitions for specified node ID. - * - * @param nodeId Node ID to get primary partitions for. - * @return Primary partitions for specified node ID. - */ - public Set<Integer> primaryPartitions(UUID nodeId) { - Set<Integer> set = primary.get(nodeId); + /** {@inheritDoc} */ + @Override public Set<Integer> backupPartitions(UUID nodeId) { + Set<Integer> res = new HashSet<>(); - return set == null ? Collections.<Integer>emptySet() : set; - } + for (int p = 0; p < assignment.size(); p++) { + List<ClusterNode> nodes = assignment.get(p); - /** - * Get backup partitions for specified node ID. - * - * @param nodeId Node ID to get backup partitions for. - * @return Backup partitions for specified node ID. - */ - public Set<Integer> backupPartitions(UUID nodeId) { - Set<Integer> set = backup.get(nodeId); + for (int i = 1; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); + + if (node.id().equals(nodeId)) { + res.add(p); + + break; + } + } + } - return set == null ? Collections.<Integer>emptySet() : set; + return res; } /** @@ -274,10 +256,10 @@ public class GridAffinityAssignment implements Serializable { if (o == this) return true; - if (o == null || getClass() != o.getClass()) + if (o == null || !(o instanceof AffinityAssignment)) return false; - return topVer.equals(((GridAffinityAssignment)o).topVer); + return topVer.equals(((AffinityAssignment)o).topologyVersion()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index a81b34d..9166b31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -78,7 +78,7 @@ public class GridAffinityAssignmentCache { private final int partsCnt; /** Affinity calculation results cache: topology version => partition => nodes. */ - private final ConcurrentNavigableMap<AffinityTopologyVersion, GridAffinityAssignment> affCache; + private final ConcurrentNavigableMap<AffinityTopologyVersion, HistoryAffinityAssignment> affCache; /** */ private List<List<ClusterNode>> idealAssignment; @@ -107,6 +107,9 @@ public class GridAffinityAssignmentCache { /** Full history size. */ private final AtomicInteger fullHistSize = new AtomicInteger(); + /** */ + private final SimilarAffinityKey similarAffKey; + /** * Constructs affinity cached calculations. * @@ -127,6 +130,7 @@ public class GridAffinityAssignmentCache { { assert ctx != null; assert aff != null; + assert nodeFilter != null; this.ctx = ctx; this.aff = aff; @@ -142,6 +146,12 @@ public class GridAffinityAssignmentCache { partsCnt = aff.partitions(); affCache = new ConcurrentSkipListMap<>(); head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE)); + + similarAffKey = new SimilarAffinityKey(aff.getClass(), nodeFilter.getClass(), backups, partsCnt); + } + + public Object similarAffinityKey() { + return similarAffKey; } /** @@ -170,7 +180,7 @@ public class GridAffinityAssignmentCache { GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment); - affCache.put(topVer, assignment); + affCache.put(topVer, new HistoryAffinityAssignment(assignment)); head.set(assignment); for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) { @@ -300,7 +310,7 @@ public class GridAffinityAssignmentCache { GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff); - affCache.put(topVer, assignmentCpy); + affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy)); head.set(assignmentCpy); for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) { @@ -328,7 +338,7 @@ public class GridAffinityAssignmentCache { * @return Affinity assignment. */ public List<List<ClusterNode>> assignments(AffinityTopologyVersion topVer) { - GridAffinityAssignment aff = cachedAffinity(topVer); + AffinityAssignment aff = cachedAffinity(topVer); return aff.assignment(); } @@ -427,7 +437,7 @@ public class GridAffinityAssignmentCache { * @param topVer Topology version. * @return Cached affinity. */ - public GridAffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) { + public AffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) { if (topVer.equals(AffinityTopologyVersion.NONE)) topVer = lastVersion(); else @@ -435,7 +445,7 @@ public class GridAffinityAssignmentCache { assert topVer.topologyVersion() >= 0 : topVer; - GridAffinityAssignment cache = head.get(); + AffinityAssignment cache = head.get(); if (!cache.topologyVersion().equals(topVer)) { cache = affCache.get(topVer); @@ -463,7 +473,7 @@ public class GridAffinityAssignmentCache { * @return {@code True} if primary changed or required affinity version not found in history. */ public boolean primaryChanged(int part, AffinityTopologyVersion startVer, AffinityTopologyVersion endVer) { - GridAffinityAssignment aff = affCache.get(startVer); + AffinityAssignment aff = affCache.get(startVer); if (aff == null) return false; @@ -475,7 +485,7 @@ public class GridAffinityAssignmentCache { ClusterNode primary = nodes.get(0); - for (GridAffinityAssignment assignment : affCache.tailMap(startVer, false).values()) { + for (AffinityAssignment assignment : affCache.tailMap(startVer, false).values()) { List<ClusterNode> nodes0 = assignment.assignment().get(part); if (nodes0.isEmpty()) @@ -549,10 +559,10 @@ public class GridAffinityAssignmentCache { } if (rmvCnt > 0) { - Iterator<GridAffinityAssignment> it = affCache.values().iterator(); + Iterator<HistoryAffinityAssignment> it = affCache.values().iterator(); while (it.hasNext() && rmvCnt > 0) { - GridAffinityAssignment aff0 = it.next(); + AffinityAssignment aff0 = it.next(); it.remove(); @@ -602,4 +612,57 @@ public class GridAffinityAssignmentCache { return S.toString(AffinityReadyFuture.class, this); } } + + /** + * + */ + private static class SimilarAffinityKey { + /** */ + private final int backups; + + /** */ + private final Class<?> affFuncCls; + + /** */ + private final Class<?> filterCls; + + /** */ + private final int partsCnt; + + /** */ + private final int hash; + + public SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int backups, int partsCnt) { + this.backups = backups; + this.affFuncCls = affFuncCls; + this.filterCls = filterCls; + this.partsCnt = partsCnt; + + int hash = backups; + hash = 31 * hash + affFuncCls.hashCode(); + hash = 31 * hash + filterCls.hashCode(); + hash= 31 * hash + partsCnt; + + this.hash = hash; + } + + @Override public int hashCode() { + return hash; + } + + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + SimilarAffinityKey key = (SimilarAffinityKey)o; + + return backups == key.backups && + affFuncCls == key.affFuncCls && + filterCls == key.filterCls && + partsCnt == key.partsCnt; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index 1726d02..7c22ef5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -385,10 +385,16 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } try { + AffinityAssignment assign0 = cctx.affinity().assignment(topVer); + + GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ? + (GridAffinityAssignment)assign0 : + new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment()); + AffinityInfo info = new AffinityInfo( cctx.config().getAffinity(), cctx.config().getAffinityMapper(), - new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer)), + assign, cctx.cacheObjectContext()); IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(info)); http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java index c24dd2d..abd5292 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java @@ -180,10 +180,16 @@ class GridAffinityUtils { cctx.affinity().affinityReadyFuture(topVer).get(); + AffinityAssignment assign0 = cctx.affinity().assignment(topVer); + + GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ? + (GridAffinityAssignment)assign0 : + new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment()); + return F.t( affinityMessage(ctx, cctx.config().getAffinity()), affinityMessage(ctx, cctx.config().getAffinityMapper()), - new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer))); + assign); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java new file mode 100644 index 0000000..e502dd5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +/** + * + */ +public class HistoryAffinityAssignment implements AffinityAssignment { + /** */ + private final AffinityTopologyVersion topVer; + + /** */ + private final List<List<ClusterNode>> assignment; + + /** */ + private final List<List<ClusterNode>> idealAssignment; + + /** */ + private final boolean clientEvtChange; + + /** + * @param assign Assignment. + */ + public HistoryAffinityAssignment(GridAffinityAssignment assign) { + this.topVer = assign.topologyVersion(); + this.assignment = assign.assignment(); + this.idealAssignment = assign.idealAssignment(); + this.clientEvtChange = assign.clientEventChange(); + } + + /** {@inheritDoc} */ + @Override public boolean clientEventChange() { + return clientEvtChange; + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> idealAssignment() { + return idealAssignment; + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignment() { + return assignment; + } + + /** {@inheritDoc} */ + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> get(int part) { + assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" + + " [part=" + part + ", partitions=" + assignment.size() + ']'; + + return assignment.get(part); + } + + /** {@inheritDoc} */ + @Override public HashSet<UUID> getIds(int part) { + assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" + + " [part=" + part + ", partitions=" + assignment.size() + ']'; + + List<ClusterNode> nodes = assignment.get(part); + + HashSet<UUID> ids = U.newHashSet(nodes.size()); + + for (int i = 0; i < nodes.size(); i++) + ids.add(nodes.get(i).id()); + + return ids; + } + + /** {@inheritDoc} */ + @Override public Set<ClusterNode> primaryPartitionNodes() { + Set<ClusterNode> res = new HashSet<>(); + + for (int p = 0; p < assignment.size(); p++) { + List<ClusterNode> nodes = assignment.get(p); + + if (!F.isEmpty(nodes)) + res.add(nodes.get(0)); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public Set<Integer> primaryPartitions(UUID nodeId) { + Set<Integer> res = new HashSet<>(); + + for (int p = 0; p < assignment.size(); p++) { + List<ClusterNode> nodes = assignment.get(p); + + if (!F.isEmpty(nodes) && nodes.get(0).id().equals(nodeId)) + res.add(p); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public Set<Integer> backupPartitions(UUID nodeId) { + Set<Integer> res = new HashSet<>(); + + for (int p = 0; p < assignment.size(); p++) { + List<ClusterNode> nodes = assignment.get(p); + + for (int i = 1; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); + + if (node.id().equals(nodeId)) { + res.add(p); + + break; + } + } + } + + return res; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return topVer.hashCode(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || !(o instanceof AffinityAssignment)) + return false; + + return topVer.equals(((AffinityAssignment)o).topologyVersion()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HistoryAffinityAssignment.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 1aedf4e..88f1f97 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -508,6 +508,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert assignment != null; + final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); + forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { List<List<ClusterNode>> idealAssignment = aff.idealAssignment(); @@ -527,7 +529,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap else newAssignment = idealAssignment; - aff.initialize(topVer, newAssignment); + aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache)); } }); } @@ -562,6 +564,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final Map<Integer, IgniteUuid> deploymentIds = msg.cacheDeploymentIds(); + final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); + forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { AffinityTopologyVersion affTopVer = aff.lastVersion(); @@ -602,7 +606,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assignment.set(part, nodes); } - aff.initialize(topVer, assignment); + aff.initialize(topVer, cachedAssignment(aff, assignment, affCache)); } else aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer); @@ -1206,6 +1210,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap throws IgniteCheckedException { AffinityTopologyVersion topVer = fut.topologyVersion(); + final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); + if (!crd) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) @@ -1213,7 +1219,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap boolean latePrimary = cacheCtx.rebalanceEnabled(); - initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary); + initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache); } return null; @@ -1227,7 +1233,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap boolean latePrimary = cache.rebalanceEnabled; - initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary); + initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary, affCache); } }); @@ -1245,7 +1251,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut, GridAffinityAssignmentCache aff, WaitRebalanceInfo rebalanceInfo, - boolean latePrimary) + boolean latePrimary, + Map<Object, List<List<ClusterNode>>> affCache) throws IgniteCheckedException { assert lateAffAssign; @@ -1292,7 +1299,26 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (newAssignment == null) newAssignment = idealAssignment; - aff.initialize(fut.topologyVersion(), newAssignment); + aff.initialize(fut.topologyVersion(), cachedAssignment(aff, newAssignment, affCache)); + } + + /** + * @param aff + * @param assign + * @param affCache + * @return + */ + private List<List<ClusterNode>> cachedAssignment(GridAffinityAssignmentCache aff, + List<List<ClusterNode>> assign, + Map<Object, List<List<ClusterNode>>> affCache) { + List<List<ClusterNode>> assign0 = affCache.get(aff.similarAffinityKey()); + + if (assign0 != null && assign0.equals(assign)) + assign = assign0; + else + affCache.put(aff.similarAffinityKey(), assign); + + return assign; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 71ae5c9..6e5a28e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -25,8 +25,8 @@ import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -265,7 +265,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * @param topVer Topology version. * @return Affinity assignment. */ - public GridAffinityAssignment assignment(AffinityTopologyVersion topVer) { + public AffinityAssignment assignment(AffinityTopologyVersion topVer) { if (cctx.isLocal()) topVer = LOC_CACHE_TOP_VER; http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 50f7f0f..871a084 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -35,8 +35,8 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@ -859,7 +859,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { - GridAffinityAssignment affAssignment = cctx.affinity().assignment(topVer); + AffinityAssignment affAssignment = cctx.affinity().assignment(topVer); List<ClusterNode> affNodes = affAssignment.get(p); http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 09aec81..d6865c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -35,8 +35,8 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment; import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -600,7 +600,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer + ", node=" + node + ']'); - GridAffinityAssignment assignment = cctx.affinity().assignment(topVer); + AffinityAssignment assignment = cctx.affinity().assignment(topVer); boolean newAffMode = node.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0;