Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]

2024-04-19 Thread via GitHub


gyfora merged PR #789:
URL: https://github.com/apache/flink-kubernetes-operator/pull/789


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]

2024-04-18 Thread via GitHub


gaborgsomogyi commented on code in PR #789:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1570203474


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##
@@ -129,8 +133,10 @@ public boolean scaleResource(
 scalingSummaries,
 autoScalerEventHandler);
 
-if (scalingWouldExceedClusterResources(
-configOverrides.newConfigWithOverrides(conf),
+var memoryTuningEnabled = 
conf.get(AutoScalerOptions.MEMORY_TUNING_ENABLED);
+if (scalingWouldExceedMaxResources(
+memoryTuningEnabled ? 
configOverrides.newConfigWithOverrides(conf) : conf,

Review Comment:
   Forgotten, but now I've added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]

2024-04-15 Thread via GitHub


gyfora commented on code in PR #789:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1565696510


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##
@@ -129,8 +133,10 @@ public boolean scaleResource(
 scalingSummaries,
 autoScalerEventHandler);
 
-if (scalingWouldExceedClusterResources(
-configOverrides.newConfigWithOverrides(conf),
+var memoryTuningEnabled = 
conf.get(AutoScalerOptions.MEMORY_TUNING_ENABLED);
+if (scalingWouldExceedMaxResources(
+memoryTuningEnabled ? 
configOverrides.newConfigWithOverrides(conf) : conf,

Review Comment:
   Did you add a test for this @gaborgsomogyi ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]

2024-04-12 Thread via GitHub


gaborgsomogyi commented on code in PR #789:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1562336481


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##
@@ -129,8 +133,10 @@ public boolean scaleResource(
 scalingSummaries,
 autoScalerEventHandler);
 
-if (scalingWouldExceedClusterResources(
-configOverrides.newConfigWithOverrides(conf),
+var memoryTuningEnabled = 
conf.get(AutoScalerOptions.MEMORY_TUNING_ENABLED);
+if (scalingWouldExceedMaxResources(
+memoryTuningEnabled ? 
configOverrides.newConfigWithOverrides(conf) : conf,

Review Comment:
   @mxm this is basically a bugfix. The original code assumed that memory 
tuning is always enabled which is not true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]

2024-04-12 Thread via GitHub


gaborgsomogyi commented on PR #789:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/789#issuecomment-2051448683

   @mxm can you plz have a second look? We've talked it through with @gyfora 
and the changes are reflecting it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]

2024-04-12 Thread via GitHub


gaborgsomogyi commented on code in PR #789:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1562302534


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##
@@ -199,6 +214,85 @@ protected static boolean 
allVerticesWithinUtilizationTarget(
 return true;
 }
 
+protected static boolean resourceQuotaReached(

Review Comment:
   I've made a deep-dive here and the conclusion is to have a single function 
where both called. They seem to do similar things but the fact is that the 
commonality is more or less the following:
   ```
   var tmCpu = ctx.getTaskManagerCpu().orElse(0.);
   var tmMemory = MemoryTuning.getTotalMemory(tunedConfig, ctx);
   var numSlotsPerTm = 
tunedConfig.get(TaskManagerOptions.NUM_TASK_SLOTS);
   ```
   All in all I've wrapped them together but in a separate function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]

2024-04-12 Thread via GitHub


gaborgsomogyi commented on code in PR #789:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1562195810


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##
@@ -115,6 +119,17 @@ public boolean scaleResource(
 return false;
 }
 
+if (resourceQuotaReached(conf, evaluatedMetrics, scalingSummaries, 
context)) {
+autoScalerEventHandler.handleEvent(
+context,
+AutoScalerEventHandler.Type.Warning,
+"ResourceQuotaReached",
+RESOURCE_QUOTA_REACHED_MESSAGE,
+null,
+conf.get(SCALING_EVENT_INTERVAL));
+return false;
+}

Review Comment:
   Makes sense, moved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]

2024-04-12 Thread via GitHub


gaborgsomogyi commented on code in PR #789:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1562107810


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java:
##
@@ -97,10 +109,6 @@ public boolean isSource(JobVertexID jobVertexID) {
 return get(jobVertexID).getInputs().isEmpty();
 }
 
-public void updateMaxParallelism(JobVertexID vertexID, int maxParallelism) 
{
-get(vertexID).updateMaxParallelism(maxParallelism);

Review Comment:
   This was an accidental remove, added back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]

2024-04-12 Thread via GitHub


gaborgsomogyi commented on code in PR #789:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1562107436


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -254,7 +263,7 @@ private void updateKafkaSourceMaxParallelisms(Context ctx, 
JobID jobId, JobTopol
 "Updating source {} max parallelism based on 
available partitions to {}",
 sourceVertex,
 numPartitions);
-topology.updateMaxParallelism(sourceVertex, (int) 
numPartitions);
+topology.get(sourceVertex).setMaxParallelism((int) 
numPartitions);

Review Comment:
   Nice catch, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]

2024-04-11 Thread via GitHub


gaborgsomogyi commented on code in PR #789:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1560631310


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##
@@ -199,6 +214,85 @@ protected static boolean 
allVerticesWithinUtilizationTarget(
 return true;
 }
 
+protected static boolean resourceQuotaReached(
+Configuration conf,
+EvaluatedMetrics evaluatedMetrics,
+Map scalingSummaries,
+JobAutoScalerContext ctx) {
+
+if (evaluatedMetrics.getJobTopology() == null
+|| 
evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().isEmpty()) {
+return false;
+}
+
+var cpuQuota = conf.getOptional(AutoScalerOptions.CPU_QUOTA);
+var memoryQuota = conf.getOptional(AutoScalerOptions.MEMORY_QUOTA);
+var tmMemory = ctx.getTaskManagerMemory();
+var tmCpu = ctx.getTaskManagerCpu();
+
+if (cpuQuota.isPresent() || memoryQuota.isPresent()) {
+var currentSlotSharingGroupMaxParallelisms = new 
HashMap();
+var newSlotSharingGroupMaxParallelisms = new 
HashMap();
+for (var e :
+
evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().entrySet()) {

Review Comment:
   Such case `jobTopology.getSlotSharingGroupMapping().isEmpty()`so it's 
handled: 
https://github.com/apache/flink-kubernetes-operator/pull/789/files#diff-7fdb929157b6a94cc180c67b8dddb6722d26a7b8ea8259eb7cb9ef84b9a418a3R224-R227



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]

2024-04-11 Thread via GitHub


gaborgsomogyi commented on code in PR #789:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1560668425


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedMetrics.java:
##
@@ -30,6 +31,7 @@
 @NoArgsConstructor
 @AllArgsConstructor
 public class EvaluatedMetrics {
+private JobTopology jobTopology;

Review Comment:
   Nice catch, removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]

2024-04-11 Thread via GitHub


gaborgsomogyi commented on code in PR #789:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1560631310


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##
@@ -199,6 +214,85 @@ protected static boolean 
allVerticesWithinUtilizationTarget(
 return true;
 }
 
+protected static boolean resourceQuotaReached(
+Configuration conf,
+EvaluatedMetrics evaluatedMetrics,
+Map scalingSummaries,
+JobAutoScalerContext ctx) {
+
+if (evaluatedMetrics.getJobTopology() == null
+|| 
evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().isEmpty()) {
+return false;
+}
+
+var cpuQuota = conf.getOptional(AutoScalerOptions.CPU_QUOTA);
+var memoryQuota = conf.getOptional(AutoScalerOptions.MEMORY_QUOTA);
+var tmMemory = ctx.getTaskManagerMemory();
+var tmCpu = ctx.getTaskManagerCpu();
+
+if (cpuQuota.isPresent() || memoryQuota.isPresent()) {
+var currentSlotSharingGroupMaxParallelisms = new 
HashMap();
+var newSlotSharingGroupMaxParallelisms = new 
HashMap();
+for (var e :
+
evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().entrySet()) {

Review Comment:
   Such case Job topology is null so it's handled: 
https://github.com/apache/flink-kubernetes-operator/pull/789/files#diff-7fdb929157b6a94cc180c67b8dddb6722d26a7b8ea8259eb7cb9ef84b9a418a3R224-R227



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org