sergeyuttsel commented on code in PR #2899:
URL: https://github.com/apache/ignite-3/pull/2899#discussion_r1410657932


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -915,8 +883,8 @@ private CompletableFuture<Void> scheduleTimers(
      */
     private CompletableFuture<Void> scheduleTimers(

Review Comment:
   This method is used only in another `scheduleTimers` method. We can merge 
both versions.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -395,132 +389,106 @@ private CompletableFuture<Void> 
onUpdateFilter(AlterZoneEventParameters paramete
 
         long causalityToken = parameters.causalityToken();
 
-        VaultEntry filterUpdateRevision = 
vaultMgr.get(zonesFilterUpdateRevision()).join();
-
-        if (filterUpdateRevision != null) {
-            // This means that we have already handled event with this 
causalityToken.
-            // It is possible when node was restarted after this listener 
completed,
-            // but applied causalityToken didn't have time to be propagated to 
the Vault.
-            if (bytesToLong(filterUpdateRevision.value()) >= causalityToken) {
-                return completedFuture(null);
-            }
-        }
-
-        vaultMgr.put(zonesFilterUpdateRevision(), 
longToBytes(causalityToken)).join();
-
         causalityDataNodesEngine.onUpdateFilter(causalityToken, zoneId, 
newFilter);
 
         return saveDataNodesToMetaStorageOnScaleUp(zoneId, causalityToken);
     }
 
     /**
-     * Creates or restores zone's state depending on the {@link 
ZoneState#topologyAugmentationMap()} existence in the Vault.
-     * We save {@link ZoneState#topologyAugmentationMap()} in the Vault every 
time we receive logical topology changes from the metastore.
+     * Restores zones' states.
      *
      * @param zone Zone descriptor.
      * @param causalityToken Causality token.
      * @return Future reflecting the completion of creation or restoring a 
zone.
      */
-    private CompletableFuture<Void> 
createOrRestoreZoneStateBusy(CatalogZoneDescriptor zone, long causalityToken) {
+    private CompletableFuture<Void> restoreZoneStateBusy(CatalogZoneDescriptor 
zone, long causalityToken) {
         int zoneId = zone.id();
 
-        Entry topologyAugmentationMapLocalMetaStorage =
-                
metaStorageManager.getLocally(zoneTopologyAugmentation(zoneId), causalityToken);
+        Entry zoneDataNodesLocalMetaStorage = 
metaStorageManager.getLocally(zoneDataNodesKey(zoneId), causalityToken);
 
-        // First creation of a zone, or first call on the manager start for 
the default zone.
-        if (topologyAugmentationMapLocalMetaStorage.value() == null) {
-            ZoneState zoneState = new ZoneState(executor);
-
-            ZoneState prevZoneState = zonesState.putIfAbsent(zoneId, 
zoneState);
-
-            assert prevZoneState == null : "Zone's state was created twice 
[zoneId = " + zoneId + ']';
-
-            Set<Node> dataNodes = 
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet());
+        if (zoneDataNodesLocalMetaStorage.value() == null) {
+            // In this case, creation of a zone was interrupted during restart.
+            return onCreateZone(zone, causalityToken);
+        } else {
+            Entry topologyAugmentationMapLocalMetaStorage = 
metaStorageManager.getLocally(zoneTopologyAugmentation(zoneId), causalityToken);
 
-            
causalityDataNodesEngine.onCreateOrRestoreZoneState(causalityToken, zone);
+            ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap;
 
-            return initDataNodesAndTriggerKeysInMetaStorage(zoneId, 
causalityToken, dataNodes);
-        } else {
-            // Restart case, when topologyAugmentationMap has already been 
saved during a cluster work.
-            ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap 
= fromBytes(topologyAugmentationMapLocalMetaStorage.value());
+            if (topologyAugmentationMapLocalMetaStorage.value() == null) {
+                // This case means that there won't any logical topology 
updates before restart.
+                topologyAugmentationMap = new ConcurrentSkipListMap<>();
+            } else {
+                topologyAugmentationMap = 
fromBytes(topologyAugmentationMapLocalMetaStorage.value());
+            }
 
             ZoneState zoneState = new ZoneState(executor, 
topologyAugmentationMap);
 
             ZoneState prevZoneState = zonesState.putIfAbsent(zoneId, 
zoneState);
 
             assert prevZoneState == null : "Zone's state was created twice 
[zoneId = " + zoneId + ']';
+        }
 
-            Optional<Long> maxScaleUpRevision = 
zoneState.highestRevision(true);
+        causalityDataNodesEngine.onCreateOrRestoreZoneState(causalityToken, 
zone);
 
-            Optional<Long> maxScaleDownRevision = 
zoneState.highestRevision(false);
+        return completedFuture(null);
+    }
 
-            VaultEntry filterUpdateRevision = 
vaultMgr.get(zonesFilterUpdateRevision()).join();
+    private CompletableFuture<Void> onCreateZone(CatalogZoneDescriptor zone, 
long causalityToken) {
+        int zoneId = zone.id();
 
-            restoreTimers(zone, zoneState, maxScaleUpRevision, 
maxScaleDownRevision, filterUpdateRevision);
-        }
+        ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap = 
new ConcurrentSkipListMap<>();
+
+        ZoneState zoneState = new ZoneState(executor, topologyAugmentationMap);
+
+        ZoneState prevZoneState = zonesState.putIfAbsent(zoneId, zoneState);
+
+        assert prevZoneState == null : "Zone's state was created twice [zoneId 
= " + zoneId + ']';
+
+        Set<Node> dataNodes = 
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet());
 
         causalityDataNodesEngine.onCreateOrRestoreZoneState(causalityToken, 
zone);
 
-        return completedFuture(null);
+        return initDataNodesAndTriggerKeysInMetaStorage(zoneId, 
causalityToken, dataNodes);
     }
 
     /**
-     * Restores timers that were scheduled before a node's restart.
-     * Take the highest revision from the {@link 
ZoneState#topologyAugmentationMap()}, compare it with the revision
-     * of the last update of the zone's filter and schedule scale up/scale 
down timers. Filter revision is taken into account because
-     * any filter update triggers immediate scale up.
+     * Restores timers that were scheduled before a node's restart. Take the 
highest revision from the
+     * {@link ZoneState#topologyAugmentationMap()}, schedule scale up/scale 
down timers.
      *
-     * @param zone Zone descriptor.
-     * @param zoneState Zone's state from Distribution Zone Manager
-     * @param maxScaleUpRevisionOptional Max revision from the {@link 
ZoneState#topologyAugmentationMap()} for node joins.
-     * @param maxScaleDownRevisionOptional Max revision from the {@link 
ZoneState#topologyAugmentationMap()} for node removals.
-     * @param filterUpdateRevisionVaultEntry Revision of the last update of 
the zone's filter.
+     * @param catalogVersion Catalog version.
+     * @return Future that represents the pending completion of the operation.
+     *         For the immediate timers it will be completed when data nodes 
will be updated in Meta Storage.
      */
-    private void restoreTimers(
-            CatalogZoneDescriptor zone,
-            ZoneState zoneState,
-            Optional<Long> maxScaleUpRevisionOptional,
-            Optional<Long> maxScaleDownRevisionOptional,
-            VaultEntry filterUpdateRevisionVaultEntry
-    ) {
-        int zoneId = zone.id();
+    private CompletableFuture<Void> restoreTimers(int catalogVersion) {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        maxScaleUpRevisionOptional.ifPresent(
-                maxScaleUpRevision -> {
-                    if (filterUpdateRevisionVaultEntry != null) {
-                        long filterUpdateRevision = 
bytesToLong(filterUpdateRevisionVaultEntry.value());
+        for (CatalogZoneDescriptor zone : 
catalogManager.zones(catalogVersion)) {
+            ZoneState zoneState = zonesState.get(zone.id());
 
-                        // Immediately trigger scale up, that was planned to 
be invoked before restart.
-                        // If this invoke was successful before restart, then 
current call just will be skipped.
-                        saveDataNodesToMetaStorageOnScaleUp(zoneId, 
filterUpdateRevision);
+            // Max revision from the {@link 
ZoneState#topologyAugmentationMap()} for node joins.
+            Optional<Long> maxScaleUpRevisionOptional = 
zoneState.highestRevision(true);
 
-                        if (maxScaleUpRevision < filterUpdateRevision) {
-                            // Don't need to trigger additional scale up for 
the scenario, when filter update event happened after the last
-                            // node join event.
-                            return;
-                        }
+            //Max revision from the {@link 
ZoneState#topologyAugmentationMap()} for node removals.
+            Optional<Long> maxScaleDownRevisionOptional = 
zoneState.highestRevision(false);
+
+            maxScaleUpRevisionOptional.ifPresent(
+                    maxScaleUpRevision -> {
+                        // Take the highest revision from the 
topologyAugmentationMap and schedule scale up/scale down,
+                        // meaning that all augmentations of nodes will be 
taken into account in newly created timers.
+                        // If augmentations have already been proposed to data 
nodes in the metastorage before restart,
+                        // that means we have updated corresponding trigger 
key and it's value will be greater than

Review Comment:
   Seems it may be "greater or equals"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to