ignite-4154 affinity

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/87b09dba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/87b09dba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/87b09dba

Branch: refs/heads/ignite-4154-3
Commit: 87b09dba85b5ed7996ba93a10ef4f28eb398c4a8
Parents: f74c9f4
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Nov 2 11:44:24 2016 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Nov 2 14:00:33 2016 +0300

----------------------------------------------------------------------
 .../processors/affinity/AffinityAssignment.java |  88 ++++++++++
 .../affinity/GridAffinityAssignment.java        | 120 ++++++-------
 .../affinity/GridAffinityAssignmentCache.java   |  83 +++++++--
 .../affinity/GridAffinityProcessor.java         |   8 +-
 .../processors/affinity/GridAffinityUtils.java  |   8 +-
 .../affinity/HistoryAffinityAssignment.java     | 169 +++++++++++++++++++
 .../cache/CacheAffinitySharedManager.java       |  38 ++++-
 .../cache/GridCacheAffinityManager.java         |   4 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   4 +-
 .../dht/preloader/GridDhtPreloader.java         |   4 +-
 10 files changed, 433 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
new file mode 100644
index 0000000..06207d3
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cluster.ClusterNode;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Cached affinity calculations.
+ */
+public interface AffinityAssignment {
+    /**
+     * @return {@code True} if related discovery event did not not cause 
affinity assignment change and
+     *    this assignment is just reference to the previous one.
+     */
+    public boolean clientEventChange();
+
+    /**
+     * @return Affinity assignment computed by affinity function.
+     */
+    public List<List<ClusterNode>> idealAssignment();
+
+    /**
+     * @return Affinity assignment.
+     */
+    public List<List<ClusterNode>> assignment();
+
+    /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion topologyVersion();
+
+    /**
+     * Get affinity nodes for partition.
+     *
+     * @param part Partition.
+     * @return Affinity nodes.
+     */
+    public List<ClusterNode> get(int part);
+
+    /**
+     * Get affinity node IDs for partition.
+     *
+     * @param part Partition.
+     * @return Affinity nodes IDs.
+     */
+    public HashSet<UUID> getIds(int part);
+
+    /**
+     * @return Nodes having primary partitions assignments.
+     */
+    public Set<ClusterNode> primaryPartitionNodes();
+
+    /**
+     * Get primary partitions for specified node ID.
+     *
+     * @param nodeId Node ID to get primary partitions for.
+     * @return Primary partitions for specified node ID.
+     */
+    public Set<Integer> primaryPartitions(UUID nodeId);
+
+    /**
+     * Get backup partitions for specified node ID.
+     *
+     * @param nodeId Node ID to get backup partitions for.
+     * @return Backup partitions for specified node ID.
+     */
+    public Set<Integer> backupPartitions(UUID nodeId);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index 568e4e8..2940d92 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -27,12 +27,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * Cached affinity calculations.
  */
-public class GridAffinityAssignment implements Serializable {
+public class GridAffinityAssignment implements AffinityAssignment, 
Serializable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -86,7 +88,7 @@ public class GridAffinityAssignment implements Serializable {
 
         this.topVer = topVer;
         this.assignment = assignment;
-        this.idealAssignment = idealAssignment;
+        this.idealAssignment = idealAssignment.equals(assignment) ? assignment 
: idealAssignment;
 
         primary = new HashMap<>();
         backup = new HashMap<>();
@@ -139,96 +141,76 @@ public class GridAffinityAssignment implements 
Serializable {
         return topVer;
     }
 
-    /**
-     * Get affinity nodes for partition.
-     *
-     * @param part Partition.
-     * @return Affinity nodes.
-     */
-    public List<ClusterNode> get(int part) {
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> get(int part) {
         assert part >= 0 && part < assignment.size() : "Affinity partition is 
out of range" +
-            " [part=" + part + ", partitions=" + assignment.size() + ']';
+                " [part=" + part + ", partitions=" + assignment.size() + ']';
 
         return assignment.get(part);
     }
 
-    /**
-     * Get affinity node IDs for partition.
-     *
-     * @param part Partition.
-     * @return Affinity nodes IDs.
-     */
-    public HashSet<UUID> getIds(int part) {
+    /** {@inheritDoc} */
+    @Override public HashSet<UUID> getIds(int part) {
         assert part >= 0 && part < assignment.size() : "Affinity partition is 
out of range" +
-            " [part=" + part + ", partitions=" + assignment.size() + ']';
+                " [part=" + part + ", partitions=" + assignment.size() + ']';
 
-        List<HashSet<UUID>> assignmentIds0 = assignmentIds;
+        List<ClusterNode> nodes = assignment.get(part);
 
-        if (assignmentIds0 == null) {
-            assignmentIds0 = new ArrayList<>();
+        HashSet<UUID> ids = U.newHashSet(nodes.size());
 
-            for (List<ClusterNode> assignmentPart : assignment) {
-                HashSet<UUID> partIds = new HashSet<>();
+        for (int i = 0; i < nodes.size(); i++)
+            ids.add(nodes.get(i).id());
 
-                for (ClusterNode node : assignmentPart)
-                    partIds.add(node.id());
+        return ids;
+    }
 
-                assignmentIds0.add(partIds);
-            }
+    /** {@inheritDoc} */
+    @Override public Set<ClusterNode> primaryPartitionNodes() {
+        Set<ClusterNode> res = new HashSet<>();
+
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
 
-            assignmentIds = assignmentIds0;
+            if (!F.isEmpty(nodes))
+                res.add(nodes.get(0));
         }
 
-        return assignmentIds0.get(part);
+        return res;
     }
 
-    /**
-     * @return Nodes having primary partitions assignments.
-     */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    public Set<ClusterNode> primaryPartitionNodes() {
-        Set<ClusterNode> primaryPartsNodes0 = primaryPartsNodes;
-
-        if (primaryPartsNodes0 == null) {
-            int parts = assignment.size();
-
-            primaryPartsNodes0 = new HashSet<>();
-
-            for (int p = 0; p < parts; p++) {
-                List<ClusterNode> nodes = assignment.get(p);
+    /** {@inheritDoc} */
+    @Override public Set<Integer> primaryPartitions(UUID nodeId) {
+        Set<Integer> res = new HashSet<>();
 
-                if (nodes.size() > 0)
-                    primaryPartsNodes0.add(nodes.get(0));
-            }
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
 
-            primaryPartsNodes = primaryPartsNodes0;
+            if (!F.isEmpty(nodes) && nodes.get(0).id().equals(nodeId))
+                res.add(p);
         }
 
-        return primaryPartsNodes0;
+        return res;
     }
 
-    /**
-     * Get primary partitions for specified node ID.
-     *
-     * @param nodeId Node ID to get primary partitions for.
-     * @return Primary partitions for specified node ID.
-     */
-    public Set<Integer> primaryPartitions(UUID nodeId) {
-        Set<Integer> set = primary.get(nodeId);
+    /** {@inheritDoc} */
+    @Override public Set<Integer> backupPartitions(UUID nodeId) {
+        Set<Integer> res = new HashSet<>();
 
-        return set == null ? Collections.<Integer>emptySet() : set;
-    }
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
 
-    /**
-     * Get backup partitions for specified node ID.
-     *
-     * @param nodeId Node ID to get backup partitions for.
-     * @return Backup partitions for specified node ID.
-     */
-    public Set<Integer> backupPartitions(UUID nodeId) {
-        Set<Integer> set = backup.get(nodeId);
+            for (int i = 1; i < nodes.size(); i++) {
+                ClusterNode node = nodes.get(i);
+
+                if (node.id().equals(nodeId)) {
+                    res.add(p);
+
+                    break;
+                }
+            }
+        }
 
-        return set == null ? Collections.<Integer>emptySet() : set;
+        return res;
     }
 
     /**
@@ -274,10 +256,10 @@ public class GridAffinityAssignment implements 
Serializable {
         if (o == this)
             return true;
 
-        if (o == null || getClass() != o.getClass())
+        if (o == null || !(o instanceof AffinityAssignment))
             return false;
 
-        return topVer.equals(((GridAffinityAssignment)o).topVer);
+        return topVer.equals(((AffinityAssignment)o).topologyVersion());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a81b34d..9166b31 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -78,7 +78,7 @@ public class GridAffinityAssignmentCache {
     private final int partsCnt;
 
     /** Affinity calculation results cache: topology version => partition => 
nodes. */
-    private final ConcurrentNavigableMap<AffinityTopologyVersion, 
GridAffinityAssignment> affCache;
+    private final ConcurrentNavigableMap<AffinityTopologyVersion, 
HistoryAffinityAssignment> affCache;
 
     /** */
     private List<List<ClusterNode>> idealAssignment;
@@ -107,6 +107,9 @@ public class GridAffinityAssignmentCache {
     /** Full history size. */
     private final AtomicInteger fullHistSize = new AtomicInteger();
 
+    /** */
+    private final SimilarAffinityKey similarAffKey;
+
     /**
      * Constructs affinity cached calculations.
      *
@@ -127,6 +130,7 @@ public class GridAffinityAssignmentCache {
     {
         assert ctx != null;
         assert aff != null;
+        assert nodeFilter != null;
 
         this.ctx = ctx;
         this.aff = aff;
@@ -142,6 +146,12 @@ public class GridAffinityAssignmentCache {
         partsCnt = aff.partitions();
         affCache = new ConcurrentSkipListMap<>();
         head = new AtomicReference<>(new 
GridAffinityAssignment(AffinityTopologyVersion.NONE));
+
+        similarAffKey = new SimilarAffinityKey(aff.getClass(), 
nodeFilter.getClass(), backups, partsCnt);
+    }
+
+    public Object similarAffinityKey() {
+        return similarAffKey;
     }
 
     /**
@@ -170,7 +180,7 @@ public class GridAffinityAssignmentCache {
 
         GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, 
affAssignment, idealAssignment);
 
-        affCache.put(topVer, assignment);
+        affCache.put(topVer, new HistoryAffinityAssignment(assignment));
         head.set(assignment);
 
         for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : 
readyFuts.entrySet()) {
@@ -300,7 +310,7 @@ public class GridAffinityAssignmentCache {
 
         GridAffinityAssignment assignmentCpy = new 
GridAffinityAssignment(topVer, aff);
 
-        affCache.put(topVer, assignmentCpy);
+        affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy));
         head.set(assignmentCpy);
 
         for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : 
readyFuts.entrySet()) {
@@ -328,7 +338,7 @@ public class GridAffinityAssignmentCache {
      * @return Affinity assignment.
      */
     public List<List<ClusterNode>> assignments(AffinityTopologyVersion topVer) 
{
-        GridAffinityAssignment aff = cachedAffinity(topVer);
+        AffinityAssignment aff = cachedAffinity(topVer);
 
         return aff.assignment();
     }
@@ -427,7 +437,7 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version.
      * @return Cached affinity.
      */
-    public GridAffinityAssignment cachedAffinity(AffinityTopologyVersion 
topVer) {
+    public AffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) {
         if (topVer.equals(AffinityTopologyVersion.NONE))
             topVer = lastVersion();
         else
@@ -435,7 +445,7 @@ public class GridAffinityAssignmentCache {
 
         assert topVer.topologyVersion() >= 0 : topVer;
 
-        GridAffinityAssignment cache = head.get();
+        AffinityAssignment cache = head.get();
 
         if (!cache.topologyVersion().equals(topVer)) {
             cache = affCache.get(topVer);
@@ -463,7 +473,7 @@ public class GridAffinityAssignmentCache {
      * @return {@code True} if primary changed or required affinity version 
not found in history.
      */
     public boolean primaryChanged(int part, AffinityTopologyVersion startVer, 
AffinityTopologyVersion endVer) {
-        GridAffinityAssignment aff = affCache.get(startVer);
+        AffinityAssignment aff = affCache.get(startVer);
 
         if (aff == null)
             return false;
@@ -475,7 +485,7 @@ public class GridAffinityAssignmentCache {
 
         ClusterNode primary = nodes.get(0);
 
-        for (GridAffinityAssignment assignment : affCache.tailMap(startVer, 
false).values()) {
+        for (AffinityAssignment assignment : affCache.tailMap(startVer, 
false).values()) {
             List<ClusterNode> nodes0 = assignment.assignment().get(part);
 
             if (nodes0.isEmpty())
@@ -549,10 +559,10 @@ public class GridAffinityAssignmentCache {
         }
 
         if (rmvCnt > 0) {
-            Iterator<GridAffinityAssignment> it = affCache.values().iterator();
+            Iterator<HistoryAffinityAssignment> it = 
affCache.values().iterator();
 
             while (it.hasNext() && rmvCnt > 0) {
-                GridAffinityAssignment aff0 = it.next();
+                AffinityAssignment aff0 = it.next();
 
                 it.remove();
 
@@ -602,4 +612,57 @@ public class GridAffinityAssignmentCache {
             return S.toString(AffinityReadyFuture.class, this);
         }
     }
+
+    /**
+     *
+     */
+    private static class SimilarAffinityKey {
+        /** */
+        private final int backups;
+
+        /** */
+        private final Class<?> affFuncCls;
+
+        /** */
+        private final Class<?> filterCls;
+
+        /** */
+        private final int partsCnt;
+
+        /** */
+        private final int hash;
+
+        public SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int 
backups, int partsCnt) {
+            this.backups = backups;
+            this.affFuncCls = affFuncCls;
+            this.filterCls = filterCls;
+            this.partsCnt = partsCnt;
+
+            int hash = backups;
+            hash = 31 * hash + affFuncCls.hashCode();
+            hash = 31 * hash + filterCls.hashCode();
+            hash= 31 * hash + partsCnt;
+
+            this.hash = hash;
+        }
+
+        @Override public int hashCode() {
+            return hash;
+        }
+
+        @Override public boolean equals(Object o) {
+            if (o == this)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            SimilarAffinityKey key = (SimilarAffinityKey)o;
+
+            return backups == key.backups &&
+                affFuncCls == key.affFuncCls &&
+                filterCls == key.filterCls &&
+                partsCnt == key.partsCnt;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 1726d02..7c22ef5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -385,10 +385,16 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             }
 
             try {
+                AffinityAssignment assign0 = 
cctx.affinity().assignment(topVer);
+
+                GridAffinityAssignment assign = assign0 instanceof 
GridAffinityAssignment ?
+                        (GridAffinityAssignment)assign0 :
+                        new GridAffinityAssignment(topVer, 
assign0.assignment(), assign0.idealAssignment());
+
                 AffinityInfo info = new AffinityInfo(
                     cctx.config().getAffinity(),
                     cctx.config().getAffinityMapper(),
-                    new GridAffinityAssignment(topVer, 
cctx.affinity().assignment(topVer)),
+                    assign,
                     cctx.cacheObjectContext());
 
                 IgniteInternalFuture<AffinityInfo> old = 
affMap.putIfAbsent(key, new GridFinishedFuture<>(info));

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
index c24dd2d..abd5292 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
@@ -180,10 +180,16 @@ class GridAffinityUtils {
 
             cctx.affinity().affinityReadyFuture(topVer).get();
 
+            AffinityAssignment assign0 = cctx.affinity().assignment(topVer);
+
+            GridAffinityAssignment assign = assign0 instanceof 
GridAffinityAssignment ?
+                (GridAffinityAssignment)assign0 :
+                new GridAffinityAssignment(topVer, assign0.assignment(), 
assign0.idealAssignment());
+
             return F.t(
                 affinityMessage(ctx, cctx.config().getAffinity()),
                 affinityMessage(ctx, cctx.config().getAffinityMapper()),
-                new GridAffinityAssignment(topVer, 
cctx.affinity().assignment(topVer)));
+                assign);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
new file mode 100644
index 0000000..e502dd5
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ *
+ */
+public class HistoryAffinityAssignment implements AffinityAssignment {
+    /** */
+    private final AffinityTopologyVersion topVer;
+
+    /** */
+    private final List<List<ClusterNode>> assignment;
+
+    /** */
+    private final List<List<ClusterNode>> idealAssignment;
+
+    /** */
+    private final boolean clientEvtChange;
+
+    /**
+     * @param assign Assignment.
+     */
+    public HistoryAffinityAssignment(GridAffinityAssignment assign) {
+        this.topVer = assign.topologyVersion();
+        this.assignment = assign.assignment();
+        this.idealAssignment = assign.idealAssignment();
+        this.clientEvtChange = assign.clientEventChange();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean clientEventChange() {
+        return clientEvtChange;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> idealAssignment() {
+        return idealAssignment;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> assignment() {
+        return assignment;
+    }
+
+    /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> get(int part) {
+        assert part >= 0 && part < assignment.size() : "Affinity partition is 
out of range" +
+            " [part=" + part + ", partitions=" + assignment.size() + ']';
+
+        return assignment.get(part);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HashSet<UUID> getIds(int part) {
+        assert part >= 0 && part < assignment.size() : "Affinity partition is 
out of range" +
+            " [part=" + part + ", partitions=" + assignment.size() + ']';
+
+        List<ClusterNode> nodes = assignment.get(part);
+
+        HashSet<UUID> ids = U.newHashSet(nodes.size());
+
+        for (int i = 0; i < nodes.size(); i++)
+            ids.add(nodes.get(i).id());
+
+        return ids;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<ClusterNode> primaryPartitionNodes() {
+        Set<ClusterNode> res = new HashSet<>();
+
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
+
+            if (!F.isEmpty(nodes))
+                res.add(nodes.get(0));
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<Integer> primaryPartitions(UUID nodeId) {
+        Set<Integer> res = new HashSet<>();
+
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
+
+            if (!F.isEmpty(nodes) && nodes.get(0).id().equals(nodeId))
+                res.add(p);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<Integer> backupPartitions(UUID nodeId) {
+        Set<Integer> res = new HashSet<>();
+
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
+
+            for (int i = 1; i < nodes.size(); i++) {
+                ClusterNode node = nodes.get(i);
+
+                if (node.id().equals(nodeId)) {
+                    res.add(p);
+
+                    break;
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return topVer.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("SimplifiableIfStatement")
+    @Override public boolean equals(Object o) {
+        if (o == this)
+            return true;
+
+        if (o == null || !(o instanceof AffinityAssignment))
+            return false;
+
+        return topVer.equals(((AffinityAssignment)o).topologyVersion());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HistoryAffinityAssignment.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 1aedf4e..88f1f97 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -508,6 +508,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         assert assignment != null;
 
+        final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
         forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) 
throws IgniteCheckedException {
                 List<List<ClusterNode>> idealAssignment = 
aff.idealAssignment();
@@ -527,7 +529,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 else
                     newAssignment = idealAssignment;
 
-                aff.initialize(topVer, newAssignment);
+                aff.initialize(topVer, cachedAssignment(aff, newAssignment, 
affCache));
             }
         });
     }
@@ -562,6 +564,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         final Map<Integer, IgniteUuid> deploymentIds = 
msg.cacheDeploymentIds();
 
+        final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
         forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) 
throws IgniteCheckedException {
                 AffinityTopologyVersion affTopVer = aff.lastVersion();
@@ -602,7 +606,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         assignment.set(part, nodes);
                     }
 
-                    aff.initialize(topVer, assignment);
+                    aff.initialize(topVer, cachedAssignment(aff, assignment, 
affCache));
                 }
                 else
                     aff.clientEventTopologyChange(exchFut.discoveryEvent(), 
topVer);
@@ -1206,6 +1210,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         throws IgniteCheckedException {
         AffinityTopologyVersion topVer = fut.topologyVersion();
 
+        final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
         if (!crd) {
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                 if (cacheCtx.isLocal())
@@ -1213,7 +1219,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
                 boolean latePrimary = cacheCtx.rebalanceEnabled();
 
-                initAffinityOnNodeJoin(fut, 
cacheCtx.affinity().affinityCache(), null, latePrimary);
+                initAffinityOnNodeJoin(fut, 
cacheCtx.affinity().affinityCache(), null, latePrimary, affCache);
             }
 
             return null;
@@ -1227,7 +1233,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
                     boolean latePrimary = cache.rebalanceEnabled;
 
-                    initAffinityOnNodeJoin(fut, cache.affinity(), 
waitRebalanceInfo, latePrimary);
+                    initAffinityOnNodeJoin(fut, cache.affinity(), 
waitRebalanceInfo, latePrimary, affCache);
                 }
             });
 
@@ -1245,7 +1251,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut,
         GridAffinityAssignmentCache aff,
         WaitRebalanceInfo rebalanceInfo,
-        boolean latePrimary)
+        boolean latePrimary,
+        Map<Object, List<List<ClusterNode>>> affCache)
         throws IgniteCheckedException
     {
         assert lateAffAssign;
@@ -1292,7 +1299,26 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         if (newAssignment == null)
             newAssignment = idealAssignment;
 
-        aff.initialize(fut.topologyVersion(), newAssignment);
+        aff.initialize(fut.topologyVersion(), cachedAssignment(aff, 
newAssignment, affCache));
+    }
+
+    /**
+     * @param aff
+     * @param assign
+     * @param affCache
+     * @return
+     */
+    private List<List<ClusterNode>> 
cachedAssignment(GridAffinityAssignmentCache aff,
+        List<List<ClusterNode>> assign,
+        Map<Object, List<List<ClusterNode>>> affCache) {
+        List<List<ClusterNode>> assign0 = 
affCache.get(aff.similarAffinityKey());
+
+        if (assign0 != null && assign0.equals(assign))
+            assign = assign0;
+        else
+            affCache.put(aff.similarAffinityKey(), assign);
+
+        return assign;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 71ae5c9..6e5a28e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -25,8 +25,8 @@ import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -265,7 +265,7 @@ public class GridCacheAffinityManager extends 
GridCacheManagerAdapter {
      * @param topVer Topology version.
      * @return Affinity assignment.
      */
-    public GridAffinityAssignment assignment(AffinityTopologyVersion topVer) {
+    public AffinityAssignment assignment(AffinityTopologyVersion topVer) {
         if (cctx.isLocal())
             topVer = LOC_CACHE_TOP_VER;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 50f7f0f..871a084 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -35,8 +35,8 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -859,7 +859,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion 
topVer) {
-        GridAffinityAssignment affAssignment = 
cctx.affinity().assignment(topVer);
+        AffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
 
         List<ClusterNode> affNodes = affAssignment.get(p);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 09aec81..d6865c1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -35,8 +35,8 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
 import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -600,7 +600,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                     log.debug("Affinity is ready for topology version, will 
send response [topVer=" + topVer +
                         ", node=" + node + ']');
 
-                GridAffinityAssignment assignment = 
cctx.affinity().assignment(topVer);
+                AffinityAssignment assignment = 
cctx.affinity().assignment(topVer);
 
                 boolean newAffMode = 
node.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0;
 

Reply via email to