SammyVimes commented on code in PR #1016:
URL: https://github.com/apache/ignite-3/pull/1016#discussion_r957445993


##########
modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java:
##########
@@ -235,4 +288,209 @@ public static boolean recoverable(Throwable t) {
         // As long as we don't have a general failure handler, we assume that 
all errors are recoverable.
         return true;
     }
+
+    /**
+     * Starts the process of removing peer from raft group if that peer has 
in-memory storage or if its
+     * storage was cleared.
+     *
+     * @param partId Partition's raft group id.
+     * @param clusterNode Cluster node to be removed from peers.
+     * @param metaStorageMgr MetaStorage manager.
+     * @return Completable future that signifies the completion of this 
operation.
+     */
+    public static CompletableFuture<Void> startPeerRemoval(String partId, 
ClusterNode clusterNode, MetaStorageManager metaStorageMgr) {
+        ByteArray key = switchReduceKey(partId);
+
+        return metaStorageMgr.get(key)
+                .thenCompose(retrievedAssignmentsSwitchReduce -> {
+                    byte[] prevValue = 
retrievedAssignmentsSwitchReduce.value();
+
+                    boolean prevValueEmpty = true;
+                    List<ClusterNode> calculatedAssignmentsSwitchReduce;
+
+                    if (prevValue != null) {
+                        calculatedAssignmentsSwitchReduce = 
(List<ClusterNode>) ByteUtils.fromBytes(prevValue);
+                        prevValueEmpty = false;
+                    } else {
+                        calculatedAssignmentsSwitchReduce = new ArrayList<>();
+                    }
+
+                    calculatedAssignmentsSwitchReduce.add(clusterNode);
+
+                    byte[] newValue = 
ByteUtils.toBytes(calculatedAssignmentsSwitchReduce);
+
+                    if (prevValueEmpty) {
+                        return metaStorageMgr.invoke(
+                                Conditions.notExists(key),
+                                Operations.put(key, newValue),
+                                Operations.noop()
+                        );
+                    } else {
+                        return metaStorageMgr.invoke(
+                                
revision(key).eq(retrievedAssignmentsSwitchReduce.revision()),
+                                Operations.put(key, newValue),
+                                Operations.noop()
+                        );
+                    }
+                }).thenCompose(res -> {
+                    if (!res) {
+                        return startPeerRemoval(partId, clusterNode, 
metaStorageMgr);
+                    }
+
+                    return CompletableFuture.completedFuture(null);
+                });
+    }
+
+    /**
+     * Handles assignments switch reduce changed.
+     *
+     * @param metaStorageMgr MetaStorage manager.
+     * @param baselineNodes Baseline nodes.
+     * @param partitions Partitions count.
+     * @param replicas Replicas count.
+     * @param partNum Number of the partition.
+     * @param partId Partition's raft group id..
+     * @param event Assignments switch reduce change event.
+     * @return Completable future that signifies the completion of this 
operation.
+     */
+    public static CompletableFuture<Void> 
handleReduceChanged(MetaStorageManager metaStorageMgr, Collection<ClusterNode> 
baselineNodes,
+            int partitions, int replicas, int partNum, String partId, 
WatchEvent event) {
+        Entry entry = event.entryEvent().newEntry();
+        byte[] eventData = entry.value();
+
+        List<ClusterNode> assignments = 
AffinityUtils.calculateAssignments(baselineNodes, partitions, 
replicas).get(partNum);
+        List<ClusterNode> switchReduce = (List<ClusterNode>) 
ByteUtils.fromBytes(eventData);
+
+        ByteArray pendingKey = pendingPartAssignmentsKey(partId);
+
+        List<ClusterNode> pendingAssignments = subtract(assignments, 
switchReduce);
+
+        byte[] pendingByteArray = ByteUtils.toBytes(pendingAssignments);
+        byte[] assignmentsByteArray = ByteUtils.toBytes(assignments);
+
+        if (switchReduce.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        ByteArray changeTriggerKey = partChangeTriggerKey(partId);
+        byte[] rev = 
ByteUtils.longToBytes(event.entryEvent().newEntry().revision());
+
+        // if ((notExists(changeTriggerKey) || value(changeTriggerKey) < 
revision) && (notExists(pendingKey) && notExists(stableKey)) {
+        //     put(pendingKey, pending)
+        //     put(stableKey, assignments)
+        //     put(changeTriggerKey, revision)
+        // } else if ((notExists(changeTriggerKey) || value(changeTriggerKey) 
< revision) && (notExists(pendingKey))) {
+        //     put(pendingKey, pending)
+        //     put(changeTriggerKey, revision)
+        // }
+
+        If iif = If.iif(

Review Comment:
   If we could do metastorage operations as code, this would be just an `if`. I 
don't know how else I should call it. Any other name won't grasp the meaning of 
it.



##########
modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java:
##########
@@ -235,4 +288,209 @@ public static boolean recoverable(Throwable t) {
         // As long as we don't have a general failure handler, we assume that 
all errors are recoverable.
         return true;
     }
+
+    /**
+     * Starts the process of removing peer from raft group if that peer has 
in-memory storage or if its
+     * storage was cleared.
+     *
+     * @param partId Partition's raft group id.
+     * @param clusterNode Cluster node to be removed from peers.
+     * @param metaStorageMgr MetaStorage manager.
+     * @return Completable future that signifies the completion of this 
operation.
+     */
+    public static CompletableFuture<Void> startPeerRemoval(String partId, 
ClusterNode clusterNode, MetaStorageManager metaStorageMgr) {
+        ByteArray key = switchReduceKey(partId);
+
+        return metaStorageMgr.get(key)
+                .thenCompose(retrievedAssignmentsSwitchReduce -> {
+                    byte[] prevValue = 
retrievedAssignmentsSwitchReduce.value();
+
+                    boolean prevValueEmpty = true;
+                    List<ClusterNode> calculatedAssignmentsSwitchReduce;
+
+                    if (prevValue != null) {
+                        calculatedAssignmentsSwitchReduce = 
(List<ClusterNode>) ByteUtils.fromBytes(prevValue);
+                        prevValueEmpty = false;
+                    } else {
+                        calculatedAssignmentsSwitchReduce = new ArrayList<>();
+                    }
+
+                    calculatedAssignmentsSwitchReduce.add(clusterNode);
+
+                    byte[] newValue = 
ByteUtils.toBytes(calculatedAssignmentsSwitchReduce);
+
+                    if (prevValueEmpty) {
+                        return metaStorageMgr.invoke(
+                                Conditions.notExists(key),
+                                Operations.put(key, newValue),
+                                Operations.noop()
+                        );
+                    } else {
+                        return metaStorageMgr.invoke(
+                                
revision(key).eq(retrievedAssignmentsSwitchReduce.revision()),
+                                Operations.put(key, newValue),
+                                Operations.noop()
+                        );
+                    }
+                }).thenCompose(res -> {
+                    if (!res) {
+                        return startPeerRemoval(partId, clusterNode, 
metaStorageMgr);
+                    }
+
+                    return CompletableFuture.completedFuture(null);
+                });
+    }
+
+    /**
+     * Handles assignments switch reduce changed.
+     *
+     * @param metaStorageMgr MetaStorage manager.
+     * @param baselineNodes Baseline nodes.
+     * @param partitions Partitions count.
+     * @param replicas Replicas count.
+     * @param partNum Number of the partition.
+     * @param partId Partition's raft group id..
+     * @param event Assignments switch reduce change event.
+     * @return Completable future that signifies the completion of this 
operation.
+     */
+    public static CompletableFuture<Void> 
handleReduceChanged(MetaStorageManager metaStorageMgr, Collection<ClusterNode> 
baselineNodes,
+            int partitions, int replicas, int partNum, String partId, 
WatchEvent event) {
+        Entry entry = event.entryEvent().newEntry();
+        byte[] eventData = entry.value();
+
+        List<ClusterNode> assignments = 
AffinityUtils.calculateAssignments(baselineNodes, partitions, 
replicas).get(partNum);
+        List<ClusterNode> switchReduce = (List<ClusterNode>) 
ByteUtils.fromBytes(eventData);
+
+        ByteArray pendingKey = pendingPartAssignmentsKey(partId);
+
+        List<ClusterNode> pendingAssignments = subtract(assignments, 
switchReduce);
+
+        byte[] pendingByteArray = ByteUtils.toBytes(pendingAssignments);
+        byte[] assignmentsByteArray = ByteUtils.toBytes(assignments);
+
+        if (switchReduce.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        ByteArray changeTriggerKey = partChangeTriggerKey(partId);
+        byte[] rev = 
ByteUtils.longToBytes(event.entryEvent().newEntry().revision());
+
+        // if ((notExists(changeTriggerKey) || value(changeTriggerKey) < 
revision) && (notExists(pendingKey) && notExists(stableKey)) {
+        //     put(pendingKey, pending)
+        //     put(stableKey, assignments)
+        //     put(changeTriggerKey, revision)
+        // } else if ((notExists(changeTriggerKey) || value(changeTriggerKey) 
< revision) && (notExists(pendingKey))) {
+        //     put(pendingKey, pending)
+        //     put(changeTriggerKey, revision)
+        // }
+
+        If iif = If.iif(
+                        and(
+                                or(notExists(changeTriggerKey), 
value(changeTriggerKey).lt(rev)),
+                                and(notExists(pendingKey), 
(notExists(stablePartAssignmentsKey(partId))))
+                        ),
+                        Operations.ops(

Review Comment:
   Right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to