vldpyatkov commented on a change in pull request #379:
URL: https://github.com/apache/ignite-3/pull/379#discussion_r723616035



##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -213,6 +221,59 @@ public TableManager(
                         }
                     });
 
+                // Performs following logic for each partition during 
iteration:
+                //  - Creates, previously absent, raft nodes according to new 
assignments.
+                //  - Trigger rebalance by changing peers from old assignment 
to (old assignment union new assignment).
+                //  - After change peers completion trigger one more change 
peers from
+                //    (old assignment union new assignment) to new assignment 
to eliminate non-new-assignment peers from
+                //    raft service.
+                //  - Update table with new raft group services with changed 
peers.
+                //  - Stop raft nodes from old assignment that doesn't fit 
into new one.
+                
((ExtendedTableConfiguration)tablesCfg.tables().get(ctx.newValue().name())).assignments().
+                    listen(assignmentsCtx -> {
+                        List<List<ClusterNode>> oldAssignments =
+                            
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.oldValue());
+
+                        List<List<ClusterNode>> newAssignments =
+                            
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.newValue());
+
+                        var futures = new 
ArrayList<CompletableFuture<Void>>(oldAssignments.size());
+
+                        // TODO: IGNITE-15554 Add logic for assignment 
recalculation in case of partitions or replicas changes
+                        // TODO: Until IGNITE-15554 is implemented it's safe 
to iterate over partitions and replicas cause there will
+                        // TODO: be exact same amount of partitions and 
replicas for both old and new assignments
+                        for (int i = 0; i < oldAssignments.size(); i++) {
+                            final int p = i;
+
+                            List<ClusterNode> oldPartitionAssignment = 
oldAssignments.get(p);
+                            List<ClusterNode> newPartitionAssignment = 
newAssignments.get(p);
+
+                            var toAdd = new HashSet<>(newPartitionAssignment);
+                            var toRemove = new 
HashSet<>(oldPartitionAssignment);
+
+                            toAdd.removeAll(oldPartitionAssignment);
+                            toRemove.removeAll(newPartitionAssignment);
+
+                            // Create new raft nodes according to new 
assignments.
+                            futures.add(raftMgr.updateRaftGroup(
+                                raftGroupName(tblId, p),
+                                newPartitionAssignment,
+                                toAdd,
+                                prepareRaftGroupListenerSupplier(p, 
ctx.newValue().name())
+                            )
+                                .thenAccept(
+                                    updatedRaftGroupService -> 
tables.get(ctx.newValue().name()).updateInternalTableRaftGroupService(p, 
updatedRaftGroupService)
+                                ).thenRun(() -> 
raftMgr.stopRaftGroup(raftGroupName(tblId, p), new ArrayList<>(toRemove))
+                                ).exceptionally(th -> {
+                                        LOG.error("Failed to update raft 
groups one the node");
+                                        return null;
+                                    }
+                                ));
+                        }
+
+                        return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[futures.size()]));

Review comment:
       Use a plain array instead of convert an array list.

##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -824,4 +885,141 @@ private void dropTableLocally(String name, IgniteUuid 
tblId, List<List<ClusterNo
     private boolean isTableConfigured(String name) {
         return tableNamesConfigured().contains(name);
     }
