ascherbakoff commented on a change in pull request #8668:
URL: https://github.com/apache/ignite/pull/8668#discussion_r564338368



##########
File path: 
modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilter.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ * This class can be used as a {@link 
RendezvousAffinityFunction#affinityBackupFilter } to create
+ * cache templates in Spring that force each partition's primary and backup to 
be co-located on nodes with the same
+ * attribute value.
+ * <p>
+ *
+ * Partition copies co-location can be helpful to group nodes into cells when 
fixed baseline topology is used. If all
+ * copies of each partition are located inside only one cell, in case of 
{@code backup + 1} nodes leave the cluster
+ * there will be data lost only if all leaving nodes belong to the same cell. 
Without partition copies co-location
+ * within a cell, most probably there will be data lost if any {@code backup + 
1} nodes leave the cluster.
+ *
+ * Note: Baseline topology change can lead to inter-cell partitions migration, 
i.e. rebalance can affect all copies
+ * of some partitions even if only one node is changed in the baseline 
topology.
+ * <p>
+ *
+ * This implementation will discard backups rather than place copies on nodes 
with different attribute values. This
+ * avoids trying to cram more data onto remaining nodes when some have failed.
+ * <p>
+ * A node attribute to compare is provided on construction.
+ *
+ * Note: All cluster nodes, on startup, automatically register all the 
environment and system properties as node
+ * attributes.
+ *
+ * Note: Node attributes persisted in baseline topology at the time of 
baseline topology change. If the co-location
+ * attribute of some node was updated, but the baseline topology wasn't 
changed, the outdated attribute value can be
+ * used by the backup filter when this node left the cluster. To avoid this, 
the baseline topology should be updated
+ * after changing the co-location attribute.
+ * <p>
+ * This class is constructed with a node attribute name, and a candidate node 
will be rejected if previously selected
+ * nodes for a partition have a different value for attribute on the candidate 
node.
+ * </pre>
+ * <h2 class="header">Spring Example</h2>
+ * Create a partitioned cache template plate with 1 backup, where the backup 
will be placed in the same cell
+ * as the primary.   Note: This example requires that the environment variable 
"CELL" be set appropriately on
+ * each node via some means external to Ignite.
+ * <pre name="code" class="xml">
+ * &lt;property name="cacheConfiguration"&gt;
+ *     &lt;list&gt;
+ *         &lt;bean id="cache-template-bean" abstract="true" 
class="org.apache.ignite.configuration.CacheConfiguration"&gt;
+ *             &lt;property name="name" value="JobcaseDefaultCacheConfig*"/&gt;
+ *             &lt;property name="cacheMode" value="PARTITIONED" /&gt;
+ *             &lt;property name="backups" value="1" /&gt;
+ *             &lt;property name="affinity"&gt;
+ *                 &lt;bean 
class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction"&gt;
+ *                     &lt;property name="affinityBackupFilter"&gt;
+ *                         &lt;bean 
class="org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter"&gt;
+ *                             &lt;!-- Backups must go to the same CELL as 
primary --&gt;
+ *                             &lt;constructor-arg value="CELL" /&gt;
+ *                         &lt;/bean&gt;
+ *                     &lt;/property&gt;
+ *                 &lt;/bean&gt;
+ *             &lt;/property&gt;
+ *         &lt;/bean&gt;
+ *     &lt;/list&gt;
+ * &lt;/property&gt;
+ * </pre>
+ * <p>
+ */
+public class ClusterNodeAttributeColocatedBackupFilter implements 
IgniteBiPredicate<ClusterNode, List<ClusterNode>> {
+    /** */
+    private static final long serialVersionUID = 1L;
+
+    /** Attribute name. */
+    private final String attrName;
+
+    /**
+     * @param attrName The attribute name for the attribute to compare.
+     */
+    public ClusterNodeAttributeColocatedBackupFilter(String attrName) {
+        this.attrName = attrName;
+    }
+
+    /**
+     * Defines a predicate which returns {@code true} if a node is acceptable 
for a backup
+     * or {@code false} otherwise. An acceptable node is one where its 
attribute value
+     * is exact match with previously selected nodes.  If an attribute does not
+     * exist on exactly one node of a pair, then the attribute does not match. 
 If the attribute
+     * does not exist both nodes of a pair, then the attribute matches.
+     *
+     * @param candidate          A node that is a candidate for becoming a 
backup node for a partition.
+     * @param previouslySelected A list of primary/backup nodes already chosen 
for a partition.
+     *                           The primary is first.
+     */
+    @Override public boolean apply(ClusterNode candidate, List<ClusterNode> 
previouslySelected) {
+        for (ClusterNode node : previouslySelected)
+            return Objects.equals(candidate.attribute(attrName), 
node.attribute(attrName));
+
+        return true;
+    }

Review comment:
       This can be simplfied to 
   `return Objects.equals(candidate.attribute(attrName), 
previouslySelected.get(0).attribute(attrName));`

##########
File path: 
modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilterSelfTest.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import 
org.apache.ignite.cache.affinity.AffinityFunctionBackupFilterAbstractSelfTest;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Partitioned affinity test.
+ */
+public class ClusterNodeAttributeColocatedBackupFilterSelfTest extends 
AffinityFunctionBackupFilterAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected AffinityFunction affinityFunction() {
+        return affinityFunctionWithAffinityBackupFilter(SPLIT_ATTRIBUTE_NAME);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected AffinityFunction 
affinityFunctionWithAffinityBackupFilter(String attrName) {
+        RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false);
+
+        aff.setAffinityBackupFilter(new 
ClusterNodeAttributeColocatedBackupFilter(attrName));
+
+        return aff;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkPartitionsWithAffinityBackupFilter() {
+        AffinityFunction aff = cacheConfiguration(grid(0).configuration(), 
DEFAULT_CACHE_NAME).getAffinity();
+
+        int partCnt = aff.partitions();
+
+        int iter = grid(0).cluster().nodes().size() / 4;
+
+        IgniteCache<Object, Object> cache = grid(0).cache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < partCnt; i++) {
+            Collection<ClusterNode> nodes = 
affinity(cache).mapKeyToPrimaryAndBackups(i);
+
+            Map<String, Integer> stat = getAttributeStatistic(nodes);
+
+            if (stat.get(FIRST_NODE_GROUP) > 0) {
+                assertEquals((Integer)Math.min(backups + 1, iter * 2), 
stat.get(FIRST_NODE_GROUP));
+                assertEquals((Integer)0, stat.get("B"));
+                assertEquals((Integer)0, stat.get("C"));
+            }
+            else if (stat.get("B") > 0) {
+                assertEquals((Integer)0, stat.get(FIRST_NODE_GROUP));
+                assertEquals((Integer)iter, stat.get("B"));
+                assertEquals((Integer)0, stat.get("C"));
+            }
+            else if (stat.get("C") > 0) {
+                assertEquals((Integer)0, stat.get(FIRST_NODE_GROUP));
+                assertEquals((Integer)0, stat.get("B"));
+                assertEquals((Integer)iter, stat.get("C"));
+            }
+            else
+                fail("Unexpected partition assignment");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkPartitions() throws Exception {
+        int iter = grid(0).cluster().nodes().size() / 2;
+
+        AffinityFunction aff = cacheConfiguration(grid(0).configuration(), 
DEFAULT_CACHE_NAME).getAffinity();
+
+        Map<Integer, String> partToAttr = 
partToAttr(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions());
+
+        assertTrue(F.exist(partToAttr.values(), "A"::equals));
+        assertTrue(F.exist(partToAttr.values(), "B"::equals));
+        assertFalse(F.exist(partToAttr.values(), v -> !"A".equals(v) && 
!"B".equals(v)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testPartitionDistributionWithAffinityBackupFilter() 
throws Exception {
+        backups = 2;
+
+        super.testPartitionDistributionWithAffinityBackupFilter();
+    }
+
+    /** */
+    @Test
+    public void testBackupFilterWithBaseline() throws Exception {
+        backups = 1;
+
+        try {
+            startGrid(0, "A");
+            startGrid(1, "B");
+            startGrid(2, "C");
+
+            startGrid(3, "A");
+            startGrid(4, "B");
+            startGrid(5, "C");
+
+            awaitPartitionMapExchange();
+
+            AffinityFunction aff = cacheConfiguration(grid(0).configuration(), 
DEFAULT_CACHE_NAME).getAffinity();
+
+            Map<Integer, String> partToAttr = 
partToAttr(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions());
+
+            grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+            // Check that we have the same distribution if some BLT nodes are 
offline.
+            stopGrid(3);
+            stopGrid(4);
+            stopGrid(5);
+
+            awaitPartitionMapExchange();
+
+            assertEquals(partToAttr, 
partToAttr(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions()));
+
+            // Check that not BLT nodes do not affect distribution.
+            startGrid(6, "D");
+
+            awaitPartitionMapExchange();
+
+            assertEquals(partToAttr, 
partToAttr(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions()));
+
+            // Check that distribution is recalculated after BLT change.
+            long topVer = grid(0).cluster().topologyVersion();
+
+            grid(0).cluster().setBaselineTopology(topVer);
+
+            // Wait for rebalance and assignment change to ideal assignment.
+            assertTrue(GridTestUtils.waitForCondition(() -> 
F.eq(grid(0).context().discovery().topologyVersionEx(),
+                    new AffinityTopologyVersion(topVer, 2)), 5_000L));
+
+            assertNotEquals(partToAttr, 
partToAttr(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions()));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /** */
+    @Test
+    public void testBackupFilterWithNullAttribute() throws Exception {
+        backups = 1;
+
+        try {
+            startGrid(0, "A");
+            startGrid(1, "A");
+            startGrid(2, (String)null);
+            startGrid(3, (String)null);
+
+            awaitPartitionMapExchange();
+
+            AffinityFunction aff = cacheConfiguration(grid(0).configuration(), 
DEFAULT_CACHE_NAME).getAffinity();
+
+            Map<Integer, String> partToAttr = 
partToAttr(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions());
+
+            assertTrue(F.exist(partToAttr.values(), Objects::isNull));
+            assertTrue(F.exist(partToAttr.values(), "A"::equals));
+            assertFalse(F.exist(partToAttr.values(), v -> !(v == null) && 
!"A".equals(v)));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Determine split attribute value for each partition and check that this 
value is the same for all nodes for
+     * this partition.
+     */
+    private Map<Integer, String> partToAttr(IgniteCache<Object, Object> cache, 
int partCnt) {

Review comment:
       Method abbreviations are not allowed in code style.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilter.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ * This class can be used as a {@link 
RendezvousAffinityFunction#affinityBackupFilter } to create
+ * cache templates in Spring that force each partition's primary and backup to 
be co-located on nodes with the same
+ * attribute value.
+ * <p>
+ *
+ * Partition copies co-location can be helpful to group nodes into cells when 
fixed baseline topology is used. If all
+ * copies of each partition are located inside only one cell, in case of 
{@code backup + 1} nodes leave the cluster
+ * there will be data lost only if all leaving nodes belong to the same cell. 
Without partition copies co-location
+ * within a cell, most probably there will be data lost if any {@code backup + 
1} nodes leave the cluster.
+ *
+ * Note: Baseline topology change can lead to inter-cell partitions migration, 
i.e. rebalance can affect all copies
+ * of some partitions even if only one node is changed in the baseline 
topology.
+ * <p>
+ *
+ * This implementation will discard backups rather than place copies on nodes 
with different attribute values. This
+ * avoids trying to cram more data onto remaining nodes when some have failed.
+ * <p>
+ * A node attribute to compare is provided on construction.
+ *
+ * Note: All cluster nodes, on startup, automatically register all the 
environment and system properties as node
+ * attributes.
+ *
+ * Note: Node attributes persisted in baseline topology at the time of 
baseline topology change. If the co-location
+ * attribute of some node was updated, but the baseline topology wasn't 
changed, the outdated attribute value can be
+ * used by the backup filter when this node left the cluster. To avoid this, 
the baseline topology should be updated
+ * after changing the co-location attribute.
+ * <p>
+ * This class is constructed with a node attribute name, and a candidate node 
will be rejected if previously selected
+ * nodes for a partition have a different value for attribute on the candidate 
node.
+ * </pre>
+ * <h2 class="header">Spring Example</h2>
+ * Create a partitioned cache template plate with 1 backup, where the backup 
will be placed in the same cell
+ * as the primary.   Note: This example requires that the environment variable 
"CELL" be set appropriately on
+ * each node via some means external to Ignite.
+ * <pre name="code" class="xml">
+ * &lt;property name="cacheConfiguration"&gt;
+ *     &lt;list&gt;
+ *         &lt;bean id="cache-template-bean" abstract="true" 
class="org.apache.ignite.configuration.CacheConfiguration"&gt;
+ *             &lt;property name="name" value="JobcaseDefaultCacheConfig*"/&gt;
+ *             &lt;property name="cacheMode" value="PARTITIONED" /&gt;
+ *             &lt;property name="backups" value="1" /&gt;
+ *             &lt;property name="affinity"&gt;
+ *                 &lt;bean 
class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction"&gt;
+ *                     &lt;property name="affinityBackupFilter"&gt;
+ *                         &lt;bean 
class="org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter"&gt;
+ *                             &lt;!-- Backups must go to the same CELL as 
primary --&gt;
+ *                             &lt;constructor-arg value="CELL" /&gt;
+ *                         &lt;/bean&gt;
+ *                     &lt;/property&gt;
+ *                 &lt;/bean&gt;
+ *             &lt;/property&gt;
+ *         &lt;/bean&gt;
+ *     &lt;/list&gt;
+ * &lt;/property&gt;
+ * </pre>
+ * <p>
+ */
+public class ClusterNodeAttributeColocatedBackupFilter implements 
IgniteBiPredicate<ClusterNode, List<ClusterNode>> {
+    /** */
+    private static final long serialVersionUID = 1L;
+
+    /** Attribute name. */
+    private final String attrName;
+
+    /**
+     * @param attrName The attribute name for the attribute to compare.
+     */
+    public ClusterNodeAttributeColocatedBackupFilter(String attrName) {
+        this.attrName = attrName;
+    }
+
+    /**
+     * Defines a predicate which returns {@code true} if a node is acceptable 
for a backup
+     * or {@code false} otherwise. An acceptable node is one where its 
attribute value
+     * is exact match with previously selected nodes.  If an attribute does not

Review comment:
       I strongly suggest to deny null attributes in this filter and add the 
validation of attribute value existence, otherwise it's possible to lose data 
if a node was restarted without an attribute due to loss of owners.
   
   Checking not null value and throwing an error to trigger failure handler in 
case of absence will do.
   
   Javadoc should be modified to reflect a requirement of the attribute.
   
   Even better would be to replace attribute filter closure with something like 
this for correct attribute validation:
   ```
   /**
    * A basic class for node attribute aware backup filter implementations.
    * Listed attributes are required to be configured on new cluster node 
joining or first activation.
    */
   public abstract class NodeAttributeAwareBackupFilter implements 
IgniteBiPredicate<ClusterNode, List<ClusterNode>> {
       /** Attribute names. */
       protected final Set<String> attributeNames;
   
       /**
        * @param attributeNames The list of attribute names for the set of 
attributes to compare. Must be at least one.
        */
       protected NodeAttributeAwareBackupFilter(String... attributeNames) {
           A.ensure(attributeNames.length > 0, "attributeNames.length > 0");
   
           this.attributeNames = Collections.unmodifiableSet(new 
HashSet<>(Arrays.asList(attributeNames)));
       }
   
       /**
        * @return The set of used cluster node attributes for this filter.
        */
       public Set<String> attributeNames() {
           return attributeNames;
       }
   }
   ```
   
   Ideally attribute values for backup filter should be persisted in the node's 
metastore to avoid accidental changes (without direct user request), or maybe 
persisted in cache configuration after cache creation.
   
   These improvements can be done in separate tickets.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to