kgusakov commented on code in PR #3225:
URL: https://github.com/apache/ignite-3/pull/3225#discussion_r1502949601


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java:
##########
@@ -214,6 +242,85 @@ public void onError(Throwable e) {
         };
     }
 
+    /**
+     * Creates watch listener for tables from a zone. This counter is needed 
for tracking rebalances for a specified partition along all
+     * tables. Once all rebalances for specified partition number are finished 
(meaning that counter is equal to 0), we can perform stable
+     * switch for all stable keys from meta storage.
+     *
+     * @return Corresponding watch listener.
+     */
+    private WatchListener createPartitionsCounterListener() {
+        return new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                return IgniteUtils.inBusyLockAsync(busyLock, () -> {
+
+                    int counter = ((Set<Integer>) 
fromBytes(event.entryEvent().newEntry().value())).size();
+
+                    assert counter >= 0 : "Tables counter for rabalances 
cannot be negative.";
+
+                    if (counter > 0) {
+                        return nullCompletedFuture();
+                    }
+
+                    int zoneId = 
RebalanceUtil.extractZoneIdFromTablesCounter(event.entryEvent().newEntry().key());
+
+                    List<CatalogTableDescriptor> tables = 
findTablesByZoneId(zoneId, catalogService.latestCatalogVersion(), 
catalogService);

Review Comment:
   Why the `catalogService.latestCatalogVersion()`? What about the stale 
counter updates? Will it work as expected?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java:
##########
@@ -214,6 +242,85 @@ public void onError(Throwable e) {
         };
     }
 
+    /**
+     * Creates watch listener for tables from a zone. This counter is needed 
for tracking rebalances for a specified partition along all
+     * tables. Once all rebalances for specified partition number are finished 
(meaning that counter is equal to 0), we can perform stable
+     * switch for all stable keys from meta storage.
+     *
+     * @return Corresponding watch listener.
+     */
+    private WatchListener createPartitionsCounterListener() {
+        return new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                return IgniteUtils.inBusyLockAsync(busyLock, () -> {
+
+                    int counter = ((Set<Integer>) 
fromBytes(event.entryEvent().newEntry().value())).size();
+
+                    assert counter >= 0 : "Tables counter for rabalances 
cannot be negative.";
+
+                    if (counter > 0) {
+                        return nullCompletedFuture();
+                    }
+
+                    int zoneId = 
RebalanceUtil.extractZoneIdFromTablesCounter(event.entryEvent().newEntry().key());
+
+                    List<CatalogTableDescriptor> tables = 
findTablesByZoneId(zoneId, catalogService.latestCatalogVersion(), 
catalogService);
+
+                    rebalanceScheduler.schedule(() -> {
+                        if (!busyLock.enterBusy()) {
+                            return;
+                        }
+
+                        LOG.info("Started to update stable keys for tables 
from the zone [zoneId = {}, tables = [{}]]",

Review Comment:
   The list of tables can be huge in production and in some kind of benchmark 
tests - it will produce huge log record. Maybe move the tables list to debug 
level at least?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java:
##########
@@ -214,6 +242,85 @@ public void onError(Throwable e) {
         };
     }
 
+    /**
+     * Creates watch listener for tables from a zone. This counter is needed 
for tracking rebalances for a specified partition along all
+     * tables. Once all rebalances for specified partition number are finished 
(meaning that counter is equal to 0), we can perform stable
+     * switch for all stable keys from meta storage.
+     *
+     * @return Corresponding watch listener.
+     */
+    private WatchListener createPartitionsCounterListener() {
+        return new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                return IgniteUtils.inBusyLockAsync(busyLock, () -> {
+
+                    int counter = ((Set<Integer>) 
fromBytes(event.entryEvent().newEntry().value())).size();
+
+                    assert counter >= 0 : "Tables counter for rabalances 
cannot be negative.";
+
+                    if (counter > 0) {
+                        return nullCompletedFuture();
+                    }
+
+                    int zoneId = 
RebalanceUtil.extractZoneIdFromTablesCounter(event.entryEvent().newEntry().key());
+
+                    List<CatalogTableDescriptor> tables = 
findTablesByZoneId(zoneId, catalogService.latestCatalogVersion(), 
catalogService);
+
+                    rebalanceScheduler.schedule(() -> {
+                        if (!busyLock.enterBusy()) {
+                            return;
+                        }
+
+                        LOG.info("Started to update stable keys for tables 
from the zone [zoneId = {}, tables = [{}]]",
+                                zoneId,
+                                
tables.stream().map(CatalogObjectDescriptor::name).collect(Collectors.toSet())
+                        );
+
+                        try {
+                            Map<ByteArray, TablePartitionId> 
partitionTablesKeys = new HashMap<>();
+
+                            int partId = 
extractPartitionNumber(event.entryEvent().newEntry().key());
+
+                            for (CatalogTableDescriptor table : tables) {
+                                TablePartitionId replicaGrpId = new 
TablePartitionId(table.id(), partId);
+                                
partitionTablesKeys.put(raftConfigurationAppliedKey(replicaGrpId), 
replicaGrpId);
+                            }
+
+                            Map<ByteArray, Entry> entriesMap = 
metaStorageManager.getAll(partitionTablesKeys.keySet()).get();
+
+                            entriesMap.forEach((key, stable) -> {
+                                doOnNewPeersConfigurationApplied(

Review Comment:
   Could you check the following case: we have N nodes, which handled some 
rebalances successfully, but 1 node didn't process any counter updates yet.
   
   At the moment, this node started to process the counter update events - so, 
it can call the keys update (potentially with the same stable keys) and finish 
the current rebalance (move planned->pending->stable and etc.) because of the 
0-value counter of old stale rebalance. 



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -432,7 +440,8 @@ public TableManager(
             HybridTimestampTracker observableTimestampTracker,
             PlacementDriver placementDriver,
             Supplier<IgniteSql> sql,
-            FailureProcessor failureProcessor
+            FailureProcessor failureProcessor,
+            ScheduledExecutorService rebalanceScheduler

Review Comment:
   javadoc must be updated



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1778,9 +1765,7 @@ private CompletableFuture<Void> 
handleChangePendingAssignmentEvent(
                         // Table can be null only recovery, because we use a 
revision from the future. See comment inside
                         // performRebalanceOnRecovery.
                         if (table == null) {
-                            if (LOG.isInfoEnabled()) {
-                                LOG.info("Skipping Pending Assignments update, 
because table {} does not exist", tblId);
-                            }
+                            assert false : "Table cannot be null.";

Review Comment:
   Why? We have a possible race between the table delete and new rebalance 
scheduling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to