+
+    public void setBaseline(Set<String> nodes) {
+        var newAssignments = baselineMgr
+            .nodes().stream().filter(n -> 
nodes.contains(n.name())).collect(Collectors.toSet());
+
+        var currentBaseline = new HashSet<>(baselineMgr.nodes());
+
+        var unionBaseline = new HashSet<>(newAssignments);
+        unionBaseline.addAll(currentBaseline);
+
+        doUpdateBaseline(unionBaseline);
+
+        if (!newAssignments.equals(unionBaseline))
+            doUpdateBaseline(newAssignments);
+    }
+
+    private void doUpdateBaseline(Set<ClusterNode> clusterNodes) {
+        var changePeersQueue = new ArrayList<Runnable>();
+
+        tablesCfg.tables().change(
+            tbls -> {
+                for (int i = 0; i < tbls.size(); i++) {
+                    tbls.createOrUpdate(tbls.get(i).name(), changeX -> {
+                        ExtendedTableChange change = 
(ExtendedTableChange)changeX;
+                        byte[] currAssignments = change.assignments();
+
+                        List<List<ClusterNode>> recalculatedAssignments = 
AffinityUtils.calculateAssignments(
+                            clusterNodes,
+                            change.partitions(),
+                            change.replicas());
+
+                        if 
(!recalculatedAssignments.equals(ByteUtils.fromBytes(currAssignments))) {
+                            
change.changeAssignments(ByteUtils.toBytes(recalculatedAssignments));
+                            changePeersQueue.add(() ->
+                                updateRaftTopology(
+                                    
(List<List<ClusterNode>>)ByteUtils.fromBytes(currAssignments),
+                                    recalculatedAssignments,
+                                    IgniteUuid.fromString(change.id())));
+                        }
+                    });
+                }
+            }).join();
+
+        for (Runnable task: changePeersQueue) {
+            task.run();
+        }
+    }
+
+    private void updateRaftTopology(List<List<ClusterNode>> oldAssignments, 
List<List<ClusterNode>> newAssignments, IgniteUuid tblId) {
+        List<CompletableFuture<Void>> futures = new 
ArrayList<>(oldAssignments.size());
+
+        // TODO: IGNITE-15554 Add logic for assignment recalculation in case 
of partitions or replicas changes
+        // TODO: Until IGNITE-15554 is implemented it's safe to iterate over 
partitions and replicas cause there will
+        // TODO: be exact same amount of partitions and replicas for both old 
and new assignments
+        for (int i = 0; i < oldAssignments.size(); i++) {
+            final int p = i;
+
+            List<ClusterNode> oldPartitionAssignment = oldAssignments.get(p);
+            List<ClusterNode> newPartitionAssignment = newAssignments.get(p);
+
+            var toAdd = new HashSet<>(newPartitionAssignment);
+
+            toAdd.removeAll(oldPartitionAssignment);
+
+            futures.add(raftMgr.prepareRaftGroup(
+                raftGroupName(tblId, p),
+                oldPartitionAssignment,
+                () -> new RaftGroupListener() {
+                    @Override public void 
onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+
+                    }
+
+                    @Override public void 
onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+
+                    }
+
+                    @Override public void onSnapshotSave(Path path, 
Consumer<Throwable> doneClo) {
+
+                    }
+
+                    @Override public boolean onSnapshotLoad(Path path) {
+                        return false;
+                    }
+
+                    @Override public void onShutdown() {
+
+                    }
+                },
+                60000,
+                10000
+            )
+                .thenCompose(
+                    updatedRaftGroupService -> {
+                        return
+                            updatedRaftGroupService.
+                                changePeers(
+                                    newPartitionAssignment.stream().map(n -> 
new Peer(n.address())).collect(Collectors.toList()));
+                    }
+                ).exceptionally(th -> {
+                        LOG.error("Failed to update raft peers for group " + 
raftGroupName(tblId, p) +
+                            "from " + oldPartitionAssignment + " to " + 
newPartitionAssignment, th);
+                        return null;
+                    }
+                ));
+        }
+
+        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[futures.size()])).join();

Review comment:
       Use plain array as before.

##########
File path: modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
##########
@@ -125,14 +135,23 @@ public Loza(ClusterService clusterNetSvc, Path dataPath) {
             groupId,
             clusterNetSvc,
             FACTORY,
-            TIMEOUT,
+            clientTimeout,
+            networkTimeout,
             peers,
             true,
             DELAY,
             executor
         );
     }
 
