kgusakov commented on code in PR #2867:
URL: https://github.com/apache/ignite-3/pull/2867#discussion_r1406865705


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java:
##########
@@ -517,23 +516,26 @@ static CompletableFuture<List<Set<Assignment>>> 
tableAssignments(
     /**
      * Returns table assignments for all table partitions from vault.

Review Comment:
   "from vault" the same as previous



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java:
##########
@@ -466,17 +465,17 @@ public static CompletableFuture<Set<Assignment>> 
partitionAssignments(
     /**
      * Returns partition assignments from vault.
      *
-     * @param vaultManager Vault manager.
+     * @param metaStorageManager Meta storage manager.
      * @param tableId Table id.
      * @param partitionNumber Partition number.
+     * @param revision Revision.
      * @return Returns partition assignments from vault or {@code null} if 
assignments is absent.

Review Comment:
   "from vault" is not truth anymore



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -629,201 +630,222 @@ private CompletableFuture<?> 
createTablePartitionsLocally(
 
             CompletableFuture<?>[] futures = new 
CompletableFuture<?>[partitions];
 
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-19713 
Process assignments and set partitions only for assigned partitions.
-            PartitionSet parts = new BitSetPartitionSet();
+            PartitionSet partitionSet = new BitSetPartitionSet();
 
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-19713 
Process assignments and set partitions only for assigned partitions.
             for (int i = 0; i < futures.length; i++) {
                 futures[i] = new CompletableFuture<>();
 
-                parts.set(i);
+                partitionSet.set(i);
             }
 
-            String localMemberName = localNode().name();
-
             for (int i = 0; i < partitions; i++) {
                 int partId = i;
 
                 Set<Assignment> newPartAssignment = newAssignments.get(partId);
 
-                InternalTable internalTbl = table.internalTable();
+                // TODO https://issues.apache.org/jira/browse/IGNITE-19170 
#handleChangePendingAssignmentEvent should be called on
+                // TODO actual event, the method #createTablePartitionsLocally 
should be removed.
+                handleChangePendingAssignmentEvent(
+                                new TablePartitionId(tableId, partId),
+                                table,
+                                assignmentEntry(new TablePartitionId(tableId, 
partId), newPartAssignment, causalityToken),
+                                assignmentEntry(new TablePartitionId(tableId, 
partId), Set.of(), causalityToken),
+                                completedFuture(Map.of(tableId, partitionSet)),
+                                false
+                        )
+                        .whenComplete((res, ex) -> {
+                            if (ex != null) {
+                                LOG.warn("Unable to update raft groups on the 
node [tableId={}, partitionId={}]", ex, tableId, partId);
 
-                Assignment localMemberAssignment = newPartAssignment.stream()
-                        .filter(a -> a.consistentId().equals(localMemberName))
-                        .findAny()
-                        .orElse(null);
+                                futures[partId].completeExceptionally(ex);
+                            } else {
+                                futures[partId].complete(null);
+                            }
+                        });
+            }
 
-                PeersAndLearners newConfiguration = 
configurationFromAssignments(newPartAssignment);
+            return allOf(futures);
+        });
+    }
 
-                TablePartitionId replicaGrpId = new TablePartitionId(tableId, 
partId);
+    private Entry assignmentEntry(TablePartitionId tblPartId, Set<Assignment> 
assignments, long revision) {
+        return new Entry() {
+            @Override
+            public byte[] key() {
+                return pendingPartAssignmentsKey(tblPartId).bytes();
+            }
 
-                transactionStateResolver.updateAssignment(replicaGrpId, 
newConfiguration.peers().stream().map(Peer::consistentId)
-                        .collect(toList()));
+            @Override
+            public byte @Nullable [] value() {
+                return ByteUtils.toBytes(assignments);
+            }
 
-                var safeTimeTracker = new 
PendingComparableValuesTracker<HybridTimestamp, Void>(
-                        new HybridTimestamp(1, 0)
-                );
-                var storageIndexTracker = new 
PendingComparableValuesTracker<Long, Void>(0L);
+            @Override
+            public long revision() {
+                return 1;

Review Comment:
   I would prefer to throw `java.lang.UnsupportedOperationException`here and in 
other unused methods - to simplify the debugging of any connected issues in 
future.



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java:
##########
@@ -690,11 +690,19 @@ private void 
directUpdateMetastoreRebalanceAssignmentKeys() throws Exception {
     }
 
     private void verifyThatRaftNodesAndReplicasWereStartedOnlyOnce() throws 
Exception {
-        for (int i = 0; i < NODE_COUNT; i++) {
-            verify(getNode(i).raftManager, times(1))
-                    .startRaftGroupNode(any(), any(), any(), any(), 
any(RaftGroupOptions.class));
-            verify(getNode(i).replicaManager, times(1)).startReplica(any(), 
any(), any(), any(), any());
-        }
+        waitForCondition(() -> {
+            try {
+                for (int i = 0; i < NODE_COUNT; i++) {
+                    verify(getNode(i).raftManager, times(1))
+                            .startRaftGroupNode(any(), any(), any(), any(), 
any(RaftGroupOptions.class));

Review Comment:
   I would prefer the 
`Mockito.mockingDetails(getNode(i).raftManager).getInvocations().size()` value 
check, than exception-based logic here. Looks like `verify` is not the right 
intention here.



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java:
##########
@@ -493,7 +493,7 @@ void testDestroyPartitionStoragesOnEvictNode() throws 
Exception {
     @Test
     @UseTestTxStateStorage
     @UseRocksMetaStorage
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20210";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21463";)

Review Comment:
   There is no issue with this number.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to