[jira] [Commented] (FLINK-31977) If scaling.effectiveness.detection.enabled is false, the call to the detectIneffectiveScaleUp() function is unnecessary

2024-01-11 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805650#comment-17805650
 ] 

Maximilian Michels commented on FLINK-31977:


I think this is related to FLINK-33993. The name of the configuration option is 
a bit misleading, as effectiveness detection is always on but scalings are only 
blocked when the option is set to {{true}}.

> If scaling.effectiveness.detection.enabled is false, the call to the 
> detectIneffectiveScaleUp() function is unnecessary
> ---
>
> Key: FLINK-31977
> URL: https://issues.apache.org/jira/browse/FLINK-31977
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Affects Versions: 1.17.0
>Reporter: Tan Kim
>Priority: Minor
>
> The code below is a function to detect inefficient scaleups.
> It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED 
> (scaling.effectiveness.detection.enabled) is true after all the necessary 
> computations for detection, but this is an unnecessary computation.
> {code:java}
> JobVertexScaler.java #175
> private boolean detectIneffectiveScaleUp(
> AbstractFlinkResource resource,
> JobVertexID vertex,
> Configuration conf,
> Map evaluatedMetrics,
> ScalingSummary lastSummary) {
> double lastProcRate = 
> lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // 
> 22569.315633422066
> double lastExpectedProcRate =
> 
> lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // 
> 37340.0
> var currentProcRate = 
> evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
> // To judge the effectiveness of the scale up operation we compute how 
> much of the expected
> // increase actually happened. For example if we expect a 100 increase in 
> proc rate and only
> // got an increase of 10 we only accomplished 10% of the desired 
> increase. If this number is
> // below the threshold, we mark the scaling ineffective.
> double expectedIncrease = lastExpectedProcRate - lastProcRate;
> double actualIncrease = currentProcRate - lastProcRate;
> boolean withinEffectiveThreshold =
> (actualIncrease / expectedIncrease)
> >= 
> conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
> if (withinEffectiveThreshold) {
> return false;
> }
> var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
> eventRecorder.triggerEvent(
> resource,
> EventRecorder.Type.Normal,
> EventRecorder.Reason.IneffectiveScaling,
> EventRecorder.Component.Operator,
> message);
> if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
> LOG.info(message);
> return true;
> } else {
> return false;
> }
> } {code}
> In the call to the detectIneffectiveScaleUp function, I would suggest 
> checking SCALING_EFFECTIVENESS_DETECTION_ENABLED first, as follows.
> {code:java}
> JobVertexScaler.java #150
> if (currentParallelism == lastSummary.getNewParallelism() && 
> lastSummary.isScaledUp()) {
>     if (scaledUp) {
>         
> if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
>             return detectIneffectiveScaleUp(resource, vertex, conf, 
> evaluatedMetrics, lastSummary);
>         } else {
>             return true;
>         }
>     } else {
>         return detectImmediateScaleDownAfterScaleUp(vertex, conf, 
> lastScalingTs);
>     }
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31977) If scaling.effectiveness.detection.enabled is false, the call to the detectIneffectiveScaleUp() function is unnecessary

2023-05-04 Thread Tan Kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719423#comment-17719423
 ] 

Tan Kim commented on FLINK-31977:
-

I understand what you're saying.
However, there's a part of the code that's a bit hard to understand because it 
has multiple returns based on conditional statements.
I think I can change it to something more intuitive like this, what do you 
think?

This is unrelated to the original suggestion in the jira ticket to improve 
inefficient function calls per SCALING_EFFECTIVENESS_DETECTION_ENABLED, but it 
does make the code a little easier to understand.
{code:java}
private boolean detectIneffectiveScaleUp(
AbstractFlinkResource resource,
JobVertexID vertex,
Configuration conf,
Map evaluatedMetrics,
ScalingSummary lastSummary) {

double lastProcRate = 
lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
double lastExpectedProcRate =
lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent();
var currentProcRate = 
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();

// To judge the effectiveness of the scale up operation we compute how much 
of the expected
// increase actually happened. For example if we expect a 100 increase in 
proc rate and only
// got an increase of 10 we only accomplished 10% of the desired increase. 
If this number is
// below the threshold, we mark the scaling ineffective.
double expectedIncrease = lastExpectedProcRate - lastProcRate;
double actualIncrease = currentProcRate - lastProcRate;

boolean isInEffectiveScaleUp =
(actualIncrease / expectedIncrease)
< 
conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);