+    public CompletableFuture<RaftGroupService> prepareRaftGroup(
+        String groupId,
+        List<ClusterNode> nodes,
+        Supplier<RaftGroupListener> lsnrSupplier) {
+
+        return prepareRaftGroup(groupId, nodes, lsnrSupplier, TIMEOUT, 
NETWORK_TIMEOUT);

Review comment:
       Why are you passing constants through parameters?

##########
File path: modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
##########
@@ -147,4 +166,44 @@ public void stopRaftGroup(String groupId, 
List<ClusterNode> nodes) {
         if (nodes.stream().anyMatch(n -> locNodeName.equals(n.name())))
             raftServer.stopRaftGroup(groupId);
     }
+
+    /**
+     * Creates a raft group service providing operations on a raft group.
+     * If {@code deltaNodes} contains the current node, then raft group starts 
on the current node.
+     * @param groupId Raft group id.
+     * @param nodes Full set of raft group nodes.
+     * @param deltaNodes New raft group nodes.
+     * @param lsnrSupplier Raft group listener supplier.
+     * @return Future representing pending completion of the operation.
+     * @return
+     */
+    public CompletableFuture<RaftGroupService> updateRaftGroup(

Review comment:
       Method prepareRaftGroup is the same of updateRaftGroup where deltaNodes 
and nodes are equivalent, is'n it?
   Please, reuse the code.

##########
File path: modules/api/src/main/java/org/apache/ignite/Ignite.java
##########
@@ -44,4 +45,11 @@
      * @return Ignite transactions.
      */
     IgniteTransactions transactions();
+
+    /**
+     * Set new baseline nodes for table assignments.
+     *
+     * @param baselineNodes Names of baseline nodes.
+     */
+    void setBaseline(Set<String> baselineNodes);

Review comment:
       It should be a set of Cluster nodes.

##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -213,6 +221,59 @@ public TableManager(
                         }
                     });
 
+                // Performs following logic for each partition during 
iteration:
+                //  - Creates, previously absent, raft nodes according to new 
assignments.
+                //  - Trigger rebalance by changing peers from old assignment 
to (old assignment union new assignment).
+                //  - After change peers completion trigger one more change 
peers from
+                //    (old assignment union new assignment) to new assignment 
to eliminate non-new-assignment peers from
+                //    raft service.
+                //  - Update table with new raft group services with changed 
peers.
+                //  - Stop raft nodes from old assignment that doesn't fit 
into new one.
+                
((ExtendedTableConfiguration)tablesCfg.tables().get(ctx.newValue().name())).assignments().
+                    listen(assignmentsCtx -> {
+                        List<List<ClusterNode>> oldAssignments =
+                            
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.oldValue());
+
+                        List<List<ClusterNode>> newAssignments =
+                            
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.newValue());
+
+                        var futures = new 
ArrayList<CompletableFuture<Void>>(oldAssignments.size());
+
+                        // TODO: IGNITE-15554 Add logic for assignment 
recalculation in case of partitions or replicas changes
+                        // TODO: Until IGNITE-15554 is implemented it's safe 
to iterate over partitions and replicas cause there will
+                        // TODO: be exact same amount of partitions and 
replicas for both old and new assignments
+                        for (int i = 0; i < oldAssignments.size(); i++) {
+                            final int p = i;
+
+                            List<ClusterNode> oldPartitionAssignment = 
oldAssignments.get(p);
+                            List<ClusterNode> newPartitionAssignment = 
newAssignments.get(p);
+
+                            var toAdd = new HashSet<>(newPartitionAssignment);
+                            var toRemove = new 
HashSet<>(oldPartitionAssignment);
+
+                            toAdd.removeAll(oldPartitionAssignment);
+                            toRemove.removeAll(newPartitionAssignment);
+
+                            // Create new raft nodes according to new 
assignments.
+                            futures.add(raftMgr.updateRaftGroup(
+                                raftGroupName(tblId, p),
+                                newPartitionAssignment,
+                                toAdd,
+                                prepareRaftGroupListenerSupplier(p, 
ctx.newValue().name())
+                            )
+                                .thenAccept(
+                                    updatedRaftGroupService -> 
tables.get(ctx.newValue().name()).updateInternalTableRaftGroupService(p, 
updatedRaftGroupService)
+                                ).thenRun(() -> 
raftMgr.stopRaftGroup(raftGroupName(tblId, p), new ArrayList<>(toRemove))
+                                ).exceptionally(th -> {
+                                        LOG.error("Failed to update raft 
groups one the node");

Review comment:
       Why are you not printing an error in the message?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to