alievmirza commented on code in PR #1631:
URL: https://github.com/apache/ignite-3/pull/1631#discussion_r1100182149


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1034,17 +1079,97 @@ CompletableFuture<Void> 
saveDataNodesToMetaStorageOnScaleUp(int zoneId, long rev
                     return completedFuture(null);
                 }
 
-                Set<String> deltaToAdd = 
zoneState.nodesToAddToDataNodes(scaleUpTriggerRevision, 
scaleDownTriggerRevision, revision);
+                List<String> deltaToAdd = 
zoneState.nodesToAddToDataNodes(scaleUpTriggerRevision, revision);
+
+                Map<String, Integer> newDataNodes = new 
HashMap<>(dataNodesFromMetaStorage);
+
+                deltaToAdd.forEach(n -> newDataNodes.merge(n, 1, 
Integer::sum));

Review Comment:
   Fixed



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1221,16 +1368,56 @@ void removeNodesFromDataNodes(Set<String> nodes, long 
revision) {
          * @param addition Indicates whether we should accumulate nodes that 
should be added to data nodes, or removed.
          * @return Accumulated nodes.
          */
-        private Set<String> accumulateNodes(long fromKey, long toKey, boolean 
addition) {
+        private List<String> accumulateNodes(long fromKey, long toKey, boolean 
addition) {
             return topologyAugmentationMap.subMap(fromKey, true, toKey, 
true).values()
                     .stream()
                     .filter(a -> a.addition == addition)
                     .flatMap(a -> a.nodeNames.stream())
-                    .collect(toSet());
+                    .collect(toList());
         }
 
+        /**
+         * Cleans {@code topologyAugmentationMap} to the key, to which is safe 
to delete.
+         * Safe means that this key was handled both by scale up and scale 
down schedulers.
+         *
+         * @param toKey Key in map to which is safe to delete data from {@code 
topologyAugmentationMap}.
+         */
         private void cleanUp(long toKey) {
-            //TODO: IGNITE-18132 Create scale down scheduler with 
dataNodesAutoAdjustScaleDown timer.
+            long firstKey = topologyAugmentationMap.firstKey();

Review Comment:
   done



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1221,16 +1368,56 @@ void removeNodesFromDataNodes(Set<String> nodes, long 
revision) {
          * @param addition Indicates whether we should accumulate nodes that 
should be added to data nodes, or removed.
          * @return Accumulated nodes.
          */
-        private Set<String> accumulateNodes(long fromKey, long toKey, boolean 
addition) {
+        private List<String> accumulateNodes(long fromKey, long toKey, boolean 
addition) {
             return topologyAugmentationMap.subMap(fromKey, true, toKey, 
true).values()
                     .stream()
                     .filter(a -> a.addition == addition)
                     .flatMap(a -> a.nodeNames.stream())
-                    .collect(toSet());
+                    .collect(toList());
         }
 
+        /**
+         * Cleans {@code topologyAugmentationMap} to the key, to which is safe 
to delete.
+         * Safe means that this key was handled both by scale up and scale 
down schedulers.
+         *
+         * @param toKey Key in map to which is safe to delete data from {@code 
topologyAugmentationMap}.
+         */
         private void cleanUp(long toKey) {
-            //TODO: IGNITE-18132 Create scale down scheduler with 
dataNodesAutoAdjustScaleDown timer.
+            long firstKey = topologyAugmentationMap.firstKey();
+
+            Long lastKeyToRemove = topologyAugmentationMap.floorKey(toKey);
+
+            if (lastKeyToRemove == null) {
+                return;
+            }
+
+            topologyAugmentationMap.subMap(firstKey, true, lastKeyToRemove, 
true).clear();
+        }
+
+        /**
+         * Returns the highest revision which is presented in the {@link 
ZoneState#topologyAugmentationMap()} taking into account
+         * the {@code addition} flag.
+         *
+         * @param addition Flag indicating the type of the nodes for which we 
want to find the highest revision.
+         * @return The highest revision which is presented in the {@link 
ZoneState#topologyAugmentationMap()} taking into account
+         *         the {@code addition} flag.
+         */
+        Optional<Long> highestRevision(boolean addition) {
+            return topologyAugmentationMap().entrySet()
+                    .stream()
+                    .filter(e -> e.getValue().addition == addition)
+                    .max(Map.Entry.comparingByKey())
+                    .map(Map.Entry::getKey);
+        }
+
+        @TestOnly
+        ScheduledFuture<?> scaleUpTask() {
+            return scaleUpTask;

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to