if (isInEffectiveScaleUp) {
var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);

eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Normal,
EventRecorder.Reason.IneffectiveScaling,
EventRecorder.Component.Operator,
message);

if 
(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
LOG.info(
"Ineffective scaling detected for {}, expected increase {}, 
actual {}",
vertex,
expectedIncrease,
actualIncrease);
return true;
}
}
return false;
} {code}

> If scaling.effectiveness.detection.enabled is false, the call to the 
> detectIneffectiveScaleUp() function is unnecessary
> ---
>
> Key: FLINK-31977
> URL: https://issues.apache.org/jira/browse/FLINK-31977
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Affects Versions: 1.17.0
>Reporter: Tan Kim
>Priority: Minor
>
> The code below is a function to detect inefficient scaleups.
> It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED 
> (scaling.effectiveness.detection.enabled) is true after all the necessary 
> computations for detection, but this is an unnecessary computation.
> {code:java}
> JobVertexScaler.java #175
> private boolean detectIneffectiveScaleUp(
> AbstractFlinkResource resource,
> JobVertexID vertex,
> Configuration conf,
> Map evaluatedMetrics,
> ScalingSummary lastSummary) {
> double lastProcRate = 
> lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // 
> 22569.315633422066
> double lastExpectedProcRate =
> 
> lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // 
> 37340.0
> var currentProcRate = 
> evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
> // To judge the effectiveness of the scale up operation we compute how 
> much of the expected
> // increase actually happened. For example if we expect a 100 increase in 
> proc rate and only
> // got an increase of 10 we only accomplished 10% of the desired 
> increase. If this number is
> // below the threshold, we mark the scaling ineffective.
> double expectedIncrease = lastExpectedProcRate - lastProcRate;
> double actualIncrease = currentProcRate - lastProcRate;
> boolean withinEffectiveThreshold =
> (actualIncrease / expectedIncrease)
> >= 
> conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
> if (withinEffectiveThreshold) {
> return false;
> }
> var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
> eventRecorder.triggerEvent(
> resource,
> EventRecorder.Type.Normal,
> EventRecorder.Reason.IneffectiveScaling,
> 

[jira] [Commented] (FLINK-31977) If scaling.effectiveness.detection.enabled is false, the call to the detectIneffectiveScaleUp() function is unnecessary

2023-05-04 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719221#comment-17719221
 ] 

Gyula Fora commented on FLINK-31977:


The problem here is that you need to call the detection logic before you 
trigger the event. The event should only be sent if ineffective scaling is 
detected.

> If scaling.effectiveness.detection.enabled is false, the call to the 
> detectIneffectiveScaleUp() function is unnecessary
> ---
>
> Key: FLINK-31977
> URL: https://issues.apache.org/jira/browse/FLINK-31977
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Affects Versions: 1.17.0
>Reporter: Tan Kim
>Priority: Minor
>
> The code below is a function to detect inefficient scaleups.
> It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED 
> (scaling.effectiveness.detection.enabled) is true after all the necessary 
> computations for detection, but this is an unnecessary computation.
> {code:java}
> JobVertexScaler.java #175
> private boolean detectIneffectiveScaleUp(
> AbstractFlinkResource resource,
> JobVertexID vertex,
> Configuration conf,
> Map evaluatedMetrics,
> ScalingSummary lastSummary) {
> double lastProcRate = 
> lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // 
> 22569.315633422066
> double lastExpectedProcRate =
> 
> lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // 
> 37340.0
> var currentProcRate = 
> evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
> // To judge the effectiveness of the scale up operation we compute how 
> much of the expected
> // increase actually happened. For example if we expect a 100 increase in 
> proc rate and only
> // got an increase of 10 we only accomplished 10% of the desired 
> increase. If this number is
> // below the threshold, we mark the scaling ineffective.
> double expectedIncrease = lastExpectedProcRate - lastProcRate;
> double actualIncrease = currentProcRate - lastProcRate;
> boolean withinEffectiveThreshold =
> (actualIncrease / expectedIncrease)
> >= 
> conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
> if (withinEffectiveThreshold) {
> return false;
> }
> var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
> eventRecorder.triggerEvent(
> resource,
> EventRecorder.Type.Normal,
> EventRecorder.Reason.IneffectiveScaling,
> EventRecorder.Component.Operator,
> message);
> if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
> LOG.info(message);
> return true;
> } else {
> return false;
> }
> } {code}
> In the call to the detectIneffectiveScaleUp function, I would suggest 
> checking SCALING_EFFECTIVENESS_DETECTION_ENABLED first, as follows.
> {code:java}
> JobVertexScaler.java #150
> if (currentParallelism == lastSummary.getNewParallelism() && 
> lastSummary.isScaledUp()) {
>     if (scaledUp) {
>         
> if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
>             return detectIneffectiveScaleUp(resource, vertex, conf, 
> evaluatedMetrics, lastSummary);
>         } else {
>             return true;
>         }
>     } else {
>         return detectImmediateScaleDownAfterScaleUp(vertex, conf, 
> lastScalingTs);
>     }
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31977) If scaling.effectiveness.detection.enabled is false, the call to the detectIneffectiveScaleUp() function is unnecessary

