ibessonov commented on code in PR #1016:
URL: https://github.com/apache/ignite-3/pull/1016#discussion_r957271853


##########
modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java:
##########
@@ -235,4 +288,209 @@ public static boolean recoverable(Throwable t) {
         // As long as we don't have a general failure handler, we assume that 
all errors are recoverable.
         return true;
     }
+
+    /**
+     * Starts the process of removing peer from raft group if that peer has 
in-memory storage or if its
+     * storage was cleared.
+     *
+     * @param partId Partition's raft group id.
+     * @param clusterNode Cluster node to be removed from peers.
+     * @param metaStorageMgr MetaStorage manager.
+     * @return Completable future that signifies the completion of this 
operation.
+     */
+    public static CompletableFuture<Void> startPeerRemoval(String partId, 
ClusterNode clusterNode, MetaStorageManager metaStorageMgr) {
+        ByteArray key = switchReduceKey(partId);
+
+        return metaStorageMgr.get(key)
+                .thenCompose(retrievedAssignmentsSwitchReduce -> {
+                    byte[] prevValue = 
retrievedAssignmentsSwitchReduce.value();
+
+                    boolean prevValueEmpty = true;
+                    List<ClusterNode> calculatedAssignmentsSwitchReduce;
+
+                    if (prevValue != null) {
+                        calculatedAssignmentsSwitchReduce = 
(List<ClusterNode>) ByteUtils.fromBytes(prevValue);
+                        prevValueEmpty = false;
+                    } else {
+                        calculatedAssignmentsSwitchReduce = new ArrayList<>();
+                    }
+
+                    calculatedAssignmentsSwitchReduce.add(clusterNode);
+
+                    byte[] newValue = 
ByteUtils.toBytes(calculatedAssignmentsSwitchReduce);
+
+                    if (prevValueEmpty) {
+                        return metaStorageMgr.invoke(
+                                Conditions.notExists(key),
+                                Operations.put(key, newValue),
+                                Operations.noop()
+                        );
+                    } else {
+                        return metaStorageMgr.invoke(
+                                
revision(key).eq(retrievedAssignmentsSwitchReduce.revision()),
+                                Operations.put(key, newValue),
+                                Operations.noop()
+                        );
+                    }
+                }).thenCompose(res -> {
+                    if (!res) {
+                        return startPeerRemoval(partId, clusterNode, 
metaStorageMgr);
+                    }
+
+                    return CompletableFuture.completedFuture(null);
+                });
+    }
+
+    /**
+     * Handles assignments switch reduce changed.
+     *
+     * @param metaStorageMgr MetaStorage manager.
+     * @param baselineNodes Baseline nodes.
+     * @param partitions Partitions count.
+     * @param replicas Replicas count.
+     * @param partNum Number of the partition.
+     * @param partId Partition's raft group id..
+     * @param event Assignments switch reduce change event.
+     * @return Completable future that signifies the completion of this 
operation.
+     */
+    public static CompletableFuture<Void> 
handleReduceChanged(MetaStorageManager metaStorageMgr, Collection<ClusterNode> 
baselineNodes,
+            int partitions, int replicas, int partNum, String partId, 
WatchEvent event) {
+        Entry entry = event.entryEvent().newEntry();
+        byte[] eventData = entry.value();
+
+        List<ClusterNode> assignments = 
AffinityUtils.calculateAssignments(baselineNodes, partitions, 
replicas).get(partNum);
+        List<ClusterNode> switchReduce = (List<ClusterNode>) 
ByteUtils.fromBytes(eventData);
+
+        ByteArray pendingKey = pendingPartAssignmentsKey(partId);
+
+        List<ClusterNode> pendingAssignments = subtract(assignments, 
switchReduce);
+
+        byte[] pendingByteArray = ByteUtils.toBytes(pendingAssignments);
+        byte[] assignmentsByteArray = ByteUtils.toBytes(assignments);
+
+        if (switchReduce.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        ByteArray changeTriggerKey = partChangeTriggerKey(partId);
+        byte[] rev = 
ByteUtils.longToBytes(event.entryEvent().newEntry().revision());
+
+        // if ((notExists(changeTriggerKey) || value(changeTriggerKey) < 
revision) && (notExists(pendingKey) && notExists(stableKey)) {
+        //     put(pendingKey, pending)
+        //     put(stableKey, assignments)
+        //     put(changeTriggerKey, revision)
+        // } else if ((notExists(changeTriggerKey) || value(changeTriggerKey) 
< revision) && (notExists(pendingKey))) {
+        //     put(pendingKey, pending)
+        //     put(changeTriggerKey, revision)
+        // }
+
+        If iif = If.iif(
+                        and(
+                                or(notExists(changeTriggerKey), 
value(changeTriggerKey).lt(rev)),
+                                and(notExists(pendingKey), 
(notExists(stablePartAssignmentsKey(partId))))
+                        ),
+                        Operations.ops(
+                                put(pendingKey, pendingByteArray),
+                                put(stablePartAssignmentsKey(partId), 
assignmentsByteArray),
+                                put(changeTriggerKey, rev)
+                        ).yield(),
+                        If.iif(
+                            and(
+                                    or(notExists(changeTriggerKey), 
value(changeTriggerKey).lt(rev)),
+                                    notExists(pendingKey)
+                            ),
+                            Operations.ops(
+                                    put(pendingKey, pendingByteArray),
+                                    put(changeTriggerKey, rev)
+                            ).yield(),
+                            ops().yield()
+                        )
+        );
+
+        return metaStorageMgr.invoke(iif).thenApply(unused -> null);
+    }
+
+    /**
+     * Builds a list of cluster nodes based on a list of peers, pending and 
stable assignments.
+     * A peer will be added to the result list iff peer's address is present 
in pending or stable assignments.
+     *
+     * @param peers List of peers.
+     * @param pendingAssignments Byte array that contains serialized list of 
pending assignments.
+     * @param stableAssignments Byte array that contains serialized list of 
stable assignments.
+     * @return Resolved cluster nodes.
+     */
+    public static List<ClusterNode> resolveClusterNodes(List<PeerId> peers, 
byte[] pendingAssignments, byte[] stableAssignments) {
+        Map<NetworkAddress, ClusterNode> resolveRegistry = new HashMap<>();
+
+        if (pendingAssignments != null) {
+            List<ClusterNode> pending = 
ByteUtils.fromBytes(pendingAssignments);
+            pending.forEach(n -> resolveRegistry.put(n.address(), n));
+        }
+
+        if (stableAssignments != null) {
+            List<ClusterNode> stable = ByteUtils.fromBytes(stableAssignments);
+            stable.forEach(n -> resolveRegistry.put(n.address(), n));
+        }
+
+        List<ClusterNode> resolvedNodes = new ArrayList<>(peers.size());
+
+        for (PeerId p : peers) {
+            var addr = NetworkAddress.from(p.getEndpoint().getIp() + ":" + 
p.getEndpoint().getPort());
+
+            if (resolveRegistry.containsKey(addr)) {
+                resolvedNodes.add(resolveRegistry.get(addr));
+            } else {
+                throw new IgniteInternalException("Can't find appropriate 
cluster node for raft group peer: " + p);
+            }
+        }
+
+        return resolvedNodes;
+    }
+
+    /**
+     * Reads a list of cluster nodes from a MetaStorage entry.
+     *
+     * @param entry MetaStorage entry.
+     * @return List of cluster nodes.
+     */
+    public static List<ClusterNode> readClusterNodes(Entry entry) {
+        if (entry.empty()) {
+            return Collections.emptyList();
+        }
+
+        return ByteUtils.fromBytes(entry.value());
+    }
+
+    /**
+     * Removes nodes from collection of nodes.
+     *
+     * @param minuend Collection to remove nodes from.
+     * @param subtrahend Collection of nodes to be removed.
+     * @return Result of the subtraction.
+     */
+    public static List<ClusterNode> subtract(Collection<ClusterNode> minuend, 
Collection<ClusterNode> subtrahend) {
+        return minuend.stream().filter(v -> 
!subtrahend.contains(v)).collect(Collectors.toList());
+    }
+
+    /**
+     * Adds nodes to the collection of nodes.
+     *
+     * @param op1 First operand.
+     * @param op2 Second operand.
+     * @return Result of the addition.
+     */
+    public static List<ClusterNode> union(Collection<ClusterNode> op1, 
Collection<ClusterNode> op2) {

Review Comment:
   How slow can this be btw? How big are these collections?
   I know that "usually" contains is invoked on a smaller collection. Should we 
apply such optimization here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to