sanpwc commented on code in PR #1631:
URL: https://github.com/apache/ignite-3/pull/1631#discussion_r1098949926
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -557,24 +561,57 @@ public CompletableFuture<?>
onUpdate(ConfigurationNotificationEvent<Distribution
int oldScaleUp;
+ int oldScaleDown;
+
// ctx.oldValue() could be null for the default zone on a first
start.
if (ctx.oldValue() == null) {
oldScaleUp = Integer.MAX_VALUE;
+
+ oldScaleDown = Integer.MAX_VALUE;
} else {
oldScaleUp = ctx.oldValue().dataNodesAutoAdjustScaleUp();
+
+ oldScaleDown = ctx.oldValue().dataNodesAutoAdjustScaleDown();
}
int newScaleUp = ctx.newValue().dataNodesAutoAdjustScaleUp();
- if (newScaleUp != Integer.MAX_VALUE && oldScaleUp != newScaleUp) {
- // It is safe to zonesTimers.get(zoneId) in term of NPE
because meta storage notifications are one-threaded
- zonesState.get(zoneId).rescheduleScaleUp(
- newScaleUp,
- () -> CompletableFuture.supplyAsync(
- () ->
saveDataNodesToMetaStorageOnScaleUp(zoneId, ctx.storageRevision()),
- Runnable::run
- )
- );
+ int newScaleDown = ctx.newValue().dataNodesAutoAdjustScaleDown();
+
+ // It is safe to zonesTimers.get(zoneId) in term of NPE because
meta storage notifications are one-threaded
+ // and this map will me initialized on a manager start or with
onCreate configuration notification
+ ZoneState zoneState = zonesState.get(zoneId);
+
+ if (oldScaleUp != newScaleUp) {
+ if (newScaleUp != Integer.MAX_VALUE) {
+ Optional<Long> highestRevision =
zoneState.highestRevision(true);
Review Comment:
Why do we need highestRevision? Is it a way to detect that there's active
timer in a best-effort manner in order not to schedule one if there are no
pending topology changes, e.g. in case of scale<>AutoadjustTimeout updates?
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -557,24 +561,57 @@ public CompletableFuture<?>
onUpdate(ConfigurationNotificationEvent<Distribution
int oldScaleUp;
+ int oldScaleDown;
+
// ctx.oldValue() could be null for the default zone on a first
start.
if (ctx.oldValue() == null) {
oldScaleUp = Integer.MAX_VALUE;
+
+ oldScaleDown = Integer.MAX_VALUE;
} else {
oldScaleUp = ctx.oldValue().dataNodesAutoAdjustScaleUp();
+
+ oldScaleDown = ctx.oldValue().dataNodesAutoAdjustScaleDown();
}
int newScaleUp = ctx.newValue().dataNodesAutoAdjustScaleUp();
- if (newScaleUp != Integer.MAX_VALUE && oldScaleUp != newScaleUp) {
- // It is safe to zonesTimers.get(zoneId) in term of NPE
because meta storage notifications are one-threaded
- zonesState.get(zoneId).rescheduleScaleUp(
- newScaleUp,
- () -> CompletableFuture.supplyAsync(
- () ->
saveDataNodesToMetaStorageOnScaleUp(zoneId, ctx.storageRevision()),
- Runnable::run
- )
- );
+ int newScaleDown = ctx.newValue().dataNodesAutoAdjustScaleDown();
+
+ // It is safe to zonesTimers.get(zoneId) in term of NPE because
meta storage notifications are one-threaded
+ // and this map will me initialized on a manager start or with
onCreate configuration notification
+ ZoneState zoneState = zonesState.get(zoneId);
+
+ if (oldScaleUp != newScaleUp) {
+ if (newScaleUp != Integer.MAX_VALUE) {
+ Optional<Long> highestRevision =
zoneState.highestRevision(true);
+
+ highestRevision.ifPresent(rev ->
zoneState.rescheduleScaleUp(
+ newScaleUp,
+ () -> CompletableFuture.supplyAsync(
+ () ->
saveDataNodesToMetaStorageOnScaleUp(zoneId, rev),
Review Comment:
Why do you use rev and not ctx.storageRevision() here? Please pay attention
that scenarios with altering the scale<>AutoadjustTimeout are possible.
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -945,21 +984,27 @@ void scheduleTimers(
throw new UnsupportedOperationException("Data nodes auto adjust is
not supported.");
} else {
if (!addedNodes.isEmpty() && autoAdjustScaleUp !=
Integer.MAX_VALUE) {
- //TODO: IGNITE-18121 Create scale up scheduler with
dataNodesAutoAdjustScaleUp timer.
zonesState.get(zoneId).addNodesToDataNodes(addedNodes,
revision);
Review Comment:
addNodesToDataNodes and especially removeNodesFromDataNodes is a bit
confusing from the naming point of view.
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1034,17 +1079,97 @@ CompletableFuture<Void>
saveDataNodesToMetaStorageOnScaleUp(int zoneId, long rev
return completedFuture(null);
}
- Set<String> deltaToAdd =
zoneState.nodesToAddToDataNodes(scaleUpTriggerRevision,
scaleDownTriggerRevision, revision);
+ List<String> deltaToAdd =
zoneState.nodesToAddToDataNodes(scaleUpTriggerRevision, revision);
+
+ Map<String, Integer> newDataNodes = new
HashMap<>(dataNodesFromMetaStorage);
+
+ deltaToAdd.forEach(n -> newDataNodes.merge(n, 1,
Integer::sum));
Review Comment:
So you will write A -> 0, right? If true that will populate ms.DataNodes
with some garbage forever.
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1221,16 +1368,56 @@ void removeNodesFromDataNodes(Set<String> nodes, long
revision) {
* @param addition Indicates whether we should accumulate nodes that
should be added to data nodes, or removed.
* @return Accumulated nodes.
*/
- private Set<String> accumulateNodes(long fromKey, long toKey, boolean
addition) {
+ private List<String> accumulateNodes(long fromKey, long toKey, boolean
addition) {
return topologyAugmentationMap.subMap(fromKey, true, toKey,
true).values()
.stream()
.filter(a -> a.addition == addition)
.flatMap(a -> a.nodeNames.stream())
- .collect(toSet());
+ .collect(toList());
}
+ /**
+ * Cleans {@code topologyAugmentationMap} to the key, to which is safe
to delete.
+ * Safe means that this key was handled both by scale up and scale
down schedulers.
+ *
+ * @param toKey Key in map to which is safe to delete data from {@code
topologyAugmentationMap}.
+ */
private void cleanUp(long toKey) {
- //TODO: IGNITE-18132 Create scale down scheduler with
dataNodesAutoAdjustScaleDown timer.
+ long firstKey = topologyAugmentationMap.firstKey();
+
+ Long lastKeyToRemove = topologyAugmentationMap.floorKey(toKey);
+
+ if (lastKeyToRemove == null) {
+ return;
+ }
+
+ topologyAugmentationMap.subMap(firstKey, true, lastKeyToRemove,
true).clear();
+ }
+
+ /**
+ * Returns the highest revision which is presented in the {@link
ZoneState#topologyAugmentationMap()} taking into account
+ * the {@code addition} flag.
+ *
+ * @param addition Flag indicating the type of the nodes for which we
want to find the highest revision.
+ * @return The highest revision which is presented in the {@link
ZoneState#topologyAugmentationMap()} taking into account
+ * the {@code addition} flag.
+ */
+ Optional<Long> highestRevision(boolean addition) {
+ return topologyAugmentationMap().entrySet()
+ .stream()
+ .filter(e -> e.getValue().addition == addition)
+ .max(Map.Entry.comparingByKey())
+ .map(Map.Entry::getKey);
+ }
+
+ @TestOnly
+ ScheduledFuture<?> scaleUpTask() {
+ return scaleUpTask;
Review Comment:
Not thread safe, same as about scaleDownTask.
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -557,24 +561,57 @@ public CompletableFuture<?>
onUpdate(ConfigurationNotificationEvent<Distribution
int oldScaleUp;
+ int oldScaleDown;
Review Comment:
As discussed in person, I'd rather add separate listener for scaleUp and
scaleDown cfg entities. That will allow to skip change-type-detection logic.
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -557,24 +561,57 @@ public CompletableFuture<?>
onUpdate(ConfigurationNotificationEvent<Distribution
int oldScaleUp;
+ int oldScaleDown;
+
// ctx.oldValue() could be null for the default zone on a first
start.
if (ctx.oldValue() == null) {
oldScaleUp = Integer.MAX_VALUE;
+
+ oldScaleDown = Integer.MAX_VALUE;
} else {
oldScaleUp = ctx.oldValue().dataNodesAutoAdjustScaleUp();
+
+ oldScaleDown = ctx.oldValue().dataNodesAutoAdjustScaleDown();
}
int newScaleUp = ctx.newValue().dataNodesAutoAdjustScaleUp();
- if (newScaleUp != Integer.MAX_VALUE && oldScaleUp != newScaleUp) {
- // It is safe to zonesTimers.get(zoneId) in term of NPE
because meta storage notifications are one-threaded
- zonesState.get(zoneId).rescheduleScaleUp(
- newScaleUp,
- () -> CompletableFuture.supplyAsync(
- () ->
saveDataNodesToMetaStorageOnScaleUp(zoneId, ctx.storageRevision()),
- Runnable::run
- )
- );
+ int newScaleDown = ctx.newValue().dataNodesAutoAdjustScaleDown();
+
+ // It is safe to zonesTimers.get(zoneId) in term of NPE because
meta storage notifications are one-threaded
+ // and this map will me initialized on a manager start or with
onCreate configuration notification
+ ZoneState zoneState = zonesState.get(zoneId);
+
+ if (oldScaleUp != newScaleUp) {
+ if (newScaleUp != Integer.MAX_VALUE) {
+ Optional<Long> highestRevision =
zoneState.highestRevision(true);
+
+ highestRevision.ifPresent(rev ->
zoneState.rescheduleScaleUp(
+ newScaleUp,
+ () -> CompletableFuture.supplyAsync(
+ () ->
saveDataNodesToMetaStorageOnScaleUp(zoneId, rev),
+ Runnable::run
+ )
+ ));
+ } else {
Review Comment:
I'd move this logic inside rescheduleScaleUp/Down(). Minor though.
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -557,24 +561,57 @@ public CompletableFuture<?>
onUpdate(ConfigurationNotificationEvent<Distribution
int oldScaleUp;
+ int oldScaleDown;
+
// ctx.oldValue() could be null for the default zone on a first
start.
if (ctx.oldValue() == null) {
oldScaleUp = Integer.MAX_VALUE;
+
+ oldScaleDown = Integer.MAX_VALUE;
} else {
oldScaleUp = ctx.oldValue().dataNodesAutoAdjustScaleUp();
+
+ oldScaleDown = ctx.oldValue().dataNodesAutoAdjustScaleDown();
}
int newScaleUp = ctx.newValue().dataNodesAutoAdjustScaleUp();
- if (newScaleUp != Integer.MAX_VALUE && oldScaleUp != newScaleUp) {
- // It is safe to zonesTimers.get(zoneId) in term of NPE
because meta storage notifications are one-threaded
- zonesState.get(zoneId).rescheduleScaleUp(
- newScaleUp,
- () -> CompletableFuture.supplyAsync(
- () ->
saveDataNodesToMetaStorageOnScaleUp(zoneId, ctx.storageRevision()),
- Runnable::run
- )
- );
+ int newScaleDown = ctx.newValue().dataNodesAutoAdjustScaleDown();
+
+ // It is safe to zonesTimers.get(zoneId) in term of NPE because
meta storage notifications are one-threaded
+ // and this map will me initialized on a manager start or with
onCreate configuration notification
+ ZoneState zoneState = zonesState.get(zoneId);
+
+ if (oldScaleUp != newScaleUp) {
+ if (newScaleUp != Integer.MAX_VALUE) {
+ Optional<Long> highestRevision =
zoneState.highestRevision(true);
+
+ highestRevision.ifPresent(rev ->
zoneState.rescheduleScaleUp(
+ newScaleUp,
+ () -> CompletableFuture.supplyAsync(
+ () ->
saveDataNodesToMetaStorageOnScaleUp(zoneId, rev),
+ Runnable::run
Review Comment:
Why do we need all that `CompletableFuture.supplyAsync(` and `Runnable::run`?
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1166,29 +1304,38 @@ synchronized void stopTimers() {
* Returns a set of nodes that should be added to zone's data nodes.
*
* @param scaleUpRevision Last revision of the scale up event.
- * @param scaleDownRevision Last revision of the scale down event.
* @param revision Revision of the event for which this data nodes is
needed.
- * @return Set of nodes that should be added to zone's data nodes.
+ * @return List of nodes that should be added to zone's data nodes.
*/
- Set<String> nodesToAddToDataNodes(long scaleUpRevision, long
scaleDownRevision, long revision) {
+ List<String> nodesToAddToDataNodes(long scaleUpRevision, long
revision) {
Long toKey = topologyAugmentationMap.floorKey(revision);
+ Long fromKey = topologyAugmentationMap.ceilingKey(scaleUpRevision);
Review Comment:
ceilingKeys is a GT|EQ and we need only GT. It's possible to use
scaleUpRevision + 1.
However I don't think that we need it at all, because there's
`topologyAugmentationMap.subMap(fromKey, true, toKey, true).` inside
accumulateNodes(), basically it may be
`topologyAugmentationMap.subMap(scaleUpRevision, false, revision, true)`
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1221,16 +1368,56 @@ void removeNodesFromDataNodes(Set<String> nodes, long
revision) {
* @param addition Indicates whether we should accumulate nodes that
should be added to data nodes, or removed.
* @return Accumulated nodes.
*/
- private Set<String> accumulateNodes(long fromKey, long toKey, boolean
addition) {
+ private List<String> accumulateNodes(long fromKey, long toKey, boolean
addition) {
return topologyAugmentationMap.subMap(fromKey, true, toKey,
true).values()
.stream()
.filter(a -> a.addition == addition)
.flatMap(a -> a.nodeNames.stream())
- .collect(toSet());
+ .collect(toList());
Review Comment:
Is there a test that will fail if we will substitute toList() with toSet()?
I mean that to List is correct but we need a test for this.
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1221,16 +1368,56 @@ void removeNodesFromDataNodes(Set<String> nodes, long
revision) {
* @param addition Indicates whether we should accumulate nodes that
should be added to data nodes, or removed.
* @return Accumulated nodes.
*/
- private Set<String> accumulateNodes(long fromKey, long toKey, boolean
addition) {
+ private List<String> accumulateNodes(long fromKey, long toKey, boolean
addition) {
return topologyAugmentationMap.subMap(fromKey, true, toKey,
true).values()
.stream()
.filter(a -> a.addition == addition)
.flatMap(a -> a.nodeNames.stream())
- .collect(toSet());
+ .collect(toList());
}
+ /**
+ * Cleans {@code topologyAugmentationMap} to the key, to which is safe
to delete.
+ * Safe means that this key was handled both by scale up and scale
down schedulers.
+ *
+ * @param toKey Key in map to which is safe to delete data from {@code
topologyAugmentationMap}.
+ */
private void cleanUp(long toKey) {
- //TODO: IGNITE-18132 Create scale down scheduler with
dataNodesAutoAdjustScaleDown timer.
+ long firstKey = topologyAugmentationMap.firstKey();
Review Comment:
Or, probably, `topologyAugmentationMap.headMap(toKey, true).clear();` will
also do the job.
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1153,10 +1277,24 @@ synchronized void rescheduleScaleDown(long delay,
Runnable runnable) {
* Cancels task for scale up and scale down.
*/
synchronized void stopTimers() {
+ stopScaleUp();
+
+ stopScaleDown();
+ }
+
+ /**
+ * Cancels task for scale up.
+ */
+ synchronized void stopScaleUp() {
Review Comment:
Why do we need synchronized here? Not a big deal though. However, as
mentioned bellow in case of using synchronized instead of volatile you should
also read tasks from within synchronized block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]