kgusakov commented on code in PR #759:
URL: https://github.com/apache/ignite-3/pull/759#discussion_r859758078


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1317,152 +1379,153 @@ private RuntimeException convertThrowable(Throwable 
th) {
     }
 
     /**
-     * Sets the nodes as baseline for all tables created by the manager.
+     * Prepare the listener for handling configuration changes in raft group.
      *
-     * @param nodes New baseline nodes.
-     * @throws NodeStoppingException If an implementation stopped before the 
method was invoked.
+     * @param tblName Name of the table.
+     * @param partNum Number of partition.
+     * @param partId Partition unique id.
+     * @return prepare listener.
+     *
+     * @see RaftGroupEventsListener
      */
-    public void setBaseline(Set<String> nodes) throws NodeStoppingException {
-        if (!busyLock.enterBusy()) {
-            throw new NodeStoppingException();
-        }
-        try {
-            setBaselineInternal(nodes);
-        } finally {
-            busyLock.leaveBusy();
-        }
+    private RaftGroupEventsListener raftGroupEventsListener(String tblName, 
int partNum,
+            String partId) {
+        return new RaftGroupEventsListener() {
+            @Override
+            public void onLeaderElected() {
+            }
+
+            @Override
+            public void onNewPeersConfigurationApplied(List<PeerId> peers) {
+                Map<ByteArray, Entry> keys = metaStorageMgr.getAll(
+                        Set.of(partAssignmentsPlannedKey(partId), 
partAssignmentsPendingKey(partId))).join();
+
+                Entry plannedEntry = 
keys.get(partAssignmentsPlannedKey(partId));
+                Entry pendingEntry = 
keys.get(partAssignmentsPendingKey(partId));
+
+                tablesCfg.tables().get(tblName).change(ch -> {
+                    List<List<ClusterNode>> assignments =
+                            (List<List<ClusterNode>>) 
ByteUtils.fromBytes(((ExtendedTableChange) ch).assignments());
+                    assignments.set(partNum, ((List<ClusterNode>) 
ByteUtils.fromBytes(pendingEntry.value())));
+                    ((ExtendedTableChange) 
ch).changeAssignments(ByteUtils.toBytes(assignments));
+                });
+
+                if (plannedEntry.value() != null) {
+                    if (!metaStorageMgr.invoke(If.iif(
+                            
revision(partAssignmentsPlannedKey(partId)).eq(plannedEntry.revision()),
+                            ops(
+                                    put(partAssignmentsStableKey(partId), 
pendingEntry.value()),
+                                    put(partAssignmentsPendingKey(partId), 
plannedEntry.value()),
+                                    remove(partAssignmentsPlannedKey(partId)))
+                                    .yield(true),
+                            ops().yield(false))).join().getAsBoolean()) {
+                        onNewPeersConfigurationApplied(peers);
+                    }
+                } else {
+                    if (!metaStorageMgr.invoke(If.iif(
+                            notExists(partAssignmentsPlannedKey(partId)),
+                            ops(put(partAssignmentsStableKey(partId), 
pendingEntry.value()),
+                                
remove(partAssignmentsPendingKey(partId))).yield(true),
+                            ops().yield(false))).join().getAsBoolean()) {
+                        onNewPeersConfigurationApplied(peers);
+                    }
+                }
+            }
+
+            @Override
+            public void onReconfigurationError(Status status) {}
+        };
     }
 
     /**
-     * Internal method for setting a baseline.
-     *
-     * @param nodes Names of baseline nodes.
+     * Register the new meta storage listener for changes in pending 
partitions.
      */
-    private void setBaselineInternal(Set<String> nodes) {
-        if (nodes == null || nodes.isEmpty()) {
-            throw new IgniteException("New baseline can't be null or empty");
-        }
+    private void registerRebalanceListeners() {
+        
metaStorageMgr.registerWatchByPrefix(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
 new WatchListener() {
+            @Override
+            public boolean onUpdate(@NotNull WatchEvent evt) {
+                assert evt.single();
 
-        var currClusterMembers = new HashSet<>(baselineMgr.nodes());
+                if (evt.entryEvent().newEntry().value() == null) {
+                    return true;
+                }
 
-        var currClusterMemberNames =
-                
currClusterMembers.stream().map(ClusterNode::name).collect(Collectors.toSet());
+                int part = 
extractPartitionNumber(evt.entryEvent().newEntry().key());
+                UUID tblId = extractTableId(evt.entryEvent().newEntry().key());
 
-        for (String nodeName : nodes) {
-            if (!currClusterMemberNames.contains(nodeName)) {
-                throw new IgniteException("Node '" + nodeName + "' not in 
current network cluster membership. "
-                        + " Adding not alive nodes is not supported yet.");
-            }
-        }
+                TableImpl tbl = tablesByIdVv.latest().get(tblId);
 
-        var newBaseline = currClusterMembers
-                .stream().filter(n -> 
nodes.contains(n.name())).collect(Collectors.toSet());
+                String grpId = partitionRaftGroupName(tblId, part);
 
-        updateAssignments(currClusterMembers);
+                Supplier<RaftGroupListener> raftGrpLsnrSupplier = () -> new 
PartitionListener(tblId,
+                        new VersionedRowStore(
+                                
tablesByIdVv.latest().get(tblId).internalTable().storage().getOrCreatePartition(part),
 txManager));
 
-        if (!newBaseline.equals(currClusterMembers)) {
-            updateAssignments(newBaseline);
-        }
-    }
+                Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier = () 
-> raftGroupEventsListener(
+                        tablesByIdVv.latest().get(tblId).name(),
+                        part,
+                        grpId);
 
-    /**
-     * Update assignments for all current tables according to input nodes 
list. These approach has known issues {@link
-     * Ignite#setBaseline(Set)}.
-     *
-     * @param clusterNodes Set of nodes for assignment.
-     */
-    private void updateAssignments(Set<ClusterNode> clusterNodes) {
-        var setBaselineFut = new CompletableFuture<>();
 
-        var changePeersQueue = new 
ArrayList<Supplier<CompletableFuture<Void>>>();
+                List<List<ClusterNode>> assignments = (List<List<ClusterNode>>)
+                        ByteUtils.fromBytes(((ExtendedTableConfiguration) 
tablesCfg.tables().get(tbl.name())).assignments().value());
 
-        tablesCfg.tables()
-                .change(tbls -> {
-                    changePeersQueue.clear();
+                List<ClusterNode> newPeers = ((List<ClusterNode>) 
ByteUtils.fromBytes(evt.entryEvent().newEntry().value()));
 
-                    for (int i = 0; i < tbls.size(); i++) {
-                        tbls.createOrUpdate(tbls.get(i).name(), changeX -> {
-                            ExtendedTableChange change = (ExtendedTableChange) 
changeX;
-                            byte[] currAssignments = change.assignments();
+                RaftGroupService raftGrpSvc = null;
 
-                            List<List<ClusterNode>> recalculatedAssignments = 
AffinityUtils.calculateAssignments(
-                                    clusterNodes,
-                                    change.partitions(),
-                                    change.replicas());
+                try {
+                    var deltaPeers = newPeers.stream()
+                            .filter(p -> !assignments.get(part).contains(p))
+                            .collect(Collectors.toList());
 
-                            if 
(!recalculatedAssignments.equals(ByteUtils.fromBytes(currAssignments))) {
-                                
change.changeAssignments(ByteUtils.toBytes(recalculatedAssignments));
+                    raftGrpSvc = raftMgr.updateRaftGroup(grpId, 
assignments.get(part), deltaPeers, raftGrpLsnrSupplier,
+                            raftGrpEvtsLsnrSupplier).join();
 
-                                changePeersQueue.add(() ->
-                                        updateRaftTopology(
-                                                (List<List<ClusterNode>>) 
ByteUtils.fromBytes(currAssignments),
-                                                recalculatedAssignments,
-                                                change.id()));
-                            }
-                        });
-                    }
-                })
-                .thenCompose((v) -> {
-                    CompletableFuture<?>[] changePeersFutures = new 
CompletableFuture<?>[changePeersQueue.size()];
+                    raftGrpSvc.refreshLeader().join();
 
-                    int i = 0;
+                    if (new 
Peer(raftGrpSvc.clusterService().topologyService().localMember().address()).equals(raftGrpSvc.leader()))
 {
 
-                    for (Supplier<CompletableFuture<Void>> task : 
changePeersQueue) {
-                        changePeersFutures[i++] = task.get();
-                    }
+                        var newNodes = newPeers.stream().map(n -> new 
Peer(n.address())).collect(Collectors.toList());
 
-                    return CompletableFuture.allOf(changePeersFutures);
-                })
-                .whenComplete((res, th) -> {
-                    if (th != null) {
-                        setBaselineFut.completeExceptionally(th);
-                    } else {
-                        setBaselineFut.complete(null);
+                        raftGrpSvc.changePeersAsync(newNodes).join();
                     }
-                });
+                } catch (NodeStoppingException e) {
+                    // noop
+                } finally {
+                    if (raftGrpSvc != null) {
+                        raftGrpSvc.shutdown();
+                    }
+                }
 
-        setBaselineFut.join();
+                return true;
+            }
+
+            @Override
+            public void onError(@NotNull Throwable e) {
+                LOG.error("Error while processing pending assignments event", 
e);
+            }
+        });
     }
 
     /**
-     * Update raft groups of table partitions to new peers list.
+     * Extract table id from pending key of partition.
      *
-     * @param oldAssignments Old assignment.
-     * @param newAssignments New assignment.
-     * @param tblId Table ID.
-     * @return Future, which completes, when update finished.
+     * @param key Key.
+     * @return Table id.
      */
-    private CompletableFuture<Void> updateRaftTopology(
-            List<List<ClusterNode>> oldAssignments,
-            List<List<ClusterNode>> newAssignments,
-            UUID tblId) {
-        CompletableFuture<?>[] futures = new 
CompletableFuture<?>[oldAssignments.size()];
-
-        // TODO: IGNITE-15554 Add logic for assignment recalculation in case 
of partitions or replicas changes
-        // TODO: Until IGNITE-15554 is implemented it's safe to iterate over 
partitions and replicas cause there will
-        // TODO: be exact same amount of partitions and replicas for both old 
and new assignments
-        for (int i = 0; i < oldAssignments.size(); i++) {
-            final int p = i;
-
-            List<ClusterNode> oldPartitionAssignment = oldAssignments.get(p);
-            List<ClusterNode> newPartitionAssignment = newAssignments.get(p);
-
-            try {
-                futures[i] = raftMgr.changePeers(
-                        raftGroupName(tblId, p),
-                        oldPartitionAssignment,
-                        newPartitionAssignment
-                ).exceptionally(th -> {
-                    LOG.error("Failed to update raft peers for group " + 
raftGroupName(tblId, p)
-                            + "from " + oldPartitionAssignment + " to " + 
newPartitionAssignment, th);
-                    return null;
-                });
-            } catch (NodeStoppingException e) {
-                throw new AssertionError("Loza was stopped before Table 
manager", e);
-            }
-        }
+    private UUID extractTableId(ByteArray key) {
+        return 
UUID.fromString(key.toString().substring(PENDING_ASSIGNMENTS_PREFIX.length(), 
PENDING_ASSIGNMENTS_PREFIX.length() + 36));
+    }
 
-        return CompletableFuture.allOf(futures);
+    /**
+     * Extract partition number from the pending key of partition.
+     *
+     * @param key Key.
+     * @return Partition number.
+     */
+    private int extractPartitionNumber(ByteArray key) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to