2023-05-01 Thread Tan Kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718215#comment-17718215
 ] 

Tan Kim commented on FLINK-31977:
-

If so, what do you think about pulling out the event trigger as a caller as 
well?

 
{code:java}
if (currentParallelism == lastSummary.getNewParallelism() && 
lastSummary.isScaledUp()) {
    if (scaledUp) {
        var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Normal,
EventRecorder.Reason.IneffectiveScaling,
EventRecorder.Component.Operator,
message);
        if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) 
{
            return detectIneffectiveScaleUp(resource, vertex, conf, 
evaluatedMetrics, lastSummary);
        } else {
            return true;
        }
    } else {
        return detectImmediateScaleDownAfterScaleUp(vertex, conf, 
lastScalingTs);
    }
} {code}
 

 

> If scaling.effectiveness.detection.enabled is false, the call to the 
> detectIneffectiveScaleUp() function is unnecessary
> ---
>
> Key: FLINK-31977
> URL: https://issues.apache.org/jira/browse/FLINK-31977
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Affects Versions: 1.17.0
>Reporter: Tan Kim
>Priority: Minor
>
> The code below is a function to detect inefficient scaleups.
> It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED 
> (scaling.effectiveness.detection.enabled) is true after all the necessary 
> computations for detection, but this is an unnecessary computation.
> {code:java}
> JobVertexScaler.java #175
> private boolean detectIneffectiveScaleUp(
> AbstractFlinkResource resource,
> JobVertexID vertex,
> Configuration conf,
> Map evaluatedMetrics,
> ScalingSummary lastSummary) {
> double lastProcRate = 
> lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // 
> 22569.315633422066
> double lastExpectedProcRate =
> 
> lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // 
> 37340.0
> var currentProcRate = 
> evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
> // To judge the effectiveness of the scale up operation we compute how 
> much of the expected
> // increase actually happened. For example if we expect a 100 increase in 
> proc rate and only
> // got an increase of 10 we only accomplished 10% of the desired 
> increase. If this number is
> // below the threshold, we mark the scaling ineffective.
> double expectedIncrease = lastExpectedProcRate - lastProcRate;
> double actualIncrease = currentProcRate - lastProcRate;
> boolean withinEffectiveThreshold =
> (actualIncrease / expectedIncrease)
> >= 
> conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
> if (withinEffectiveThreshold) {
> return false;
> }
> var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
> eventRecorder.triggerEvent(
> resource,
> EventRecorder.Type.Normal,
> EventRecorder.Reason.IneffectiveScaling,
> EventRecorder.Component.Operator,
> message);
> if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
> LOG.info(message);
> return true;
> } else {
> return false;
> }
> } {code}
> In the call to the detectIneffectiveScaleUp function, I would suggest 
> checking SCALING_EFFECTIVENESS_DETECTION_ENABLED first, as follows.
> {code:java}
> JobVertexScaler.java #150
> if (currentParallelism == lastSummary.getNewParallelism() && 
> lastSummary.isScaledUp()) {
>     if (scaledUp) {
>         
> if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
>             return detectIneffectiveScaleUp(resource, vertex, conf, 
> evaluatedMetrics, lastSummary);
>         } else {
>             return true;
>         }
>     } else {
>         return detectImmediateScaleDownAfterScaleUp(vertex, conf, 
> lastScalingTs);
>     }
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31977) If scaling.effectiveness.detection.enabled is false, the call to the detectIneffectiveScaleUp() function is unnecessary

