sanpwc commented on code in PR #2415:
URL: https://github.com/apache/ignite-3/pull/2415#discussion_r1305556986


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -621,174 +299,110 @@ public CompletableFuture<Set<String>> dataNodes(long 
causalityToken, int zoneId)
         return causalityDataNodesEngine.dataNodes(causalityToken, zoneId);
     }
 
-    /**
-     * Creates configuration listener for updates of scale up value.
-     *
-     * @return Configuration listener for updates of scale up value.
-     */
-    private ConfigurationListener<Integer> onUpdateScaleUp() {
-        return ctx -> {
-            if (ctx.oldValue() == null) {
-                // zone creation, already handled in a separate listener.
-                return completedFuture(null);
-            }
-
-            DistributionZoneView zoneView = 
ctx.newValue(DistributionZoneView.class);
+    private CompletableFuture<Void> 
onUpdateScaleUpBusy(AlterZoneEventParameters parameters) {
+        int zoneId = parameters.zoneDescriptor().id();
 
-            int zoneId = zoneView.zoneId();
+        int newScaleUp = 
parameters.zoneDescriptor().dataNodesAutoAdjustScaleUp();
 
-            int newScaleUp = ctx.newValue().intValue();
+        long causalityToken = parameters.causalityToken();
 
-            long revision = ctx.storageRevision();
+        // It is safe to zonesTimers.get(zoneId) in term of NPE because meta 
storage notifications are one-threaded
+        // and this map will be initialized on a manager start or with catalog 
notification
+        ZoneState zoneState = zonesState.get(zoneId);
 
-            // It is safe to zonesTimers.get(zoneId) in term of NPE because 
meta storage notifications are one-threaded
-            // and this map will be initialized on a manager start or with 
onCreate configuration notification
-            ZoneState zoneState = zonesState.get(zoneId);
-
-            if (newScaleUp != INFINITE_TIMER_VALUE) {
-                Optional<Long> highestRevision = 
zoneState.highestRevision(true);
+        if (newScaleUp != INFINITE_TIMER_VALUE) {
+            Optional<Long> highestRevision = zoneState.highestRevision(true);
 
-                assert highestRevision.isEmpty() || revision >= 
highestRevision.get() : "Expected revision that "
-                        + "is greater or equal to already seen meta storage 
events.";
+            assert highestRevision.isEmpty() || causalityToken >= 
highestRevision.get() : IgniteStringFormatter.format(
+                    "Expected causalityToken that is greater or equal to 
already seen meta storage events: highestRevision={}, "
+                            + "causalityToken={}",
+                    highestRevision.orElse(null), causalityToken
+            );
 
-                zoneState.rescheduleScaleUp(
-                        newScaleUp,
-                        () -> saveDataNodesToMetaStorageOnScaleUp(zoneId, 
revision),
-                        zoneId
-                );
-            } else {
-                zoneState.stopScaleUp();
-            }
+            zoneState.rescheduleScaleUp(
+                    newScaleUp,
+                    () -> saveDataNodesToMetaStorageOnScaleUp(zoneId, 
causalityToken),
+                    zoneId
+            );
+        } else {
+            zoneState.stopScaleUp();
+        }
 
-            causalityDataNodesEngine.causalityOnUpdateScaleUp(revision, 
zoneId, newScaleUp);
+        causalityDataNodesEngine.causalityOnUpdateScaleUp(causalityToken, 
zoneId, newScaleUp);
 
-            return completedFuture(null);
-        };
+        return completedFuture(null);
     }
 
-    /**
-     * Creates configuration listener for updates of scale down value.
-     *
-     * @return Configuration listener for updates of scale down value.
-     */
-    private ConfigurationListener<Integer> onUpdateScaleDown() {
-        return ctx -> {
-            if (ctx.oldValue() == null) {
-                // zone creation, already handled in a separate listener.
-                return completedFuture(null);
-            }
-
-            int zoneId = ctx.newValue(DistributionZoneView.class).zoneId();
+    private CompletableFuture<Void> 
onUpdateScaleDownBusy(AlterZoneEventParameters parameters) {
+        int zoneId = parameters.zoneDescriptor().id();
 
-            int newScaleDown = ctx.newValue().intValue();
+        int newScaleDown = 
parameters.zoneDescriptor().dataNodesAutoAdjustScaleDown();
 
-            long revision = ctx.storageRevision();
+        long causalityToken = parameters.causalityToken();
 
-            // It is safe to zonesTimers.get(zoneId) in term of NPE because 
meta storage notifications are one-threaded
-            // and this map will be initialized on a manager start or with 
onCreate configuration notification
-            ZoneState zoneState = zonesState.get(zoneId);
+        // It is safe to zonesTimers.get(zoneId) in term of NPE because meta 
storage notifications are one-threaded
+        // and this map will be initialized on a manager start or with catalog 
notification
+        ZoneState zoneState = zonesState.get(zoneId);
 
-            if (newScaleDown != INFINITE_TIMER_VALUE) {
-                Optional<Long> highestRevision = 
zoneState.highestRevision(false);
+        if (newScaleDown != INFINITE_TIMER_VALUE) {
+            Optional<Long> highestRevision = zoneState.highestRevision(false);
 
-                assert highestRevision.isEmpty() || revision >= 
highestRevision.get() : "Expected revision that "
-                        + "is greater or equal to already seen meta storage 
events.";
+            assert highestRevision.isEmpty() || causalityToken >= 
highestRevision.get() : IgniteStringFormatter.format(
+                    "Expected causalityToken that is greater or equal to 
already seen meta storage events: highestRevision={}, "
+                            + "causalityToken={}",
+                    highestRevision.orElse(null), causalityToken
+            );
 
-                zoneState.rescheduleScaleDown(
-                        newScaleDown,
-                        () -> saveDataNodesToMetaStorageOnScaleDown(zoneId, 
revision),
-                        zoneId
-                );
-            } else {
-                zoneState.stopScaleDown();
-            }
+            zoneState.rescheduleScaleDown(
+                    newScaleDown,
+                    () -> saveDataNodesToMetaStorageOnScaleDown(zoneId, 
causalityToken),
+                    zoneId
+            );
+        } else {
+            zoneState.stopScaleDown();
+        }
 
-            causalityDataNodesEngine.causalityOnUpdateScaleDown(revision, 
zoneId, newScaleDown);
+        causalityDataNodesEngine.causalityOnUpdateScaleDown(causalityToken, 
zoneId, newScaleDown);
 
-            return completedFuture(null);
-        };
+        return completedFuture(null);
     }
 
-    /**
-     * Creates configuration listener for updates of zone's filter value.
-     *
-     * @return Configuration listener for updates of zone's filter value.
-     */
-    private ConfigurationListener<String> onUpdateFilter() {
-        return ctx -> {
-            if (ctx.oldValue() == null) {
-                // zone creation, already handled in a separate listener.
-                return completedFuture(null);
-            }
-
-            DistributionZoneView zoneView = 
ctx.newValue(DistributionZoneView.class);
+    private CompletableFuture<Void> onUpdateFilter(AlterZoneEventParameters 
parameters) {
+        int zoneId = parameters.zoneDescriptor().id();
 
-            int zoneId = zoneView.zoneId();
+        String newFilter = parameters.zoneDescriptor().filter();
 
-            String filter = zoneView.filter();
+        long causalityToken = parameters.causalityToken();
 
-            long revision = ctx.storageRevision();
+        VaultEntry filterUpdateRevision = 
vaultMgr.get(zonesFilterUpdateRevision()).join();
 
-            VaultEntry filterUpdateRevision = 
vaultMgr.get(zonesFilterUpdateRevision()).join();
-
-            if (filterUpdateRevision != null) {
-                // This means that we have already handled event with this 
revision.
-                // It is possible when node was restarted after this listener 
completed,
-                // but applied revision didn't have time to be propagated to 
the Vault.
-                if (bytesToLong(filterUpdateRevision.value()) >= revision) {
-                    return completedFuture(null);
-                }
+        if (filterUpdateRevision != null) {
+            // This means that we have already handled event with this 
causalityToken.
+            // It is possible when node was restarted after this listener 
completed,
+            // but applied causalityToken didn't have time to be propagated to 
the Vault.
+            if (bytesToLong(filterUpdateRevision.value()) >= causalityToken) {
+                return completedFuture(null);
             }
-
-            vaultMgr.put(zonesFilterUpdateRevision(), 
longToBytes(revision)).join();
-
-            saveDataNodesToMetaStorageOnScaleUp(zoneId, revision);
-
-            causalityDataNodesEngine.onUpdateFilter(revision, zoneId, filter);
-
-            return completedFuture(null);
-        };
-    }
-
-    private class ZonesConfigurationListener implements 
ConfigurationNamedListListener<DistributionZoneView> {
-        @Override
-        public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
-            DistributionZoneView zone = ctx.newValue();
-
-            createOrRestoreZoneState(zone, ctx.storageRevision());
-
-            return completedFuture(null);
         }
 
-        @Override
-        public CompletableFuture<?> 
onDelete(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
-            int zoneId = ctx.oldValue().zoneId();
-
-            long revision = ctx.storageRevision();
-
-            ZoneState zoneState = zonesState.get(zoneId);
+        vaultMgr.put(zonesFilterUpdateRevision(), 
longToBytes(causalityToken)).join();
 
-            zoneState.stopTimers();
+        saveDataNodesToMetaStorageOnScaleUp(zoneId, causalityToken);

Review Comment:
   Well, it's not yours however in order to prevent ms invocations reordering 
we should return saveDataNodesToMetaStorageOnScaleUp result future as 
onAnyCallback result. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to