alievmirza commented on code in PR #3225:
URL: https://github.com/apache/ignite-3/pull/3225#discussion_r1507403088


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java:
##########
@@ -214,6 +242,85 @@ public void onError(Throwable e) {
         };
     }
 
+    /**
+     * Creates watch listener for tables from a zone. This counter is needed 
for tracking rebalances for a specified partition along all
+     * tables. Once all rebalances for specified partition number are finished 
(meaning that counter is equal to 0), we can perform stable
+     * switch for all stable keys from meta storage.
+     *
+     * @return Corresponding watch listener.
+     */
+    private WatchListener createPartitionsCounterListener() {
+        return new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                return IgniteUtils.inBusyLockAsync(busyLock, () -> {
+
+                    int counter = ((Set<Integer>) 
fromBytes(event.entryEvent().newEntry().value())).size();
+
+                    assert counter >= 0 : "Tables counter for rabalances 
cannot be negative.";
+
+                    if (counter > 0) {
+                        return nullCompletedFuture();
+                    }
+
+                    int zoneId = 
RebalanceUtil.extractZoneIdFromTablesCounter(event.entryEvent().newEntry().key());
+
+                    List<CatalogTableDescriptor> tables = 
findTablesByZoneId(zoneId, catalogService.latestCatalogVersion(), 
catalogService);
+
+                    rebalanceScheduler.schedule(() -> {
+                        if (!busyLock.enterBusy()) {
+                            return;
+                        }
+
+                        LOG.info("Started to update stable keys for tables 
from the zone [zoneId = {}, tables = [{}]]",
+                                zoneId,
+                                
tables.stream().map(CatalogObjectDescriptor::name).collect(Collectors.toSet())
+                        );
+
+                        try {
+                            Map<ByteArray, TablePartitionId> 
partitionTablesKeys = new HashMap<>();
+
+                            int partId = 
extractPartitionNumber(event.entryEvent().newEntry().key());
+
+                            for (CatalogTableDescriptor table : tables) {
+                                TablePartitionId replicaGrpId = new 
TablePartitionId(table.id(), partId);
+                                
partitionTablesKeys.put(raftConfigurationAppliedKey(replicaGrpId), 
replicaGrpId);
+                            }
+
+                            Map<ByteArray, Entry> entriesMap = 
metaStorageManager.getAll(partitionTablesKeys.keySet()).get();
+
+                            entriesMap.forEach((key, stable) -> {
+                                doOnNewPeersConfigurationApplied(

Review Comment:
   I've added check for revision and we skipp outdared events



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to