kgusakov commented on code in PR #759:
URL: https://github.com/apache/ignite-3/pull/759#discussion_r861302440


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1317,152 +1370,77 @@ private RuntimeException convertThrowable(Throwable 
th) {
     }
 
     /**
-     * Sets the nodes as baseline for all tables created by the manager.
-     *
-     * @param nodes New baseline nodes.
-     * @throws NodeStoppingException If an implementation stopped before the 
method was invoked.
+     * Register the new meta storage listener for changes in pending 
partitions.
      */
-    public void setBaseline(Set<String> nodes) throws NodeStoppingException {
-        if (!busyLock.enterBusy()) {
-            throw new NodeStoppingException();
-        }
-        try {
-            setBaselineInternal(nodes);
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
+    private void registerRebalanceListeners() {
+        
metaStorageMgr.registerWatchByPrefix(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
 new WatchListener() {
+            @Override
+            public boolean onUpdate(@NotNull WatchEvent evt) {
+                assert evt.single();
 
-    /**
-     * Internal method for setting a baseline.
-     *
-     * @param nodes Names of baseline nodes.
-     */
-    private void setBaselineInternal(Set<String> nodes) {
-        if (nodes == null || nodes.isEmpty()) {
-            throw new IgniteException("New baseline can't be null or empty");
-        }
+                if (evt.entryEvent().newEntry().value() == null) {
+                    return true;
+                }
 
-        var currClusterMembers = new HashSet<>(baselineMgr.nodes());
+                int part = 
extractPartitionNumber(evt.entryEvent().newEntry().key());
+                UUID tblId = extractTableId(evt.entryEvent().newEntry().key());
 
-        var currClusterMemberNames =
-                
currClusterMembers.stream().map(ClusterNode::name).collect(Collectors.toSet());
+                TableImpl tbl = tablesByIdVv.latest().get(tblId);
 
-        for (String nodeName : nodes) {
-            if (!currClusterMemberNames.contains(nodeName)) {
-                throw new IgniteException("Node '" + nodeName + "' not in 
current network cluster membership. "
-                        + " Adding not alive nodes is not supported yet.");
-            }
-        }
+                ExtendedTableConfiguration tblCfg = 
(ExtendedTableConfiguration) tablesCfg.tables().get(tbl.name());
 
-        var newBaseline = currClusterMembers
-                .stream().filter(n -> 
nodes.contains(n.name())).collect(Collectors.toSet());
+                String grpId = partitionRaftGroupName(tblId, part);
 
-        updateAssignments(currClusterMembers);
+                Supplier<RaftGroupListener> raftGrpLsnrSupplier = () -> new 
PartitionListener(tblId,
+                        new VersionedRowStore(
+                                
tbl.internalTable().storage().getOrCreatePartition(part), txManager));
 
-        if (!newBaseline.equals(currClusterMembers)) {
-            updateAssignments(newBaseline);
-        }
-    }
+                Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier = () 
-> new RebalanceRaftGroupEventsListener(
+                        metaStorageMgr,
+                        tblCfg,
+                        grpId,
+                        part);
 
-    /**
-     * Update assignments for all current tables according to input nodes 
list. These approach has known issues {@link
-     * Ignite#setBaseline(Set)}.
-     *
-     * @param clusterNodes Set of nodes for assignment.
-     */
-    private void updateAssignments(Set<ClusterNode> clusterNodes) {
-        var setBaselineFut = new CompletableFuture<>();
 
-        var changePeersQueue = new 
ArrayList<Supplier<CompletableFuture<Void>>>();
+                List<List<ClusterNode>> assignments = (List<List<ClusterNode>>)
+                        ByteUtils.fromBytes(tblCfg.assignments().value());
 
-        tablesCfg.tables()
-                .change(tbls -> {
-                    changePeersQueue.clear();
+                List<ClusterNode> newPeers = ((List<ClusterNode>) 
ByteUtils.fromBytes(evt.entryEvent().newEntry().value()));
 
-                    for (int i = 0; i < tbls.size(); i++) {
-                        tbls.createOrUpdate(tbls.get(i).name(), changeX -> {
-                            ExtendedTableChange change = (ExtendedTableChange) 
changeX;
-                            byte[] currAssignments = change.assignments();
+                RaftGroupService raftGrpSvc = null;
 
-                            List<List<ClusterNode>> recalculatedAssignments = 
AffinityUtils.calculateAssignments(
-                                    clusterNodes,
-                                    change.partitions(),
-                                    change.replicas());
+                try {
+                    var deltaPeers = newPeers.stream()
+                            .filter(p -> !assignments.get(part).contains(p))
+                            .collect(Collectors.toList());
 
-                            if 
(!recalculatedAssignments.equals(ByteUtils.fromBytes(currAssignments))) {
-                                
change.changeAssignments(ByteUtils.toBytes(recalculatedAssignments));
+                    raftGrpSvc = raftMgr.updateRaftGroup(grpId, 
assignments.get(part), deltaPeers, raftGrpLsnrSupplier,
+                            raftGrpEvtsLsnrSupplier).join();
 
-                                changePeersQueue.add(() ->
-                                        updateRaftTopology(
-                                                (List<List<ClusterNode>>) 
ByteUtils.fromBytes(currAssignments),
-                                                recalculatedAssignments,
-                                                change.id()));
-                            }
-                        });
-                    }
-                })
-                .thenCompose((v) -> {
-                    CompletableFuture<?>[] changePeersFutures = new 
CompletableFuture<?>[changePeersQueue.size()];
+                    raftGrpSvc.refreshLeader().join();
 
-                    int i = 0;
+                    if (new 
Peer(raftGrpSvc.clusterService().topologyService().localMember().address()).equals(raftGrpSvc.leader()))
 {
 
-                    for (Supplier<CompletableFuture<Void>> task : 
changePeersQueue) {
-                        changePeersFutures[i++] = task.get();
-                    }
+                        var newNodes = newPeers.stream().map(n -> new 
Peer(n.address())).collect(Collectors.toList());
 
-                    return CompletableFuture.allOf(changePeersFutures);
-                })
-                .whenComplete((res, th) -> {
-                    if (th != null) {
-                        setBaselineFut.completeExceptionally(th);
-                    } else {
-                        setBaselineFut.complete(null);
+                        raftGrpSvc.changePeersAsync(newNodes).join();

Review Comment:
   updated according to our discussion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to