sanpwc commented on code in PR #2867:
URL: https://github.com/apache/ignite-3/pull/2867#discussion_r1411698172


##########
modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java:
##########
@@ -451,6 +451,14 @@ Publisher<BinaryRow> lookup(
      */
     RaftGroupService partitionRaftGroupService(int partition);
 
+    /**
+     * Returns raft group client for corresponding partition.
+     *
+     * @param partition Partition number.
+     * @return Whether raft group client for corresponding partition is 
started.
+     */
+    boolean partitionRaftGroupServiceStarted(int partition);

Review Comment:
   From my point of view given method breaks the encapsulation. Why do we need 
it? And the minor: javadoc isn't correct
   `Returns raft group client for corresponding partition.` This method doesn't 
return the raft group client.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java:
##########
@@ -465,19 +464,19 @@ public static CompletableFuture<Set<Assignment>> 
partitionAssignments(
     }
 
     /**
-     * Returns partition assignments from vault.
+     * Returns partition assignments from meta storage locally.
      *
-     * @param vaultManager Vault manager.
+     * @param metaStorageManager Meta storage manager.
      * @param tableId Table id.
      * @param partitionNumber Partition number.
-     * @return Returns partition assignments from vault or {@code null} if 
assignments is absent.
+     * @param revision Revision.
+     * @return Returns partition assignments from meta storage locally or 
{@code null} if assignments is absent.
      */
     public static Set<Assignment> partitionAssignments(
-            VaultManager vaultManager, int tableId, int partitionNumber) {
-        VaultEntry entry =
-                vaultManager.get(stablePartAssignmentsKey(new 
TablePartitionId(tableId, partitionNumber))).join();
+            MetaStorageManager metaStorageManager, int tableId, int 
partitionNumber, long revision) {

Review Comment:
   I'm not sure whether it's a matter of taste or we even have given rule in 
our guidelines, however in case of 4 params I'd rather have one at a row.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -523,38 +520,57 @@ public void start() {
     }
 
     private void performRebalanceOnRecovery(long recoveryRevision) {
-        CompletableFuture<Void> pendingAssignmentsRecoveryFuture;
-
-        var prefix = new ByteArray(PENDING_ASSIGNMENTS_PREFIX);
+        var pendingAssignmentsPrefix = new 
ByteArray(PENDING_ASSIGNMENTS_PREFIX);

Review Comment:
   Why did you change performRebalanceOnRecovery? rebalance is about handling 
pending keys. I expect it to be properly implemented in IGNITE-20344. Did you 
find bugs there?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java:
##########
@@ -465,19 +464,19 @@ public static CompletableFuture<Set<Assignment>> 
partitionAssignments(
     }
 
     /**
-     * Returns partition assignments from vault.
+     * Returns partition assignments from meta storage locally.
      *
-     * @param vaultManager Vault manager.
+     * @param metaStorageManager Meta storage manager.
      * @param tableId Table id.
      * @param partitionNumber Partition number.
-     * @return Returns partition assignments from vault or {@code null} if 
assignments is absent.
+     * @param revision Revision.
+     * @return Returns partition assignments from meta storage locally or 
{@code null} if assignments is absent.
      */
     public static Set<Assignment> partitionAssignments(
-            VaultManager vaultManager, int tableId, int partitionNumber) {
-        VaultEntry entry =
-                vaultManager.get(stablePartAssignmentsKey(new 
TablePartitionId(tableId, partitionNumber))).join();
+            MetaStorageManager metaStorageManager, int tableId, int 
partitionNumber, long revision) {
+        Entry entry = 
metaStorageManager.getLocally(stablePartAssignmentsKey(new 
TablePartitionId(tableId, partitionNumber)), revision);
 
-        return (entry == null) ? null : ByteUtils.fromBytes(entry.value());
+        return (entry == null || entry.empty() || entry.tombstone()) ? null : 
ByteUtils.fromBytes(entry.value());

Review Comment:
   Method isn't @Nullable, however it may return null.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java:
##########
@@ -516,25 +515,28 @@ static CompletableFuture<List<Set<Assignment>>> 
tableAssignments(
     }
 
     /**
-     * Returns table assignments for all table partitions from vault.
+     * Returns table assignments for all table partitions from meta storage 
locally.
      *
-     * @param vaultManager Vault manager.
+     * @param metaStorageManager Meta storage manager.
      * @param tableId Table id.
      * @param numberOfPartitions Number of partitions.
+     * @param revision Revision.
      * @return Future with table assignments as a value.
      */
     public static List<Set<Assignment>> tableAssignments(

Review Comment:
   Same method naming concerns as for partitionAssignments.



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java:
##########
@@ -500,7 +500,7 @@ void testDestroyPartitionStoragesOnEvictNode() throws 
Exception {
     @Test
     @UseTestTxStateStorage
     @UseRocksMetaStorage
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20210";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19170";)
     void testDestroyPartitionStoragesOnRestartEvictedNode(TestInfo testInfo) 
throws Exception {

Review Comment:
   Should we write some tests that will ensure that proposed implementation is 
correct?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -523,38 +520,57 @@ public void start() {
     }
 
     private void performRebalanceOnRecovery(long recoveryRevision) {
-        CompletableFuture<Void> pendingAssignmentsRecoveryFuture;
-
-        var prefix = new ByteArray(PENDING_ASSIGNMENTS_PREFIX);
+        var pendingAssignmentsPrefix = new 
ByteArray(PENDING_ASSIGNMENTS_PREFIX);
+        var stableAssignmentsPrefix = new ByteArray(STABLE_ASSIGNMENTS_PREFIX);
+
+        startVv.update(recoveryRevision, (v, e) -> 
handleAssignmentsEventsOnRecovery(
+                stableAssignmentsPrefix,
+                recoveryRevision,
+                (entry, rev) -> handleChangeStableAssignmentEvent(entry, rev, 
true),
+                "stable"
+        ));
+        startVv.update(recoveryRevision, (v, e) -> 
handleAssignmentsEventsOnRecovery(
+                pendingAssignmentsPrefix,
+                recoveryRevision,
+                (entry, rev) -> handleChangePendingAssignmentEvent(entry, 
localPartsByTableIdVv.get(recoveryRevision), rev, true),
+                "pending"
+        ));
+    }
 
-        try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix, 
recoveryRevision)) {
+    private CompletableFuture<Void> handleAssignmentsEventsOnRecovery(

Review Comment:
   What do you mean? On recovery there are no events, right? We just scan 
catalog, retrive corresponding stable and pending assignments, etc? What kind 
of events do you handle on recovery?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java:
##########
@@ -465,19 +464,19 @@ public static CompletableFuture<Set<Assignment>> 
partitionAssignments(
     }
 
     /**
-     * Returns partition assignments from vault.
+     * Returns partition assignments from meta storage locally.
      *
-     * @param vaultManager Vault manager.
+     * @param metaStorageManager Meta storage manager.
      * @param tableId Table id.
      * @param partitionNumber Partition number.
-     * @return Returns partition assignments from vault or {@code null} if 
assignments is absent.
+     * @param revision Revision.
+     * @return Returns partition assignments from meta storage locally or 
{@code null} if assignments is absent.
      */
     public static Set<Assignment> partitionAssignments(

Review Comment:
   Naming is a bit confusing. Currently we have two 
RebalanceUtil#partitionAssignments(): one will retrieve assignments locally 
another from ms leader. I'd rather update method name to clarify that.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1272,6 +1346,8 @@ private void dropTableLocally(long causalityToken, 
CatalogTableDescriptor tableD
 
             CompletableFuture<?>[] stopReplicaFutures = new 
CompletableFuture<?>[partitions];

Review Comment:
   Seems that proposed changes are too complicated. Let's step back for a while 
and discuss (and design) an expected flow.
   
   - Should we treat initial table creation as a rebalance from null 
assignments to pending-initial one? Or we should consider it as direct 
assignments.stable management? 
   - Do we treat assignments.stable handling differently while it's the initial 
set or assignments.switch? Is it reasonable to split given handling into two 
methods or merge them within one?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java:
##########
@@ -516,25 +515,28 @@ static CompletableFuture<List<Set<Assignment>>> 
tableAssignments(
     }
 
     /**
-     * Returns table assignments for all table partitions from vault.
+     * Returns table assignments for all table partitions from meta storage 
locally.
      *
-     * @param vaultManager Vault manager.
+     * @param metaStorageManager Meta storage manager.
      * @param tableId Table id.
      * @param numberOfPartitions Number of partitions.
+     * @param revision Revision.
      * @return Future with table assignments as a value.
      */
     public static List<Set<Assignment>> tableAssignments(
-            VaultManager vaultManager,
+            MetaStorageManager metaStorageManager,
             int tableId,
-            int numberOfPartitions
+            int numberOfPartitions,
+            long revision
     ) {
         return IntStream.range(0, numberOfPartitions)
-                .mapToObj(i ->
-                        (Set<Assignment>) ByteUtils.fromBytes(
-                                vaultManager.get(
-                                        stablePartAssignmentsKey(new 
TablePartitionId(tableId, i))
-                                ).join().value())
-                )
+                .mapToObj(p -> {
+                    Entry e = 
metaStorageManager.getLocally(stablePartAssignmentsKey(new 
TablePartitionId(tableId, p)), revision);
+
+                    assert !e.empty() && !e.tombstone() : e;

Review Comment:
   1. Within partitionAssignments() we check whether entry is null, here we 
don't, why?
   2. Within partitionAssignments() we return null in case of empty/tombstone 
assignments, here we assert that it's not true, why?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -630,201 +646,222 @@ private CompletableFuture<?> 
createTablePartitionsLocally(
 
             CompletableFuture<?>[] futures = new 
CompletableFuture<?>[partitions];
 
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-19713 
Process assignments and set partitions only for assigned partitions.
-            PartitionSet parts = new BitSetPartitionSet();
+            PartitionSet partitionSet = new BitSetPartitionSet();
 
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-19713 
Process assignments and set partitions only for assigned partitions.
             for (int i = 0; i < futures.length; i++) {
                 futures[i] = new CompletableFuture<>();
 
-                parts.set(i);
+                partitionSet.set(i);
             }
 
-            String localMemberName = localNode().name();
-
             for (int i = 0; i < partitions; i++) {
                 int partId = i;
 
                 Set<Assignment> newPartAssignment = newAssignments.get(partId);
 
-                InternalTable internalTbl = table.internalTable();
+                // TODO https://issues.apache.org/jira/browse/IGNITE-19170 
#handleChangePendingAssignmentEvent should be called on
+                // TODO actual event, the method #createTablePartitionsLocally 
should be removed.
+                handleChangePendingAssignmentEvent(

Review Comment:
   What's the point in considering initial assignments as pending one? 
Generally it sounds reasonable but only if you took into consideration the fact 
that assignments.stable might be empty and in this case there's no sense in 
calling changePeersAsync, did you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to