vldpyatkov commented on a change in pull request #379:
URL: https://github.com/apache/ignite-3/pull/379#discussion_r723616035
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -213,6 +221,59 @@ public TableManager(
}
});
+ // Performs following logic for each partition during
iteration:
+ // - Creates, previously absent, raft nodes according to new
assignments.
+ // - Trigger rebalance by changing peers from old assignment
to (old assignment union new assignment).
+ // - After change peers completion trigger one more change
peers from
+ // (old assignment union new assignment) to new assignment
to eliminate non-new-assignment peers from
+ // raft service.
+ // - Update table with new raft group services with changed
peers.
+ // - Stop raft nodes from old assignment that doesn't fit
into new one.
+
((ExtendedTableConfiguration)tablesCfg.tables().get(ctx.newValue().name())).assignments().
+ listen(assignmentsCtx -> {
+ List<List<ClusterNode>> oldAssignments =
+
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.oldValue());
+
+ List<List<ClusterNode>> newAssignments =
+
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.newValue());
+
+ var futures = new
ArrayList<CompletableFuture<Void>>(oldAssignments.size());
+
+ // TODO: IGNITE-15554 Add logic for assignment
recalculation in case of partitions or replicas changes
+ // TODO: Until IGNITE-15554 is implemented it's safe
to iterate over partitions and replicas cause there will
+ // TODO: be exact same amount of partitions and
replicas for both old and new assignments
+ for (int i = 0; i < oldAssignments.size(); i++) {
+ final int p = i;
+
+ List<ClusterNode> oldPartitionAssignment =
oldAssignments.get(p);
+ List<ClusterNode> newPartitionAssignment =
newAssignments.get(p);
+
+ var toAdd = new HashSet<>(newPartitionAssignment);
+ var toRemove = new
HashSet<>(oldPartitionAssignment);
+
+ toAdd.removeAll(oldPartitionAssignment);
+ toRemove.removeAll(newPartitionAssignment);
+
+ // Create new raft nodes according to new
assignments.
+ futures.add(raftMgr.updateRaftGroup(
+ raftGroupName(tblId, p),
+ newPartitionAssignment,
+ toAdd,
+ prepareRaftGroupListenerSupplier(p,
ctx.newValue().name())
+ )
+ .thenAccept(
+ updatedRaftGroupService ->
tables.get(ctx.newValue().name()).updateInternalTableRaftGroupService(p,
updatedRaftGroupService)
+ ).thenRun(() ->
raftMgr.stopRaftGroup(raftGroupName(tblId, p), new ArrayList<>(toRemove))
+ ).exceptionally(th -> {
+ LOG.error("Failed to update raft
groups one the node");
+ return null;
+ }
+ ));
+ }
+
+ return CompletableFuture.allOf(futures.toArray(new
CompletableFuture[futures.size()]));
Review comment:
Use a plain array instead of convert an array list.
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -824,4 +885,141 @@ private void dropTableLocally(String name, IgniteUuid
tblId, List<List<ClusterNo
private boolean isTableConfigured(String name) {
return tableNamesConfigured().contains(name);
}
+
+ public void setBaseline(Set<String> nodes) {
+ var newAssignments = baselineMgr
+ .nodes().stream().filter(n ->
nodes.contains(n.name())).collect(Collectors.toSet());
+
+ var currentBaseline = new HashSet<>(baselineMgr.nodes());
+
+ var unionBaseline = new HashSet<>(newAssignments);
+ unionBaseline.addAll(currentBaseline);
+
+ doUpdateBaseline(unionBaseline);
+
+ if (!newAssignments.equals(unionBaseline))
+ doUpdateBaseline(newAssignments);
+ }
+
+ private void doUpdateBaseline(Set<ClusterNode> clusterNodes) {
+ var changePeersQueue = new ArrayList<Runnable>();
+
+ tablesCfg.tables().change(
+ tbls -> {
+ for (int i = 0; i < tbls.size(); i++) {
+ tbls.createOrUpdate(tbls.get(i).name(), changeX -> {
+ ExtendedTableChange change =
(ExtendedTableChange)changeX;
+ byte[] currAssignments = change.assignments();
+
+ List<List<ClusterNode>> recalculatedAssignments =
AffinityUtils.calculateAssignments(
+ clusterNodes,
+ change.partitions(),
+ change.replicas());
+
+ if
(!recalculatedAssignments.equals(ByteUtils.fromBytes(currAssignments))) {
+
change.changeAssignments(ByteUtils.toBytes(recalculatedAssignments));
+ changePeersQueue.add(() ->
+ updateRaftTopology(
+
(List<List<ClusterNode>>)ByteUtils.fromBytes(currAssignments),
+ recalculatedAssignments,
+ IgniteUuid.fromString(change.id())));
+ }
+ });
+ }
+ }).join();
+
+ for (Runnable task: changePeersQueue) {
+ task.run();
+ }
+ }
+
+ private void updateRaftTopology(List<List<ClusterNode>> oldAssignments,
List<List<ClusterNode>> newAssignments, IgniteUuid tblId) {
+ List<CompletableFuture<Void>> futures = new
ArrayList<>(oldAssignments.size());
+
+ // TODO: IGNITE-15554 Add logic for assignment recalculation in case
of partitions or replicas changes
+ // TODO: Until IGNITE-15554 is implemented it's safe to iterate over
partitions and replicas cause there will
+ // TODO: be exact same amount of partitions and replicas for both old
and new assignments
+ for (int i = 0; i < oldAssignments.size(); i++) {
+ final int p = i;
+
+ List<ClusterNode> oldPartitionAssignment = oldAssignments.get(p);
+ List<ClusterNode> newPartitionAssignment = newAssignments.get(p);
+
+ var toAdd = new HashSet<>(newPartitionAssignment);
+
+ toAdd.removeAll(oldPartitionAssignment);
+
+ futures.add(raftMgr.prepareRaftGroup(
+ raftGroupName(tblId, p),
+ oldPartitionAssignment,
+ () -> new RaftGroupListener() {
+ @Override public void
onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+
+ }
+
+ @Override public void
onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+
+ }
+
+ @Override public void onSnapshotSave(Path path,
Consumer<Throwable> doneClo) {
+
+ }
+
+ @Override public boolean onSnapshotLoad(Path path) {
+ return false;
+ }
+
+ @Override public void onShutdown() {
+
+ }
+ },
+ 60000,
+ 10000
+ )
+ .thenCompose(
+ updatedRaftGroupService -> {
+ return
+ updatedRaftGroupService.
+ changePeers(
+ newPartitionAssignment.stream().map(n ->
new Peer(n.address())).collect(Collectors.toList()));
+ }
+ ).exceptionally(th -> {
+ LOG.error("Failed to update raft peers for group " +
raftGroupName(tblId, p) +
+ "from " + oldPartitionAssignment + " to " +
newPartitionAssignment, th);
+ return null;
+ }
+ ));
+ }
+
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[futures.size()])).join();
Review comment:
Use plain array as before.
##########
File path: modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
##########
@@ -125,14 +135,23 @@ public Loza(ClusterService clusterNetSvc, Path dataPath) {
groupId,
clusterNetSvc,
FACTORY,
- TIMEOUT,
+ clientTimeout,
+ networkTimeout,
peers,
true,
DELAY,
executor
);
}
+ public CompletableFuture<RaftGroupService> prepareRaftGroup(
+ String groupId,
+ List<ClusterNode> nodes,
+ Supplier<RaftGroupListener> lsnrSupplier) {
+
+ return prepareRaftGroup(groupId, nodes, lsnrSupplier, TIMEOUT,
NETWORK_TIMEOUT);
Review comment:
Why are you passing constants through parameters?
##########
File path: modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
##########
@@ -147,4 +166,44 @@ public void stopRaftGroup(String groupId,
List<ClusterNode> nodes) {
if (nodes.stream().anyMatch(n -> locNodeName.equals(n.name())))
raftServer.stopRaftGroup(groupId);
}
+
+ /**
+ * Creates a raft group service providing operations on a raft group.
+ * If {@code deltaNodes} contains the current node, then raft group starts
on the current node.
+ * @param groupId Raft group id.
+ * @param nodes Full set of raft group nodes.
+ * @param deltaNodes New raft group nodes.
+ * @param lsnrSupplier Raft group listener supplier.
+ * @return Future representing pending completion of the operation.
+ * @return
+ */
+ public CompletableFuture<RaftGroupService> updateRaftGroup(
Review comment:
Method prepareRaftGroup is the same of updateRaftGroup where deltaNodes
and nodes are equivalent, is'n it?
Please, reuse the code.
##########
File path: modules/api/src/main/java/org/apache/ignite/Ignite.java
##########
@@ -44,4 +45,11 @@
* @return Ignite transactions.
*/
IgniteTransactions transactions();
+
+ /**
+ * Set new baseline nodes for table assignments.
+ *
+ * @param baselineNodes Names of baseline nodes.
+ */
+ void setBaseline(Set<String> baselineNodes);
Review comment:
It should be a set of Cluster nodes.
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -213,6 +221,59 @@ public TableManager(
}
});
+ // Performs following logic for each partition during
iteration:
+ // - Creates, previously absent, raft nodes according to new
assignments.
+ // - Trigger rebalance by changing peers from old assignment
to (old assignment union new assignment).
+ // - After change peers completion trigger one more change
peers from
+ // (old assignment union new assignment) to new assignment
to eliminate non-new-assignment peers from
+ // raft service.
+ // - Update table with new raft group services with changed
peers.
+ // - Stop raft nodes from old assignment that doesn't fit
into new one.
+
((ExtendedTableConfiguration)tablesCfg.tables().get(ctx.newValue().name())).assignments().
+ listen(assignmentsCtx -> {
+ List<List<ClusterNode>> oldAssignments =
+
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.oldValue());
+
+ List<List<ClusterNode>> newAssignments =
+
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.newValue());
+
+ var futures = new
ArrayList<CompletableFuture<Void>>(oldAssignments.size());
+
+ // TODO: IGNITE-15554 Add logic for assignment
recalculation in case of partitions or replicas changes
+ // TODO: Until IGNITE-15554 is implemented it's safe
to iterate over partitions and replicas cause there will
+ // TODO: be exact same amount of partitions and
replicas for both old and new assignments
+ for (int i = 0; i < oldAssignments.size(); i++) {
+ final int p = i;
+
+ List<ClusterNode> oldPartitionAssignment =
oldAssignments.get(p);
+ List<ClusterNode> newPartitionAssignment =
newAssignments.get(p);
+
+ var toAdd = new HashSet<>(newPartitionAssignment);
+ var toRemove = new
HashSet<>(oldPartitionAssignment);
+
+ toAdd.removeAll(oldPartitionAssignment);
+ toRemove.removeAll(newPartitionAssignment);
+
+ // Create new raft nodes according to new
assignments.
+ futures.add(raftMgr.updateRaftGroup(
+ raftGroupName(tblId, p),
+ newPartitionAssignment,
+ toAdd,
+ prepareRaftGroupListenerSupplier(p,
ctx.newValue().name())
+ )
+ .thenAccept(
+ updatedRaftGroupService ->
tables.get(ctx.newValue().name()).updateInternalTableRaftGroupService(p,
updatedRaftGroupService)
+ ).thenRun(() ->
raftMgr.stopRaftGroup(raftGroupName(tblId, p), new ArrayList<>(toRemove))
+ ).exceptionally(th -> {
+ LOG.error("Failed to update raft
groups one the node");
Review comment:
Why are you not printing an error in the message?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]