2023-05-01 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718194#comment-17718194
 ] 

Gyula Fora commented on FLINK-31977:


The logic here is that we trigger an Event even if the effectiveness detection 
is disabled. This will not prevent further scale up but would still notify the 
user.

> If scaling.effectiveness.detection.enabled is false, the call to the 
> detectIneffectiveScaleUp() function is unnecessary
> ---
>
> Key: FLINK-31977
> URL: https://issues.apache.org/jira/browse/FLINK-31977
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Affects Versions: 1.17.0
>Reporter: Tan Kim
>Priority: Minor
>
> The code below is a function to detect inefficient scaleups.
> It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED 
> (scaling.effectiveness.detection.enabled) is true after all the necessary 
> computations for detection, but this is an unnecessary computation.
> {code:java}
> JobVertexScaler.java #175
> private boolean detectIneffectiveScaleUp(
> AbstractFlinkResource resource,
> JobVertexID vertex,
> Configuration conf,
> Map evaluatedMetrics,
> ScalingSummary lastSummary) {
> double lastProcRate = 
> lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // 
> 22569.315633422066
> double lastExpectedProcRate =
> 
> lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // 
> 37340.0
> var currentProcRate = 
> evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
> // To judge the effectiveness of the scale up operation we compute how 
> much of the expected
> // increase actually happened. For example if we expect a 100 increase in 
> proc rate and only
> // got an increase of 10 we only accomplished 10% of the desired 
> increase. If this number is
> // below the threshold, we mark the scaling ineffective.
> double expectedIncrease = lastExpectedProcRate - lastProcRate;
> double actualIncrease = currentProcRate - lastProcRate;
> boolean withinEffectiveThreshold =
> (actualIncrease / expectedIncrease)
> >= 
> conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
> if (withinEffectiveThreshold) {
> return false;
> }
> var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
> eventRecorder.triggerEvent(
> resource,
> EventRecorder.Type.Normal,
> EventRecorder.Reason.IneffectiveScaling,
> EventRecorder.Component.Operator,
> message);
> if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
> LOG.info(message);
> return true;
> } else {
> return false;
> }
> } {code}
> In the call to the detectIneffectiveScaleUp function, I would suggest 
> checking SCALING_EFFECTIVENESS_DETECTION_ENABLED first, as follows.
> {code:java}
> JobVertexScaler.java #150
> if (currentParallelism == lastSummary.getNewParallelism() && 
> lastSummary.isScaledUp()) {
>     if (scaledUp) {
>         
> if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
>             return detectIneffectiveScaleUp(resource, vertex, conf, 
> evaluatedMetrics, lastSummary);
>         } else {
>             return true;
>         }
>     } else {
>         return detectImmediateScaleDownAfterScaleUp(vertex, conf, 
> lastScalingTs);
>     }
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)