Exclude neighbors flag for affinity functions. This closes #80
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a180027 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a180027 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a180027 Branch: refs/heads/ignite-1753-1282 Commit: 5a180027b174c1b76ab71a789633fe5f80bc9180 Parents: a4d625d Author: agura <ag...@gridgain.com> Authored: Tue Oct 27 15:57:59 2015 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Tue Oct 27 16:01:23 2015 +0300 ---------------------------------------------------------------------- .../ClientAbstractMultiThreadedSelfTest.java | 3 +- .../affinity/fair/FairAffinityFunction.java | 497 ++++++++++++++----- .../rendezvous/RendezvousAffinityFunction.java | 140 ++---- .../processors/cache/GridCacheProcessor.java | 13 - .../processors/cache/GridCacheUtils.java | 50 ++ .../AbstractAffinityFunctionSelfTest.java | 293 +++++++++++ .../affinity/AffinityClientNodeSelfTest.java | 194 ++++++++ ...ityFunctionBackupFilterAbstractSelfTest.java | 138 +++++ ...unctionExcludeNeighborsAbstractSelfTest.java | 182 +++++++ .../affinity/IgniteClientNodeAffinityTest.java | 194 -------- .../fair/FairAffinityDynamicCacheSelfTest.java | 97 ++++ ...airAffinityFunctionBackupFilterSelfTest.java | 35 ++ ...ffinityFunctionExcludeNeighborsSelfTest.java | 31 ++ .../fair/FairAffinityFunctionNodesSelfTest.java | 245 +++++++++ .../fair/FairAffinityFunctionSelfTest.java | 31 ++ .../GridFairAffinityFunctionNodesSelfTest.java | 245 --------- .../fair/GridFairAffinityFunctionSelfTest.java | 270 ---------- .../IgniteFairAffinityDynamicCacheSelfTest.java | 97 ---- ...ousAffinityFunctionBackupFilterSelfTest.java | 35 ++ ...ffinityFunctionExcludeNeighborsSelfTest.java | 32 ++ .../RendezvousAffinityFunctionSelfTest.java | 50 ++ .../cache/CrossCacheTxRandomOperationsTest.java | 2 +- .../GridCacheAbstractLocalStoreSelfTest.java | 5 + ...idCacheConfigurationConsistencySelfTest.java | 17 - ...dCachePartitionedAffinityFilterSelfTest.java | 143 ------ .../dht/GridCacheDhtPreloadPutGetSelfTest.java | 3 + ...unctionExcludeNeighborsAbstractSelfTest.java | 184 ------- ...ffinityFunctionExcludeNeighborsSelfTest.java | 32 -- ...xcludeNeighborsMultiNodeFullApiSelfTest.java | 36 ++ ...tedFairAffinityMultiNodeFullApiSelfTest.java | 35 ++ ...xcludeNeighborsMultiNodeFullApiSelfTest.java | 36 ++ ...dezvousAffinityMultiNodeFullApiSelfTest.java | 36 ++ .../IgniteCacheFullApiSelfTestSuite.java | 8 + .../ignite/testsuites/IgniteCacheTestSuite.java | 16 +- .../testsuites/IgniteCacheTestSuite2.java | 12 +- .../cache/IgniteCacheAbstractQuerySelfTest.java | 4 + 36 files changed, 2021 insertions(+), 1420 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java index 9dd4d83..9f6bf2b 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java @@ -254,8 +254,7 @@ public abstract class ClientAbstractMultiThreadedSelfTest extends GridCommonAbst final ConcurrentLinkedQueue<String> execQueue = new ConcurrentLinkedQueue<>(); IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { - @Override - public void run() { + @Override public void run() { long processed; while ((processed = cnt.getAndIncrement()) < taskExecutionCount()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java index cc04875..b42b683 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Queue; import java.util.RandomAccess; import java.util.UUID; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.affinity.AffinityCentralizedFunction; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.affinity.AffinityFunctionContext; @@ -38,15 +39,38 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.resources.LoggerResource; +import org.jetbrains.annotations.Nullable; /** * Fair affinity function which tries to ensure that all nodes get equal number of partitions with * minimum amount of reassignments between existing nodes. + * This function supports the following configuration: + * <ul> + * <li> + * {@code partitions} - Number of partitions to spread across nodes. + * </li> + * <li> + * {@code excludeNeighbors} - If set to {@code true}, will exclude same-host-neighbors + * from being backups of each other. This flag can be ignored in cases when topology has no enough nodes + * for assign backups. + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * </li> + * <li> + * {@code backupFilter} - Optional filter for back up nodes. If provided, then only + * nodes that pass this filter will be selected as backup nodes. If not provided, then + * primary and backup nodes will be selected out of all nodes available for this cache. + * </li> + * </ul> * <p> - * Cache affinity can be configured for individual caches via - * {@link CacheConfiguration#setAffinity(AffinityFunction)} method. + * Cache affinity can be configured for individual caches via {@link CacheConfiguration#getAffinity()} method. */ @AffinityCentralizedFunction public class FairAffinityFunction implements AffinityFunction { @@ -62,21 +86,165 @@ public class FairAffinityFunction implements AffinityFunction { /** Descending comparator. */ private static final Comparator<PartitionSet> DESC_CMP = Collections.reverseOrder(ASC_CMP); - /** */ - private final int parts; + /** Number of partitions. */ + private int parts; + + /** Exclude neighbors flag. */ + private boolean exclNeighbors; + + /** Exclude neighbors warning. */ + private transient boolean exclNeighborsWarn; + + /** Logger instance. */ + @LoggerResource + private transient IgniteLogger log; + + /** Optional backup filter. First node is primary, second node is a node being tested. */ + private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter; /** - * Creates fair affinity with default partition count. + * Empty constructor with all defaults. */ public FairAffinityFunction() { - this(DFLT_PART_CNT); + this(false); + } + + /** + * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other + * and specified number of backups. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups + * of each other. + */ + public FairAffinityFunction(boolean exclNeighbors) { + this(exclNeighbors, DFLT_PART_CNT); } /** * @param parts Number of partitions. */ public FairAffinityFunction(int parts) { + this(false, parts); + } + + /** + * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other, + * and specified number of backups and partitions. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups + * of each other. + * @param parts Total number of partitions. + */ + public FairAffinityFunction(boolean exclNeighbors, int parts) { + this(exclNeighbors, parts, null); + } + + /** + * Initializes optional counts for replicas and backups. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param parts Total number of partitions. + * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected + * from all nodes that pass this filter. First argument for this filter is primary node, and second + * argument is node being tested. + */ + public FairAffinityFunction(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + this(false, parts, backupFilter); + } + + /** + * Private constructor. + * + * @param exclNeighbors Exclude neighbors flag. + * @param parts Partitions count. + * @param backupFilter Backup filter. + */ + private FairAffinityFunction(boolean exclNeighbors, int parts, + IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + A.ensure(parts > 0, "parts > 0"); + + this.exclNeighbors = exclNeighbors; this.parts = parts; + this.backupFilter = backupFilter; + } + + /** + * Gets total number of key partitions. To ensure that all partitions are + * equally distributed across all nodes, please make sure that this + * number is significantly larger than a number of nodes. Also, partition + * size should be relatively small. Try to avoid having partitions with more + * than quarter million keys. + * <p> + * Note that for fully replicated caches this method should always + * return {@code 1}. + * + * @return Total partition count. + */ + public int getPartitions() { + return parts; + } + + /** + * Sets total number of partitions. + * + * @param parts Total number of partitions. + */ + public void setPartitions(int parts) { + this.parts = parts; + } + + + /** + * Gets optional backup filter. If not {@code null}, backups will be selected + * from all nodes that pass this filter. First node passed to this filter is primary node, + * and second node is a node being tested. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @return Optional backup filter. + */ + @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() { + return backupFilter; + } + + /** + * Sets optional backup filter. If provided, then backups will be selected from all + * nodes that pass this filter. First node being passed to this filter is primary node, + * and second node is a node being tested. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param backupFilter Optional backup filter. + */ + public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + this.backupFilter = backupFilter; + } + + /** + * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @return {@code True} if nodes residing on the same host may not act as backups of each other. + */ + public boolean isExcludeNeighbors() { + return exclNeighbors; + } + + /** + * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other. + */ + public void setExcludeNeighbors(boolean exclNeighbors) { + this.exclNeighbors = exclNeighbors; } /** {@inheritDoc} */ @@ -89,14 +257,20 @@ public class FairAffinityFunction implements AffinityFunction { return Collections.nCopies(parts, Collections.singletonList(primary)); } - List<List<ClusterNode>> assignment = createCopy(ctx); + Map<UUID, Collection<ClusterNode>> neighborhoodMap = exclNeighbors + ? GridCacheUtils.neighbors(ctx.currentTopologySnapshot()) + : null; + + List<List<ClusterNode>> assignment = createCopy(ctx, neighborhoodMap); + + int backups = ctx.backups(); - int tiers = Math.min(ctx.backups() + 1, topSnapshot.size()); + int tiers = backups == Integer.MAX_VALUE ? topSnapshot.size() : Math.min(backups + 1, topSnapshot.size()); // Per tier pending partitions. Map<Integer, Queue<Integer>> pendingParts = new HashMap<>(); - FullAssignmentMap fullMap = new FullAssignmentMap(tiers, assignment, topSnapshot); + FullAssignmentMap fullMap = new FullAssignmentMap(tiers, assignment, topSnapshot, neighborhoodMap); for (int tier = 0; tier < tiers; tier++) { // Check if this is a new tier and add pending partitions. @@ -104,23 +278,32 @@ public class FairAffinityFunction implements AffinityFunction { for (int part = 0; part < parts; part++) { if (fullMap.assignments.get(part).size() < tier + 1) { - if (pending == null) { - pending = new LinkedList<>(); - - pendingParts.put(tier, pending); - } + if (pending == null) + pendingParts.put(tier, pending = new LinkedList<>()); if (!pending.contains(part)) pending.add(part); - } } // Assign pending partitions, if any. - assignPending(tier, pendingParts, fullMap, topSnapshot); + assignPending(tier, pendingParts, fullMap, topSnapshot, false); // Balance assignments. - balance(tier, pendingParts, fullMap, topSnapshot); + boolean balanced = balance(tier, pendingParts, fullMap, topSnapshot, false); + + if (!balanced && exclNeighbors) { + assignPending(tier, pendingParts, fullMap, topSnapshot, true); + + balance(tier, pendingParts, fullMap, topSnapshot, true); + + if (!exclNeighborsWarn) { + LT.warn(log, null, "Affinity function excludeNeighbors property is ignored " + + "because topology has no enough nodes to assign backups."); + + exclNeighborsWarn = true; + } + } } return fullMap.assignments; @@ -153,9 +336,14 @@ public class FairAffinityFunction implements AffinityFunction { * @param pendingMap Pending partitions per tier. * @param fullMap Full assignment map to modify. * @param topSnapshot Topology snapshot. + * @param allowNeighbors Allow neighbors nodes for partition. */ - private void assignPending(int tier, Map<Integer, Queue<Integer>> pendingMap, FullAssignmentMap fullMap, - List<ClusterNode> topSnapshot) { + private void assignPending(int tier, + Map<Integer, Queue<Integer>> pendingMap, + FullAssignmentMap fullMap, + List<ClusterNode> topSnapshot, + boolean allowNeighbors) + { Queue<Integer> pending = pendingMap.get(tier); if (F.isEmpty(pending)) @@ -168,19 +356,18 @@ public class FairAffinityFunction implements AffinityFunction { PrioritizedPartitionMap underloadedNodes = filterNodes(tierMapping, idealPartCnt, false); // First iterate over underloaded nodes. - assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, false); + assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, false, allowNeighbors); if (!pending.isEmpty() && !underloadedNodes.isEmpty()) { // Same, forcing updates. - assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, true); + assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, true, allowNeighbors); } if (!pending.isEmpty()) - assignPendingToNodes(tier, pendingMap, fullMap, topSnapshot); - - assert pending.isEmpty(); + assignPendingToNodes(tier, pendingMap, fullMap, topSnapshot, allowNeighbors); - pendingMap.remove(tier); + if (pending.isEmpty()) + pendingMap.remove(tier); } /** @@ -192,6 +379,7 @@ public class FairAffinityFunction implements AffinityFunction { * @param underloadedNodes Underloaded nodes. * @param topSnapshot Topology snapshot. * @param force {@code True} if partitions should be moved. + * @param allowNeighbors Allow neighbors nodes for partition. */ private void assignPendingToUnderloaded( int tier, @@ -199,7 +387,8 @@ public class FairAffinityFunction implements AffinityFunction { FullAssignmentMap fullMap, PrioritizedPartitionMap underloadedNodes, Collection<ClusterNode> topSnapshot, - boolean force) { + boolean force, + boolean allowNeighbors) { Iterator<Integer> it = pendingMap.get(tier).iterator(); int ideal = parts / topSnapshot.size(); @@ -212,7 +401,7 @@ public class FairAffinityFunction implements AffinityFunction { assert node != null; - if (fullMap.assign(part, tier, node, force, pendingMap)) { + if (fullMap.assign(part, tier, node, pendingMap, force, allowNeighbors)) { // We could add partition to partition map without forcing, remove partition from pending. it.remove(); @@ -237,9 +426,10 @@ public class FairAffinityFunction implements AffinityFunction { * @param pendingMap Pending partitions per tier. * @param fullMap Full assignment map to modify. * @param topSnapshot Topology snapshot. + * @param allowNeighbors Allow neighbors nodes for partition. */ private void assignPendingToNodes(int tier, Map<Integer, Queue<Integer>> pendingMap, - FullAssignmentMap fullMap, List<ClusterNode> topSnapshot) { + FullAssignmentMap fullMap, List<ClusterNode> topSnapshot, boolean allowNeighbors) { Iterator<Integer> it = pendingMap.get(tier).iterator(); int idx = 0; @@ -254,7 +444,7 @@ public class FairAffinityFunction implements AffinityFunction { do { ClusterNode node = topSnapshot.get(i); - if (fullMap.assign(part, tier, node, false, pendingMap)) { + if (fullMap.assign(part, tier, node, pendingMap, false, allowNeighbors)) { it.remove(); assigned = true; @@ -270,7 +460,7 @@ public class FairAffinityFunction implements AffinityFunction { do { ClusterNode node = topSnapshot.get(i); - if (fullMap.assign(part, tier, node, true, pendingMap)) { + if (fullMap.assign(part, tier, node, pendingMap, true, allowNeighbors)) { it.remove(); assigned = true; @@ -283,7 +473,7 @@ public class FairAffinityFunction implements AffinityFunction { } while (i != idx); } - if (!assigned) + if (!assigned && (!exclNeighbors || exclNeighbors && allowNeighbors)) throw new IllegalStateException("Failed to find assignable node for partition."); } } @@ -295,9 +485,10 @@ public class FairAffinityFunction implements AffinityFunction { * @param pendingParts Pending partitions per tier. * @param fullMap Full assignment map to modify. * @param topSnapshot Topology snapshot. + * @param allowNeighbors Allow neighbors nodes for partition. */ - private void balance(int tier, Map<Integer, Queue<Integer>> pendingParts, FullAssignmentMap fullMap, - Collection<ClusterNode> topSnapshot) { + private boolean balance(int tier, Map<Integer, Queue<Integer>> pendingParts, FullAssignmentMap fullMap, + Collection<ClusterNode> topSnapshot, boolean allowNeighbors) { int idealPartCnt = parts / topSnapshot.size(); Map<UUID, PartitionSet> mapping = fullMap.tierMapping(tier); @@ -313,7 +504,7 @@ public class FairAffinityFunction implements AffinityFunction { boolean assigned = false; for (PartitionSet underloaded : underloadedNodes.assignments()) { - if (fullMap.assign(part, tier, underloaded.node(), false, pendingParts)) { + if (fullMap.assign(part, tier, underloaded.node(), pendingParts, false, allowNeighbors)) { // Size of partition sets has changed. if (overloaded.size() <= idealPartCnt) overloadedNodes.remove(overloaded.nodeId()); @@ -335,7 +526,7 @@ public class FairAffinityFunction implements AffinityFunction { if (!assigned) { for (PartitionSet underloaded : underloadedNodes.assignments()) { - if (fullMap.assign(part, tier, underloaded.node(), true, pendingParts)) { + if (fullMap.assign(part, tier, underloaded.node(), pendingParts, true, allowNeighbors)) { // Size of partition sets has changed. if (overloaded.size() <= idealPartCnt) overloadedNodes.remove(overloaded.nodeId()); @@ -366,6 +557,8 @@ public class FairAffinityFunction implements AffinityFunction { break; } while (true); + + return underloadedNodes.isEmpty(); } /** @@ -393,9 +586,12 @@ public class FairAffinityFunction implements AffinityFunction { * Creates copy of previous partition assignment. * * @param ctx Affinity function context. + * @param neighborhoodMap Neighbors nodes grouped by target node. * @return Assignment copy and per node partition map. */ - private List<List<ClusterNode>> createCopy(AffinityFunctionContext ctx) { + private List<List<ClusterNode>> createCopy(AffinityFunctionContext ctx, + Map<UUID, Collection<ClusterNode>> neighborhoodMap) + { DiscoveryEvent discoEvt = ctx.discoveryEvent(); UUID leftNodeId = (discoEvt == null || discoEvt.type() == EventType.EVT_NODE_JOINED) @@ -411,26 +607,42 @@ public class FairAffinityFunction implements AffinityFunction { if (partNodes == null) partNodesCp = new ArrayList<>(); - else { - if (leftNodeId == null) { - partNodesCp = new ArrayList<>(partNodes.size() + 1); // Node joined. + else + partNodesCp = copyAssigments(neighborhoodMap, partNodes, leftNodeId); - partNodesCp.addAll(partNodes); - } - else { - partNodesCp = new ArrayList<>(partNodes.size()); + cp.add(partNodesCp); + } + + return cp; + } + + /** + * @param neighborhoodMap Neighbors nodes grouped by target node. + * @param partNodes Partition nodes. + * @param leftNodeId Left node id. + */ + private List<ClusterNode> copyAssigments(Map<UUID, Collection<ClusterNode>> neighborhoodMap, + List<ClusterNode> partNodes, UUID leftNodeId) { + final List<ClusterNode> partNodesCp = new ArrayList<>(partNodes.size()); + + for (ClusterNode node : partNodes) { + if (node.id().equals(leftNodeId)) + continue; + + boolean containsNeighbor = false; - for (ClusterNode affNode : partNodes) { - if (!affNode.id().equals(leftNodeId)) - partNodesCp.add(affNode); + if (neighborhoodMap != null) + containsNeighbor = F.exist(neighborhoodMap.get(node.id()), new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return partNodesCp.contains(node); } - } - } + }); - cp.add(partNodesCp); + if (!containsNeighbor) + partNodesCp.add(node); } - return cp; + return partNodesCp; } /** @@ -512,59 +724,11 @@ public class FairAffinityFunction implements AffinityFunction { } /** - * Constructs assignment map for specified tier. - * - * @param tier Tier number, -1 for all tiers altogether. - * @param assignment Assignment to construct map from. - * @param topSnapshot Topology snapshot. - * @return Assignment map. - */ - private static Map<UUID, PartitionSet> assignments(int tier, List<List<ClusterNode>> assignment, - Collection<ClusterNode> topSnapshot) { - Map<UUID, PartitionSet> tmp = new LinkedHashMap<>(); - - for (int part = 0; part < assignment.size(); part++) { - List<ClusterNode> nodes = assignment.get(part); - - assert nodes instanceof RandomAccess; - - if (nodes.size() <= tier) - continue; - - int start = tier < 0 ? 0 : tier; - int end = tier < 0 ? nodes.size() : tier + 1; - - for (int i = start; i < end; i++) { - ClusterNode n = nodes.get(i); - - PartitionSet set = tmp.get(n.id()); - - if (set == null) { - set = new PartitionSet(n); - - tmp.put(n.id(), set); - } - - set.add(part); - } - } - - if (tmp.size() < topSnapshot.size()) { - for (ClusterNode node : topSnapshot) { - if (!tmp.containsKey(node.id())) - tmp.put(node.id(), new PartitionSet(node)); - } - } - - return tmp; - } - - /** * Full assignment map. Auxiliary data structure which maintains resulting assignment and temporary * maps consistent. */ @SuppressWarnings("unchecked") - private static class FullAssignmentMap { + private class FullAssignmentMap { /** Per-tier assignment maps. */ private Map<UUID, PartitionSet>[] tierMaps; @@ -574,20 +738,28 @@ public class FairAffinityFunction implements AffinityFunction { /** Resulting assignment. */ private List<List<ClusterNode>> assignments; + /** Neighborhood map. */ + private final Map<UUID, Collection<ClusterNode>> neighborhoodMap; + /** * @param tiers Number of tiers. * @param assignments Assignments to modify. * @param topSnapshot Topology snapshot. + * @param neighborhoodMap Neighbors nodes grouped by target node. */ - private FullAssignmentMap(int tiers, List<List<ClusterNode>> assignments, Collection<ClusterNode> topSnapshot) { + private FullAssignmentMap(int tiers, + List<List<ClusterNode>> assignments, + Collection<ClusterNode> topSnapshot, + Map<UUID, Collection<ClusterNode>> neighborhoodMap) + { this.assignments = assignments; - - tierMaps = new Map[tiers]; + this.neighborhoodMap = neighborhoodMap; + this.tierMaps = new Map[tiers]; for (int tier = 0; tier < tiers; tier++) - tierMaps[tier] = assignments(tier, assignments, topSnapshot); + tierMaps[tier] = assignments(tier, topSnapshot); - fullMap = assignments(-1, assignments, topSnapshot); + fullMap = assignments(-1, topSnapshot); } /** @@ -599,14 +771,20 @@ public class FairAffinityFunction implements AffinityFunction { * @param part Partition to assign. * @param tier Tier number to assign. * @param node Node to move partition to. - * @param force Force flag. * @param pendingParts per tier pending partitions map. + * @param force Force flag. + * @param allowNeighbors Allow neighbors nodes for partition. * @return {@code True} if assignment succeeded. */ - boolean assign(int part, int tier, ClusterNode node, boolean force, Map<Integer, Queue<Integer>> pendingParts) { + boolean assign(int part, + int tier, + ClusterNode node, + Map<Integer, Queue<Integer>> pendingParts, boolean force, + boolean allowNeighbors) + { UUID nodeId = node.id(); - if (!fullMap.get(nodeId).contains(part)) { + if (isAssignable(part, tier, node, allowNeighbors)) { tierMaps[tier].get(nodeId).add(part); fullMap.get(nodeId).add(part); @@ -656,11 +834,8 @@ public class FairAffinityFunction implements AffinityFunction { Queue<Integer> pending = pendingParts.get(t); - if (pending == null) { - pending = new LinkedList<>(); - - pendingParts.put(t, pending); - } + if (pending == null) + pendingParts.put(t, pending = new LinkedList<>()); pending.add(part); @@ -668,7 +843,7 @@ public class FairAffinityFunction implements AffinityFunction { } } - throw new IllegalStateException("Unable to assign partition to node while force is true."); + return false; } // !force. @@ -684,6 +859,102 @@ public class FairAffinityFunction implements AffinityFunction { public Map<UUID, PartitionSet> tierMapping(int tier) { return tierMaps[tier]; } + + /** + * @param part Partition. + * @param tier Tier. + * @param node Node. + * @param allowNeighbors Allow neighbors. + */ + private boolean isAssignable(int part, int tier, final ClusterNode node, boolean allowNeighbors) { + if (containsPartition(part, node)) + return false; + + if (exclNeighbors) + return allowNeighbors || !neighborsContainPartition(node, part); + else if (backupFilter == null) + return true; + else { + if (tier == 0) { + List<ClusterNode> assigment = assignments.get(part); + + assert assigment.size() > 0; + + List<ClusterNode> backups = assigment.subList(1, assigment.size()); + + return !F.exist(backups, new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode n) { + return !backupFilter.apply(node, n); + } + }); + } + else + return (backupFilter.apply(assignments.get(part).get(0), node)); + } + } + + /** + * @param part Partition. + * @param node Node. + */ + private boolean containsPartition(int part, ClusterNode node) { + return fullMap.get(node.id()).contains(part); + } + + /** + * @param node Node. + * @param part Partition. + */ + private boolean neighborsContainPartition(ClusterNode node, final int part) { + return F.exist(neighborhoodMap.get(node.id()), new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode n) { + return fullMap.get(n.id()).contains(part); + } + }); + } + + /** + * Constructs assignments map for specified tier. + * + * @param tier Tier number, -1 for all tiers altogether. + * @param topSnapshot Topology snapshot. + * @return Assignment map. + */ + private Map<UUID, PartitionSet> assignments(int tier, Collection<ClusterNode> topSnapshot) { + Map<UUID, PartitionSet> tmp = new LinkedHashMap<>(); + + for (int part = 0; part < assignments.size(); part++) { + List<ClusterNode> nodes = assignments.get(part); + + assert nodes instanceof RandomAccess; + + if (nodes.size() <= tier) + continue; + + int start = tier < 0 ? 0 : tier; + int end = tier < 0 ? nodes.size() : tier + 1; + + for (int i = start; i < end; i++) { + ClusterNode n = nodes.get(i); + + PartitionSet set = tmp.get(n.id()); + + if (set == null) + tmp.put(n.id(), set = new PartitionSet(n)); + + set.add(part); + } + } + + if (tmp.size() < topSnapshot.size()) { + for (ClusterNode node : topSnapshot) { + if (!tmp.containsKey(node.id())) + tmp.put(node.id(), new PartitionSet(node)); + } + } + + return tmp; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java index fd07eb9..61a21d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java @@ -29,26 +29,28 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.affinity.AffinityFunctionContext; import org.apache.ignite.cache.affinity.AffinityNodeHashResolver; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.Nullable; /** @@ -60,8 +62,9 @@ import org.jetbrains.annotations.Nullable; * </li> * <li> * {@code excludeNeighbors} - If set to {@code true}, will exclude same-host-neighbors - * from being backups of each other. Note that {@code backupFilter} is ignored if - * {@code excludeNeighbors} is set to {@code true}. + * from being backups of each other. This flag can be ignored in cases when topology has no enough nodes + * for assign backups. + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. * </li> * <li> * {@code backupFilter} - Optional filter for back up nodes. If provided, then only @@ -70,7 +73,7 @@ import org.jetbrains.annotations.Nullable; * </li> * </ul> * <p> - * Cache affinity can be configured for individual caches via {@link org.apache.ignite.configuration.CacheConfiguration#getAffinity()} method. + * Cache affinity can be configured for individual caches via {@link CacheConfiguration#getAffinity()} method. */ public class RendezvousAffinityFunction implements AffinityFunction, Externalizable { /** */ @@ -80,8 +83,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza public static final int DFLT_PARTITION_COUNT = 1024; /** Comparator. */ - private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = - new HashComparator(); + private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator(); /** Thread local message digest. */ private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() { @@ -92,8 +94,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza catch (NoSuchAlgorithmException e) { assert false : "Should have failed in constructor"; - throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", - e); + throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", e); } } }; @@ -104,6 +105,9 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza /** Exclude neighbors flag. */ private boolean exclNeighbors; + /** Exclude neighbors warning. */ + private transient boolean exclNeighborsWarn; + /** Optional backup filter. First node is primary, second node is a node being tested. */ private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter; @@ -114,6 +118,10 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza @IgniteInstanceResource private Ignite ignite; + /** Logger instance. */ + @LoggerResource + private transient IgniteLogger log; + /** * Empty constructor with all defaults. */ @@ -125,7 +133,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other * and specified number of backups. * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. * * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups * of each other. @@ -138,7 +146,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other, * and specified number of backups and partitions. * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. * * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups * of each other. @@ -151,14 +159,14 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza /** * Initializes optional counts for replicas and backups. * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. * * @param parts Total number of partitions. * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected * from all nodes that pass this filter. First argument for this filter is primary node, and second * argument is node being tested. * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. */ public RendezvousAffinityFunction(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { this(false, parts, backupFilter); @@ -173,7 +181,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza */ private RendezvousAffinityFunction(boolean exclNeighbors, int parts, IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { - A.ensure(parts != 0, "parts != 0"); + A.ensure(parts > 0, "parts > 0"); this.exclNeighbors = exclNeighbors; this.parts = parts; @@ -253,7 +261,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza * from all nodes that pass this filter. First node passed to this filter is primary node, * and second node is a node being tested. * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. * * @return Optional backup filter. */ @@ -266,7 +274,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza * nodes that pass this filter. First node being passed to this filter is primary node, * and second node is a node being tested. * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. * * @param backupFilter Optional backup filter. */ @@ -277,7 +285,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza /** * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. * * @return {@code True} if nodes residing on the same host may not act as backups of each other. */ @@ -288,7 +296,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza /** * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. * * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other. */ @@ -355,20 +363,9 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza Collections.sort(lst, COMPARATOR); - int primaryAndBackups; - - List<ClusterNode> res; - - if (backups == Integer.MAX_VALUE) { - primaryAndBackups = Integer.MAX_VALUE; - - res = new ArrayList<>(); - } - else { - primaryAndBackups = backups + 1; + int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size()); - res = new ArrayList<>(primaryAndBackups); - } + List<ClusterNode> res = new ArrayList<>(primaryAndBackups); ClusterNode primary = lst.get(0).get2(); @@ -376,39 +373,38 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza // Select backups. if (backups > 0) { - for (int i = 1; i < lst.size(); i++) { + for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) { IgniteBiTuple<Long, ClusterNode> next = lst.get(i); ClusterNode node = next.get2(); if (exclNeighbors) { - Collection<ClusterNode> allNeighbors = allNeighbors(neighborhoodCache, res); + Collection<ClusterNode> allNeighbors = GridCacheUtils.neighborsForNodes(neighborhoodCache, res); if (!allNeighbors.contains(node)) res.add(node); } - else { - if (!res.contains(node) && (backupFilter == null || backupFilter.apply(primary, node))) - res.add(next.get2()); - } - - if (res.size() == primaryAndBackups) - break; + else if (backupFilter == null || backupFilter.apply(primary, node)) + res.add(next.get2()); } } if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) { - // Need to iterate one more time in case if there are no nodes which pass exclude backups criteria. - for (int i = 1; i < lst.size(); i++) { + // Need to iterate again in case if there are no nodes which pass exclude neighbors backups criteria. + for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) { IgniteBiTuple<Long, ClusterNode> next = lst.get(i); ClusterNode node = next.get2(); if (!res.contains(node)) res.add(next.get2()); + } + + if (!exclNeighborsWarn) { + LT.warn(log, null, "Affinity function excludeNeighbors property is ignored " + + "because topology has no enough nodes to assign backups."); - if (res.size() == primaryAndBackups) - break; + exclNeighborsWarn = true; } } @@ -437,7 +433,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza List<List<ClusterNode>> assignments = new ArrayList<>(parts); Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ? - neighbors(affCtx.currentTopologySnapshot()) : null; + GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null; for (int i = 0; i < parts; i++) { List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(), @@ -463,6 +459,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { parts = in.readInt(); exclNeighbors = in.readBoolean(); @@ -471,57 +468,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza } /** - * Builds neighborhood map for all nodes in snapshot. - * - * @param topSnapshot Topology snapshot. - * @return Neighbors map. - */ - private Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) { - Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f); - - // Group by mac addresses. - for (ClusterNode node : topSnapshot) { - String macs = node.attribute(IgniteNodeAttributes.ATTR_MACS); - - Collection<ClusterNode> nodes = macMap.get(macs); - - if (nodes == null) { - nodes = new HashSet<>(); - - macMap.put(macs, nodes); - } - - nodes.add(node); - } - - Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f); - - for (Collection<ClusterNode> group : macMap.values()) { - for (ClusterNode node : group) - neighbors.put(node.id(), group); - } - - return neighbors; - } - - /** - * @param neighborhoodCache Neighborhood cache. - * @param nodes Nodes. - * @return All neighbors for given nodes. - */ - private Collection<ClusterNode> allNeighbors(Map<UUID, Collection<ClusterNode>> neighborhoodCache, - Iterable<ClusterNode> nodes) { - Collection<ClusterNode> res = new HashSet<>(); - - for (ClusterNode node : nodes) { - if (!res.contains(node)) - res.addAll(neighborhoodCache.get(node.id())); - } - - return res; - } - - /** * */ private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable { http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 722e570..578ad6c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -53,7 +53,6 @@ import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.affinity.AffinityFunctionContext; import org.apache.ignite.cache.affinity.AffinityNodeAddressHashResolver; -import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.CacheStoreSessionListener; @@ -367,18 +366,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheType cacheType, @Nullable CacheStore cfgStore) throws IgniteCheckedException { if (cc.getCacheMode() == REPLICATED) { - if (cc.getAffinity() instanceof FairAffinityFunction) - throw new IgniteCheckedException("REPLICATED cache can not be started with FairAffinityFunction" + - " [cacheName=" + U.maskName(cc.getName()) + ']'); - - if (cc.getAffinity() instanceof RendezvousAffinityFunction) { - RendezvousAffinityFunction aff = (RendezvousAffinityFunction)cc.getAffinity(); - - if (aff.isExcludeNeighbors()) - throw new IgniteCheckedException("For REPLICATED cache flag 'excludeNeighbors' in " + - "RendezvousAffinityFunction cannot be set [cacheName=" + U.maskName(cc.getName()) + ']'); - } - if (cc.getNearConfiguration() != null && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) { U.warn(log, "Near cache cannot be used with REPLICATED cache, " + http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index ee1f4a1..f7d115f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -1811,4 +1812,53 @@ public class GridCacheUtils { } }; } + + /** + * Builds neighborhood map for all nodes in snapshot. + * + * @param topSnapshot Topology snapshot. + * @return Neighbors map. + */ + public static Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) { + Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f); + + // Group by mac addresses. + for (ClusterNode node : topSnapshot) { + String macs = node.attribute(IgniteNodeAttributes.ATTR_MACS); + + Collection<ClusterNode> nodes = macMap.get(macs); + + if (nodes == null) + macMap.put(macs, nodes = new HashSet<>()); + + nodes.add(node); + } + + Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f); + + for (Collection<ClusterNode> group : macMap.values()) + for (ClusterNode node : group) + neighbors.put(node.id(), group); + + return neighbors; + } + + /** + * Returns neighbors for all {@code nodes}. + * + * @param neighborhood Neighborhood cache. + * @param nodes Nodes. + * @return All neighbors for given nodes. + */ + public static Collection<ClusterNode> neighborsForNodes(Map<UUID, Collection<ClusterNode>> neighborhood, + Iterable<ClusterNode> nodes) { + Collection<ClusterNode> res = new HashSet<>(); + + for (ClusterNode node : nodes) { + if (!res.contains(node)) + res.addAll(neighborhood.get(node.id())); + } + + return res; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java new file mode 100644 index 0000000..878d7d1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; +import org.apache.ignite.testframework.GridTestNode; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstractTest { + /** MAC prefix. */ + private static final String MAC_PREF = "MAC"; + + /** + * Returns affinity function. + * + * @return Affinity function. + */ + protected abstract AffinityFunction affinityFunction(); + + /** + * @throws Exception If failed. + */ + public void testNodeRemovedNoBackups() throws Exception { + checkNodeRemoved(0); + } + + /** + * @throws Exception If failed. + */ + public void testNodeRemovedOneBackup() throws Exception { + checkNodeRemoved(1); + } + + /** + * @throws Exception If failed. + */ + public void testNodeRemovedTwoBackups() throws Exception { + checkNodeRemoved(2); + } + + /** + * @throws Exception If failed. + */ + public void testNodeRemovedThreeBackups() throws Exception { + checkNodeRemoved(3); + } + + /** + * @throws Exception If failed. + */ + public void testRandomReassignmentNoBackups() throws Exception { + checkRandomReassignment(0); + } + + /** + * @throws Exception If failed. + */ + public void testRandomReassignmentOneBackup() throws Exception { + checkRandomReassignment(1); + } + + /** + * @throws Exception If failed. + */ + public void testRandomReassignmentTwoBackups() throws Exception { + checkRandomReassignment(2); + } + + /** + * @throws Exception If failed. + */ + public void testRandomReassignmentThreeBackups() throws Exception { + checkRandomReassignment(3); + } + + /** + * @throws Exception If failed. + */ + protected void checkNodeRemoved(int backups) throws Exception { + checkNodeRemoved(backups, 1, 1); + } + + /** + * @throws Exception If failed. + */ + protected void checkNodeRemoved(int backups, int neighborsPerHost, int neighborsPeriod) throws Exception { + + AffinityFunction aff = affinityFunction(); + + int nodesCnt = 50; + + List<ClusterNode> nodes = new ArrayList<>(nodesCnt); + + List<List<ClusterNode>> prev = null; + + for (int i = 0; i < nodesCnt; i++) { + info("======================================"); + info("Assigning partitions: " + i); + info("======================================"); + + ClusterNode node = new GridTestNode(UUID.randomUUID()); + + if (neighborsPerHost > 0) + node.attribute(MAC_PREF + ((i / neighborsPeriod) % (nodesCnt / neighborsPerHost))); + + nodes.add(node); + + DiscoveryEvent discoEvt = new DiscoveryEvent(node, "", EventType.EVT_NODE_JOINED, node); + + GridAffinityFunctionContextImpl ctx = + new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i), backups); + + List<List<ClusterNode>> assignment = aff.assignPartitions(ctx); + + info("Assigned."); + + verifyAssignment(assignment, backups, aff.partitions(), nodes.size()); + + prev = assignment; + } + + info("======================================"); + info("Will remove nodes."); + info("======================================"); + + for (int i = 0; i < nodesCnt - 1; i++) { + info("======================================"); + info("Assigning partitions: " + i); + info("======================================"); + + ClusterNode rmv = nodes.remove(nodes.size() - 1); + + DiscoveryEvent discoEvt = new DiscoveryEvent(rmv, "", EventType.EVT_NODE_LEFT, rmv); + + List<List<ClusterNode>> assignment = aff.assignPartitions( + new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i), + backups)); + + info("Assigned."); + + verifyAssignment(assignment, backups, aff.partitions(), nodes.size()); + + prev = assignment; + } + } + + /** + * @param backups Backups. + */ + protected void checkRandomReassignment(int backups) { + AffinityFunction aff = affinityFunction(); + + Random rnd = new Random(); + + int maxNodes = 50; + + List<ClusterNode> nodes = new ArrayList<>(maxNodes); + + List<List<ClusterNode>> prev = null; + + int state = 0; + + int i = 0; + + while (true) { + boolean add; + + if (nodes.size() < 2) { + // Returned back to one node? + if (state == 1) + return; + + add = true; + } + else if (nodes.size() == maxNodes) { + if (state == 0) + state = 1; + + add = false; + } + else { + // Nodes size in [2, maxNodes - 1]. + if (state == 0) + add = rnd.nextInt(3) != 0; // 66% to add, 33% to remove. + else + add = rnd.nextInt(3) == 0; // 33% to add, 66% to remove. + } + + DiscoveryEvent discoEvt; + + if (add) { + ClusterNode addedNode = new GridTestNode(UUID.randomUUID()); + + nodes.add(addedNode); + + discoEvt = new DiscoveryEvent(addedNode, "", EventType.EVT_NODE_JOINED, addedNode); + } + else { + ClusterNode rmvNode = nodes.remove(rnd.nextInt(nodes.size())); + + discoEvt = new DiscoveryEvent(rmvNode, "", EventType.EVT_NODE_LEFT, rmvNode); + } + + info("======================================"); + info("Assigning partitions [iter=" + i + ", discoEvt=" + discoEvt + ", nodesSize=" + nodes.size() + ']'); + info("======================================"); + + List<List<ClusterNode>> assignment = aff.assignPartitions( + new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i), + backups)); + + verifyAssignment(assignment, backups, aff.partitions(), nodes.size()); + + prev = assignment; + + i++; + } + } + + + /** + * @param assignment Assignment to verify. + */ + private void verifyAssignment(List<List<ClusterNode>> assignment, int keyBackups, int partsCnt, int topSize) { + Map<UUID, Collection<Integer>> mapping = new HashMap<>(); + + int ideal = Math.round((float)partsCnt / topSize * Math.min(keyBackups + 1, topSize)); + + for (int part = 0; part < assignment.size(); part++) { + for (ClusterNode node : assignment.get(part)) { + assert node != null; + + Collection<Integer> parts = mapping.get(node.id()); + + if (parts == null) { + parts = new HashSet<>(); + + mapping.put(node.id(), parts); + } + + assertTrue(parts.add(part)); + } + } + + int max = -1, min = Integer.MAX_VALUE; + + for (Collection<Integer> parts : mapping.values()) { + max = Math.max(max, parts.size()); + min = Math.min(min, parts.size()); + } + + log().warning("max=" + max + ", min=" + min + ", ideal=" + ideal + ", minDev=" + deviation(min, ideal) + "%, " + + "maxDev=" + deviation(max, ideal) + "%"); + } + + /** + * @param val Value. + * @param ideal Ideal. + */ + private static int deviation(int val, int ideal) { + return Math.round(Math.abs(((float)val - ideal) / ideal * 100)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java new file mode 100644 index 0000000..24704ed --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity; + +import java.util.Collection; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * + */ +public class AffinityClientNodeSelfTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODE_CNT = 4; + + /** */ + private static final String CACHE1 = "cache1"; + + /** */ + private static final String CACHE2 = "cache2"; + + /** */ + private static final String CACHE3 = "cache3"; + + /** */ + private static final String CACHE4 = "cache4"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + if (gridName.equals(getTestGridName(NODE_CNT - 1))) + cfg.setClientMode(true); + + CacheConfiguration ccfg1 = new CacheConfiguration(); + + ccfg1.setBackups(1); + ccfg1.setName(CACHE1); + ccfg1.setAffinity(new RendezvousAffinityFunction()); + ccfg1.setNodeFilter(new TestNodesFilter()); + + CacheConfiguration ccfg2 = new CacheConfiguration(); + + ccfg2.setBackups(1); + ccfg2.setName(CACHE2); + ccfg2.setAffinity(new RendezvousAffinityFunction()); + + CacheConfiguration ccfg3 = new CacheConfiguration(); + + ccfg3.setBackups(1); + ccfg3.setName(CACHE3); + ccfg3.setAffinity(new FairAffinityFunction()); + ccfg3.setNodeFilter(new TestNodesFilter()); + + CacheConfiguration ccfg4 = new CacheConfiguration(); + + ccfg4.setCacheMode(REPLICATED); + ccfg4.setName(CACHE4); + ccfg4.setNodeFilter(new TestNodesFilter()); + + cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3, ccfg4); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(NODE_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeNotInAffinity() throws Exception { + checkCache(CACHE1, 2); + + checkCache(CACHE2, 2); + + checkCache(CACHE3, 2); + + checkCache(CACHE4, 3); + + Ignite client = ignite(NODE_CNT - 1); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setBackups(0); + + ccfg.setNodeFilter(new TestNodesFilter()); + + IgniteCache<Integer, Integer> cache = client.createCache(ccfg); + + try { + checkCache(null, 1); + } + finally { + cache.destroy(); + } + + cache = client.createCache(ccfg, new NearCacheConfiguration()); + + try { + checkCache(null, 1); + } + finally { + cache.destroy(); + } + } + + /** + * @param cacheName Cache name. + * @param expNodes Expected number of nodes per partition. + */ + private void checkCache(String cacheName, int expNodes) { + log.info("Test cache: " + cacheName); + + Ignite client = ignite(NODE_CNT - 1); + + assertTrue(client.configuration().isClientMode()); + + ClusterNode clientNode = client.cluster().localNode(); + + for (int i = 0; i < NODE_CNT; i++) { + Ignite ignite = ignite(i); + + Affinity<Integer> aff = ignite.affinity(cacheName); + + for (int part = 0; part < aff.partitions(); part++) { + Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(part); + + assertEquals(expNodes, nodes.size()); + + assertFalse(nodes.contains(clientNode)); + } + } + } + + /** + * + */ + private static class TestNodesFilter implements IgnitePredicate<ClusterNode> { + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode clusterNode) { + Boolean attr = clusterNode.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE); + + assertNotNull(attr); + + assertFalse(attr); + + return true; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java new file mode 100644 index 0000000..3bf41c1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity; + +import java.util.Collection; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; + +/** + * Base tests of {@link AffinityFunction} implementations with user provided backup filter. + */ +public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridCommonAbstractTest { + /** Ip finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Backup count. */ + private static final int BACKUPS = 1; + + /** Split attribute name. */ + private static final String SPLIT_ATTRIBUTE_NAME = "split-attribute"; + + /** Split attribute value. */ + private String splitAttrVal; + + /** Test backup filter. */ + protected static final IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter = + new IgniteBiPredicate<ClusterNode, ClusterNode>() { + @Override public boolean apply(ClusterNode primary, ClusterNode backup) { + assert primary != null : "primary is null"; + assert backup != null : "backup is null"; + + return !F.eq(primary.attribute(SPLIT_ATTRIBUTE_NAME), backup.attribute(SPLIT_ATTRIBUTE_NAME)); + } + }; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setBackups(BACKUPS); + cacheCfg.setAffinity(affinityFunction()); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setRebalanceMode(SYNC); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + spi.setIpFinder(IP_FINDER); + + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(cacheCfg); + cfg.setDiscoverySpi(spi); + cfg.setUserAttributes(F.asMap(SPLIT_ATTRIBUTE_NAME, splitAttrVal)); + + return cfg; + } + + /** + * @return Affinity function for test. + */ + protected abstract AffinityFunction affinityFunction(); + + /** + * @throws Exception If failed. + */ + public void testPartitionDistribution() throws Exception { + try { + for (int i = 0; i < 3; i++) { + splitAttrVal = "A"; + + startGrid(2 * i); + + splitAttrVal = "B"; + + startGrid(2 * i + 1); + + awaitPartitionMapExchange(); + + checkPartitions(); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("ConstantConditions") + private void checkPartitions() throws Exception { + AffinityFunction aff = cacheConfiguration(grid(0).configuration(), null).getAffinity(); + + int partCnt = aff.partitions(); + + IgniteCache<Object, Object> cache = grid(0).cache(null); + + for (int i = 0; i < partCnt; i++) { + Collection<ClusterNode> nodes = affinity(cache).mapKeyToPrimaryAndBackups(i); + + assertEquals(2, nodes.size()); + + ClusterNode primary = F.first(nodes); + ClusterNode backup = F.last(nodes); + + assertFalse(F.eq(primary.attribute(SPLIT_ATTRIBUTE_NAME), backup.attribute(SPLIT_ATTRIBUTE_NAME))); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java new file mode 100644 index 0000000..10cb5a5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheRebalanceMode.NONE; + +/** + * Partitioned affinity test. + */ +@SuppressWarnings({"PointlessArithmeticExpression", "FieldCanBeLocal"}) +public abstract class AffinityFunctionExcludeNeighborsAbstractSelfTest extends GridCommonAbstractTest { + /** Number of backups. */ + private int backups = 2; + + /** Number of girds. */ + private int gridInstanceNum; + + /** Ip finder. */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + // Override node attributes in discovery spi. + TcpDiscoverySpi spi = new TcpDiscoverySpi() { + @Override public void setNodeAttributes(Map<String, Object> attrs, + IgniteProductVersion ver) { + super.setNodeAttributes(attrs, ver); + + // Set unique mac addresses for every group of three nodes. + String macAddrs = "MOCK_MACS_" + (gridInstanceNum / 3); + + attrs.put(IgniteNodeAttributes.ATTR_MACS, macAddrs); + + gridInstanceNum++; + } + }; + + spi.setIpFinder(ipFinder); + + c.setDiscoverySpi(spi); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + + cc.setBackups(backups); + + cc.setAffinity(affinityFunction()); + + cc.setRebalanceMode(NONE); + + c.setCacheConfiguration(cc); + + return c; + } + + /** + * @return Affinity function for test. + */ + protected abstract AffinityFunction affinityFunction(); + + /** + * @param aff Affinity. + * @param key Key. + * @return Nodes. + */ + private static Collection<? extends ClusterNode> nodes(Affinity<Object> aff, Object key) { + return aff.mapKeyToPrimaryAndBackups(key); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityMultiNode() throws Exception { + int grids = 9; + + startGrids(grids); + + try { + Object key = 12345; + + int copies = backups + 1; + + for (int i = 0; i < grids; i++) { + final Ignite g = grid(i); + + Affinity<Object> aff = g.affinity(null); + + List<TcpDiscoveryNode> top = new ArrayList<>(); + + for (ClusterNode node : g.cluster().nodes()) + top.add((TcpDiscoveryNode) node); + + Collections.sort(top); + + assertEquals(grids, top.size()); + + int idx = 1; + + for (ClusterNode n : top) { + assertEquals(idx, n.order()); + + idx++; + } + + Collection<? extends ClusterNode> affNodes = nodes(aff, key); + + info("Affinity picture for grid [i=" + i + ", aff=" + U.toShortString(affNodes)); + + assertEquals(copies, affNodes.size()); + + Set<String> macs = new HashSet<>(); + + for (ClusterNode node : affNodes) + macs.add((String)node.attribute(IgniteNodeAttributes.ATTR_MACS)); + + assertEquals(copies, macs.size()); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testAffinitySingleNode() throws Exception { + Ignite g = startGrid(); + + try { + Object key = 12345; + + Collection<? extends ClusterNode> affNodes = nodes(g.affinity(null), key); + + info("Affinity picture for grid: " + U.toShortString(affNodes)); + + assertEquals(1, affNodes.size()); + } + finally { + stopAllGrids(); + } + } +} \ No newline at end of file