sanpwc commented on code in PR #1631:
URL: https://github.com/apache/ignite-3/pull/1631#discussion_r1098949926


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -557,24 +561,57 @@ public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<Distribution
 
             int oldScaleUp;
 
+            int oldScaleDown;
+
             // ctx.oldValue() could be null for the default zone on a first 
start.
             if (ctx.oldValue() == null) {
                 oldScaleUp = Integer.MAX_VALUE;
+
+                oldScaleDown = Integer.MAX_VALUE;
             } else {
                 oldScaleUp = ctx.oldValue().dataNodesAutoAdjustScaleUp();
+
+                oldScaleDown = ctx.oldValue().dataNodesAutoAdjustScaleDown();
             }
 
             int newScaleUp = ctx.newValue().dataNodesAutoAdjustScaleUp();
 
-            if (newScaleUp != Integer.MAX_VALUE && oldScaleUp != newScaleUp) {
-                // It is safe to zonesTimers.get(zoneId) in term of NPE 
because meta storage notifications are one-threaded
-                zonesState.get(zoneId).rescheduleScaleUp(
-                        newScaleUp,
-                        () -> CompletableFuture.supplyAsync(
-                                () -> 
saveDataNodesToMetaStorageOnScaleUp(zoneId, ctx.storageRevision()),
-                                Runnable::run
-                        )
-                );
+            int newScaleDown = ctx.newValue().dataNodesAutoAdjustScaleDown();
+
+            // It is safe to zonesTimers.get(zoneId) in term of NPE because 
meta storage notifications are one-threaded
+            // and this map will me initialized on a manager start or with 
onCreate configuration notification
+            ZoneState zoneState = zonesState.get(zoneId);
+
+            if (oldScaleUp != newScaleUp) {
+                if (newScaleUp != Integer.MAX_VALUE) {
+                    Optional<Long> highestRevision = 
zoneState.highestRevision(true);

Review Comment:
   Why do we need highestRevision? Is it a way to detect that there's active 
timer in a best-effort manner in order not to schedule one if there are no 
pending topology changes, e.g. in case of scale<>AutoadjustTimeout updates?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -557,24 +561,57 @@ public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<Distribution
 
             int oldScaleUp;
 
+            int oldScaleDown;
+
             // ctx.oldValue() could be null for the default zone on a first 
start.
             if (ctx.oldValue() == null) {
                 oldScaleUp = Integer.MAX_VALUE;
+
+                oldScaleDown = Integer.MAX_VALUE;
             } else {
                 oldScaleUp = ctx.oldValue().dataNodesAutoAdjustScaleUp();
+
+                oldScaleDown = ctx.oldValue().dataNodesAutoAdjustScaleDown();
             }
 
             int newScaleUp = ctx.newValue().dataNodesAutoAdjustScaleUp();
 
-            if (newScaleUp != Integer.MAX_VALUE && oldScaleUp != newScaleUp) {
-                // It is safe to zonesTimers.get(zoneId) in term of NPE 
because meta storage notifications are one-threaded
-                zonesState.get(zoneId).rescheduleScaleUp(
-                        newScaleUp,
-                        () -> CompletableFuture.supplyAsync(
-                                () -> 
saveDataNodesToMetaStorageOnScaleUp(zoneId, ctx.storageRevision()),
-                                Runnable::run
-                        )
-                );
+            int newScaleDown = ctx.newValue().dataNodesAutoAdjustScaleDown();
+
+            // It is safe to zonesTimers.get(zoneId) in term of NPE because 
meta storage notifications are one-threaded
+            // and this map will me initialized on a manager start or with 
onCreate configuration notification
+            ZoneState zoneState = zonesState.get(zoneId);
+
+            if (oldScaleUp != newScaleUp) {
+                if (newScaleUp != Integer.MAX_VALUE) {
+                    Optional<Long> highestRevision = 
zoneState.highestRevision(true);
+
+                    highestRevision.ifPresent(rev -> 
zoneState.rescheduleScaleUp(
+                            newScaleUp,
+                            () -> CompletableFuture.supplyAsync(
+                                    () -> 
saveDataNodesToMetaStorageOnScaleUp(zoneId, rev),

Review Comment:
   Why do you use rev and not ctx.storageRevision() here? Please pay attention 
that scenarios with altering the scale<>AutoadjustTimeout are possible.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -945,21 +984,27 @@ void scheduleTimers(
             throw new UnsupportedOperationException("Data nodes auto adjust is 
not supported.");
         } else {
             if (!addedNodes.isEmpty() && autoAdjustScaleUp != 
Integer.MAX_VALUE) {
-                //TODO: IGNITE-18121 Create scale up scheduler with 
dataNodesAutoAdjustScaleUp timer.
                 zonesState.get(zoneId).addNodesToDataNodes(addedNodes, 
revision);

Review Comment:
   addNodesToDataNodes and especially removeNodesFromDataNodes is a bit 
confusing from the naming point of view. 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1034,17 +1079,97 @@ CompletableFuture<Void> 
saveDataNodesToMetaStorageOnScaleUp(int zoneId, long rev
                     return completedFuture(null);
                 }
 
-                Set<String> deltaToAdd = 
zoneState.nodesToAddToDataNodes(scaleUpTriggerRevision, 
scaleDownTriggerRevision, revision);
+                List<String> deltaToAdd = 
zoneState.nodesToAddToDataNodes(scaleUpTriggerRevision, revision);
+
+                Map<String, Integer> newDataNodes = new 
HashMap<>(dataNodesFromMetaStorage);
+
+                deltaToAdd.forEach(n -> newDataNodes.merge(n, 1, 
Integer::sum));

Review Comment:
   So you will write A -> 0, right? If true that will populate ms.DataNodes 
with some garbage forever.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1221,16 +1368,56 @@ void removeNodesFromDataNodes(Set<String> nodes, long 
revision) {
          * @param addition Indicates whether we should accumulate nodes that 
should be added to data nodes, or removed.
          * @return Accumulated nodes.
          */
-        private Set<String> accumulateNodes(long fromKey, long toKey, boolean 
addition) {
+        private List<String> accumulateNodes(long fromKey, long toKey, boolean 
addition) {
             return topologyAugmentationMap.subMap(fromKey, true, toKey, 
true).values()
                     .stream()
                     .filter(a -> a.addition == addition)
                     .flatMap(a -> a.nodeNames.stream())
-                    .collect(toSet());
+                    .collect(toList());
         }
 
+        /**
+         * Cleans {@code topologyAugmentationMap} to the key, to which is safe 
to delete.
+         * Safe means that this key was handled both by scale up and scale 
down schedulers.
+         *
+         * @param toKey Key in map to which is safe to delete data from {@code 
topologyAugmentationMap}.
+         */
         private void cleanUp(long toKey) {
-            //TODO: IGNITE-18132 Create scale down scheduler with 
dataNodesAutoAdjustScaleDown timer.
+            long firstKey = topologyAugmentationMap.firstKey();
+
+            Long lastKeyToRemove = topologyAugmentationMap.floorKey(toKey);
+
+            if (lastKeyToRemove == null) {
+                return;
+            }
+
+            topologyAugmentationMap.subMap(firstKey, true, lastKeyToRemove, 
true).clear();
+        }
+
+        /**
+         * Returns the highest revision which is presented in the {@link 
ZoneState#topologyAugmentationMap()} taking into account
+         * the {@code addition} flag.
+         *
+         * @param addition Flag indicating the type of the nodes for which we 
want to find the highest revision.
+         * @return The highest revision which is presented in the {@link 
ZoneState#topologyAugmentationMap()} taking into account
+         *         the {@code addition} flag.
+         */
+        Optional<Long> highestRevision(boolean addition) {
+            return topologyAugmentationMap().entrySet()
+                    .stream()
+                    .filter(e -> e.getValue().addition == addition)
+                    .max(Map.Entry.comparingByKey())
+                    .map(Map.Entry::getKey);
+        }
+
+        @TestOnly
+        ScheduledFuture<?> scaleUpTask() {
+            return scaleUpTask;

Review Comment:
   Not thread safe, same as about scaleDownTask.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -557,24 +561,57 @@ public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<Distribution
 
             int oldScaleUp;
 
+            int oldScaleDown;

Review Comment:
   As discussed in person, I'd rather add separate listener for scaleUp and 
scaleDown cfg entities. That will allow to skip change-type-detection logic.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -557,24 +561,57 @@ public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<Distribution
 
             int oldScaleUp;
 
+            int oldScaleDown;
+
             // ctx.oldValue() could be null for the default zone on a first 
start.
             if (ctx.oldValue() == null) {
                 oldScaleUp = Integer.MAX_VALUE;
+
+                oldScaleDown = Integer.MAX_VALUE;
             } else {
                 oldScaleUp = ctx.oldValue().dataNodesAutoAdjustScaleUp();
+
+                oldScaleDown = ctx.oldValue().dataNodesAutoAdjustScaleDown();
             }
 
             int newScaleUp = ctx.newValue().dataNodesAutoAdjustScaleUp();
 
-            if (newScaleUp != Integer.MAX_VALUE && oldScaleUp != newScaleUp) {
-                // It is safe to zonesTimers.get(zoneId) in term of NPE 
because meta storage notifications are one-threaded
-                zonesState.get(zoneId).rescheduleScaleUp(
-                        newScaleUp,
-                        () -> CompletableFuture.supplyAsync(
-                                () -> 
saveDataNodesToMetaStorageOnScaleUp(zoneId, ctx.storageRevision()),
-                                Runnable::run
-                        )
-                );
+            int newScaleDown = ctx.newValue().dataNodesAutoAdjustScaleDown();
+
+            // It is safe to zonesTimers.get(zoneId) in term of NPE because 
meta storage notifications are one-threaded
+            // and this map will me initialized on a manager start or with 
onCreate configuration notification
+            ZoneState zoneState = zonesState.get(zoneId);
+
+            if (oldScaleUp != newScaleUp) {
+                if (newScaleUp != Integer.MAX_VALUE) {
+                    Optional<Long> highestRevision = 
zoneState.highestRevision(true);
+
+                    highestRevision.ifPresent(rev -> 
zoneState.rescheduleScaleUp(
+                            newScaleUp,
+                            () -> CompletableFuture.supplyAsync(
+                                    () -> 
saveDataNodesToMetaStorageOnScaleUp(zoneId, rev),
+                                    Runnable::run
+                            )
+                    ));
+                } else {

Review Comment:
   I'd move this logic inside rescheduleScaleUp/Down(). Minor though.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -557,24 +561,57 @@ public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<Distribution
 
             int oldScaleUp;
 
+            int oldScaleDown;
+
             // ctx.oldValue() could be null for the default zone on a first 
start.
             if (ctx.oldValue() == null) {
                 oldScaleUp = Integer.MAX_VALUE;
+
+                oldScaleDown = Integer.MAX_VALUE;
             } else {
                 oldScaleUp = ctx.oldValue().dataNodesAutoAdjustScaleUp();
+
+                oldScaleDown = ctx.oldValue().dataNodesAutoAdjustScaleDown();
             }
 
             int newScaleUp = ctx.newValue().dataNodesAutoAdjustScaleUp();
 
-            if (newScaleUp != Integer.MAX_VALUE && oldScaleUp != newScaleUp) {
-                // It is safe to zonesTimers.get(zoneId) in term of NPE 
because meta storage notifications are one-threaded
-                zonesState.get(zoneId).rescheduleScaleUp(
-                        newScaleUp,
-                        () -> CompletableFuture.supplyAsync(
-                                () -> 
saveDataNodesToMetaStorageOnScaleUp(zoneId, ctx.storageRevision()),
-                                Runnable::run
-                        )
-                );
+            int newScaleDown = ctx.newValue().dataNodesAutoAdjustScaleDown();
+
+            // It is safe to zonesTimers.get(zoneId) in term of NPE because 
meta storage notifications are one-threaded
+            // and this map will me initialized on a manager start or with 
onCreate configuration notification
+            ZoneState zoneState = zonesState.get(zoneId);
+
+            if (oldScaleUp != newScaleUp) {
+                if (newScaleUp != Integer.MAX_VALUE) {
+                    Optional<Long> highestRevision = 
zoneState.highestRevision(true);
+
+                    highestRevision.ifPresent(rev -> 
zoneState.rescheduleScaleUp(
+                            newScaleUp,
+                            () -> CompletableFuture.supplyAsync(
+                                    () -> 
saveDataNodesToMetaStorageOnScaleUp(zoneId, rev),
+                                    Runnable::run

Review Comment:
   Why do we need all that `CompletableFuture.supplyAsync(` and `Runnable::run`?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1166,29 +1304,38 @@ synchronized void stopTimers() {
          * Returns a set of nodes that should be added to zone's data nodes.
          *
          * @param scaleUpRevision Last revision of the scale up event.
-         * @param scaleDownRevision Last revision of the scale down event.
          * @param revision Revision of the event for which this data nodes is 
needed.
-         * @return Set of nodes that should be added to zone's data nodes.
+         * @return List of nodes that should be added to zone's data nodes.
          */
-        Set<String> nodesToAddToDataNodes(long scaleUpRevision, long 
scaleDownRevision, long revision) {
+        List<String> nodesToAddToDataNodes(long scaleUpRevision, long 
revision) {
             Long toKey = topologyAugmentationMap.floorKey(revision);
 
+            Long fromKey = topologyAugmentationMap.ceilingKey(scaleUpRevision);

Review Comment:
   ceilingKeys is a GT|EQ and we need only GT. It's possible to use 
scaleUpRevision + 1.
   However I don't think that we need it at all, because there's 
`topologyAugmentationMap.subMap(fromKey, true, toKey, true).` inside 
accumulateNodes(), basically it may be 
   `topologyAugmentationMap.subMap(scaleUpRevision, false, revision, true)`



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1221,16 +1368,56 @@ void removeNodesFromDataNodes(Set<String> nodes, long 
revision) {
          * @param addition Indicates whether we should accumulate nodes that 
should be added to data nodes, or removed.
          * @return Accumulated nodes.
          */
-        private Set<String> accumulateNodes(long fromKey, long toKey, boolean 
addition) {
+        private List<String> accumulateNodes(long fromKey, long toKey, boolean 
addition) {
             return topologyAugmentationMap.subMap(fromKey, true, toKey, 
true).values()
                     .stream()
                     .filter(a -> a.addition == addition)
                     .flatMap(a -> a.nodeNames.stream())
-                    .collect(toSet());
+                    .collect(toList());

Review Comment:
   Is there a test that will fail if we will substitute toList() with toSet()? 
I mean that to List is correct but we need a test for this.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1221,16 +1368,56 @@ void removeNodesFromDataNodes(Set<String> nodes, long 
revision) {
          * @param addition Indicates whether we should accumulate nodes that 
should be added to data nodes, or removed.
          * @return Accumulated nodes.
          */
-        private Set<String> accumulateNodes(long fromKey, long toKey, boolean 
addition) {
+        private List<String> accumulateNodes(long fromKey, long toKey, boolean 
addition) {
             return topologyAugmentationMap.subMap(fromKey, true, toKey, 
true).values()
                     .stream()
                     .filter(a -> a.addition == addition)
                     .flatMap(a -> a.nodeNames.stream())
-                    .collect(toSet());
+                    .collect(toList());
         }
 
+        /**
+         * Cleans {@code topologyAugmentationMap} to the key, to which is safe 
to delete.
+         * Safe means that this key was handled both by scale up and scale 
down schedulers.
+         *
+         * @param toKey Key in map to which is safe to delete data from {@code 
topologyAugmentationMap}.
+         */
         private void cleanUp(long toKey) {
-            //TODO: IGNITE-18132 Create scale down scheduler with 
dataNodesAutoAdjustScaleDown timer.
+            long firstKey = topologyAugmentationMap.firstKey();

Review Comment:
   Or, probably, `topologyAugmentationMap.headMap(toKey, true).clear();` will 
also do the job.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1153,10 +1277,24 @@ synchronized void rescheduleScaleDown(long delay, 
Runnable runnable) {
          * Cancels task for scale up and scale down.
          */
         synchronized void stopTimers() {
+            stopScaleUp();
+
+            stopScaleDown();
+        }
+
+        /**
+         * Cancels task for scale up.
+         */
+        synchronized void stopScaleUp() {

Review Comment:
   Why do we need synchronized here? Not a big deal though. However, as 
mentioned bellow in case of using synchronized instead of volatile you should 
also read tasks from within synchronized block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to