sanpwc commented on a change in pull request #379:
URL: https://github.com/apache/ignite-3/pull/379#discussion_r724090239



##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -824,4 +885,141 @@ private void dropTableLocally(String name, IgniteUuid 
tblId, List<List<ClusterNo
     private boolean isTableConfigured(String name) {
         return tableNamesConfigured().contains(name);
     }
+
+    public void setBaseline(Set<String> nodes) {
+        var newAssignments = baselineMgr
+            .nodes().stream().filter(n -> 
nodes.contains(n.name())).collect(Collectors.toSet());
+
+        var currentBaseline = new HashSet<>(baselineMgr.nodes());
+
+        var unionBaseline = new HashSet<>(newAssignments);
+        unionBaseline.addAll(currentBaseline);
+
+        doUpdateBaseline(unionBaseline);
+
+        if (!newAssignments.equals(unionBaseline))
+            doUpdateBaseline(newAssignments);
+    }
+
+    private void doUpdateBaseline(Set<ClusterNode> clusterNodes) {
+        var changePeersQueue = new ArrayList<Runnable>();
+
+        tablesCfg.tables().change(
+            tbls -> {
+                for (int i = 0; i < tbls.size(); i++) {
+                    tbls.createOrUpdate(tbls.get(i).name(), changeX -> {
+                        ExtendedTableChange change = 
(ExtendedTableChange)changeX;
+                        byte[] currAssignments = change.assignments();
+
+                        List<List<ClusterNode>> recalculatedAssignments = 
AffinityUtils.calculateAssignments(
+                            clusterNodes,
+                            change.partitions(),
+                            change.replicas());
+
+                        if 
(!recalculatedAssignments.equals(ByteUtils.fromBytes(currAssignments))) {
+                            
change.changeAssignments(ByteUtils.toBytes(recalculatedAssignments));
+                            changePeersQueue.add(() ->
+                                updateRaftTopology(
+                                    
(List<List<ClusterNode>>)ByteUtils.fromBytes(currAssignments),
+                                    recalculatedAssignments,
+                                    IgniteUuid.fromString(change.id())));
+                        }
+                    });
+                }
+            }).join();
+
+        for (Runnable task: changePeersQueue) {
+            task.run();
+        }
+    }
+
+    private void updateRaftTopology(List<List<ClusterNode>> oldAssignments, 
List<List<ClusterNode>> newAssignments, IgniteUuid tblId) {
+        List<CompletableFuture<Void>> futures = new 
ArrayList<>(oldAssignments.size());
+
+        // TODO: IGNITE-15554 Add logic for assignment recalculation in case 
of partitions or replicas changes
+        // TODO: Until IGNITE-15554 is implemented it's safe to iterate over 
partitions and replicas cause there will
+        // TODO: be exact same amount of partitions and replicas for both old 
and new assignments
+        for (int i = 0; i < oldAssignments.size(); i++) {
+            final int p = i;
+
+            List<ClusterNode> oldPartitionAssignment = oldAssignments.get(p);
+            List<ClusterNode> newPartitionAssignment = newAssignments.get(p);
+
+            var toAdd = new HashSet<>(newPartitionAssignment);
+
+            toAdd.removeAll(oldPartitionAssignment);
+
+            futures.add(raftMgr.prepareRaftGroup(
+                raftGroupName(tblId, p),
+                oldPartitionAssignment,
+                () -> new RaftGroupListener() {
+                    @Override public void 
onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+
+                    }
+
+                    @Override public void 
onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+
+                    }
+
+                    @Override public void onSnapshotSave(Path path, 
Consumer<Throwable> doneClo) {
+
+                    }
+
+                    @Override public boolean onSnapshotLoad(Path path) {
+                        return false;
+                    }
+
+                    @Override public void onShutdown() {
+
+                    }
+                },
+                60000,
+                10000
+            )
+                .thenCompose(
+                    updatedRaftGroupService -> {
+                        return
+                            updatedRaftGroupService.
+                                changePeers(
+                                    newPartitionAssignment.stream().map(n -> 
new Peer(n.address())).collect(Collectors.toList()));
+                    }
+                ).exceptionally(th -> {
+                        LOG.error("Failed to update raft peers for group " + 
raftGroupName(tblId, p) +
+                            "from " + oldPartitionAssignment + " to " + 
newPartitionAssignment, th);
+                        return null;
+                    }
+                ));
+        }
+
+        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[futures.size()])).join();

Review comment:
       Are you sure that join here and in other placed won't leed to a 
deadlock? I mean known issue with blocking sendWithRetry response processing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to