sashapolo commented on code in PR #2698:
URL: https://github.com/apache/ignite-3/pull/2698#discussion_r1370558536
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2157,6 +2157,16 @@ protected CompletableFuture<Void>
handleChangeStableAssignmentEvent(WatchEvent e
return metaStorageMgr.get(pendingPartAssignmentsKey(tablePartitionId),
stableAssignmentsWatchEvent.revision())
.thenComposeAsync(pendingAssignmentsEntry -> {
+ CompletableFuture<Void> raftClientUpdateFuture =
completedFuture(null);
+
+ if
(!stableAssignments.equals(ByteUtils.fromBytes(evt.entryEvent().oldEntry().value())))
{
+ raftClientUpdateFuture =
tablesById(evt.revision()).thenAccept(t -> {
Review Comment:
Please add some comments about the purpose of this code
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2157,6 +2157,16 @@ protected CompletableFuture<Void>
handleChangeStableAssignmentEvent(WatchEvent e
return metaStorageMgr.get(pendingPartAssignmentsKey(tablePartitionId),
stableAssignmentsWatchEvent.revision())
.thenComposeAsync(pendingAssignmentsEntry -> {
+ CompletableFuture<Void> raftClientUpdateFuture =
completedFuture(null);
Review Comment:
Small optimization: `completedFuture(null)` assignment can be done in the
`else` branch below
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java:
##########
@@ -587,6 +587,59 @@ void testRebalanceWithTheSameNodes() throws Exception {
verifyThatRaftNodesAndReplicasWereStartedOnlyOnce();
}
+ @Test
+ void testRaftClientsUpdatesAfterRebalance() throws Exception {
+ Node node = getNode(0);
+
+ createZone(node, ZONE_NAME, 1, 1);
+
+ createTable(node, ZONE_NAME, TABLE_NAME);
+
+ assertTrue(waitForCondition(() -> getPartitionClusterNodes(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
+
+ Set<Assignment> assignmentsBeforeRebalance =
getPartitionClusterNodes(node, 0);
+
+ Node newNodeForAssignment = nodes.stream().filter(n ->
Review Comment:
May I propose the following refactoring to make the code more readable:
```
String newNodeNameForAssignment = nodes.stream()
.map(n -> Assignment.forPeer(n.clusterService.nodeName()))
.filter(assignment ->
!assignmentsBeforeRebalance.contains(assignment))
.findFirst()
.orElseThrow()
.consistentId();
```
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java:
##########
@@ -587,6 +587,59 @@ void testRebalanceWithTheSameNodes() throws Exception {
verifyThatRaftNodesAndReplicasWereStartedOnlyOnce();
}
+ @Test
+ void testRaftClientsUpdatesAfterRebalance() throws Exception {
+ Node node = getNode(0);
+
+ createZone(node, ZONE_NAME, 1, 1);
+
+ createTable(node, ZONE_NAME, TABLE_NAME);
+
+ assertTrue(waitForCondition(() -> getPartitionClusterNodes(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
+
+ Set<Assignment> assignmentsBeforeRebalance =
getPartitionClusterNodes(node, 0);
+
+ Node newNodeForAssignment = nodes.stream().filter(n ->
+
!assignmentsBeforeRebalance.contains(Assignment.forPeer(n.clusterService.nodeName()))).findFirst().get();
+
+ Set<Assignment> newAssignment =
Set.of(Assignment.forPeer(newNodeForAssignment.clusterService.nodeName()));
+
+ // Write the new assignments to metastore as a pending assignments.
+ {
Review Comment:
Why do you need these braces?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java:
##########
@@ -482,6 +482,13 @@ public ClusterService clusterService() {
return cluster;
}
+ @Override
+ public void updateConfiguration(PeersAndLearners configuration) {
Review Comment:
This should probably be reflected in a comment and a TODO
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java:
##########
@@ -587,6 +587,59 @@ void testRebalanceWithTheSameNodes() throws Exception {
verifyThatRaftNodesAndReplicasWereStartedOnlyOnce();
}
+ @Test
+ void testRaftClientsUpdatesAfterRebalance() throws Exception {
+ Node node = getNode(0);
+
+ createZone(node, ZONE_NAME, 1, 1);
+
+ createTable(node, ZONE_NAME, TABLE_NAME);
+
+ assertTrue(waitForCondition(() -> getPartitionClusterNodes(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
+
+ Set<Assignment> assignmentsBeforeRebalance =
getPartitionClusterNodes(node, 0);
+
+ Node newNodeForAssignment = nodes.stream().filter(n ->
+
!assignmentsBeforeRebalance.contains(Assignment.forPeer(n.clusterService.nodeName()))).findFirst().get();
+
+ Set<Assignment> newAssignment =
Set.of(Assignment.forPeer(newNodeForAssignment.clusterService.nodeName()));
+
+ // Write the new assignments to metastore as a pending assignments.
+ {
+ TablePartitionId partId = new TablePartitionId(getTableId(node,
TABLE_NAME), 0);
+
+ ByteArray partAssignmentsPendingKey =
pendingPartAssignmentsKey(partId);
+
+ Map<ByteArray, byte[]> msEntries = new HashMap<>();
+
+ byte[] bytesPendingAssignments = ByteUtils.toBytes(newAssignment);
+
+ msEntries.put(partAssignmentsPendingKey, bytesPendingAssignments);
Review Comment:
You can use `Map.of` here
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2157,6 +2157,16 @@ protected CompletableFuture<Void>
handleChangeStableAssignmentEvent(WatchEvent e
return metaStorageMgr.get(pendingPartAssignmentsKey(tablePartitionId),
stableAssignmentsWatchEvent.revision())
.thenComposeAsync(pendingAssignmentsEntry -> {
+ CompletableFuture<Void> raftClientUpdateFuture =
completedFuture(null);
+
+ if
(!stableAssignments.equals(ByteUtils.fromBytes(evt.entryEvent().oldEntry().value())))
{
Review Comment:
What is this condition for? How can new assignments be equal to previous
assignments?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]