alievmirza commented on code in PR #1631:
URL: https://github.com/apache/ignite-3/pull/1631#discussion_r1100182149
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1034,17 +1079,97 @@ CompletableFuture<Void>
saveDataNodesToMetaStorageOnScaleUp(int zoneId, long rev
return completedFuture(null);
}
- Set<String> deltaToAdd =
zoneState.nodesToAddToDataNodes(scaleUpTriggerRevision,
scaleDownTriggerRevision, revision);
+ List<String> deltaToAdd =
zoneState.nodesToAddToDataNodes(scaleUpTriggerRevision, revision);
+
+ Map<String, Integer> newDataNodes = new
HashMap<>(dataNodesFromMetaStorage);
+
+ deltaToAdd.forEach(n -> newDataNodes.merge(n, 1,
Integer::sum));
Review Comment:
Fixed
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1221,16 +1368,56 @@ void removeNodesFromDataNodes(Set<String> nodes, long
revision) {
* @param addition Indicates whether we should accumulate nodes that
should be added to data nodes, or removed.
* @return Accumulated nodes.
*/
- private Set<String> accumulateNodes(long fromKey, long toKey, boolean
addition) {
+ private List<String> accumulateNodes(long fromKey, long toKey, boolean
addition) {
return topologyAugmentationMap.subMap(fromKey, true, toKey,
true).values()
.stream()
.filter(a -> a.addition == addition)
.flatMap(a -> a.nodeNames.stream())
- .collect(toSet());
+ .collect(toList());
}
+ /**
+ * Cleans {@code topologyAugmentationMap} to the key, to which is safe
to delete.
+ * Safe means that this key was handled both by scale up and scale
down schedulers.
+ *
+ * @param toKey Key in map to which is safe to delete data from {@code
topologyAugmentationMap}.
+ */
private void cleanUp(long toKey) {
- //TODO: IGNITE-18132 Create scale down scheduler with
dataNodesAutoAdjustScaleDown timer.
+ long firstKey = topologyAugmentationMap.firstKey();
Review Comment:
done
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1221,16 +1368,56 @@ void removeNodesFromDataNodes(Set<String> nodes, long
revision) {
* @param addition Indicates whether we should accumulate nodes that
should be added to data nodes, or removed.
* @return Accumulated nodes.
*/
- private Set<String> accumulateNodes(long fromKey, long toKey, boolean
addition) {
+ private List<String> accumulateNodes(long fromKey, long toKey, boolean
addition) {
return topologyAugmentationMap.subMap(fromKey, true, toKey,
true).values()
.stream()
.filter(a -> a.addition == addition)
.flatMap(a -> a.nodeNames.stream())
- .collect(toSet());
+ .collect(toList());
}
+ /**
+ * Cleans {@code topologyAugmentationMap} to the key, to which is safe
to delete.
+ * Safe means that this key was handled both by scale up and scale
down schedulers.
+ *
+ * @param toKey Key in map to which is safe to delete data from {@code
topologyAugmentationMap}.
+ */
private void cleanUp(long toKey) {
- //TODO: IGNITE-18132 Create scale down scheduler with
dataNodesAutoAdjustScaleDown timer.
+ long firstKey = topologyAugmentationMap.firstKey();
+
+ Long lastKeyToRemove = topologyAugmentationMap.floorKey(toKey);
+
+ if (lastKeyToRemove == null) {
+ return;
+ }
+
+ topologyAugmentationMap.subMap(firstKey, true, lastKeyToRemove,
true).clear();
+ }
+
+ /**
+ * Returns the highest revision which is presented in the {@link
ZoneState#topologyAugmentationMap()} taking into account
+ * the {@code addition} flag.
+ *
+ * @param addition Flag indicating the type of the nodes for which we
want to find the highest revision.
+ * @return The highest revision which is presented in the {@link
ZoneState#topologyAugmentationMap()} taking into account
+ * the {@code addition} flag.
+ */
+ Optional<Long> highestRevision(boolean addition) {
+ return topologyAugmentationMap().entrySet()
+ .stream()
+ .filter(e -> e.getValue().addition == addition)
+ .max(Map.Entry.comparingByKey())
+ .map(Map.Entry::getKey);
+ }
+
+ @TestOnly
+ ScheduledFuture<?> scaleUpTask() {
+ return scaleUpTask;
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]